把 CSV 文件摄入到 Elasticsearch 中 – CSVES

2025年2月15日   |   by mebius

在我们之前的很多文章里,我有讲到这个话题。在今天的文章中,我们就提重谈。我们使用一种新的方法来实现。这是一个基于 golang 的开源项目。项目的源码在https://github.com/githubesson/csves/。由于这个原始的代码并不支持 basic security 及带有安全的 SSL ES 连接。我把它进行了一些小的改造。它是一种灵活的工具,用于将 CSV 数据导入 Elasticsearch,具有自动字段检测和映射功能。

克隆项目

我们在 terminal 中打入如下的命令:

git clone https://github.com/liu-xiao-guo/csves
$ pwd
/Users/liuxg/go
$ git clone https://github.com/liu-xiao-guo/csves
cd csves
$ tree -L 3
.
├── LICENSE
├── README.md
├── cmd
│ └── csves
│     └── main.go
├── example.csv
├── fields.json
├── go.mod
├── go.sum
└── pkg
    ├── config
    │ └── config.go
    ├── csv
    │ └── service.go
    ├── elasticsearch
    │ └── service.go
    └── models
        └── document.go

为了能够使得我们在下面能够运行应用,我们在项目的根目录下创建如下的一个 .env 文件:

.env

ELASTICSEARCH_URL=https://localhost:9200
INDEX_NAME=csv_test
CSV_FILE_PATH=./example.csv
USER_NAME=elastic
PASSWORD="y9NWnPq0++V=WxMXxSmr"
FIELD_CONFIG_PATH=fields.json
ELASTICSEARCH_CERT_PATH=/Users/liuxg/elastic/elasticsearch-8.17.1/config/certs/http_ca.crt

依赖你的配置,你需要根据上面的变量值做相应的修改。特别值得注意的是:

  • 如果你是 basic 安全设置,那么你需要提供 PASSWORD 及 USER_NAME
  • 如果你的 Elasticsearch 是带有 SSL 安全的,那么针对自签名Elasticsearch,你需要提供ELASTICSEARCH_CERT_PATH。否则设置它为 “”。

更多有关如何在 golang 里连接到 Elasticsearch,请参考文章 “Elasticsearch:运用 Go 语言实现 Elasticsearch 搜索 – 8.x”。

前提条件

为方便测试,我们在本文中采用使用 start-local 脚本在本地运行 Elasticsearch 来进行安装。这个方法的好处是没有安全的安装。它便于测试!

编译项目

go build -o csves cmd/csves/main.go
$ pwd
/Usetgcoders/liuxg/go/csves
$ go build -o csves cmd/csves/main.go
go: downloading github.com/joho/godotenv v1.5.1
go: downloading github.com/elastic/go-elasticsearch/v8 v8.17.1
go: downloading go.opentelemetry.io/otel/trace v1.28.0
go: downloading github.com/elastic/elastic-transport-go/v8 v8.6.1
go: downloading go.opentelemetry.io/otel v1.28.0
go: downloading go.opentelemetry.io/otel/metric v1.28.0
go: downloading github.com/go-logr/logr v1.4.2
go: downloading github.com/go-logr/stdr v1.2.2
$ ls
LICENSE     cmd         example.csv go.mod      pkg
README.md   csves       fields.json go.sum

我们看到一个新生成的csves 执行文件。

用法

基本使用

我们使用如下的方法来测试一个 csv 文件是否可以行:

./csves -csv="example.csv" -test
$ ./csves -csv="example.csv" -test
es-url: https://localhost:9200
index: csv_test
csv: ./example.csv
fields: fields.json
fields: fields.json
test: false
username: elastic
password: y9NWnPq0++V=WxMXxSmr
certpath: /Users/liuxg/elastic/elasticsearch-8.17.1/config/certs/http_ca.crt
Detected delimiter: ';'
CSV Header mapping: map[age:1 name:0 sex:2]
Test Mode - Printing all processed records:
Record 1:
  name: Jerry
  age: 28
  sex: M
  source_csv: example.csv

Record 2:
  name: Tom
  age: 40
  sex: F
  source_csv: example.csv

Record 3:
  name: Cherry
  age: 20
  sex: F
  source_csv: example.csv

Total records processed: 3

我们的测试 example.csv 文件内容如下:

$ vi example.csv 

name;age;sex
Jerry;28;M
Tom;40;F
Cherry;20;F

我们只选中其中的 name 及 age 字段:

./csves -csv="example.csv" -select="name,age" -test
$ ./csves -csv="example.csv" -select="name,age" -test
es-url: https://localhost:9200
index: csv_test
csv: ./example.csv
fields: fields.json
fields: fields.json
test: false
username: elastic
password: y9NWnPq0++V=WxMXxSmr
certpath: /Users/liuxg/elastic/elasticsearch-8.17.1/config/certs/http_ca.crt
Detected delimiter: ';'
CSV Header mapping: map[age:1 name:0 sex:2]
Test Mode - Printing all processed records:
Record 1:
  name: Jerry
  age: 28
  source_csv: example.ctgcodesv

Record 2:
  name: Tom
  age: 40
  source_csv: example.csv

Record 3:
  name: Cherry
  age: 20
  source_csv: example.csv
tgcode
Total records processed: 3

写入数据到 Elasticsearch 中

我们使用如下的命令来把数据写入到 Elasticsearch 中:

./csves -csv="example.csv"
$ ./csves -csv="example.csv"
es-url: https://localhost:9200
index: csv_test
csv: ./example.csv
fields: fields.json
fields: fields.json
test: false
username: elastic
password: y9NWnPq0++V=WxMXxSmr
certpath: /Users/liuxg/elastic/elasticsearch-8.17.1/config/certs/http_ca.crt
Detected delimiter: ';'
CSV Header mapping: map[age:1 name:0 sex:2]
Sample of processed records:
Fields: map[age:28 name:Jerry sex:M source_csv:example.csv]
Fields: map[age:40 name:Tom sex:F source_csv:example.csv]
Total records processed: 3
2025/02/15 13:28:16 All documents indexed successfully

我们在 Kibana 中进行查看:

GET csv_test/_search

%title插图%num

我们看到有三个文档被成功地写入。

下面我们有选择地写入其中的两个字段:name 及 age。我们首先在 Kibana 中执行如下的命令:

DELETE csv_test

我们再执行如下的命令:

./csves -csv="example.csv" -select="name,age"
$ ./csves -csv="example.csv" -select="name,age" 
es-url: https://localhost:9200
index: csv_test
csv: ./example.csv
fields: fields.json
fields: fields.json
test: false
username: elastic
password: y9NWnPq0++V=WxMXxSmr
certpath: /Users/liuxg/elastic/elasticsearch-8.17.1/config/certs/http_ca.crt
Detected delimiter: ';'
CSV Header mapping: map[age:1 name:0 sex:2]
Sample of processed records:
Fields: map[age:28 name:Jerry source_csv:example.csv]
Fields: map[age:40 name:Tom source_csv:example.csv]
Total records processed: 3
2025/02/15 13:31:06 All documents indexed successfully

在 Kibana 中进行查看:

%title插图%num

文章来源于互联网:把 CSV 文件摄入到 Elasticsearch 中 – CSVES

相关推荐: 使用 Elastic Cloud Hosted 优化长期数据保留:确保政府合规性和效率

作者:来自 ElasticJennie Davidowitz 在数字时代,州和地方政府越来越多地承担着管理大量数据的任务,同时确保遵守严格的监管要求。这些法规可能因司法管辖区而异,通常要求将数据保留较长时间 —— 有时从一年到七年不等。遵守刑事司法信息服务 (…