Logstash:配置例子
2021年7月12日 | by mebius
今天看了一下 Elastic 的官方例子。觉得还是很不错。在今天的文章中,我就常见的几种格式tgcode的日志来做一个说明,希望对大家有所帮助。关于 Logstash,我之前有做一个 meetup。里面还含有一个完整的例子。你可以在如下的地址进行查看:
另外,我的另外一篇文章 “Logstash:Data转换,分析,提取,丰富及核心操作” 里面含有丰富的例子供大家阅读。
以下示例说明了如何配置 Logstash 以过滤事件、处理 Apache 日志和 syslog 消息,以及使用条件来控制过滤器或输出处理哪些事件。
配置过滤器
过滤器是一种 in-line 处理机制,可以灵活地对数据进行切片和切块以满足您的需求。 让我们来看看一些实际的过滤器。 以下配置文件设置 grok 和日期过滤器。
我们首先定义一个叫做 logstash.conf 的配置文件:
logstash.conf
input { stdin { } }
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
在上面,我们使用 stdin 来做为输入,也就是使用 console 来进行输入。关于上面的 grok 过滤器,我们可以使用 Elastic 的 grok debugger 来进行验证:
我们可以使用如下的一个 apache 日志例子来进行测试:
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
在上面,我们可以看到日志被成功结构化。我可以看到 request,agent, verb 等字段。这是一个非常标准的日志,我们使用 grok pattern:
%{COMBINEDAPACHELOG}
来进行解析。
在上面,我们可以看到经过 grok 过滤器处理后,有一个叫做 timestamp 的字段。在 Elasticsearch 的日志中,我们习惯使用 @timestamp 来作为日志的时间标签。我们使用 date 过滤器来对它做转换。经过这样的处理后,日志的 @timestamp 的值将取自 timestamp 字段的值。
我们在 console 下做如下的运行:
./bin/logstash -f logstash.conf
等 Logstash 已经完全运行起来后,我们粘贴上面的例子日志:
从上面,我们可以看出来 apache 日志已经被成功地解析并结构化。我们甚至可以在 Elasticsearch 中查看到最新生成的日志文件:
在上面,我们使用 stdin 作为输入。在每次进行测试的时候,我们需要拷贝再粘贴。测试起来非常不方便。一种办法是创建一个叫做 test.log 的文件,并使用如下的方式来启动 Logstash:
$ pwd
/Users/liuxg/elastic3/logstash-7.13.0
$ ls test.log
test.log
$ cat test.log
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
$ cat test.log | ./bin/logstash -f logstash.conf
我们将看到和上面一样的运行结果。这样我们就不用每次粘贴了。
当然,我们还有一种方式就使用 file input 来读取这个文件 test.log。这个在我之前的文章 “Logstash:Data转换,分析,提取,丰富及核心操作” 有许多的例子。我们可以把我们的 logstash.conf 的文件修改为:
logstash.conf:
input {
file {
type => "apache"
path => "/Users/liuxg/elastic3/logstash-7.13.0/test.log"
start_position => "beginning"
sincedb_path => "null"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
处理 Apache 日志
让我们做一些真正有用的事情:处理 apache2 访问日志文件! 我们将从本地主机上的文件中读取输入,并根据我们的需要使用条件来处理事件。 首先,创建一个名为 logstash-apache.conf 的文件,内容如下(你可以根据需要更改日志的文件路径):
logstash-apache.conf:
input {
file {
path => "/tmp/access_log"
start_position => "beginning"
}
}
filter {
if [path] =~ "access" {
mutate { replace => { "type" => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
然后,使用以下日志条目(或使用你自己的网络服务器中的一些条目)创建你在上面配置的输入文件(在本例中为“/tmp/access_log”):
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3" 134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPathtgcode.2; .NET4.0C; .NET4.0E)" 98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
在上面的 filter 中,我们添加了对 path 路径的检查。如果路径中含有 access 字符串,则添加字段 type 为 apache_access。
我们重新运行 Logstash:
bin/logstash -f logstash-apache.conf
现在你应该可以在 Elasticsearch 中看到你的 apache 日志数据! Logstash 打开并读取指定的输入文件,处理它遇到的每个事件。 记录到此文件的任何其他行也将被 Logstash 作为事件捕获、处理并存储在 Elasticsearch 中。 作为一个额外的好处,它们隐藏在字段 “type” 设置为 “apache_access” 的情况下(这是由输入配置中的 type⇒“apache_access” 行完成的)。
在此配置中,Logstash 仅查看 apache access_log,但通过更改上述配置中的一行,很容易同时查看 access_log 和 error_log(实际上是任何匹配 *log 的文件):
input {
file {
path => "/tmp/*_log"
...
当你重新启动 Logstash, 你会发现 access_log 里的日志被分解为各个字段,而 error_log 里的则没有。这是因为在 filter 里检查 path 里是否含有 access。如果有,则对它进行解析。而针对 error_log,它没有。
请注意,Logstash 没有重新处理在 access_log 文件中已经看到的事件。 从文件中读取时,Logstash 会保存其位置,并且仅在添加新行时对其进行处理。 整洁的!
如果你确实需要重新把日志文件从头再来一遍,你可以这样来配置:
input {
file {
path => "/tmp/access_log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
使用条件
你可以使用条件来控制过滤器或输出处理哪些事件。 例如,你可以根据每个事件出现在哪个文件(access_log、error_log 和其他以 “log” 结尾的随机文件)来标记每个事件。
input {
file {
path => "/tmp/*_log"
}
}
filter {
if [path] =~ "access" {
mutate { replace => { type => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
} else if [path] =~ "error" {
mutate { replace => { type => "apache_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
此示例使用 type 字段标记所有事件,但实际上并未解析错误或随机文件。 有很多类型的错误日志,如何标记它们实际上取决于你正在使用的日志。
同样,你可以使用条件将事件定向到特定输出。 例如,你可以:
- 警告 nagios 任何状态为 5xx 的 apache 事件
- 将任何 4xx 状态记录到 Elasticsearch
- 通过 statsd 记录所有状态代码命中
要将任何具有 5xx 状态代码的 http 事件告诉 nagios,你首先需要检查 type 字段的值。 如果是 apache,那么你可以检查状态字段是否包含 5xx 错误。 如果是,请将其发送给 nagios。 如果不是 5xx 错误,请检查状态字段是否包含 4xx 错误。 如果是这样,请将其发送到 Elasticsearch。 最后,无论 status 字段包含什么,都将所有 apache 状态代码发送到 statsd:
output {
if [type] == "apache" {
tgcodeif [status] =~ /^5dd/ {
nagios { ... }
} else if [status] =~ /^4dd/ {
elasticsearch { ... }
}
statsd { increment => "apache.%{status}" }
}
}
处理 syslog 信息
Syslog 是 Logstash 最常见的用例之一,它处理得非常好(只要日志行大致符合 RFC3164)。 Syslog 是事实上的 UNIX 网络日志记录标准,将消息从客户端计算机发送到本地文件,或通过 rsyslog 发送到中央日志服务器。 对于这个例子,你不需要一个正常运行的 syslog 实例; 我们将从命令行伪造它,以便你了解发生了什么。如果你想使用真正的 syslog,你在 Linux 的环境下,可以在路径/var/log/syslog 或者/var/log/messages。
首先我们为 Logstash + syslog 做一个简单的配置文件,叫做 logstash-syslog.conf。
logstash-syslog.conf
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
我们使用如下的命令来运行 Logstash:
./bin/logstash -f logstash-syslog.conf
通常,客户端计算机会连接到端口 5000 上的 Logstash 实例并发送其消息。 在这个例子中,我们将 telnet 到 Logstash 并输入一个日志行(类似于我们之前在 STDIN 中输入日志行的方式)。 等 Logstash 完全运行起来后,打开另一个 shell 窗口以与 Logstash syslog 输入交互并输入以下命令:
telnet localhost 5000
复制并粘贴以下几行作为示例。 (随意尝试一些你自己的,但请记住,如果 grok 过滤器对你的数据不正确,它们可能无法解析)。
我们回到 Logstash 运行的 console 中,我们可以看到:
我们可以看到日志被正确地解析。当然我们也可以在 Elasticsearch 中查看到最新导入的日志:
文章来源于互联网:Logstash:配置例子