Observability:如何使用 Elastic Agents 把定制的日志摄入到 Elasticsearch 中

2022年8月25日   |   by mebius

在我之前的文章 “Observability:使用 Elastic Agent 来摄入日志及指标 – Elastic Stack 8.0”,我详细地描述了如何安装 Elasticsearch,Stack 及 Elastic Agents 来采集系统日志及指标。很多开发者可能会有疑问,在我们的实际使用中,我们更多的可能是需要采集定制的应用日志,而不是系统日志。那么在这个时候,我们该如何使用 Elastic Agents 来把这些日志摄入呢?在以前的系统中,我们可以使用如下的几种方式来采集日志:

%title插图%num

  1. 我们可以直接使用 Beats 把数据传入到 Elasticsearch 中。对数据的处理,我们可以使用 Beats 的 processors 来处理数据,或者通过 Elasticsearch 集群的 ingest nodes 来处理数据。
  2. 我们可以tgcode通过 Beats => Logstash => Elasticsearch。针对这种情况,我们可以分别在 Beats,Logstash 或者 Elasticsearch 集群的 ingest nodes 来处理数据。
  3. 我们可以直接使用各种编程语言来直接向 Elasticsearch 集群进行写入。我们可以使用 Elasticsearch 集群的 ingest nodes 来处理数据。

在今天的文章里,我们来详细地描述如何使用 Elastic Agents 把应用中的定制日志摄入到 Elasticsearch 中并进行分析。在今天的演示中,我将使用如下测试环境:

%title插图%num

我将使用 Elastic Stack 8.3来进行安装并展示。

如何使用 Elastic Agents 把定制的日志摄入到 Elasticsearch 中

准备日志

为了方便,我们使用我之前的一个教程写的文章里的例子来生成日志。我使用 Python 应用来生成日志。请参考文章 “Beats: 使用 Filebeat 进行日志结构化 – Python”。我们按照如下的步骤在 Ubuntu OS 的机器上来运行应用:

liuxg@liuxgu:~/python/logs$ pwd
/home/liuxg/python/logs
liuxg@liuxgu:~/python/logs$ ls
createlogs.py  createlogs_1.py  createlogs_2.py  filebeat_json.yml  json_logs  test.log
liuxg@liuxgu:~/python/logs$ python createlogs_2.py 
liuxg@liuxgu:~/python/logs$ cat json_logs 
{"user_name": "arthur", "id": 42, "verified": false, "event": "logged_in"}
{"user_name": "arthur", "id": 42, "verified": true, "event": "changed_state"}

可以看出来在我的应用目录里会生成一个叫做 json_logs 的文件。它的内容如上所示。上面的文档路径及文件名将在下面的配置中要用到。我们的日志路径是:

/home/liuxg/python/logs/json_logs

安装

在进行下面的练习之前,我们必须安装好 Elasticsearch 及 Kibana。我们可以参考之前的文章:

我们按照上面的要求进行安装 Elasticsearch 及 Kibana。为了能够让 fleet 正常工作,内置的 API service必须启动。我们必须为 Elasticsearch 的配置文件 config/elasticsearch.yml 文件配置:

xpack.security.authc.api_key.enabled: true

配置完后,我们再重新启动 Elasticsearch。针对 Kibana,我们也需要做一个额外的配置。我们需要修改 config/kibana.yml 文件。在这个文件的最后面,添加如下的一行:


xpack.encryptedSavedObjects.encryptionKey: 'fhjskloppd678ehkdfdlliverpoolfcr'

如果你不想使用上面的这个设置,你可以使用如下的方式来tgcode获得:

%title插图%num

从上面的输出中,我们可以看出来,有三个输出的 key。我们可以把这三个同时拷贝,并添加到 config/kibana.yml 文件的后面。当然,我们也可以tgcode只拷贝其中的一个也可。我们再重新启动 Kibana。

这样我们对 Elasticsearch 及 Kibana 的配置就完成。 针对 Elastic Stack 8.0 以前的版本安装,请阅读我之前的文章 “Observability:如何在最新的 Elastic Stack 中使用 Fleet 摄入 system 日志及指标”。

