将 Logstash 管道转换为 OpenTelemetry Collector 管道

2024年11月6日   |   by mebius

作者:来自 ElasticMirko BezTaha Derouiche

%title插图%num

本指南通过演示如何将常见的 Logstash 管道转换为等效的 OpenTelemetry Collector 配置,帮助 Logstash 用户过渡到 OpenTelemetry。我们将重点介绍日志信号。

简介

Elastic 可观察性策略与 OpenTelemetry 越来越一致。随着 OpenTelemetry 的 Elastic Distributions 的最近发布,我们正在扩展我们的产品,使 OpenTelemetry 更容易使用,Elastic Agent 现在提供 “otel” 模式,使其能够运行 OpenTelemetry Collector 的自定义发行版,无缝增强你的可观察性入门和 Elastic 体验。

这篇文章旨在通过演示如何将一些标准 Logstash 管道转换为相应的 OpenTelemetry Collector 配置,帮助熟悉 Logstash 的用户过渡到 OpenTelemetry。

什么是 OpenTelemetry Collector,我为什么要关心它?

OpenTelemetry 是一个开源框架,可确保与供应商无关的数据收集,为可观察性数据的收集、处理和提取提供标准化方法。 Elastic 完全致力于这一原则,旨在使可观察性真正与供应商无关,并消除用户在切换平台时重新检测其可观察性的需要。

通过采用 OpenTelemetry,你可以获得以下好处:

  • 统一的可观察性:通过使用 OpenTelemetry Collector,你可以从单个工具收集和管理日志、指标和跟踪,从而提供对系统性能和行为的整体可观察性。这简化了微服务等复杂分布式环境中的监控和调试。
  • 灵活性和可扩展性:无论你运行的是小型服务还是大型分布式系统,OpenTelemetry Collector 都可以扩展以处理生成的数据量,从而提供部署为代理(与应用程序一起运行)或网关(集中式集线器)的灵活性。
  • 开放标准:由于 OpenTelemetry 是云原生计算基金会 (CNCF) 下的一个开源项目,因此它可确保你使用广泛接受的标准,从而有助于实现可观测性堆栈的长期可持续性和兼容性。
  • 简化的遥测管道:使用接收器、处理器和导出器构建管道的能力通过集中数据流并最大限度地减少对多个代理的需求,简化了遥测管理。

在接下来的部分中,我们将解释 OTEL Collector 和 Logstash 的结构。

OTEL 收集器配置

OpenTelemetry 收集器配置有不同的部分:

  • 接收器(receivers):从不同来源收集数据。
  • 处理器(processors):转换接收器收集的数据
  • 导出器(exporters):将数据发送到不同的收集器
  • 连接器(connectors):将两个管道连接在一起
  • 服务(service):定义哪些组件处于活动状态

    • 管道(pipelines):结合定义的接收器、处理器、导出器和连接器来处理数据
    • 扩展(extensision)是可选组件,可扩展收集器的功能以完成不直接涉及处理遥测数据的任务(例如,健康监测)
    • 遥测(telemetry),你可以在其中设置收集器本身的可观察性(例如,日志记录和监控)

我们可以将其以以下示意图形式可视化:

%title插图%num

我们参考官方文档 Configuration| OpenTelemetry 深入了解组件。

Logstash 管道定义

Logstash 管道由三个主要组件组成:

  • 输入插件:允许我们从不同来源读取数据
  • 过滤器插件:允许我们转换和过滤数据
  • 输出插件:允许我们发送数据

Logstash 还有一个特殊的输入和一个特殊的输出,允许管道到管道通信,我们可以将其视为与 OpenTelemetry 连接器类似的概念。更多有关 Logstash 的介绍,请参考文章:

Logstash 管道与 Otel Collector 组件的比较

我们可以将 Logstash 管道和 OTEL Collector 管道组件之间的关系图示如下:

%title插图%num

理论讲得够多了!让我们深入研究一些例子。

将 Logstash 管道转换为 OpenTelemetry Collector 管道

