Logstash:在 Logstash 管道中的定制的 Elasticsearch update by query
2023年2月27日 | by mebius
我们知道 Elasticsearch output plugin 为我们在 Logstash 的 pipeline 中向 Elasticsearch 的写入提供了可能。我们可以使用如下的格式向 Elasticsearch 写入数据:
elasticsearch {
hosts => ["https://localhost:9200"]
index => "data-%{+YYYY.MM.dd}"
user => "elastic"
password => "NtC7cM-GKQWOxqamHd1R"
ssl => true
ca_trusted_fingerprint => "d464eed5d00a20908318b6a1de38f88daf3a867177123def4c34aa2272571aaf"
}
在向 Elasticsearch 写入数据的时候,目前它有四种操作:
- index:索引文档(来自 Logstash 的事件)。
- delete:通过 id 删除文档(此操作需要一个id)
- create:索引文档,如果索引中已存在该 id 的文档,则失败。
- update:通过 id 更新文档。 Update 有一个特殊情况,你可以 upsert — 更新文档(如果不存在)。 请参阅 doc_as_upsert 选项。 注意:这在 Elasticsearch 1.x 中不起作用且不受支持。 请升级到 ES 2.x 或更高版本以将此功能与 Logstash 一起使用!
一个 sprintf 样式的字符串,用于根据事件的内容更改操作。 值 %{[foo]} 将使用 foo 字段进行操作。 如果 resolved action 不在 [index, delete, create, update] 中,事件将不会发送到 Elasticsearch。 相反,事件将被发送到管道的死信队列 (DLQ)(如果启用),或者将被记录并删除。
在实际的使用中,假如我们的操作不是 index,delete create 或 update 其中的一种,那么我们该怎么办呢?比如我们想根据一定的条件来更新文档,就像 update by query 那样?我们该怎么办呢?
幸运的是,Logstash 提供了一个叫做 HTTP output plugin。它可以帮我解决这个问题。
准备数据
首先,我们来创建如下的一个索引:
PUT customer/_doc/2
{
"id": 2,
"timestamp": "2019-08-11T17:55:56Z",
"paymentType": "Visa",
"name": "Darby Dacks",
"gender": "Female",
"ip_address": "77.72.239.47",
"purpose": "Shoes",
"country": "Poland",
"age": 55,
"offer": false
}
我们在 Kibana 中输入上面的命令来创建一个叫做 customer 的索引。它的 id 为 2。
更新数据
接下来,我们需要按照一定的条件来更新我们的数据。比如,我们想把 paymentType 为 Visa,并且年龄大于或等于 55 岁的人的 offer 设置为 true。在 Kibana 中正常的命令是这样的:
POST customer/_update_by_query
{
"query": {
"bool": {
"must": [
{
"match": {
"paymentType.keyword": "Visa"
}
},
{
"range": {
"age": {
"gte": 50
}
}
}
]
}
},
"script": {
"source": "ctx._source.offer = params.offer",
"lang": "painless",
"params": {
"offer": true
}
}
}
我们可以对 Logsthash 做如下的 pitgcodepeline 设计:
logstash.conf
input {
generator {
message => '{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}'
count => 1
}
}
filter {
json {
source => "message"
}
if [paymentType] == "Mastercard" {
drop {}
}
mutate {
remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
}
}
output {
stdout {
codec => rubydebug
}
http {
url => "https://localhost:9200/customer/_update_by_query"
user => "elastic"
password => "Y+6tv9jejPl=W4IGrTD="
http_method => "post"
format => "message"
content_type => "application/json"
message => '{"query":{"bool":{"must":[{"match":{"paymentType.keyword":"%{paymentType}"}},{"range":{"age":{"gte":"%{age}"}tgcode}}]}},"script":{"source":"ctx._source.offer = params.offer","lang":"painless","params":{"offer":true}}}'
cacert => "/Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt"
}
}
在上面,我们在 message 中通过一个查询,匹配到 paymentType.keyword 为 Visa,并且 age 为大于等于 55 的文档,我们设置该用户为促销对象。把他的 offer 值设置为 true。这个在实际的使用中,依据自己的条件来进行配置。在上面,cacert 为我们的 Elasticsearch 的证书文件位置。具体使用,请参考文档。
我们接下来运行 Logstash 的 pipeline:
./bin/logstash -f logstash.conf
在上面我们可以看出来信息的输出。我们在 Kibana 中使用如下的命令来检查更新后的文档:
GET customer/_search
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "customer",
"_id": "2",
"_score": 1,
"_source": {
"offer": true,
"country": "Poland",
"gender": "Female",
"purpostgcodee": "Shoes",
"name": "Darby Dacks",
"id": 2,
"ip_address": "77.72.239.47",
"age": 55,
"timestamp": "2019-08-11T17:55:56Z",
"paymentType": "Visa"
}
}
]
}
}
很显然,我们上面的 offer 值现在变为 true,而不是之前的 false。
文章来源于互联网:Logstash:在 Logstash 管道中的定制的 Elasticsearch update by query
相关推荐: Elasticsearch:解析和丰富日志数据以在 Elastic 平台上进行故障排除
作者:Luca Wintergerst 在较早的博客文章 “日志监控和非结构化日志数据中,超越 tail -f”,我们讨论了收集和使用非结构化日志数据。 我们了解到,将数据添加到 Elastic Stack 非常容易。 到目前为止,我们所做的唯一解析是从该数据…