使用 Elastic Observability 监控 dbt 管道
2024年11月6日 | by mebius
tgcode
作者:来自 ElasticAlmudena Sanz Oliv•Tamara Dancheva
了解如何使用 Elastic 设置 dbt 监控系统,该系统可主动发出数据处理成本峰值、每张表的行数异常以及数据质量测试失败的警报。
在 Elastic 可观察性组织内的数据分析团队中,我们使用 dbt(dbt™,data build tool– 数据构建工具)来执行我们的 SQL 数据转换管道。dbt 是一个 SQL 优先的转换工作流,可让团队快速协作地部署分析代码。具体来说,我们使用开源项目 dbt core,你可以在其中从命令行进行开发并运行你的 dbt 项目。
我们的数据转换管道每天运行并处理数据,这些数据将提供给我们的内部仪表板、报告、分析和机器学习 (ML) 模型。
过去曾发生过管道故障、源表包含错误数据或我们对 SQL 代码进行了更改导致数据质量问题的事件,而我们只有在每周报告中看到显示异常数量的记录时才意识到这一点。这就是为什么我们建立了一个监控系统,该系统会在这些类型的事件发生时主动向我们发出警报,并帮助我们进行可视化和分析以了解其根本原因,从而为我们节省了几个小时或几天的手动调查时间。
我们利用自己的可观察性解决方案来帮助解决这一挑战,监控 dbt 实施的整个生命周期。此设置使我们能够跟踪模型的行为并对最终表进行数据质量测试。我们将运行作业和测试中的 dbt 流程日志导出到 Elasticsearch,并利用 Kibana 创建仪表板、设置警报和配置机器学习作业来监控和评估问题。
下图显示了我们的完整架构。在后续文章中,我们还将介绍如何使用 OTEL 和 Elastic 观察我们的 python 数据处理和 ML 模型流程 – 敬请期待。
为什么要使用 Elastic 监控 dbt 管道?
每次调用时,dbt 都会生成并保存一个或多个 JSON 文件(称为 artifacts),其中包含有关调用结果的日志数据。根据 dbt 文档,dbt run 和 dbt test 调用日志存储在文件 run_results.json 中:
This file contains information about a completed invocation of dbt, including timing and status info for each node (model, test, etc) that was executed. In aggregate, manyrun_results.json can be combined to calculate average model runtime, test failure rates, the number of record changes captured by snapshots, etc.
监控 dbt 运行调用日志有助于解决多个问题,包括跟踪和警告表容量、检测资源密集型模型的过多时隙时间、识别由于时隙时间或容量导致的成本峰值以及查明可能表明存在调度问题的缓慢执行时间。当我们将 PR 与存在问题的代码更改合并时,此系统至关重要,导致上游表 A 中的每日行数突然下降。通过将 dbt 运行日志导入 Elastic,我们的异常检测作业快速识别出表 A 及其下游表 B、C 和 D 的每日行数异常。数据分析团队收到了有关该问题的警报通知,使我们能够及时排除故障、修复和回填表,以免影响每周仪表板和下游 ML 模型。
监控 dbt 测试调用日志还可以解决多个问题,例如识别表中的重复项、通过验证所有枚举字段来检测特定字段允许值的未注意到的更改,以及解决各种其他数据处理和质量问题。借助仪表板和数据质量测试警报,我们可以主动识别重复键、意外类别值和空值增加等问题,从而确保数据完整性。在我们的团队中,我们遇到了一个问题,即我们的原始查找表之一的更改导致用户表中出现重复行,从而使报告的用户数量翻倍。通过将 dbt 测试日志导入 Elastic,我们的规则检测到一些重复测试失败。团队收到了有关该问题的警报通知,使我们能够通过找到根本原因的上游表立即对其进行故障排除。这些重复意味着下游表必须处理 2 倍的数据量,从而导致处理的字节数和时隙时间激增。dbt 运行日志上的异常检测和警报还帮助我们发现各个表的这些峰值,并使我们能够量化对账单的影响。
使用 Elastic 和 Kibana 处理 dbt 日志使我们能够获得实时洞察,帮助我们快速排除潜在问题,并确保我们的数据转换过程顺利运行。我们在 Kibana 中设置了异常检测作业和警报,以监控 dbt 处理的行数、时隙时间和测试结果。这让我们能够捕捉实时事件,通过及时识别和修复这些问题,Elastic 使我们的数据管道更具弹性,我们的模型更具成本效益,帮助我们掌握成本峰值或数据质量问题。
我们还可以将这些信息与 Elastic 中采集的其他事件关联起来,例如使用 Elastic Github 连接器,我们可以将数据质量测试失败或其他异常与代码更改关联起来,以找到导致问题的提交或 PR 的根本原因。通过将应用程序日志采集到 Elastic,我们还可以使用 APM 分析管道中的这些问题是否影响了下游应用程序,从而增加了延迟、吞吐量或错误率。采集账单、收入数据或网络流量,我们还可以看到业务指标的影响。
如何将 dbt 调用日志导出到 Elasticsearch
我们在生产中每天运行 dbt run 和 dbt test 流程后,使用 Python Elasticsearch 客户端将 dbt 调用日志发送到 Elastic。设置只需要你安装 Elasticsearch Python 客户端并获取你的 Elastic Cloud ID(转到 https://cloud.elastic.co/deployments/,选择你的部署并找到 Cloud ID)和 Elastic Cloud API 密钥(按照本指南操作)
此 python 辅助函数将把 run_results.json 文件中的结果索引到指定的索引。你只需将变量导出到环境:
- RESULTS_FILE:run_results.json 文件的路径
- DBT_RUN_LOGS_INDEX:你想要在 Elastic 中为 dbt 运行日志索引指定的名称,例如 dbt_run_logs
- DBT_TEST_LOGS_INDEX:你想要在 Elastic 中为 dbt 测试日志索引指定的名称,例如dbt_test_logs
- ES_CLUSTER_CLOUD_ID
- ES_CLUSTER_API_KEY
然后从你的 Python 代码中调用函数 log_dbt_es,或者将此代码保存为 Python 脚本,并在执行你的 dbt run 或 dbt test 命令后运行它:
from elasticsearch import Elasticsearch, helpers
import os
import sys
import json
def log_dbt_es():
RESULTS_FILE = os.environ["RESULTS_FILE"]
DBT_RUN_LOGS_INDEX = os.environ["DBT_RUN_LOGS_INDEX"]
DBT_TEST_LOGS_INDEX = os.environ["DBT_TEST_LOGS_INDEX"]
es_cluster_cloud_id = os.environ["ES_CLUSTER_CLOUD_ID"]
es_cluster_api_key = os.environ["ES_CLUSTER_API_KEY"]
es_client = Elasticsearch(
cloud_id=es_cluster_cloud_id,
api_key=es_cluster_api_key,
request_timeout=120,
)
if not os.path.exists(RESULTS_FILE):
print(f"ERROR: {RESULTS_FILE} No dbt run results found.")
sys.exit(1)
with open(RESULTS_FILE, "r") as json_file:
results = json.load(json_file)
timestamp = results["metadata"]["generated_at"]
metadata = results["metadata"]
elapsed_time = results["elapsed_time"]
args = results["args"]
docs = []
for result in results["results"]:
if result["unique_id"].split(".")[0] == "tetgcodest":
result["_index"] = DBT_TEST_LOGS_INDEX
else:
result["_index"] = DBT_RUN_LOGS_INDEX
result["@timestamp"] = timestamp
result["metadata"] = metadata
result["elapsed_time"] = elapsed_time
result["args"] = args
docs.append(result)
_ = helpers.bulk(es_client, docs)
return "Done"
# Call the function
log_dbt_es()
如果你想从 run_retgcodesults.json 中添加/删除任何其他字段,你可以修改上述函数来执行此操作。
将结果编入索引后,你可以使用 Kibana 为两个索引创建数据视图,并开始在 Discover 中探索它们。
转到 Discover,单击左上角的数据视图选择器,然后单击 “Create a data view”。
现在,你可以用你喜欢的名称创建数据视图。对 dbt run(代码中的 DBT_RUN_LOGS_INDEX)和 dbt test(代码中的 DBT_TEST_LOGS_INDEX)索引执行此操作:
返回 Discover,你将能够选择 Data Views 并探索数据。
dbt run 警报、仪表板和 ML 作业
dbt run 的调用针对当前数据库执行已编译的 SQL 模型文件。dbt run 调用日志包含以下字段:
- unique_id:唯一模型标识符
- execution_time:执行此模型运行所花费的总时间
日志还包含有关适配器作业执行的以下指标:
- adapter_response.bytes_processed
- adapter_response.bytes_billed
- adapter_response.slot_ms
- adapter_response.rows_affected
我们已使用 Kibana 根据上述指标设置异常检测作业。你可以配置按 unique_id 拆分的 multi-metric job,以便在每个表受影响的行数、消耗的时隙时间或计费的字节数的总和异常时发出警报。你可以跟踪每个指标的一个作业。如果你已为每个表构建了指标的仪表板,则可以使用此快捷方式直接从可视化中创建异常检测作业。创建作业并运行传入数据后,你可以 view the jobs 并使用异常时间线中的三个点按钮将其添加到仪表板:
我们使用 ML 作业设置了警报,当检测到异常时,会向我们发送电子邮件/Slack 消息。可以直接从 “Machine Learning > Anomaly Detection Jobs)” 页面创建警报,方法是单击 ML 作业行末尾的三个点:
我们还使用 Kibana 仪表板可视化每个表的异常检测作业结果和相关指标,以确定哪些表消耗了我们的大部分资源,了解它们的时间演变,并测量可以帮助我们了解月度变化的汇总指标。
dbt test 警报和仪表板
你可能已经熟悉 dbt 中的测试,但如果你不熟悉,dbt 数据测试就是你对模型做出的断言(assertions)。使用命令 dbt test,dbt 将告诉你项目中的每个测试是通过还是失败。这里是如何设置它们的示例。在我们的团队中,我们使用开箱即用的 dbt 测试(unique、not_null、accepted_values 和 relationships)以及包 dbt_utils 和 dbt_expectations 进行一些额外的测试。运行命令 dbt test 时,它会生成存储在 run_results.json 中的日志。
dbt 测试日志包含以下字段:
- unique_id:唯一测试标识符,测试在其唯一标识符中包含 “test” 前缀
- status:测试结果,pass 或 fail
- execution_time:执行此测试所花费的总时间
- failures:如果测试通过,则为 0,如果测试失败,则为 1
- message:如果测试失败,则为失败的原因
日志还包含来自适配器的有关作业执行的指标。
我们已经设置了文档计数警报(参见指南),当有任何失败的测试时,它将向我们发送电子邮件/Slack 消息。警报规则是在我们之前创建的 dbt 测试数据视图上设置的,查询过滤 status:fail 以获取失败测试的日志,规则条件是文档计数大于 0。每当生产中的任何测试失败时,我们都会收到一条警报,其中包含指向警报详细信息和仪表板的链接,以便能够对其进行故障排除:
我们还构建了一个仪表板来可视化运行的测试、失败的测试及其执行时间和时间段,以查看测试运行的历史视图:
使用 AI 助手查找根本原因
对于我们来说,分析这些多种信息源的最有效方法是使用 AI 助手帮助我们排除故障。在我们的案例中,我们收到了有关测试失败的警报,并使用 AI 助手为我们提供发生的事情的背景信息。然后我们询问是否有任何下游后果,AI 助手解释了异常检测作业的结果,这表明我们的下游表之一的时隙时间激增,并且时隙时间与基线相比有所增加。然后,我们询问根本原因,AI 助手能够从我们的 Github 变更日志中找到并向我们提供 PR 链接,该 PR 与事件的开始相匹配,并且是最可能的原因。
结论
作为数据分析团队,我们有责任保证我们向利益相关者提供的表格、图表、模型、报告和仪表板准确无误,并包含正确的信息来源。随着团队的发展,我们拥有的模型数量变得越来越大,相互联系也越来越紧密,要保证一切顺利运行并提供准确的结果并不容易。拥有一个可以主动提醒我们成本飙升、行数异常或数据质量测试失败的监控系统,就像拥有一个值得信赖的伙伴,如果出现问题,它会提前提醒你,并帮助你找到问题的根本原因。
dbt 调用日志是有关我们数据管道状态的重要信息来源,而 Elastic 是从中挖掘最大潜力的完美工具。使用这篇博文作为利用 dbt 日志的起点,帮助你的团队实现更高的可靠性和安心,让他们专注于更具战略性的任务,而不必担心潜在的数据问题。
原文:Monitor dbt pipelines with Elastic Observability — Elastic Observability Labs
文章来源于互联网:使用 Elastic Observability 监控 dbt 管道