示例 1:解析和转换日志行

让我们考虑以下行:

2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404

我们将应用以下步骤:

  1. 从文件 /tmp/demo-line.log 中读取行。
  2. 将输出定义为 Elasticsearch 数据流 logs-access-default。
  3. 提取 @timestamp、user.name、client.ip、client.port、url.path 和 http.status.code。
  4. 删除与 SYSTEM 用户相关的日志消息。
  5. 使用相关日期格式解析日期时间戳并将其存储在 @timestamp 中。
  6. 根据已知代码的描述添加代码 http.status.code_description。
  7. 将数据发送到 Elasticsearch。

Logstash pipeline

input {
    file {
        path => "/tmp/demo-line.log" #[1]
        start_position => "beginning"
        add_field => { #[2]
            "[data_stream][type]" => "logs"
            "[data_stream][dataset]" => "access_log"
            "[data_stream][namespace]" => "default"
        }
    }
}

filter {
    grok { #[3]
        match => {
            "message" => "%{TIMESTAMP_ISO8601:[date]}: user %{WORD:[user][name]} accessed from %{IP:[client][ip]}:%{NUMBER:[client][port]:int} path %{URIPATH:[url][path]} with error %{NUMBER:[http][status][code]}"
        }
    }
    if "_grokparsefailure" not in [tags] {
        if [user][name] == "SYSTEM" { #[4]
            drop {}
        }
        date { #[5]
            match => ["[date]", "ISO8601"]
            target => "[@timestamp]"
            timezone => "UTC"
            remove_field => [ "date" ]
        }
        translate { #[6]
            source => "[http][status][code]"
            target => "[http][status][code_description]"
            dictionary => {
                "200" => "OK"
                "403" => "Permission denied"
                "404" => "Not Found"
                "500" => "Server Error"
            }
            fallback => "Unknown error"
        }
    }
}

output {
    elasticsearch { #[7]
        hosts => "elasticsearch-enpoint:443"
        api_key => "${ES_API_KEY}"
    }
}

OpenTelemtry Collector 配置

receivers:
  filelog: #[1]
    start_at: beginning
    include:
      - /tmp/demo-line.log
    include_file_name: false
    include_file_path: true
    storage: file_storage 
    operators:
    # Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
    - type: copy
      from: body
      to: attributes['event.original']
    - type: add #[2]
      field: attributes["data_stream.type"]
      value: "logs"
    - type: add #[2]
      field: attributes["data_stream.dataset"]
      value: "access_log_otel" 
    - type: add #[2]
      field: attributes["data_stream.namespace"]
      value: "default"

extensions:
  file_storage:
    directory: /var/lib/otelcol/file_storage

processors:
  # Adding  host.name (this is done OOTB by Logstash)
  resourcedetection/system:
    detectors: ["system"]
    system:
      hostname_sources: ["os"]
      resource_attributes:
        os.type:
          enabled: false

  transform/grok: #[3]
    log_statements:
      - context: log
        statements:
        - 'merge_maps(attributes, ExtractGrokPatterns(attributes["event.original"], "%{TIMESTAMP_ISO8601:date}: user %{WORD:user.name} accessed from %{IP:client.ip}:%{NUMBER:client.port:int} path %{URIPATH:url.path} with error %{NUMBER:http.status.code}", true), "insert")'

  filter/exclude_system_user:  #[4]
    error_mode: ignore
    logs:
      log_record:
        - atgcodettributes["user.name"] == "SYSTEM"

  transform/parse_date: #[5]
    log_statements:
      - context: log
        statements:
          - set(time, Time(attributes["date"], "%Y-%m-%dT%H:%M:%S"))
          - delete_key(attributes, "date")
        conditions:
          - attributes["date"] != nil

  transform/translate_status_code:  #[6]
    log_statements:
      - context: log
        conditions:
        - attributes["http.status.code"] != nil
        statements:
        - set(attributes["http.status.code_description"], "OK")                where attributes["http.status.code"] == "200"
        - set(attributes["http.status.code_description"], "Permission Denied") where attributes["http.status.code"] == "403"
        - set(attributes["http.status.code_description"], "Not Found")         where attributes["http.status.code"] == "404"
        - set(attributes["http.status.code_description"], "Server Error")      where attributes["http.status.code"] == "500"
        - set(attributes["http.status.code_description"], "Unknown Error")     where attributes["http.status.code_description"] == nil

