Elasticsearch 的新 range 丰富策略使上下文数据分析更上一层楼 – 7.16

2022年2月24日   |   by mebius

Elasticsearch 7.16 引入了一个新的丰富策略:range。 range 策略允许将传入文档中的数字、日期或 IP 地址与丰富索引中相同类型的范围相匹配。 能够与 IP 范围进行匹配在安全用例中特别有用,其中额外的元数据可用于进一步细化检测规则。 由于我们已经在文档中添加了一个使用 IP 范围的示例,因此我们将在此处使用 date_range 类型进行示例。

%title插图%num

在之前我的文章 “Elasticsearch:enrich processor (7.5发行版新功能)” 已经详细描述了geo_matchmatch 的丰富策略。详细使用,请阅读那篇文章。

我们虚构的例子:事件和待命时间表

假设我们有许多待命(随传随到)时间表,我们希望将它们添加到 Elasticsearch,以便每个连续班次都是一个文档。 让我们介绍一下我们虚构的测试用例:Bob、Alice、Dan、Matt 和 Lizzie。

Bob 喜欢朝九晚六的工作,中午午休一小时。 我们可以像这样添加他在 11 月 29 日星期一的日程安排:

PUT /on_call_schedules
{
  "mappings": {
    "properties": {
      "shift": { 
        "type": "date_range", 
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}
POST on_call_schedules/_doc
{
  "engineer" : { 
    "name" : "Bob"
  },
  "shift" : {
    "gte" : "2021-11-29 08:00:00", 
    "lte" : "2021-11-29 12:00:00"
  }
}
POST on_call_schedules/_doc
{
  "engineer" : { 
    "name" : "Bob"
  },
  "shift" : {
    "gte" : "2021-11-29 13:00:00", 
    "lte" : "2021-11-29 17:00:00"
  }
}

对于其他工程师,他们的故事如下:Alice 有类似的时间表,但从 13:00 开始吃午饭,Dan 和 Matt 在不同的时区,Matt 工作半天,0:00 – 4:00,Dan 工作 3 :00-8:00 午休至 9:00,12:00 下班,Lizzie 晚上 16:00 工作至午夜 20:00 休息。

填充索引的其余请求如下所示(为简洁起见,将对象放在同一行):

POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Alice"  },
  "shift" : { "gte" : "2021-11-29 09:00:00", "lte" : "2021-11-29 13:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Alice"  }, 
  "shift" : {  "gte" : "2021-11-29 14:00:00", "lte" : "2021-11-29 18:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Dan"  },
  "shift" : { "gte" : "2021-11-29 03:00:00", "lte" : "2021-11-29 08:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Dan"  }, 
  "shift" : {  "gte" : "2021-11-29 09:00:00", "lte" : "2021-11-29 12:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Matt"  }, 
  "shift" : {  "gte" : "2021-11-29 00:00:00", "lte" : "2021-11-29 04:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Lizzie"  }, 
  "shift" : {  "gte" : "2021-11-29 16:00:00", "lte" : "2021-11-29 20:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Lizzie"  }, 
  "shift" : {  "gte" : "2021-11-29 21:00:00", "lte" : "2021-11-30 00:00:00"  }
}

现在我们有了一个包含所有时间tgcode表的索引,我们可以继续创建一个丰富的策略,以便在我们提供日期时通过将其与包含 date_range 的 shift 字段进行匹配来查找待命工程师:

PUT /_enrich/policy/add-oncall-engineers-policy
{
  "range": {
    "indices": "on_call_schedules",
    "match_field": "shift",
    "enrich_fields": ["engineer.name"]
  }
}

上面的 range 策略的意思是:当 shift 字段的值是在输入文档中匹配的时间范围内,那么 engineer.name 的值将被丰富进来。有了策略,我们可以执行它,以便可以准备源索引中的数据以供使用:

POST /_enrich/policy/add-oncall-engineers-policy/_execute?wait_for_completion=true

现在我们将创建一个摄入管道,以便我们可以处理传入的文档:

PUT /_ingest/pipeline/engineer_lookup
{
  "processors" : [
    {
      "enrich" : {
        "description": "Add on-call engineer based on 'date'",
        "policy_name": "add-oncall-engineers-policy",
        "field" : "@timestamp",
        "target_field": "oncall_engineers",
        "max_matches": "25"
      }
    }
  ]
}

在这一点上,我们都准备好记录一些事件,并用预定的工程师来丰富它们。这个摄入管道的意思是:当一个文档 @timestamp 的时间落在策略中的所定义的索引on_call_schedules 中的 shift 范围内,那么相应的 engineer.name 将被丰富于新的文档中。

让我们使用一个事件来测试

让我们记录一下 Dan 在早上 6:12 处理的事件一:

PUT /incidents/_doc/incident1?pipeline=engineer_lookup
{
  "@timestamp": "2021-11-29 06:12:33",
  "severity": "high",
  "handled_by": "Dan"
}

在上面,我们输入一个事件。我们可以看到它里面还有一个字段叫做 @timestamp。我们可以用这个值和之前输入的文档中的 shift 进行比较。如果这个 @timestamp 的值的范围落于其中的一个文档时间范围,那么我们可以把它的 name 这字段丰富过来,并写入到 target_field 中。

当我们检索文档时,我们可以看到 Dan 是唯一安排好的人:

GET /incidents/_doc/incident1

上面的命令响应是:

{
  "_index" : "incidents",
  "_type" : "_doc",
  "_id" : "incident1",
  "_version" : 8,
  "_seq_no" : 7,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "severity" : "high",
    "@timestamp" : "2021-11-29 06:12:33",
    "handled_by" : "Dan",
    "oncall_engineers" : [
      {
        "shift" : {
          "gte" : "2021-11-29 03:00:00",
          "lte" : "2021-11-29 08:00:00"
        },
        "engineer" : {
          "name" : "Dan"
        }
      }
    ]
  }
}

在上面我们可以看到被丰富的字段 engineer.name 是 Dan,也就是如下的文档:

POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Dan"  },
  "shift" : { "gte" : "2021-11-29 03:00:00", "lte" : "2021-11-29 08:00:00"  }
}

