Logstash:为 Logstash 日志启动索引生命周期管理
2021年1月9日 | by mebius
在 Elastic Stack 中,Logstash 作为一种 ETL 的摄入工具,它为大量摄入数据提供可能。Elastic Stack 提供索引生命周期管理可以帮我们管理被摄入的数据到不同的冷热节点中,并删除不需要保存的索引。在今天的文章中,我们将讲述如何为 Logstash 配置索引生命周期管理。在开始之前,建议你阅读如下的文档:
在本实验中,我将使用 Elastic Stack 7.10 来进行展示。
前提条件
你需要完成在文章 “Logstash:Logstash 入门教程 (二)” 中的实验内容 。在那篇文章中,它详细地描述了如何安装及配置所需要的 Elastic Stack:Elasticsearch,Kibana 及 Logstash。
在今天的实践中,我们对启动 Elasticsearch 稍有不同。我们来创建两个节点的 Elasticsearch 集群。一个节点是 hot,而另外一个节点是 warm。我们安装如下的方式来进行启动。
我们可以参考文章 “Elasticsearch:运用shard filtering来控制索引分配给哪个节点” 运行起来两个节点的 cluster。其实非常简单,当我们安装好 Elasticsearch 后,打开一个 terminal,并运行如下的命令:
./bin/elasticsearch -E node.name=node1 -E node.attr.data=hot -Enode.max_local_storage_nodes=2
它将运行起来一个叫做 node1 的节点。同时在另外 terminal 中运行如下的命令:
./bin/elasticsearch -E node.name=node2 -E node.attr.data=warm -Enode.max_local_storage_nodes=2
它运行另外一个叫做 node2 的节点。我们可以通过如下的命令来进行查看:
GET _cat/nodes?v
显示两个节点:
我们可以用如下的命令来检查这两个 node 的属性:
GET _cat/nodeattrs?v&s=name
显然其中的一个 node 是 hot,另外一个是 warm。
这样,我们就创建好了我们的 Elasticsearch 集群了。
测试环境如下:
一般来说索引生命周期有如下的几个阶段:
如上所示,一个典型的 ILM 通常有4个阶段:Hot, Warm, Cold, 以及 Delete。针对自己的业务需求,你可以分别启动相应的阶段。
动手实践
在接下来的实践中,我们将安按照如下的步骤来进行:
- 创建 ILM (Index Lifecycle Management)Policy
- 创建 Index template
- 导入数据并观察 ILM 的结果
创建 ILM Policy
首先打开 Kibana。我们来定义一个叫做 logstash 的 ILM policy:
我们接下来定义 hot phase 的配置:
如上图所示,当文档数满足如下的任何一个条件时就会滚动到另外一个索引:
- 索引的大小超过 1G
- 文档数超过 5 个
- 索引的最大时间跨度超过 30 天
这样做的目的就是为了防止一个索引的大小变的很大。
接下来我们来定义在 warm phase 的配置:
在上面,我们启动 Warm phase。在这个 phase 里,数据将保存于带有 warm 标签的节点上。由于我们只有一个 warm 节点,在本练习中,我将 number of replica 设置为 0。在实际的使用中,有更多的 replica 代表有更多的 read 能力。这个可以根据自己的业务需求和配置进行设置。我也同时启动了 Shrink index,也就是说它将在 warm phase 里把所有的 primary shard 压缩到一个。通常 primary shard 代表导入数据的能力。在 warm phase 中,我们通常不需要导入数据,我们只在 hot 节点中导入数据。
在今天的练习中,为了简单起见,我们不再定义其它的 phase 了。
点击上面的 Save as new policy。这样我们就完成了 policy 的定义了。
我们可以使用如下的 API 来进行查找,或者点击上面的 Show request 按钮来进行查看:
我们可以使用如下的命令来进行查看:
GET _ilm/policy/logstash
{
"logstash" : {
"version" : 1,
"modified_date" : "2020-12-07T06:21:20.988Z",
"policy" : {
"phases" : {
"warm" : {
"min_age" : "0ms",
"actions" : {
"allocate" : {
"number_of_replicas" : 0,
"include" : { },
"exclude" : { },
"require" : {
"data" : "warm"
}
},
"shrink" : {
"number_of_shards" : 1
},
"set_priority" : {
"priority" : 50
}
}
},
"hot" : {
"min_age" : "0ms",
"actions" : {
"rollover" : {
"max_size" : "1gb",
"max_age" : "30d",
"max_docs" : 5
},
"set_priority" : {
"priority" : 100
}
}
}
}
}
}
}
创建 logstash template
接下来我们需要创建一个 tgcodeindex template。我们取名为 logstash_template:
如果你想使用 data stream 的话,请参阅我之前的文章 “Elastic:Data stream 在索引生命周期管理中的应用”。点击 Next 按钮:
我们把如下的配置拷贝到编辑框中:
{
"lifecycle": {
"name": "logstash",
"rollover_alias": "logstash"
},
"routing": {
"allocation": {
"require": {
"data": "hot"
}
}
},
"refresh_interval": "1s",
"number_of_shards": "2"
}
在上面我们定义了 lifecyle 的名称以及 rollover_alias 的名字。点击 Next 按钮:
点击上面的 Create template:
这样我们就创建了 logstash_template。在上面的创建过程中,它除了创建 logstash_template,也同时当第一个文档被导入时会生成logstash 这个 alias。我们可以通过如下的命令来进行来查看:
GET _index_template/logstash_template
我们可以通过如上的命令来进行查看被定义的 logstash_template。注意这个 API 和之前版本的有所不同。之前的 index template 是 _template 终点而不是 _index_template。在实际的使用中,你需要进行查看。上面的命令生成的结果为:
{
"index_templates" : [
{
"name" : "logstash_template",
"index_template" : {
"index_patterns" : [
"logstash-*"
tgcode ],
"template" : {
"settings" : {
"index" : {
"lifecycle" : {
"name" : "logstash",
"rollover_alias" : "logstash"
},
"routing" : {
"allocation" : {
"require" : {
"data" : "hot"
}
}
},
"refresh_interval" : "1s",
"number_of_shards" : "2"
}
}
},
"composed_of" : [ ]
}
}
]
}
定义 Logstash 的配置文件
由于引入了 ILM,我们需要对先前文章 “Logstash:Logstash 入门教程 (二)” 中的 web_log.conf 来进行重新的定义:
weblog.conf
input {
tcp {
port => 9900
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
mutate {
remove_field => [ "message" ]
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "useragent"
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate {
remove_field => [ "timestamp" ]
}
}
output {
stdout { }
elasticsearch {
hosts => ["localhost:9200"]
ilm_rollover_alias => "logstash"
ilm_pattern => "000001"
ilm_policy => "logstash"
#user => "elastic"
#password => "password"
}
}
请注意在上面,我们在 elasticsearch 的 output 中定义了如下的几行字:
ilm_rollover_alias => "logstash"
ilm_pattern => "000001"
ilm_policy => "logstash"
如果你们感兴趣的话,请参阅 Elastic 的官方文档 “Elasticsearch output plugin”。我们把之前定义的 policy 以及 rollover_alias 填入。
通过 logstash 导入数据并检查结果
我们在一个 terminal 中启动 logstash:
./bin/logstash -f /Users/liuxg/demos/logstash/weblog.conf
请根据自己配置文件的路径的不同而修改上面的路径。
为了能够快速地看到 ILM 起作用,我们把 ILM 的检查时间从默认的 10 分钟缩减到 10 秒。
PUT _cluster/settings
{
"transient": {
"indices.lifecycle.poll_interval": "10s"
}
}
我们在另外一个 terminal 中打入如下的命令:
head -n 5 weblog-sample.log | nc localhost 9900
我们可以通过如下的命令进行查看:
GET logstash/_search
我们可以看到所有的文档都被写入到logstash-000001 这个索引中:
等过一会的时候,我们打入如下命令:
GET logstash/_ilm/explain
它显示:
{
"indices" : {
"logstash-000002" : {
"index" : "logstash-000002",
"managed" : true,
"policy" : "logstash",
"lifecycle_date_millis" : 1607324430865,
"age" : "1.93m",
"phase" : "hot",
"phase_time_millis" : 1607324432647,
"action" : "rollover",
"action_time_millis" : 1607324443170,
"step" : "check-rollover-ready",
"step_time_millis" : 1607324443170,
"phase_execution" : {
"policy" : "logstash",
"phase_definition" : {
"min_age" : "0ms",
"actions" : {
"rollover" : {
"max_size" : "1gb",
"max_age" : "30d",
"max_docs" : 5
},
"set_priority" : {
"priority" : 100
}
}
},
"version" : 1,
"modified_date_in_millis" : 1607322080988
}
},
"shrink-logstash-000001" : {
"index" : "shrink-logstash-000001",
"managed" : true,
"policy" : "logstash",
"lifecycle_date_millis" : 1607324430710,
"age" : "1.93m",
"phase" : "warm",
"action" : "complete",
"action_time_millis" : 1607324455330,
"step" : "complete",
"step_time_millis" : 1607324455330
}
}
}
在上面显示有两个索引:logstash-000002 以及shrink-logstash-000001。这个 shrink-logstash-000001 是由于从 hot phase 有5个文档满足触发的条件,从而转入到 warm phase。在 warm phase 中,我们把所有的 primary shards 都合并到一个 primary shard 中。我们可以查看上面的 policy 的定义。
在 terminal 中,我们接着打入如下的命令:
head -n 1 weblog-sample.log | nc localhost 9900
我们接着使用如下的命令来进行查看:
GET logstash/_search
{
"_source": "_index"
}
上面的命令显示:
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 6,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "shrink-logstash-000001",
"_type" : "_doc",
"_id" : "rAEAPHYB9dcfabeQVSHM",
"_score" : 1.0,
"_source" : { }
},
{
"_index" : "shrink-logstash-000001",
"_type" : "_doc",
"_id" : "qgEAPHYB9dcfabeQVSHM",
"_score" : 1.0,
"_source" : { }
},
{
"_index" : "shrink-logstash-000001",
"_type" : "_doc",
"_id" : "qwEAPHYB9dcfabeQVSHM",
"_score" : 1.0,
"_source" : { }
},
{
"_index" : "shrink-logstash-000001",
"_type" : "_doc",
"_id" : "rQEAPHYB9dcfabeQVSHN",
"_score" : 1.0,
"_sourcetgcode" : { }
},
{
"_index" : "shrink-logstash-000001",
"_type" : "_doc",
"_id" : "rgEAPHYB9dcfabeQVSHT",
"_score" : 1.0,
"_source" : { }
},
{
"_index" : "logstash-000002",
"_type" : "_doc",
"_id" : "1QEGPHYB9dcfabeQYSH_",
"_score" : 1.0,
"_source" : { }
}
]
}
}
也就是说有两个索引:shrink-logstash-000001 已及logstash-000002。最新被导入的文档被保存于logstash-000002 (hot node) 索引中。之前的 5 个文档被保存于shrink-logstash-000001 (warm node) 中。
我们在数据被导入后,我们可以使用如下的命令来查看已经被创建的 logstash alias:
GET _cat/aliases?v
上面的命令显示:
alias index filter routing.index routing.search is_write_index
.kibana .kibana_1 - - - -
.kibana_task_manager .kibana_task_manager_1 - - - -
apm-7.10.0-span apm-7.10.0-span-000001 - - - true
apm-7.10.0-transaction apm-7.10.0-transaction-000001 - - - true
logstash shrink-logstash-000001 - - - -
logstash-000001 shrink-logstash-000001 - - - -
apm-7.10.0-metric apm-7.10.0-metric-000001 - - - true
apm-7.10.0-profile apm-7.10.0-profile-000001 - - - true
.kibana-event-log-7.10.0 .kibana-event-log-7.10.0-000001 - - - true
logstash logstash-000002 - - - true
metricbeat-7.10.0 metricbeat-7.10.0-2020.12.06-000001 - - - true
apm-7.10.0-error apm-7.10.0-error-000001 - - - true
ilm-history-3 ilm-history-3-000001 - - - true
从上面的列表中,我们可以看出来 logstash 这个 alias 指向 logstash-00002。
文章来源于互联网:Logstash:为 Logstash 日志启动索引生命周期管理
相关推荐: Beats:使用 Elastic Stack 记录 Python 应用日志
日志记录实际上是每个应用程序都必须具备的功能。无论你选择基于哪种技术,都需要监视应用程序的运行状况和操作。随着应用程序扩展,这变得越来越困难,你需要查看不同的文件,文件夹甚至服务器来查找所需的信息。虽然你可以使用内置功能从应用程序本身编写 Python 日志,…