除此之外,Kibana 需要 Internet 连接才能从 Elastic Package Registry 下载集成包。 确保 Kibana 服务器可以连接到https://epr.elastic.co 的端口 443 上。如果你的环境有网络流量限制,有一些方法可以解决此要求。 有关详细信息,请参阅气隙环境

目前,Fleet 只能被具有 superuser role 的用户所使用。

配置 Fleet

使用 Kibana 中的 Fleet 将日志、指标和安全数据导入 Elastic Stack。第一次使用 Fleet 时,你可能需要对其进行设置并添加 Fleet Server。在做配置之前,我们首先来查看一下有没有任何的 integration 被安装:

%title插图%num

%title插图%num从上面我们可以看出来没有任何安装的 integrations。

我们打开 Fleet 页面:

%title插图%num

%title插图%num

%title插图%num

%title插图%num

%title插图%num

我们接下来添加 Agent:

%title插图%num

%title插图%num上面显示我们的 Fleet Sever policy 被成功地创建了。我们需要把我们的 Fleet Server 安装到 Ubuntu OS 机器上。

%title插图%num

我们的目标机器是 Linux OS。我们点击上面的拷贝按钮,并在 Linux OS 上进行安装:

curl -L -O https://artifacts.elastic.co/downloads/beats/elastic-agent/elastic-agent-8.3.0-linux-x86_64.tar.gz
tar xzvf elastic-agent-8.3.0-linux-x86_64.tar.gz
cd elastic-agent-8.3.0-linux-x86_64
sudo ./elastic-agent install 
  --fleet-server-es=https://192.168.0.3:9200 
  --fleet-server-service-token=AAEAAWVsYXN0aWMvZmxlZXQtc2VydmVyL3Rva2VuLTE2NTY1Njg5MTE1NTk6clBaX1pidXNTdTZXc2Fvb0ROcXVhUQ 
  --fleet-server-policy=fleet-server-policy 
  --fleet-server-es-ca-trusted-fingerprint=764021beb30446365d829986a362ffba82d03f4ff7861839a60f7951b8e83e7a

我们按照 Kibana 中的提示来安装:

%title插图%num

%title插图%num

等过一段时间,我们可以看到这个运用于 192.168.0.4 机器上的 Agents 的状态也变为 healthy:

%title插图%num

由于我们的 Elastic Agent 和 Fleet Server 是在一个服务器上运行的,所以,我们直接在 Fleet Server Policy 里添加我们想要的 integration。如果你的 Elastic Agent 可以运行于另外的一个机器上,而不和 Fleet Server 在同一个机器上,你可以创建一个新的 policy,比如 logs。然后让 agent赋予给这个 新创建的 policy。

我们直接在这个 Fleet Server Policy 里添加一个叫做 custom log 的集成:

%title插图%num

%title插图%num

%title插图%num

%title插图%num

在上面,我们把 Ubuntu OS 上的日志的路径添加进去:

%title插图%num

%title插图%num

在上面,我们可以看到新增加的 log-1 集成。

如果你之前已经生成过 json_logs 日志文件,我们可以删除当前目录的文件,并再次生成该文件:

liuxg@liuxgu:~/python/logs$ pwd
/home/liuxg/python/logs
liuxg@liuxgu:~/python/logs$ ls
createlogs.py  createlogs_1.py  createlogs_2.py  filebeat_json.yml  json_logs  test.log
liuxg@liuxgu:~/python/logs$ rm json_logs 
liuxg@liuxgu:~/python/logs$ python createlogs_2.py 

我们或者使用如下的命令来追加一写文档到 json_logs 文件中:

liuxg@liuxgu:~/python/logs$ pwd
/home/liuxg/python/logs
liuxg@liuxgu:~/python/logs$ cp json_logs temp
liuxg@liuxgu:~/python/logs$ cat temp >> json_logs 
liuxg@liuxgu:~/python/logs$ cat json_logs 
{"user_name": "arthur", "id": 42, "verified": false, "event": "logged_in"}
{"user_name": "arthur", "id": 42, "verified": true, "event": "changed_state"}
{"user_name": "arthur", "id": 42, "verified": false, "event": "logged_in"}
{"user_name": "arthur", "id": 42, "verified": true, "event": "changed_state"}

在上面,我们追加了两个文档到 json_logs 里去了。

我们接下来回到 Discover 去查看:

%title插图%num

在搜索框中输入 json_logs,我们发现在过去 15分钟之内有两个新摄入的文档。我们再次查看 message 的内容:

%title插图%num

显然,我们可以看到 message 字段显示的就是我们之前在日志中的信息。它是一个 JSON 格式的信息。这个虽然好,但是它不是结构化的日志信息。我们想要的是 user_name 为文档的一个字段,id 为另外一个字段这样的结构化信息。在之前的 Filebeat 中,我们可以轻松地使用 Filebeat 所提供的 processors 或者就像如同在文章 “Beats: 使用 Filebeat 进行日志结构化 – Python” 使用的那样。我们可以使用 Filebeat input type 所提供的固有功能来完成。再者,我们还可以使用 Elasticsearch 集群的 ingest node 来完成。

那么针对我们目前的 Elastic Agent 摄入方式,我们该如何结构化这个 message 信息呢?答案是使用 ingest pipeline。

我们在 Kibana 的 Dev Tools 中创建如下的 ingest pipeline:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "structure a JSON format message",
    "processors": [
      {
        "json": {
          "field": "message",
          "target_field": "json_fields"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "{"user_name": "arthur", "id": 42, "verified": false, "event": "logged_in"}"
      }
    }
  ]
}

在上面,我们通过 _simulate 来测试我们的 pipeline:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "json_fields": {
            "verified": false,
            "id": 42,
            "event": "logged_in",
            "user_name": "arthur"
          },
          "message": """{"user_name": "arthur", "id": 42, "verified": false, "event": "logged_in"}"""
        },
        "_ingest": {
          "timestamp": "2022-07-01T00:03:30.040008Z"
        }
      }
    }
  ]
}

如上所示,我们的 josn processor 能够非常出色地完成 message 的结构化,并把结构化的信息保存于一个叫做 json_fields 的字段中。在完成上面的模拟后,我们使用如下的命令来创建一个 pipeline:

PUT _ingest/pipeline/message_structure
{
  "description": "structure a JSON format message",
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "json_fields"
      }
    }
  ]
}

在上面,我们创建了一个叫做 message_structure 的 ingest pipeline。

我们接下来展示如何在 custom logs 里来使用这个 ingest pipeline。我们重新打开 log-1 集成:

%title插图%num

%title插图%num

在上面,选定默认的 generic 为 dataset 的名称,那么最终形成的索引名称将会是 logs-generic-*。我们也可以设定自己想要的 dataset,比如 my-app。如上所示,我们在 Custom congfiguration 里填写 pipeline 的定义。点击上面的 Save integration:

%title插图%num

%title插图%num

上面显示,我们的更新完毕。

我们接下来再次删除在日志目录下的 json_logs 文件,并再次运行 python 应用:

liuxg@liuxgu:~/python/logs$ pwd
/home/liuxg/python/logs
liuxg@liuxgu:~/python/logs$ ls
createlogs.py  createlogs_1.py  createlogs_2.py  filebeat_json.yml  json_logs  test.log
liuxg@liuxgu:~/python/logs$ rm json_logs 
liuxg@liuxgu:~/python/logs$ python createlogs_2.py 

我们再次回到 Discover 界面来进行查看:

%title插图%num

我们可以看到最新的日志信息被收集起来了。我们展开该文档进行查看:

%title插图%num

我们可以看到 message 的信息被结构化了,并且保存于一个叫做 json_fields 的字段中。

好了,今天我的分享就写到这里。希望对大家从 Beats 转换到 Elastic Agents 的使用提供一个平滑的过度。在未来,Elastic 更推崇 Elastic Agents 的使用虽然之前的 Beats 方式还可以继续使用。使用 Elastic Agents 可以使我们的 Agents 更容易集中管理。

文章来源于互联网:Observability:如何使用 Elastic Agents 把定制的日志摄入到 Elasticsearch 中

相关推荐: Elasticsearch:使用不同的 CA 更新安全证书 (一)

如果你必须添加组织中的新 CA 证书,或者你需要自己生成新 CA,请使用此新 CA 签署新节点证书并指示你的节点信任新 CA 证书。在今天的展示中,我将来演示如何更新一个 Elasticsearch 8.x 集群的 CA 证书。 Elasticsearch:使…