Observability:使用 OTEL 监控你的 Python 数据管道
2024年11月6日 | by mebius
作者:来自 ElasticTamara Dancheva•Almudena Sanz Oliv
了解如何为数据管道配置 OTEL、检测任何异常、分析性能以及使用 Elastic 设置相应的警报。
本文深入探讨了如何实施可观察性实践,特别是使用 Python 中的 OpenTelemetry (OTEL),以增强使用 Elastic 对数据管道的监控和质量控制。虽然本文中提供的示例主要关注 ETL(Extract, Transform, Load – 提取、转换、加载)流程,以确保对商业智能 (Business Intelligence – BI) 至关重要的数据管道的准确性和可靠性,但讨论的策略和工具同样适用于用于机器学习 (ML) 模型或其他数据处理任务的 Python 流程。
简介
数据管道,尤其是 ETL 流程,构成了现代数据架构的支柱。这些管道负责从各种来源提取原始数据,将其转换为有意义的信息,并将其加载到数据仓库或数据湖中进行分析和报告。
在我们的组织中,我们有基于 Python 的 ETL 脚本,它们在从 Elasticsearch (ES) 集群导出和处理数据并将其加载到 Google BigQuery (BQ) 中起着关键作用。然后,这些处理后的数据输入到 DBT(Data Build Tool – 数据构建工具)模型中,该模型进一步细化数据并使其可用于分析和报告。要查看完整的架构并了解我们如何使用 Elastic 监控我们的 DBT 管道,请参阅使用 Elastic Observability 监控你的 DBT 管道。在本文中,我们重点介绍 ETL 脚本。鉴于这些脚本的关键性质,必须建立机制来控制和确保它们生成的数据的质量。
这里讨论的策略可以扩展到处理数据处理或机器学习模型的任何脚本或应用程序,无论使用何种编程语言,只要存在支持 OTEL 仪器的相应代理。
动机
数据管道中的可观察性涉及监控数据处理的整个生命周期,以确保一切按预期运行。它包括:
1)数据质量控制:
- 检测数据中的异常,例如记录数意外下降。
- 验证数据转换是否正确且一致地应用。
- 确保加载到数据仓库中的数据的完整性和准确性。
2)性能监控:
- 跟踪 ETL 脚本的执行时间以识别瓶颈并优化性能。
- 监控资源使用情况,例如内存和 CPU 消耗,以确保高效使用基础设施。
3)实时警报:
- 设置警报以立即通知问题,例如 ETL 作业失败、数据质量问题或性能下降。
- 识别此类事件的根本原因
- 主动解决事件以最大限度地减少停机时间和对业务运营的影响
诸如 ETL 作业失败之类的问题甚至可以指向更大的基础设施或数据源数据质量问题。
检测步骤
以下是自动检测 Python 脚本以导出 OTEL 跟踪、指标和日志的步骤。
步骤 1:导入所需库
我们首先需要安装以下库。
pip install elastic-opentelemetry google-cloud-bigquery[opentelemetry]
你还可以将它们添加到项目的 requirements.txt 文件中,并使用 pip install -r requirements.txt 进行安装。
依赖项说明
1)elastic-opentelemetry:此软件包是 OpenTelemetry Python 的 Elastic 发行版。在底层,它将安装以下软件包:
- opentelemetry-distro:此软件包是 OpenTelemetry 的便捷发行版,其中包括 OpenTelemetry SDK、API 和各种仪表包。它简化了应用程序中 OpenTelemetry 的设置和配置。
- opentelemetry-exporter-otlp:此软件包提供了一个导出器,可将遥测数据发送到 OpenTelemetry Collector 或支持 OpenTelemetry 协议 (OTLP) 的任何其他端点。这包括跟踪、指标和日志。
- opentelemetry-instrumentation-system-metrics:此软件包提供了用于收集系统指标(例如 CPU 使用率、内存使用率和其他系统级指标)的检测。
2)google-cloud-bigquery[opentelemetry]:此软件包将 Google Cloud BigQuery 与 OpenTelemetry 集成,允许你跟踪和监控 BigQuery 操作。
第 2 步:导出 OTEL 变量
通过从 Elastic 的 APM OTEL 获取配置来设置必要的 OpenTelemetry (OTEL) 变量。
转到 APM -> Services -> Add data (top left corner)
在本节中,你将找到如何配置各种 APM 代理的步骤。导航到 OpenTelemetry 以查找需要导出的变量。
查找 OTLP 端点:
- 查找与 OpenTelemetry 或 OTLP 配置相关的部分。
- OTEL_EXPORTER_OTLP_ENDPOINT 通常作为将 OpenTelemetry 与 Elastic APM 集成的设置说明的一部分提供。它可能看起来像 https:///otlp。
获取 OTLP 标头:
- 在同一部分中,你应该找到 OTLP 标头的说明或字段。这些标头通常用于身份验证目的。
- 复制接口提供的必要标头。它们可能看起来像 Authorization: Bearer 。
注意:请注意,使用 Python 时,你需要将 OTEL_EXPORTER_OTLP_HEADERS 变量中的 Bearer 和你的 token 之间的空格替换为 %20。
或者,你可以使用不同的方法使用 API 密钥进行身份验证(请参阅说明)。如果你使用我们的 serverless 产品,则需要使用此方法。
设置变量:
- 将脚本中的占位符替换为从 Elastic APM 接口获取的实际值,并通过源命令 source env.sh 在 shell 中执行。
以下是设置这些变量的脚本:
#!/bin/bash
echo "--- :otel: Setting OTEL variables"
export OTEL_EXPORTER_OTLP_ENDPOINT='https://your-apm-server/otlp:443'
export OTEL_EXPORTER_OTLP_HEADERS='Authorization=Bearer%20your-token'
export OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true
export OTEL_PYTHON_LOG_CORRELATION=true
export ELASTIC_OTEL_SYSTEM_METRICS_ENABLED=true
export OTEL_METRIC_EXPORT_INTERVAL=5000
export OTEL_LOGS_EXPORTER="otlp,console"
设置这些变量后,我们就可以进行自动检测,而不需要在代码中添加任何内容。
变量说明:
- OTEL_EXPORTER_OTLP_ENDPOINT:此变量指定将发送 OTLP 数据(跟踪、指标、日志)的端点。将占位符替换为你的实际 OTLP 端点。
- OTEL_EXPORTER_OTLP_HEADERS:此变量指定发送 OTLP 数据时身份验证或其他目的所需的任何标头。将占位符替换为你的实际 OTLP 标头。
- OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED:此变量启用 Python 中日志的自动检测,允许使用跟踪上下文自动丰富日志。
- OTEL_PYTHON_LOG_CORRELATION:此变量启用日志关联,其中包括日志条目中的跟踪上下文,以将日志与跟踪关联起来。
- OTEL_METRIC_EXPORT_INTERVAL:此变量指定指标导出间隔(以毫秒为单位),在本例中为 5 秒。
- OTEL_LOGS_EXPORTER:此变量指定用于日志的导出器。将其设置为 “otlp” 意味着将使用 OTLP 协议导出日志。添加 “console” 指定应将日志导出到 OTLP 端点和控制台。在我们的案例中,为了在 infa 端获得更好的可见性,我们选择也导出到控制台。
- ELASTIC_OTEL_SYSTEM_METRICS_ENABLED:使用 Elastic 分布时需要使用此变量,因为默认情况下它设置为 false。
注意:OTEL_METRICS_EXPORTER 和 OtgcodeTEL_TRACES_EXPORTER:此变量指定用于指标/跟踪的导出器,默认情况下设置为 “otlp”,这意味着将使用 OTLP 协议导出指标和跟踪。
运行 Python ETL
我们使用以下命令运行 Python ETL:
OTEL_RESOURCE_ATTRIBUTES="service.name=x-ETL,service.version=1.0,deployment.environment=production" && opentelemetry-instrument python3 X_ETL.py
命令说明:
- OTEL_RESOURCE_ATTRIBUTES:此变量指定将包含在所有遥测数据中的其他资源属性,例如服务名称、服务版本和部署环境,你可以根据需要自定义这些值。你可以为每个脚本使用不同的服务名称。
- opentelemetry-instrument:此命令自动为 OpenTelemetry 检测指定的 Python 脚本。它设置了收集跟踪、指标和日志所需的钩子。
- python3 X_ETL.py:这将运行指定的 Python 脚本 (X_ETL.py)。
跟踪
我们通过默认的 OTLP 协议导出跟踪。
跟踪是监控和了解应用程序性能的关键方面。跨度(spans)构成了跟踪的构建块。它们封装了有关特定代码路径执行的详细信息。它们记录活动的开始和结束时间,并且可以与其他跨度具有层次关系,形成父/子结构。
跨度包括基本属性,例如事务 ID、父 ID、开始时间、持续时间、名称、类型、子类型和操作。此外,跨度可能包含堆栈跟踪,它提供函数调用的详细视图,包括函数名称、文件路径和行号等属性,这对于调试特别有用。这些属性帮助我们分析脚本的执行流程,识别性能问题并增强优化工作。
使用默认检测,整个 Python 脚本将是一个跨度。在我们的案例中,我们决定根据 Python 进程的不同阶段手动添加特定跨度,以便能够单独测量它们的延迟、吞吐量、错误率等。这是我们手动定义跨度的方式:
from opentelemetry import trace
if __name__ == "__main__":
tracer = trace.get_tracer("main")
with tracer.start_as_current_span("initialization") as span:
# Init code
…
with tracer.start_as_current_span("search") as span:
# Step 1 - Search code
…
with tracer.start_as_current_span("transform") as span:
# Step 2 - Transform code
…
with tracer.start_as_current_span("load") as span:
# Step 3 - Load code
…
你可以在 APM 界面中探索轨迹,如下所示:
指标
我们也通过默认的 OTLP 协议导出指标,例如 CPU 使用率和内存。脚本本身无需添加额外代码。
注意:请记住将 ELASTIC_OTEL_SYSTEM_METRICS_ENABLED 设置为 true。
日志记录
我们也通过默认的 OTLP 协议导出日志。
对于日志记录,我们修改日志记录调用以使用字典结构 (bq_fields) 添加额外字段,如下所示:
job.result() # Waits for table load to complete
job_details = client.get_job(job.job_id) # Get job details
# Extract job information
bq_fields = {
# "slot_time_ms": job_details.slot_ms,
"job_id": job_details.job_id,
"job_type": job_details.job_type,
"state": job_details.state,
"path": job_details.path,
"job_created": job_details.created.isoformat(),
"job_ended": job_details.ended.isoformat(),
"execution_time_ms": (
job_details.ended - job_details.created
).total_seconds()
* 1000,
"bytes_processed": job_details.output_bytes,
"rows_affected": job_details.output_rows,
"destination_table": job_details.destination.table_id,
"event": "BigQuery Load Job", # Custom event type
"status": "success", # Status of the step (success/error)
"category": category # ETL category tag
}
logging.info("BigQuery load operation successful", extra=bq_fields)
此代码显示了如何提取 BQ 作业统计数据、执行时间、处理的字节数、受影响的行数以及其中的目标表。你可以像我们一样添加其他元数据,例如自定义事件类型、状态和类别。
对日志记录的任何调用(高于设定阈值的所有级别,在本例中为 INFO logs.getLogger().setLevel(logging.INFO))都将创建一个将导出到 Elastic 的日志。这意味着在已经使用日志记录的 Python 脚本中,无需进行任何更改即可将日志导出到 Elastic。
对于每条日志消息,你可以进入详细信息视图(将鼠标悬停在日志行上时单击 … 并进入查看详细信息)以检查附加到日志消息的元数据。你还可以在 Discover 中探索日志。
日志修改说明
- logging.info:记录一条信息性消息。记录消息 “BigQuery load operation successful”。
- extra=bq_fields:使用 bq_fields 字典向日志条目添加额外上下文。此上下文可以包含详细信息,使日志条目更具信息性且更易于分析。此数据稍后将用于tgcode设置警报和数据异常检测作业。
在 Elastic 的 APM 中进行监控
如图所示,我们可以在 APM 界面中检查跟踪、指标和日志。为了充分利用这些数据,我们利用了 Elastic Observability 中几乎全部的功能以及 Elastic Analytic 的 ML 功能。
规则和警报
我们可以设置规则和警报来检测脚本中的异常、错误和性能问题。
error count threshold rule 用于在服务中的错误数量超过定义的阈值时创建触发器。
要创建规则,请转到 Alerts and Insights -> Rules -> Create Rule -> Error count threshold,,设置错误计数阈值、要监控的服务或环境(你还可以设置跨服务的错误分组键)、运行检查的频率,然后选择连接器。
接下来,我们在给定的 ETL 日志data view上创建一个 custom threshold 类型的规则(为你的索引创建一个),过滤 “labels.status: error”,以获取来自任何失败的 ETL 步骤的所有状态为错误的日志。规则条件设置为 documenttgcode count > 0。在我们的例子中,在规则配置的最后一部分,我们还设置了每次激活规则时的 Slack 警报。你可以从 Elastic 支持的众多connectors进行选择
然后我们可以设置失败警报。我们将状态添加到日志元数据中,如下面的代码示例所示,针对 ETL 中的每个步骤添加状态。然后它可以通过 labels.status 在 ES 中可用。
logging.info(
"Elasticsearch search operation successful",
extra={
"event": "Elasticsearch Search",
"status": "success",
"category": category,
"index": index,
},
)
更多规则
我们还可以添加规则来检测我们定义的不同跨度的执行时间异常。这可以通过选择 transaction/span -> Alerts and rules -> Custom threshold rule -> Latency 来完成。在下面的示例中,我们希望在搜索步骤花费超过 25 秒时生成警报。
或者,为了进行更细粒度的控制,你可以使用 Alerts and rules -> Anomaly rule,设置异常作业,并选择阈值严重性级别。
异常检测作业
在此示例中,我们在转换前针对文档数量设置了异常检测作业。
我们使用 [Single metric job] (https://www.elastic.co/guide/en/machine-learning/current/ml-anomaly-detection-job-types.html#multi-metric-jobs) 在转换前针对文档数量设置了异常检测作业,以检测传入数据源是否存在任何异常。
在最后一步中,你可以像我们之前所做的那样创建警报,通过设置严重性级别阈值,每当检测到异常时都会收到警报。使用分配给每个异常的异常分数,每个异常都具有严重性级别。
与前面的示例类似,我们设置了一个 Slack 连接器,以便在检测到异常时接收警报。
你可以通过 Add Panel -> ML -> Anomaly Swim Lane -> Pick your job 进入自定义仪表板。
类似地,我们为转换后的文档数量添加作业,并为 “execution_time_ms”、“bytes_processed” 和 “rows_affected” 添加多指标作业,类似于在使用 Elastic Observability 监控 DBT 管道中所做的操作。
自定义仪表板
现在你的日志、指标和跟踪都在 Elastic 中,你可以充分利用我们的 Kibana 仪表板,从中提取最大价值。我们可以创建一个自定义仪表板,如下所示:基于 labels.event(ETL 中每种步骤的类别字段)的饼图、按状态细分的每种步骤的图表、按状态细分的步骤时间线、ETL 的 BQ 统计数据以及各种异常作业的异常检测泳道(swim lane)面板。
结论
Elastic 的 APM 与其他可观察性和 ML 功能相结合,为我们的数据管道提供了统一视图,使我们能够以最少的代码更改带来很多价值:
- 记录新日志(无需添加自定义日志)及其执行上下文
- 监控我们模型的运行时行为
- 跟踪数据质量问题
- 识别和排除实时事件故障
- 优化性能瓶颈和资源使用情况
- 识别对其他服务的依赖关系及其延迟
- 优化数据转换过程
- 设置有关延迟、数据质量问题、交易错误率或 CPU 使用率的警报
借助这些功能,我们可以确保数据管道的弹性和可靠性,从而实现更强大和准确的 BI 系统和报告。
总之,使用 Python 设置 OpenTelemetry (OTEL) 以实现数据管道可观察性,大大提高了我们主动监控、检测和解决问题的能力。这带来了更可靠的数据转换、更好的资源管理,并提高了我们的数据转换、BI 和机器学习系统的整体性能。
原文:Monitor your Python data pipelines with OTEL — Elastic Observability Labs