Elasticsearch:使用 categorize text aggregation 来创建更好的警报

2021年12月23日   |   by mebius

Elasticsearch 提供了丰富的 aggregation 来帮助我们来分析数据。在一些实际的使用中,我们想要针对一些信息进行分类统计,比如我们想知道 log.level 为 ERROR这样的错误日志里的所有错误的分类。我们首先想到的是使用 terms 聚合来完成,但是这里有一个问题就是,我们分类的信息可能像是:

  • Aborted process Execution
  • Unable to load file

显然这样的分类不能使用 terms 聚合来完成,这是因为它们每个信息里面含有多个 term。另外针对一些分类,我们可能想知道相似的信息的分类,比如:

  • process 1, Aborted process Execution
  • process 2, Aborted process Execution
  • precess 3, Unable to load file
  • precess 4, Unable to load file

显然,前面的两个还是可以认为是桶一个分类尽管它们前面的 process 1 及 process 2 是不一样的。

在实际的使用中,我们可以使用这样的分类来达到统计的目的,更有甚者,我们可以运用这些信息更好地对一些异常或错误进行及时报警。

那么我们该如何来完成这种情况的聚合呢?答案是 Elasticsearch 所提供的 Categorize text aggregation

简单地说,Categorize text aggregation 是一种多桶聚合,将半结构化文本分组到桶中。 使用自定义分析器重新分析每个文本字段。 然后对生成的 toktgcodeen 进行分类,创建类似格式文本值的桶。 这种聚合最适合机器生成的文本,如系统日志。 只有 100 个分析的 token 用于对文本进行分类。

警告:重新分析大数据集将需要大量时间和内存。 此聚合应与异步搜索(Async search)结合使用。 此外,你可以考虑将聚合用作采样器(sampler)或多样化采样器聚合(diversified sampler)的子项。 这通常会提高速度和内存使用。

示例

我们首先来针对我们的电脑的 syslog 来进行演示。我们使用 Filebeat 来收集电脑的系统日志。如果你对这个还不是很熟的话,请参阅我之前的文章:

在这里,我就不进行累述了。一旦系统日志被成功地收集起来了,我们可以在 Kibana 中进行如下的查看:

GET filebeat-7.16.2/_count

上述命令将显示有多少个系统日志被采集上来了。等我们收集到一定的日志后,我们将在下面进行使用演示。

展示

经过一段时间的运行后,在我的电脑上成功地收集了1130个日志文档。我们首先来查看一下 filebeat-* 中有一个叫做 message 的字段:

%title插图%num

它的类型为 text:

%title插图%num

我们知道 text 字段是不可以用于 aggregation 的。如上所示,在 Aggregatable 里显示是否定的。

现在我们想针对这个字段来进行一些统计。我们想找出这个 message 字段里相似的字段的统计。我们首先来运行如下的一个聚合:

POST filebeat-7.16.2/_search?filter_path=aggregations
{
  "aggs": {
    "categories": {
      "categorize_text": {
        "field": "message"
      }
    }
  }
}

在上面,我们针对 filebeat-* 索引来进行分类统计:

{
  "aggregations" : {
    "categories" : {
      "buckets" : [
        {
          "doc_count" : 672,
          "key" : "Service exited due to SIGKILL sent by *"
        },
        {
          "doc_count" : 293,
          "key" : "Libnotify notify_register_coalesced_registration failed with code on line"
        },
        {
          "doc_count" : 55,
          "key" : "last message repeated times"
        },
        {
          "doc_count" : 28,
          "key" : "last message repeated time"
        },
        {
          "doc_count" : 23,
          "key" : "entering bootstrap mode"
        },
        {
          "doc_count" : 23,
          "key" : "exiting bootstrap mode"
        },
        {
          "doc_count" : 19,
          "key" : "ASL Sender Statistics"
        },
        {
          "doc_count" : 15,
          "key" : "Configuration Notice"
        },
        {
          "doc_count" : 9,
          "key" : "DEPRECATED USE in libdispatch client Changing the target of source after it has been activated set breakpoint on dispatch_bug_deprecated to debug"
        },
        {
          "doc_count" : 9,
          "key" : "Invalid type for event event/type *"
        }
      ]
    }
  }
}

如上所示,信息 “Service exited due to SIGKILL sent by *” 出现了 672 次。这在 1130 个日志里是一个非常突出的统计数值,是非常值得我们注意的。