中的 shift 时间范围包含之前的 @timestamp 时间2021-11-29 06:12:33,从而 engineer.name 这个字段被添加到新的文档中。

更多事件

让我们再记录三个事件,两个由 Dan 在 11:12 和 14:08 处理,一个由 Alice 在 16:12 处理:

PUT /incidents/_doc/incident2?pipeline=engineer_lookup
{
  "@timestamp": "2021-11-29 11:12:52",  "severity": "high",  "handled_by": "Dan"
}

PUT /incidents/_doc/incident3?pipeline=engineer_lookup
{
  "@timestamp": "2021-11-29 14:08:06",   "severity": "high",  "handled_by": "Dan"
}

PUT /incidents/_doc/incident4?pipeline=engineer_lookup&refretgcodesh=wait_for
{
  "@timestamp": "2021-11-29 16:12:16",  "severity": "high",  "handled_by": "Alice"
}

根据我们的日程安排,当第二次事故发生时,我们应该有 3 名工程师随叫随到。 让我们验证一下:

GET /incidents/_doc/incident2

响应是:

{
  "_index" : "incidents",
  "_type" : "_doc",
  "_id" : "incident2",
  "_version" : 1,
  "_seq_no" : 8,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "severity" : "high",
    "@timestamp" : "2021-11-29 11:12:52",
    "handled_by" : "Dan",
    "oncall_engineers" : [
      {
        "shift" : {
          "gte" : "2021-11-29 08:00:00",
          "lte" : "2021-11-29 12:00:00"
        },
        "engtgcodeineer" : {
          "name" : "Bob"
        }
      },
      {
        "shift" : {
          "gte" : "2021-11-29 09:00:00",
          "lte" : "2021-11-29 13:00:00"
        },
        "engineer" : {
          "name" : "Alice"
        }
      },
      {
        "shift" : {
          "gte" : "2021-11-29 09:00:00",
          "lte" : "2021-11-29 12:00:00"
        },
        "engineer" : {
          "name" : "Dan"
        }
      }
    ]
  }
}

如我们所见,shift 匹配正确。

事件三有点奇怪,Dan 处理了这件事,但没有被安排做这件事! Dan 工作太辛苦了。 与其直接检索事件,不如搜索 Dan 在未安排时间时处理的所有事件:

GET incidents/_search
{
  "query": {
    "bool": {
      "must_not": [
        {
          "term": {
            "oncall_engineers.engineer.name.keyword": "Dan"
          }
        }
      ], 
      "filter": [
        {
          "term": {
            "handled_by.keyword": "Dan"
          }
        }
      ]
    }
  }
}

事实上,我们将事件三作为命中:

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.0,
    "hits" : [
      {
        "_index" : "incidents",
        "_type" : "_doc",
        "_id" : "incident3",
        "_score" : 0.0,
        "_source" : {
          "severity" : "high",
          "@timestamp" : "2021-11-29 14:08:06",
          "handled_by" : "Dan",
          "oncall_engineers" : [
            {
              "shift" : {
                "gte" : "2021-11-29 13:00:00",
                "lte" : "2021-11-29 17:00:00"
              },
              "engineer" : {
                "name" : "Bob"
              }
            },
            {
              "shift" : {
                "gte" : "2021-11-29 14:00:00",
                "lte" : "2021-11-29 18:00:00"
              },
              "engineer" : {
                "name" : "Alice"
              }
            }
          ]
        }
      }
    ]
  }
}

除了搜索之外,我们还可以运行聚合。 让我们按照每个工程师处理事件的频率以及他们待命的事件数量来分解事情:

GET incidents/_search
{
  "aggs": {
    "on_call_per_incident": {
      "terms": {
        "field": "oncall_engineers.engineer.name.keyword",
        "size": 10
      }
    },
    "handled_incidents": {
      "terms": {
        "field": "handled_by.keyword",
        "size": 10
      }
    }
  }
  , "size": 0
}

