Elasticsearch:如何处理 ingest pipeline 中的异常
2021年1月9日 | by mebius
在我之前的文章 “如何在 Elasticsearch 中使用 pipeline API 来对事件进行处理” 中,我详细地介绍了如何创建并使用一个 ingest pipeline。简单地说 pipeline 是一系列处理器的定义,这些处理器将按照声明的顺序执行。 pipeline 包含两个主要字段:描述和处理器列表:
在这里,特别需要指出的是 pipeline 是运行于 ingest node 之上的。所有的 ingest pipeline 被保存于 cluster state 中。
Pipeline 是如何工作的
下面是一个定义 pipeline 的例子:
PUT _ingest/pipeline/apache-log
{
"description": "This is an example for apache logs",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{COMMONAPACHELOG}"]
}
},
{
"date": {
"field": "timestamp",
"formats": ["dd/MMM/yyyy:HH:mm:ss Z"]
}
},
{
"remove": {
"field": "message"
}
}
]
}
上面的 processors 将被依次执行。我们可以使用如下的例子来进行调用:
PUT logs/_doc/1?pipeline=apache-log
{
"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"
}
上面的命令的输出为:
{
"took" : 20,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "logs",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
}
}
]
}
}
我们可以通过如下的命令来查询已经被 apache-log pipeline 处理过的文档:
GET logs/_doc/1
上面的命令将返回:
{
"_index" : "logs",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 1,
"_primary_term" : 1,
"found" : true,
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
}
}
从上面我们可以看出来:经过 apache-log 这个 pipeline 的一组 processors,我们成功地把 log tgcode进行结构化并丰富我们的数据。我们通过grok processor 把数据进行结构化;通过 date processor 把 @timestamp 设置为和 timestamp 一样的值;通过 remove 把 message 字段去掉。
在设计 pipeline 时,我们很少情况下直接让它作用于我们的文档。在更多的情况下,我们希望通过一些测试文档来检验我们的 pipeline 的正确性。否则一个不正确的 pipeline 会把我们的数据搞坏。我们可以通过 _simulate 来进行检测。针对我们的情况:
POST _ingest/pipeline/apache-log/_simulate
{
"docs": [
{
"_source": {
"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"
}
}
]
}
在上面 docs 可以定义各种可能的文档类型来进行测试。它是一个数组。我们可以同时定义多个文档来进行测试。上面的命令的返回结果是:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"timestamp" : "2020-11-17T11:09:35.351117Z"
}
}
}
]
}
我们可以看到模拟出来的结果。
上面的个文档经过一组 pipeline processor 的处理,我们不能看出来每个 processor 的处理结果。这个时候,我们可以添加上 verbose 来进行查看每个 processor 的处理情况:
POST _ingest/pipeline/apache-log/_simulate?verbose
{
"docs": [
{
"_source": {
"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"
}
}
]
}
上面的返回的结果是:
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "success",
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "apache-log",
"timestamp" : "2020-11-17T11:11:43.039149Z"
}
}
},
{
"processor_type" : "date",
"status" : "success",
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "apache-log",
"timestamp" : "2020-11-17T11:11:43.039149Z"
}
}
},
{
"processor_type" : "remove",
"status" : "success",
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "apache-log",
"timestamp" : "2020-11-17T11:11:43.039149Z"
}
}
}
]
}
]
}
上面详细地记录了每个 processor 所处理的结果。它可以非常方便地让我对每个 processor 的运行进行分解,并对我们的错误进行排查。
如何处理 pipeline 错误
当我们使用 pipeline 处理一个文档的时候,有时并不是所有的文档都很规范,那么这个时候就会出现文档不能被正确解析或者处理的情况:
当它不能正常解析的时候,它会返回客户端一个错误的信息,表明它不能被正确地处理。这是一种默认的动作。另外一种处理方式是,我们可以通过 on_failure来处理我们的错误:
当错误发生后,我们可以创建另外一组 processor 来处理我们的错。在通常的情况下,我们可以使用 set processor 来对文档做一些相关的处理。比如它可以帮我们把错误的文档信息记录下来,并保存于另外一个索引之中。之后我们可以检查这个索引,并根据错误信息来解决这个问题。在这种情况下:
在这种情况下,下面的 processor 将不被执行。在这种情况下,客户端将不再收到失败的消息。我们可以在 on_failure 中把错误的信息存放于另外一个索引之中。我们可以定义一组的 processor 来处理这个错误,比如:
在上面,我们可以通过 remove 以及 set 两个 processor 对失败的文档进行处理。我们甚至可以针对这个 failure 的 processor 组再进行额外的 on_failure 处理。在上面,我们可以通过 set process 来做一些处理,比如如果当前的 date 是错误的,我们可以设置一个默认的日期,或者使用当前的日期,让后重新让这个文档进入队列进行处理:
这个完全依赖于你自己的业务需求来进行处理。
下面,我将以一个例子来进行演示:
GET _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
},
{
"remove": {
"field": "message"
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
在上面,我们把文档写入到 my_index 之中,尽管我们只是模拟:
{
"docs" : [
{
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
tgcode"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"timestamp" : "2020-11-17T11:55:43.679709Z"
}
}
}
]
}
在上面我们可以看出来经过 grok 的处理,bytes 是一个字符串。我们可以通过 convert processor 来把这个字段转变为整数:
GET _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer"
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
那么现在的结果是:
{
"docs" : [
{
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : 24,
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"timestamp" : "2020-11-17T12:01:38.662559Z"
}
}
}
]
}
从上面,我们可以看出来 bytes 现在变为整型值了。当然我们也可以如法炮制,把上面的接口调用添加 verbose 参数来查看每个 processor 的执行情况。为了调试的方便,我们甚至可以对每个 processor 添加一个 tag,这样当我们使用 verbose 时可以很轻松地知道是那个 processor:
GET _ingest/pipeline/_simulate?verbose
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer"
}
},
{
"convert": {
"tag": "convert_reponse",
"field": "response",
"type": "integer"
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
在上面,我们针对 response 的 convert processor 添加了一个叫做 convert_response 的 tag。这样当我们搜寻 convert_response 更加容易,否则有两个 convert processor,我们不容易区分,尽管执行是按照次序先后执行的。
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:07:36.432606Z"
}
}
},
{
"processor_type" : "date",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:07:36.432606Z"
}
}
},
{
"processor_type" : "remove",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:07:36.432606Z"
}
}
},
{
"processor_type" : "convert",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : 24,
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:07:36.432606Z"
}
}
},
{
"processor_type" : "convert",
"status" : "success",
"tag" : "convert_reponse",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : 200,
"bytes" : 24,
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:07:36.432606Z"
}
}
}
]
}
]
}
我们从上面的输出中可以看到 convert_response 的 tag。
下面,我们来模拟一个错误的文档,从而使得 processor 不能被正确地解析。我们把文档中 2015 中的 “5” 去掉:
GET _ingest/pipeline/_simulate?verbose
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer"
}
},
{
"convert": {
"tag": "convert_reponse",
"field": "response",
"type": "integer"
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
显然这样会造成一个不能被正确解析的文档。返回的错误如下:
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "error",
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : """Provided Grok expressions do not match field value: [83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24]"""
}
],
"type" : "illegal_argument_exception",
"reason" : """Provided Grok expressions do not match field value: [83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24]"""
}
}
]
}
]
}
当我们发送这样的错误时,我们很容易发现这个问题。上面显示 grok pattern 不匹配。我们可以对文档进行如下的修改:
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +200] "GET / HTTP/1.1" 200 24"""
在上面,我们把时间中的 +0000 修改为 +000,也就是少了一个 0。我们接下来运行:
GET _ingest/pipeline/_simulate?verbose
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer"
}
},
{
"convert": {
"tag": "convert_reponse",
"field": "response",
"type": "integer"
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
上面的命令返回的结果为:
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:20:43.098763Z"
}
}
},
{
"processor_type" : "date",
"status" : "error",
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]"
}
],
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]",
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]",
"caused_by" : {
"type" : "date_time_parse_exception",
"reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"
}
}
}
}
]
}
]
}
这次显然和之前的是不一样的。 grok pattern 能正确地解析我们的文档,但是我们的 date processor 解析时间出现了问题。
处理这种问题,我们有两种方法:
- pipeline 级别来处理
- processor 级别来处理
pipeline 级别来处理
我们在 pipeline 的后面添加一个 on_failure:
GET _ingest/pipeline/_simulate?verbose
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer"
}
},
{
"convert": {
"tag": "convert_reponse",
"field": "response",
"type": "integer"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed"
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
在上面,我添加了如下的代码:
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed"
}
}
]
在这里,我们指定了另外一个索引叫做 failed。执行上面的 pipeline:
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:25:50.517958Z"
}
}
},
{
"processor_type" : "date",
"status" : "error",
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]"
}
],
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]",
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]",
"caused_by" : {
"type" : "date_time_parse_exception",
"reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"
}
}
}
},
{
"processor_type" : "set",
"status" : "success",
"doc" : {
"_index" : "failed",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:25:50.517958Z",
"on_failure_processor_type" : "date"
}
}
}
]
}
]
}
显然第一步是成功的,第二步有错误,紧接着它执行了 on_failure,并在里面执行了 set processor 把索引修改为 failed。你之后可以直接在 failed 索引中进行查看。
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:25:50.517958Z",
"on_failure_processor_type" : "date"
}
在上面它指出来在 ingest 是的一个错误信息,我们可以接着记录这个错误的信息:
GET _ingest/pipeline/_simulate?verbose
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer"
}
},
{
"convert": {
"tag": "convert_reponse",
"field": "response",
"type": "integer"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed"
}
},
{
"set": {
"tag": "mark_failure",
"field": "failure",
"value": {
"message": "{{_ingest.on_failure_message}}"
}
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
在上面我们设置 failure 字段,并记录一个 object。运行上面的 pipeline:
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:39:09.206999Z"
}
}
},
{
"processor_type" : "date",
"status" : "error",
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]"
}
],
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]",
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]",
"caused_by" : {
"type" : "date_time_parse_exception",
"reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"
}
}
}
},
{
"processor_type" : "set",
"status" : "success",
"doc" : {
"_index" : "failed",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:39:09.206999Z",
"on_failure_processor_type" : "date"
}
}
},
{
"processor_type" : "set",
"status" : "success",
"tag" : "mark_failure",
"doc" : {
"_index" : "failed",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"failure" : {
"message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"
},
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:39:09.206999Z",
"on_failure_processor_type" : "date"
}
}
}
]
}
]
}
显然在上面的 _source 中新增加了一个叫做 failure 的字段。它含有相应的错误信息。由于上面的 failure 是一个 object, 事实上我们可以为它添加多个字段,比如:
GET _ingest/pipeline/_simulate?verbose
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer"
}
},
{
"convert": {
"tag": "convert_reponse",
"field": "response",
"type": "integer"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed"
}
},
{
"set": {
"tag": "mark_failure",
"field": "failure",
"value": {
"message": "{{_ingest.on_failure_message}}",
"processor": "{{_ingest.on_failure_processor_type}}"
}
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
我们添加了字段 processor,这样我们可以更容易知道是哪个 processor 出了问题:
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:42:27.811805Z"
}
}
},
{
"processor_type" : "date",
"status" : "error",
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]"
}
],
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]",
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]",
"caused_by" : {
"type" : "date_time_parse_exception",
"reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"
}
}
}
},
{
"processor_type" : "set",
"status" : "success",
"doc" : {
"_index" : "failed",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:42:27.811805Z",
"on_failure_processor_type" : "date"
}
}
},
{
"processor_type" : "set",
"status" : "success",
"tag" : "mark_failure",
"doc" : {
"_index" : "failed",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"failure" : {
"message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"processor" : "date"
},
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:42:27.811805Z",
"on_failure_processor_type" : "date"
}
}
}
]
}
]
}
上面的这种处理是在 pipeline 级的处理。
processor 级处理
我们直接可以针对每个 processor 进行错误的捕获及处理。比如针对 date process:
GET _ingest/pipeline/_simulate?verbose
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
],
"on_failure": [
{
"set": {
"tag": "set_default_date",
"field": "@timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer"
}
},
{
"convert": {
"tag": "convert_reponse",
"field": "response",
"type": "integer"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed"
}
},
{
"set": {
"tag": "mark_failure",
"field": "failure",
"value": {
"message": "{{_ingest.on_failure_message}}",
"processor": "{{_ingest.on_failure_processor_type}}"
}
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""
},
"_index": "my_index"
}
]
}
在上面,我们为 date processor 添加了如下的 on_failure 代码:
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
],
"on_failure": [
{
"set": {
"tag": "set_default_date",
"field": "@timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}
}
当错误发生时,我们直接使用 _ingest.timestamp 作为 @timestamp 的值。运行上面的 pipeline:
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:49:49.720153Z"
}
}
},
{
"processor_type" : "date",
"status" : "error",
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]"
}
],
"type" : "illegal_argument_exception",
"reason" : "unable to parse date [17/May/2015:10:05:03 +000]",
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]",
"caused_by" : {
tgcode"type" : "date_time_parse_exception",
"reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"
}
}
}
},
{
"processor_type" : "set",
"status" : "success",
"tag" : "set_default_date",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""",
"@timestamp" : "2020-11-17T12:49:49.720153Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:49:49.720153Z",
"on_failure_processor_type" : "date"
}
}
},
{
"processor_type" : "remove",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2020-11-17T12:49:49.720153Z",
"response" : "200",
"bytes" : "24",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:49:49.720153Z"
}
}
},
{
"processor_type" : "convert",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2020-11-17T12:49:49.720153Z",
"response" : "200",
"bytes" : 24,
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:49:49.720153Z"
}
}
},
{
"processor_type" : "convert",
"status" : "success",
"tag" : "convert_reponse",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2020-11-17T12:49:49.720153Z",
"response" : 200,
"bytes" : 24,
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:49:49.720153Z"
}
}
}
]
}
]
}
显然在这次运行中,当错误发生时set_default_date,被调用,并且@timestamp” : “2020-11-17T12:49:49.720153Z。显然是 ingest pipeline 被执行的时间。这个和之前的文档中的时间相差很远。这个完全依赖于你自己的业务设计,看你具体想使用什么值。
接下来,我们假如我们已经修正了我们的时间,重新变为 +0000。我们把 bytes 的数值修改为一个不可以转换为数值的字符,比如 “-“.
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -"""
重新运行 pipeline,我们将会发现错误信息:
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "field [bytes] not present as part of path [bytes]"
}
],
"type" : "illegal_argument_exception",
"reason" : "field [bytes] not present as part of path [bytes]"
}
如法炮制,我们可以为这个 processor 定制一个 on_failure:
GET _ingest/pipeline/_simulate?verbose
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
],
"on_failure": [
{
"set": {
"tag": "set_default_date",
"field": "@timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}
},
{
"remove": {
"field": "message"
}
},
{
"convert": {
"field": "bytes",
"type": "integer",
"on_failure":[
{
"set": {
"field": "bytes",
"value": -1
}
}
]
}
},
{
"convert": {
"tag": "convert_reponse",
"field": "response",
"type": "integer"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed"
}
},
{
"set": {
"tag": "mark_failure",
"field": "failure",
"value": {
"message": "{{_ingest.on_failure_message}}",
"processor": "{{_ingest.on_failure_processor_type}}"
}
}
}
]
},
"docs": [
{
"_source": {
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -"""
},
"_index": "my_index"
}
]
}
我们添加了如想的代码:
{
"convert": {
"field": "bytes",
"type": "integer",
"on_failure":[
{
"set": {
"field": "bytes",
"value": -1
}
}
]
}
}
也就是说,当错误发生后,我们直接把 bytes 设置为 -1:
{
"docs" : [
{
"processor_results" : [
{
"processor_type" : "grok",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"response" : "200",
"clientip" : "83.149.9.216",
"verb" : "GET",
"httpversion" : "1.1",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -""",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:59:19.385189Z"
}
}
},
{
"processor_type" : "date",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -""",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:59:19.385189Z"
}
}
},
{
"processor_type" : "remove",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"auth" : "-",
"ident" : "-",
"response" : "200",
"clientip" : "83.149.9.216",
"verb" : "GET",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:59:19.385189Z"
}
}
},
{
"processor_type" : "convert",
"status" : "error",
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "field [bytes] not present as part of path [bytes]"
}
],
"type" : "illegal_argument_exception",
"reason" : "field [bytes] not present as part of path [bytes]"
}
},
{
"processor_type" : "set",
"status" : "success",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : "200",
"bytes" : -1,
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "field [bytes] not present as part of path [bytes]",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:59:19.385189Z",
"on_failure_processor_type" : "convert"
}
}
},
{
"processor_type" : "convert",
"status" : "success",
"tag" : "convert_reponse",
"doc" : {
"_index" : "my_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"request" : "/",
"auth" : "-",
"ident" : "-",
"verb" : "GET",
"@timestamp" : "2015-05-17T10:05:03.000Z",
"response" : 200,
"bytes" : -1,
"clientip" : "83.149.9.216",
"httpversion" : "1.1",
"timestamp" : "17/May/2015:10:05:03 +0000"
},
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"timestamp" : "2020-11-17T12:59:19.385189Z"
}
}
}
]
}
]
}
我们可以从上面的输出结果中看出来 byte 已经被设置为 -1。
好了今天的分享就到这里。希望大家知道如何来处理 pipeline 的错误,并做相应的处理。
文章来源于互联网:Elasticsearch:如何处理 ingest pipeline 中的异常
相关推荐: Elastic:Data tiers 介绍及索引生命周期管理 – 7.10 之后版本
Data tier也就是数据层。是一个在 7.10 版本的一个新概念。数据层是具有相同数据角色的节点的集合,这些节点通常共享相同的硬件配置文件: Content tier(内容层)节点处理诸如产品目录之类的内容的索引和查询负载。 Hot tier(热层) 节点…