Elasticsearch:深入理解 Dissect ingest processor
2021年1月9日 | by mebius
与 Grok 处理器类似,dissect 处理器也从文档中的单个文本字段中提取结构化字段。 但是,与 Grok 处理器不同,解析不使用正则表达式。 这使得 Dissect 的语法更加简单,并且在某些情况下比 Grok Processor 更快。
Dissect 将单个文本字段与定义的模式匹配。在我之前的文章 “Elastic可观测性 – 运用 pipeline 使数据结构化” 中我们已经对 Grok 及 Dissect 处理器做了介绍。在今天的文章中,我们想更深入地了解 dissect 处理器。在今天的讲解中,我将以一些例子来进行展示。
动手实践
简单的一个例子
我们先以一个简单的例子啦进行展示:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "Example using dissect processor",
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{@timestamp} [%{loglevel}] %{status}"
}
}
]
},
"docs": [
{
"_source": {
"message": "2019-09-29T00:39:02.912Z [Debug] MyApp stopped"
}
}
]
}
在上面,我们通过 pattern 来对 message 进行提取。在 disssect 中,特别需要注意的是空格的使用。如果空格不匹配,那么也会造成错误。上面的结果是:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"@timestamp" : "2019-09-29T00:39:02.912Z",
"loglevel" : "Debug",
"message" : "2019-09-29T00:39:02.912Z [Debug] MyApp stopped",
"status" : "MyApp stopped"
},
"_ingest" : {
"timestamp" : "2020-12-09T04:40:40.894589Z"
}
}
}
]
}
显然它提取出来 loglevel, message 以及 status。请注意,我们也丢掉了里面的 [ 及 ] 字符。
跳过字段
由于 dissect 是一种确切地匹配,但是在实际的使用中,我们可能并不想要某个字段出现在我们的文档中,虽然它可以被结构化。我们看一下如下的一个例子:
POST _ingest/pipeline/_simulate
{
"pipeline": {
tgcode "description": "Example using dissect processor",
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{@timestamp} [%{?loglevel}] %{status}"
}
}
]
},
"docs": [
{
"_source": {
"message": "2019-09-29T00:39:02.912Z [Debug] MyApp stopped"
}
}
]
}
在上面的例子中,我们使用了 %{?loglevel},它表明我们不需要 loglevel 出现在我们的结果中:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"@timestamp" : "2019-09-29T00:39:02.912Z",
"message" : "2019-09-29T00:39:02.912Z [Debug] MyApp stopped",
"status" : "MyApp stopped"
},
"_ingest" : {
"timestamp" : "2020-12-09T04:47:24.7823Z"
}
}
}
]
}
显然在这个输出中,没有了之前的 loglevel 这个字段了。
处理多个空格
Dissect 处理器是非常严格的。它需要完全匹配的空格,否则解析将不会成功,比如:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "Example using dissect processor",
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{@timestamp} %{status}"
}
}
]
},
"docs": [
{
"_source": {
"message": "2019-09-29 MyApp stopped"
}
}
]
}
在上面,我们故意在 MyApp stopped 之前多加了一个空格,那么上面解析的结果是:
{
tgcode"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"@timestamp" : "2019-09-29",
"message" : "2019-09-29 MyApp stopped",
"status" : ""
},
"_ingest" : {
"timestamp" : "2020-12-09T05:01:58.065065Z"
}
}
}
]
}
从上面的结果中可以看出来,它完全解析不了我们的 message。status 字段显示为空。那么我们该如何处理这个呢?
我们可以使用向右的 padding 修饰符 -> 忽略 padding:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "Example using dissect processor",
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{@timestamp->} %{status}"
}
}
]
},
"docs": [
{
"_source": {
"message": "2019-09-29 MyApp stopped"
}
}
]
}
上面的运行结果是:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"@timestamp" : "2019-09-29",
"message" : "2019-09-29 MyApp stopped",
"status" : "MyApp stopped"
},
"_ingest" : {
"timestamp" : "2020-12-09T05:07:23.294188Z"
}
}
}
]
}
我们也可以使用一个空的键来跳过不想要的空格:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "Example using dissect processor",
"processors": [
{
"dissect": {
"field": "message",
"pattern": "[%{@timestamp}]%{->}[%{status}]"
}
}
]
},
"docs": [
{
"_source": {
"message": "[2019-09-29] [MyApp stopped]"
}
},
{
"_source": {
"message": "[2019-09-29] [MyApp stopped]"
}
}
]
}
在上面我们使用了 %{->} 来匹配不想要的空格。在上面,我们使用了两个文档,一个文档含有一个空格,另外一个文档含有两个空格。运行的结果如下:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"@timestamp" : "2019-09-29",
"message" : "[2019-09-29] [MyApp stopped]",
"status" : "MyApp stopped"
},
"_ingest" : {
"timestamp" : "2020-12-09T05:21:14.752694Z"
}
}
},
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"@timestamp" : "2019-09-29",
"message" : "[2019-09-29] [MyApp stopped]",
"status" : "MyApp stopped"
},
"_ingest" : {
"timestamp" : "2020-12-09T05:21:14.752701Z"
}
}
}
]
}
追加字段
在很多的情况下,我们甚至可以把很多的字段追加到一个字段中去,比如:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "Example using dissect processor",
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{@timestamp} %{+@timestamp} %{+@timestamp} %{loglevel} %{status}",
"append_separator": " "
}
}
]
},
"docs": [
{
"_source": {
"message": "Oct 29 00:39:02 Debug MyApp stopped"
}
}
]
}
在上面,我们的时间表达式是Oct 29 00:39:02。它是由三个字符串组成的。我们通过%{@timestamp} %{+@timestamp} %{+@timestamp} 来把这三个字符串组合成一个 @timestamp 字段。运行上面的结果是:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"@timestamp" : "Oct 29 00:39:02",
"loglevel" : "Debug",
"message" : "Oct 29 00:39:02 Debug MyApp stopped",
"status" : "MyApp stopped"
},
"_ingest" : {
"timestamp" : "2020-12-09T05:27:29.785206Z"
}
}
}
]
}
请注意在上面的例子中,我们使用了append_separator,并配置它为空字符串。否则在我们的结果中三个字符串将被级联起来,从而变成Oct2900:39:02。这个在实际的使用中,可能并不是我们想要的结果。
提前 key-value
我们可以使用 %{*field} 当做 key,并把 %{&field} 当做 value 来匹配:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "Example using dissect processor key-value",
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{@timestamp} %{*field1}=%{&field1} %{*field2}=%{&field2}"
}
}
]
},
"docs": [
{
"_source": {
"message": "2019009-29T00:39:02.912Z host=AppServer status=STATUS_OK"
}
}
]
}
上面的运行结果是:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"@timestamp" : "2019009-29T00:39:02.912Z",
"host" : "AppServer",
"message" : "2019009-29T00:39:02.912Z host=AppServer status=STATUS_OK",
"status" : "STATUS_OK"
},
"_ingest" : {
"timestamp" : "2020-12-09T05:34:30.47561Z"
}
}
}
]
}
挑战自己
从上面的练习中,可能你已经感觉到这个 dissect 处理器是非常有用的,而且也是非常简单易用的。那么我们现在来做一个真正实用的一个例子:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
]
},
"docs": [
{
"_source": {
"message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
}
}
]
}
上面是一个 haproxy 的例子。信息很长。我们该如何使用 processor 来处理上面的信息并使之成为一个结构化的文档呢?
我们可以使用 dissect 处理器。按照我们上面所学的东西,我们可以先这么处理:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} %{+timestamp} %{+timestamp}"
}
}
]
},
"docs": [
{
"_source": {
"message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
}
}
]
}
在上面,我们把前面的三个字符串连接成为一个 timestamp 的字段。运行上面的命令:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"timestamp" : """Mar2201:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
},
"_ingest" : {
"timestamp" : "2020-12-09T05:38:44.674567Z"
}
}
}
]
}
显然前面的三个字符串连成一个字符串,并且它很贪婪。它把后面所有的字符串都匹配到这个字符串中。我们需要重新进行修改:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host}",
"append_separator": " "
}
}
]
},
"docs": [
{
"_source": {
"message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
}
}
]
}
我们添加了 append_separator,并使用 %{host} 来匹配后面所有的字符串:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"host" : """localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"timestamp" : "Mar 22 01:27:39"
},
"_ingest" : {
"timestamp" : "2020-12-09T05:41:53.667182Z"
}
}
}
]
}
显然这次,我们可以清楚地看到 timestamp 这个字段,但是 host 字段还是一个很长的字符串。我们接着处理:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{rest}",
"append_separator": " "
}
}
]
},
"docs": [
{
"_source": {
"message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
}
}
]
}
在上面,我们提取 process 以及其 id,并把其它的内容放入到 %{rest} 中去:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"rest" : """ Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"process" : "haproxy",
"host" : "localhost",
tgcode "id" : "14415",
"message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"timestamp" : "Mar 22 01:27:39"
},
"_ingest" : {
"timestamp" : "2020-12-09T05:46:11.833548Z"
}
}
}
]
}
从上面的 rest 中,我们可以看出来前面的部分是一个 status,而后面的是一个 kv 类型的数据。我们可以使用 kv processor 来对它进行处理。
我们首先来提取 status:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
"append_separator": " "
}
}
]
},
"docs": [
{
"_source": {
"message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
}
}
]
}
运行上面的命令:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"rest" : """reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"process" : "haproxy",
"host" : "localhost",
"id" : "14415",
"message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"status" : " Server updates /appServer02 is UP",
"timestamp" : "Mar 22 01:27:39"
},
"_ingest" : {
"timestamp" : "2020-12-09T05:50:18.300969Z"
}
}
}
]
}
显然,我们可以得到 status 这个字段。在接下来的 rest 字段中显然是一个 key-value 这样的信息。我们可以使用 kv processor 来进行处理:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
"append_separator": " "
}
},
{
"kv": {
"field": "rest",
"field_split": ", ",
"value_split": ":"
}
}
]
},
"docs": [
{
"_source": {
"message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
}
}
]
}
在上面我们添加了一个叫做 kv 的处理器:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"rest" : """reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"reason" : " Layer7 check passed",
"process" : "haproxy",
"code" : "2000",
"check duration" : "3ms.",
"message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
"host" : "localhost",
"id" : "14415",
"status" : " Server updates /appServer02 is UP",
"timestamp" : "Mar 22 01:27:39",
"info" : ""OK""
},
"_ingest" : {
"timestamp" : "2020-12-09T06:00:37.990909Z"
}
}
}
]
}
从上面的结果中,我们可以看出来我们得到了所有的想要的字段。我们接下来删除那个不想要的 message 及 rest 字段:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
"append_separator": " "
}
},
{
"kv": {
"field": "rest",
"field_split": ", ",
"value_split": ":"
}
},
{
"remove": {
"field": "message"
}
},
{
"remove": {
"field": "rest"
}
}
]
},
"docs": [
{
"_source": {
"message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
}
}
]
}
在上面,我运用 remove 处理器删除了 message 以及 rest 字段:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"reason" : " Layer7 check passed",
"process" : "haproxy",
"code" : "2000",
"check duration" : "3ms.",
"host" : "localhost",
"id" : "14415",
"status" : " Server updates /appServer02 is UP",
"timestamp" : "Mar 22 01:27:39",
"info" : ""OK""
},
"_ingest" : {
"timestamp" : "2020-12-09T05:59:44.138394Z"
}
}
}
]
}
从上面的一步一步的过程中,我们可以看出来如何对一个非结构化的数据进行结构化。
文章来源于互联网:Elasticsearch:深入理解 Dissect ingest processor
相关推荐: Elastic:培训视频 – 最新更新 2020-12-04
在这篇文章中,我将tgcode会把我写的有些内容录制成视频,供大家参考。希望对大家有所帮助。优酷的视频频道地址在这里,B站频道 Elastic 简介及Elastic Stack 安装:优酷,腾讯,B站 Elastic Stack docker 部署:优酷,腾讯…