exporters:
  elasticsearch: #[7]
    endpoints: ["elasticsearch-enpoint:443"]
    api_key: ${env:ES_API_KEY}
    tls:
    logs_dynamic_index:
      enabled: true
    mapping:
      mode: ecs

service:
  extensions: [file_storage]
  pipelines:
    logs:
      receivers:
        - filelog
      processors:
        - resourcedetection/system
        - transform/grok
        - filter/exclude_system_user
        - transform/parse_date
        - transform/translate_status_code
      exporters:
        - elasticsearch

这些将在 Elasticsearch 中生成以下文档:

{
    "@timestamp": "2024-09-20T08:33:27.000Z",
    "client": {
        "ip": "89.66.167.22",
        "port": 10592
    },
    "data_stream": {
        "dataset": "access_log",
        "namespace": "default",
        "type": "logs"
    },
    "event": {
        "original": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404"
    },
    "host": {
        "hostname": "my-laptop",
        "name": "my-laptop",
     },
    "http": {
        "status": {
            "code": "404",
            "code_description": "Not Found"
        }
    },
    "log": {
        "file": {
            "path": "/tmp/demo-line.log"
        }
    },
    "message": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404",
    "url": {
        "path": "/blog"
    },
    "user": {
        "name": "frank"
    }
}

示例 2:解析和转换 NDJSON 格式的日志文件

让我们考虑以下 json 行:

{"log_level":"INFO","message":"User login successful","service":"auth-service","timestamp":"2024-10-11 12:34:56.123 +0100","user":{"id":"A1230","name":"john_doe"}}

我们将应用以下步骤:

从文件 /tmp/demo.ndjson 中读取一行。

  1. 将输出定义为 Elasticsearch 数据流 logs-json-default
  2. 解析 JSON 并分配相关键和值。
  3. 解析日期。
  4. 覆盖消息字段。
  5. 重命名字段以遵循 ECS 约定。
  6. 将数据发送到 Elasticsearch。

Logstash pipeline

input {
    file {
        path => "/tmp/demo.ndjson" #[1]
        start_position => "beginning"
        add_field => { #[2]
            "[data_stream][type]" => "logs"
            "[data_stream][dataset]" => "json"
            "[data_stream][namespace]" => "default"
        }
    }
}