响应为:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "handled_incidents" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "Dan",
          "doc_count" : 3
        },
        {
          "key" : "Alice",
          "doc_count" : 1
        }
      ]
    },
    "on_call_per_incident" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "Alice",
          "doc_count" : 3
        },
        {
          "key" : "Bob",
          "doc_count" : 3
        },
        {
          "key" : "Dan",
          "doc_count" : 2
        },
        {
          "key" : "Lizzie",
          "doc_count" : 1
        }
      ]
    }
  }
}

作为最后一个例子,让我们也分解一下每个工程师以及当该工程师处理事件时谁也在待命:

GET incidents/_search
{
  "aggs": {
    "incidents_handled_by": {
      "terms": {
        "field": "handled_by.keyword",
        "size": 10
      }
      , "aggs": {
        "supporting": {
          "terms": {
            "field": "oncall_engineers.engineer.name.keyword",
            "size": 10
          }
        }
      }
    }
  }
  , "size": 0
}

更多例子

以下示例创建一个 range 丰富策略,该策略根据 IP 地址将描述性网络名称和负责部门添加到传入文档。 然后,它将丰富策略添加到摄取管道中的处理器。

使用带有适当映射的 create index API 创建源索引。

PUT /networks
{
  "mappings": {
    "properties": {
      "range": { "type": "ip_range" },
      "name": { "type": "keyword" },
      "department": { "type": "keyword" }
    }
  }
}

以下索引 API 请求将新文档索引到该索引。

PUT /networks/_doc/1?refresh=wait_for
{
  "range": "10.100.0.0/16",
  "name": "production",
  "department": "OPS"
}

使用创建丰富策略 API 创建具有 range 策略类型的丰富策略。 该政策必须包括:

  • 一个或多个源索引
  • match_field,源索引中用于匹配传入文档的字段
  • 丰富你想要附加到传入文档的源索引中的字段

由于我们计划根据 IP 地址丰富文档,因此策略的 match_field 必须是 ip_range 字段。

PUT /_enrich/policy/networks-policy
{
  "range": {
    "indices": "networks",
    "match_field": "range",
    "enrich_fields": ["name", "department"]
  }
}

使用 execute enrich policyAPI 为策略创建丰富索引。

POST /_enrich/policy/networks-policy/_execute

使用创建或更新管道 API 创建摄取管道。 在管道中,添加一个丰富的处理器,其中包括:

  • 你的丰富策略。
  • 用于匹配来自丰富索引的文档的传入文档的 field。
  • target_field 用于存储传入文档的附加丰富数据。 此字段包含你的丰富策略中指定的 match_field 和enrich_fields。
PUT /_ingest/pipeline/networks_lookup
{
  "processors" : [
    {
      "enrich" : {
        "description": "Add 'network' data based on 'ip'",
        "policy_name": "networks-policy",
        "field" : "ip",
        "target_field": "network",
        "max_matches": "10"
      }
    }
  ]
}

使用摄入管道来索引文档。 传入的文档应包括你的丰富处理器中指定的字段。

PUT /my-index-000001/_doc/my_id?pipeline=networks_lookup
{
  "ip": "10.100.34.1"
}

在上面,由于”ip”: “10.100.34.1” 这个 IP 地址是在10.100.0.0/16 范围内,所以当我们摄入数据时 networks_lookup 摄入管道会起作用。它会把丰富的内容添加到 target_field 字段里。

要验证丰富处理器匹配并附加了适当的字段数据,请使用 get API 查看索引文档。

GET /my-index-000001/_doc/my_id

API 返回以下响应:

{
  "_index" : "my-index-000001",
  "_type" : "_doc",
  "_id" : "my_id",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 8,
  "found" : true,
  "_source" : {
    "ip" : "10.100.34.1",
    "network" : [
      {
        "name" : "production",
        "range" : "10.100.0.0/16",
        "department" : "OPS"
      }
    ]
  }
}

从上面我们可以看出来,除了我们已经输入的字段 ip 以外,我们还可以看到被丰富的字段 network 作为起 target_field。在它里面含有我们之前在networks-policy 定义的 name 及 department 字段。

Range丰富政策的好处

Range 丰富政策开辟了新的匹配选项和丰富文档的新方法。 在这篇博文中,我们展示了一家虚构的公司,其中包含预定的工程师和记录的事件。 使用 Elasticsearch 的功能,我们可以记录事件,丰富工程师安排的事件,并分析数据。

文章来源于互联网:Elasticsearch 的新 range 丰富策略使上下文数据分析更上一层楼 – 7.16

相关推荐: Elasticsearch:了解 Elasticsearch combined fields 和 multi match 查询

原文:Understanding Elasticsearch combined fields and multi match queries 这篇博文深入探讨了 Elasticsearch 7.13 中添加的新的 combined_fields 查询以及为什么…

Tags: