Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch

2022年11月10日   |   by mebius

在我之前的文章,我详细地介绍了如何通过 Filebeat 来收集日志并写入到 Elasticsearch。你可以阅读我之前的文章:

%title插图%num

tgcode

在今天的文章中,我将分享如何使用 Logstash 把日志文件发送到 Elasticsearch。使用 Logstash 的好处是它可以很方便地使用它丰富的过滤器对数据进行清洗以便更好地对数据进行分析。我们使用如下的架构:

%title插图%num

在今天的展示中,我将使用最新的 Elastic Stack 8.4.3 来进行展示。

安装

如果你还没有安装好自己的 Elasticsearch,Kibana 及 Logstash,你可以按照如下的文章来进行安装:

首先,我们参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群” 来生成 truststore.p12 证书文件:

$ pwd
/Users/liuxg/test/elasticsearch-8.4.3/config/certs
$ ls 
http.p12      http_ca.crt   transport.p12
$ keytool -import -file http_ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
Certificate was added to keystore
$ ls
http.p12       http_ca.crt    transport.p12  truststore.p12

在上面,我们生产的 truststore.p12 的密码为 password。

我们针对 Logstash 配置如下的配置文件:

logstash.conf

input {
  udp {
    port => 5959
    codec => json {
          target => "[document]"
    }
  }
}
output {
  stdout {
    codec => rubydebug
  }

  elasticsearch {
      index => "logdb"
      hosts => ["https://192.168.0.3:9200"]
      user => "elastic"
      password => "6bTlJp388KkgJKWi+hQr"
      ssl_certificate_verification => true
      truststore => "/Users/liuxg/test/elasticsearch-8.4.3/config/certs/truststore.p12"
      truststore_password => "password"
  }
}

在上面,我们需要根据自己的 Elasticsearch 账号及密码进行修改。另外你也需要根据自己的证书位置进行相应的调整。 上面的 hosts 是我的本地 Elasticsearch 集群的访问地址。你需要根据自己的进行配置。在上面,我们使用 udp input 来收集日志,并传入到 Elasticsearch。在本示例中,我们忽略了 filter 部分,以简化问题的描述。我们可以把这个 logstash.conf 置于 Logstash 的安装根目录中。

我们可以使用如下的命令来运行:

%title插图%num

%title插图%num

Python 日志应用

我们首先来安装一个叫做 python-logstash 的包:

 pip install python-logstash

我们设计如下的 Python 应用来通过 Logstash 写入日志:

app.py

import logging
import logstash
import sys


class Logging(object):
    def __init__(self, logger_name='python-logger',
                 log_stash_host='localhost',
                 log_stash_upd_port=5959

                 ):
        self.logger_name = logger_name
        self.log_stash_host = log_stash_host
        self.log_stash_upd_port = log_stash_upd_port


    def get(self):
        logging.basicConfig(
            filename="logfile",
            filemode="a",
            format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s",
            datefmt="%H:%M:%S",
            level=logging.INFO,
        )

        self.stderrLogger = logging.StreamHandler()
        logging.getLogger().addHandler(self.stderrLogger)
        self.logger = logging.getLogger(self.logger_name)
        self.logger.addHandler(logstash.LogstashHandler(self.log_stash_host,
                                                        self.log_stash_upd_port,
                                                tgcode        version=1))
        return self.logger



instance = Logging(log_stash_upd_port=5959, log_stash_host='localhost', logger_name='soumil')
logger = instance.get()

count = 0
from time import sleep
while True:

    count = count + 1

    if count % 2 == 0:
        logger.error('Error Message Code Faield :{} '.format(count))
    else:
        logger.info('python-logstash: test logstash info message:{} '.format(count))

我们在和 Logstash 运行的同一个机器上运行上面的应用。我们使用如下的方法来运行:

python app.py

%title插图%num

我们在 Logstash 的 terminal 中可以看到:

%title插图%num

它表明 Logstash 运作正常。

我们再到 Kibana 中打入如下的命令:

GET _cat/indices

%title插图%num

从上面的输出中,我们可以看到新生成的 logdb 索引。

我们可以对这个索引进行搜索:

%title插图%num

我们可以看到日志被正常地解析并可以被搜索。

采用异步方式写入日志

在很多的时候如果我们的日志是大量的,那么我们可以采取使用异步的方式来写如日志。这样的好处是应用不用等待数据完全写入后才继续向下执行。tgcode我们创建如下的 Logstash 的配置文件:

logstash_async.conf

input {
    tcp {
        port => 6000
    }
}


output {
  stdout {
    codec => rubydebug
  }
 
  elasticsearch {
      index => "logstash_async"
      hosts => ["https://192.168.0.3:9200"]
      user => "elastic"
      password => "6bTlJp388KkgJKWi+hQr"
      ssl_certificate_verification => true
      truststore => "/Users/liuxg/test/elasticsearch-8.4.3/config/certs/truststore.p12"
      truststore_password => "password"
  }
}

我们把这个文件拷贝到 Logstash 的安装根目录下,并进行如下的运行:

%title插图%num

%title插图%num

我们接下来创建如下的 Python 应用:

python-logstash-logging.py

import logging
import time
from logstash_async.handler import AsynchronousLogstashHandler
                                   
host = 'localhost'
port = 6000

test_logger = logging.getLogger('python-logging-test')
test_logger.setLevel(logging.DEBUG)
async_handler = AsynchronousLogstashHandler(host, port, database_path = None)
test_logger.addHandler(async_handler)

while True:
    test_logger.info("this is an info message at %s", time.time())
    time.sleep(0.5)    

在上面,我们每隔 0.5 秒的时间写入一条日志到 TCP 端口 6000,进而通过 Logstash 写入到 Elasticsearch。我们可以通过如下的命令来运行这个 Python 应用:

python python-logstash-logging.py 

在 Logstash 运行的 terminal 中,我们可以看到如下的输出:

%title插图%num

在 Kibana 中,我们可以查看索引logstash_async:

%title插图%num

文章来源于互联网:Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch

相关推荐: Elasticsearch:通过热、温、冷和冻结层管理数据自动化 — 无需编码!

如果你想完全按照本文标题的建议去做,那就别无所求。 这篇文章旨在指导如何使用 Kibana Dashboard 的 “堆栈管理(Stack Management)” 功能集通过热、温、冷和冻结层自动移动数据,而无需进行任何编码或执行命令行动作。 在下面的展示中…

Tags: , , , ,