filter {
  if [message] =~ /^{.*/ {
    json { #[3] & #[5]
        source => "message"
    }
  }
  date { #[4]
    match => ["[timestamp]", "yyyy-MM-dd HH:mm:ss.SSS Z"]
    remove_field => "[timestamp]"
  }
  mutate {
    rename => { #[6]
      "service" => "[service][name]"
      "log_level" => "[log][level]"
    }
  }
}


output {
    elasticsearch { # [7]
        hosts => "elasticsearch-enpoint:443"
        api_key => "${ES_API_KEY}"
    }
}

OpenTelemtry Collector 配置

receivers:
  fitgcodelelog/json: # [1]
    include: 
      - /tmp/demo.ndjson
    retry_on_failure:
      enabled: true
    start_at: beginning
    storage: file_storage 
    operators:
     # Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
    - type: copy
      from: body
      to: attributes['event.original']
    - type: add #[2]
      field: attributes["data_stream.type"]
      value: "logs"      
    - type: add #[2]
      field: attributes["data_stream.dataset"]
      value: "otel" #[2]
    - type: add
      field: attributes["data_stream.namespace"]
      value: "default"     


extensions:
  file_storage:
    directory: /var/lib/otelcol/file_storage

processors:
  # Adding  host.name (this is done OOTB by Logstash)
  resourcedetection/system:
    detectors: ["system"]
    system:
      hostname_sources: ["os"]
      resource_attributes:
        os.type:
          enabled: false

  transform/json_parse:  #[3]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - merge_maps(attributes, ParseJSON(body), "upsert")
        conditions: 
          - IsMatch(body, "^{")
      

  transform/parse_date:  #[4]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(time, Time(attributes["timestamp"], "%Y-%m-%d %H:%M:%S.%L %z"))
          - delete_key(attributes, "timestamp")
        conditions: 
          - attributes["timestamp"] != nil

  transform/override_message_field: [5]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(body, attributes["message"])
          - delete_key(attributes, "message")

  transform/set_log_severity: # [6]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(severity_text, attributes["log_level"])          

  attributes/rename_attributes: #[6]
    actions:
      - key: service.name
        from_attribute: service
        action: insert
      - key: service
        action: delete
      - key: log_level
        action: delete

exporters:
  elasticsearch: #[7]
    endpoints: ["elasticsearch-enpoint:443"]
    api_key: ${env:ES_API_KEY}
    tls:
    logs_dynamic_index:
      enabled: true
    mapping:
      mode: ecs

service:
  extensions: [file_storage]
  pipelines:
    logs/json:
      receivers: 
        - filelog/json
      processors:
        - resourcedetection/system    
        - transform/json_parse
        - transform/parse_date        
        - transform/override_message_field
        - transform/set_log_severity
        - attributes/rename_attributes
      exporters: 
        - elasticsearch

这些将在 Elasticsearch 中生成以下文档:

{
    "@timestamp": "2024-10-11T12:34:56.123000000Z",
    "data_stream": {
        "dataset": "otel",
        "namespace": "default",
        "type": "logs"
    },
    "event": {
        "original": "{"log_level":"WARNING","message":"User login successful","service":"auth-service","timestamp":"2024-10-11 12:34:56.123 +0100","user":{"id":"A1230","name":"john_doe"}}"
    },
    "host": {
        "hostname": "my-laptop",
        "name": "my-laptop",
     },
    "log": {
        "file": {
            "name": "json.log"
        },
        "level": "WARNING"
    },
    "message": "User login successful",
    "service": {
        "name": "auth-service"
    },
    "user": {
        "id": "A1230",
        "name": "john_doe"
    }
}

结论

在这篇文章中,我们展示了如何将典型的 Logstash 管道转换为用于日志的 OpenTelemetry Collector 管道的示例。虽然 OpenTelemetry 提供了用于收集和导出日志的强大工具,但如果你的管道依赖于复杂的转换或脚本,Logstash 仍然是更好的选择。这是因为 Logstash 提供了更广泛的内置功能和更灵活的方法来处理高级数据操作任务。

下一步是什么?

现在你已经看到了将 Logstash 管道转换为 OpenTelemetry 的基本(但现实)示例,现在轮到你深入研究了。根据你的需要,你可以进一步探索并在以下存储库中找到更详细的资源:

如果你遇到特定挑战或需要处理更高级的用例,这些存储库将是发现可以增强你的管道的其他组件或集成的绝佳资源。所有这些存储库都具有类似的结构,其中包含名为接收器、处理器、导出器和连接器的文件夹,阅读本博客后应该会熟悉这些文件夹。无论你是迁移简单的 Logstash 管道还是处理更复杂的数据转换,这些工具和社区都将为你提供成功实施 OpenTelemetry 所需的支持。

原文:Convert Logstash pipelines to OpenTelemetry Coltgcodelector Pipelines — Elastic Observability Labs

文章来源于互联网:将 Logstash 管道转换为 OpenTelemetry Collector 管道

相关推荐: Vertex AI 与 Elasticsearch 开放推理 API 集成,为你的 RAG 应用程序带来重新排名

作者:来自 ElasticTim Grein 你可以使用 Google Vertex AI 和 Elasticsearch 开放推理 API 构建语义搜索和语义重新排名! 在我们与 Google Vertex AI 团队密切合作之后,我们很高兴地宣布 Elas…

Tags: , , ,