在实际的使用中,我们还可以有许许多多的微调参数来供我们进行选择。我们可以参考文档Categorize text aggregation

假如我们有如下的一个索引:

PUT test_index/_doc/1
{
  "message": "process trivial_1, Aborted process Execution"
}

PUT test_index/_doc/2
{
  "message": "process trivial_2, Aborted process Execution"
}

PUT test_index/_doc/3
{
  "message": "process trivial_3, Aborted process Execution"
}

上面的三个索引长的几乎一模一样,除了其中有一个 trivial_1,trivial_2 及 trivial_3 是不同的。如果我们直接使用上面的方法来进行统计:

POST test_index/_search?filter_path=aggregations
{
  "aggs": {
    "categories": {
      "categorize_text": {
        "field": "message"
      }
    }
  }
}

我们会发现如下的结果:

{
  "aggregations" : {
    "categories" : {
      "buckets" : [
        {
          "doc_count" : 1,
          "key" : "process trivial_1 Aborted process Execution"
        },
        {
          "doc_count" : 1,
          "key" : "process trivial_2 Aborted process Execution"
        },
        {
          "doc_count" : 1,
          "key" : "process trivial_3 Aborted process Execution"
        }
      ]
    }
  }
}

显然是三个分类。这在实际的使用中,可能它不是我们想要的情况。我们想把这三种情况一起统计为一种情况。那么我们该怎么办呢?我们可以使用categorization_filters:

POST test_index/_search?filter_path=aggregations
{
  "aggs": {
    "categories": {
      "categorize_text": {
        "field": "message",
        "categorization_filters": ["trivial\_\d{1}"]
      }
    }
  }
}

在上面,我们使用了 regex 来过滤掉 trivial_1,trivial_2 及 trivial_3。更为通用的格式:

POST test_index/_search?filter_path=aggregations
{
  "aggs": {
    "categories": {
      "categorize_text": {
        "field": "message",
        "categorization_filters": ["\w+\_\d{1}"]
      }
    }
  }
}

它可以帮我们过滤掉任何诸如 foo_1 这样格式的文字。重新运行上面的统计,我们可以看到:

{
  "aggregations" : {
    "categories" : {
      "buckets" : [
        {
          "doc_count" : 3,
          "key" : "process Aborted process Execution"
        }
      ]
    }
  }
}

显然这次,我们把三个文档统计为同样的一个类里。

案例

我们假设有一个案例。我们希望做如下的事情:

  1. 在过去的10分钟之内,有超过5个的错误信息,需要发送一个警告
  2. 发送的警告必须含有错误的类别

通常我们解决这类问题会使用 terms 聚合配合 top_hits 聚合来完成。当然,它不能完全解决上面我们所提到的在错误信息里面含有一些变量的日志信息。

下面,我们使用 categorize text aggregation 来解决这个问题:

GET logs-*/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [ # filter on time range
        {
          "range": {
            "@timestamp": {
              "gttgcodee": "now-10m/m"
            }
          }
        },
        {
          "term": {
            "log.level": "ERROR" # return only ertgcoderors
          }
        }
      ]
    }
  },
  "aggs": { # get error count for 10 minutes buckets if elastic find more than 1 doc
    "per_10_min": {
      "date_histogram": {
        "field": "@timestamp",
        "interval": "10m",
        "min_doc_count": 1
      },
      "aggs": { # within these errors split by log message informations
        "categories": {
          "categorize_text": {
            "field": "message",
             "categorization_filters": ["\\.*"], # add a regex filter to remove specific data of log (id, filename etc…)
            "size": 10
          }
        }
      }
    }
  }
}

上面的搜索会生成类似如下的结果:

      "buckets" : [
        {
          "key_as_string" : "2021-12-08T09:30:00.000Z",
          "key" : 1638955800000,
          "doc_count" : 26, #total number of errors in the last 10 minutes
          "categories" : {
            "buckets" : [
              {
                "doc_count" : 6, # 6 errors of that type
                "key" : "Aborted process Execution"
              },
              {
                "doc_count" : 20, # 20 files not found
                "key" : "Unable to load file" # file names has been removed by the regex filter
              }
            ]
          }
        }
      ]
    }
  }
}

现在可以快速轻松地为电子邮件警报创建正文,其中包含所有错误信息。

文章来源于互联网:Elasticsearch:使用 categorize text aggregation 来创建更好的警报