Elasticsearch:如何在 Python 中使用批量 API 为 Elasticsearch 索引文档
2023年4月2日 | by mebius
当我们需要创建 Elasticsearch 索引时,数据源通常没有规范化,无法直接导入。 原始数据可以存储在数据库、原始 CSV/XML 文件中,甚至可以从第三方 API 获取。 在这种情况下,我们需要对数据进行预处理以使其与 Bulk API 一起使用。 在本教程中,我们将演示如何使用简单的 Python 代码从 CSV 文件中索引 Elasticsearch 文档。 将使用原生Elasticsearch bulkAPI 和 helpers 模块中的 API。 你将学习如何在不同的场合使用合适的工具来索引 Elasticsearch 文档。
在之前的文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 – 8.x”,我展示了如何使用 bulk API 来索引文档到 Elasticsearch 中。细心的开发者可能观察到,如果我们的文档很多,数据量很大,那个方法可能并不适用,这是因为所以的操作都是在内存里进行操作的。如果我们的原始文档很大,这极有可能造成内存不够的情况。在今天的文章中,我将探讨使用 Python 里的 generator 来实现。
为了方便测试,我们的数据可以从https://github.com/liu-xiao-guo/py-elasticsearch8中获取。data.csv将是我们使用的原始数据。
安装
为了方便进行测试,我们将采用我之前的文章 “Elasticsearch:如何在 Docker 上运行 Elasticsearch 8.x 进行本地开发” 来进行部署。在这里我们采用 docker compose 来进行安装 Elasticsearch 及 Kibana。我们将不采用安全设置。更多关于如何在具有安全性的条件下使用 Python 来连接 Elasticsearch,请参考之前的文章“Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 – 8.x”。我们可以参考那篇文章来进行安装所需要的 Python 包。
在 Python 中创建索引
我们将创建与之前文章中演示的相同的 latops-demo 索引。 首先,我们将使用 Elasticsearch 客户端直接创建索引。 此外,settings 和 mappings 将作为顶级参数传递,而不是通过 body 参数传递。创建索引的命令是:
main.py
# Import Elasticsearch package
from elasticsearch import Elasticsearch
import csv
import json
# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es.info()
print(resp)
settings = {
"index": {"number_of_replicas": 2},
"analysis": {
"filter": {
"ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 15,
}
},
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "ngram_filter"],
}
}
}
}
mappings = {
"properties": {
"id": {"type": "long"},
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword"},
"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
}
},
"brand": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"},
}
},
"price": {"type": "float"},
"attributes": {
"type": "nested",
"properties": {
"attribute_name": {"type": "text"},
"attribute_value": {"type": "text"},
}
}
}
}
configurations = {
"settings": {
"index": {"number_of_replicas": 2},
"analysis": {
"filter": {
"ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 15,
}
},
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "ngram_filter"],
}
}
}
},
"mappings": {
"properties": {
"id": {"type": "long"},
"name": {
"type": "text",
"analyzer": "standard",
"fields": tgcode{
"keyword": {"type": "keyword"},
"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
}
},
"brand": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"},
}
},
"price": {"type": "float"},
"attributes": {
"type": "nested",
"properties": {
"attribute_name": {"type": "text"},
"attribute_value": {"type": "text"},
}
}
}
}
}
INDEX_NAME = "laptops-demo"
# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):
print("The index has already existed, going to remove it")
es.options(ignore_status=404).indices.delete(index=INDEX_NAME)
# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)
# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )
现在索引已创建。我们可以在 Kibana 中使用如下的命令来进行查看:
GET _cat/indices
我们可以开始向其中添加文档。
使用原生 Elasticsearch 批量 API
当你有一个小数据集要加载时,使用原生 Elasticsearch 批量 API 会很方便,因为语法与原生 Elasticsearch 查询相同,可以直接在 Dev 控制台中运行。 你不需要学习任何新东西。
将要加载的数据文件可以从这个链接下载。 将其保存为 data.csv,将在下面的 Python 代码中使用:
main.py
# Import Elasticsearch package
from elasticsearch import Elasticsearch
import csv
import json
# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es.info()
# print(resp)
settings = {
"index": {"number_of_replicas": 2},
"analysis": {
"filter": {
"ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 15,
}
},
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "ngram_filter"],
}
}
}
}
mappings = {
"properties": {
"id": {"type": "long"},
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword"},
"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
}
},
"brand": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"},
}
},
"price": {"type": "float"},
"attributes": {
"type": "nested",
"properties": {
"attribute_name": {"type": "text"},
"attribute_value": {"type": "text"},
}
}
}
}
configurations = {
"settings": {
"index": {"number_of_replicas": 2},
"analysis": {
"filter": {
"ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 15,
}
},
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "ngram_filter"],
}
}
}
},
"mappings": {
"properties": {
"id": {"type": "long"},
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword"},
"ngrams": {"type": "ttgcodeext", "analyzer": "ngram_analyzer"},
}
},
"brand": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"},
}
},
"price": {"type": "float"},
"attributes": {
"type": "nested",
"properties": {
"attribute_name": {"type": "text"},
"attribute_value": {"type": "text"},
}
}
}
}
}
INDEX_NAME = "laptops-demo"
# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):
print("The index has already existed, going to remove it")
es.options(ignore_status=404).indices.delete(index=INDEX_NAME)
# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)
# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )
with open("data.csv", "r") as fi:
reader = csv.DictReader(fi, delimiter=",")
actions = []
for row in reader:
action = {"index": {"_index": INDEX_NAME, "_id": int(row["id"])}}
doc = {
"id": int(row["id"]),
"name": row["name"],
"price": float(row["price"]),
"brand": row["brand"],
"attributes": [
{"attribute_name": "cpu", "attribute_value": row["cpu"]},
{"attribute_name": "memory", "attribute_value": row["memory"]},
{
"attribute_name": "storage",
"attribute_value": row["storage"],
},
],
}
actions.append(action)
actions.append(doc)
es.bulk(index=INDEX_NAME, operations=actions, refresh=True)
# Check the results:
result = es.count(index=INDEX_NAME)
print(result)
print(result.body['count'])
我们运行上面的代码:
$ python main.py
The index has already existed, going to remove it
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'laptops-demo'}
{'count': 200, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}
200
注意:在上面的 bulk 指令中,我们需要使用 refresh=True,否则当我们读出 count 的时候,它的值可能是 0。
在上面的代码中,有一个致命的问题就是我们在内存里创建 actions。如果我们的数据比较大的话,那么 actions 所需要的内存也会比较大。它显然不适合很大的数据的情况。
请注意,我们使用 csv 库方便地从 CSV 文件中读取数据。 可以看出,原生 bulk API 的语法非常简单,可以跨不同语言(包括 Dev Tools Console)使用。
使用批量助手 – bulk helper
如上所述,原生 bulkAPI 的一个问题是所有数据都需要先加载到内存,然后才能被索引。 当我们有一个大数据集时,这可能会出现问题并且效率很低。 为了解决这个问题,我们可以使用 bulk helper,它可以从迭代器(iterators)或生成器(generators)中索引 Elasticsearch 文档。 因此,它不需要先将所有数据加载到内存中,这在内存方面非常高效。 然而,语法有点不同,我们很快就会看到。
在我们使用 bulk helper 索引文档之前,我们应该删除索引中的文档以确认 bulk helper 确实成功工作。这个已经在我们上面的代码中已经完成了。然后我们可以运行以下代码使用批量助手将数据加载到 Elasticsearch:
main.py
# Import Elasticsearch package
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import csv
import json
# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es.info()
# print(resp)
settings = {
"index": {"number_of_replicas": 2},
"analysis": {
"filter": {
"ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 15,
}
},
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "ngram_filter"],
}
}
}
}
mappings = {
"properties": {
"id": {"type": "long"},
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword"},
"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
}
},
tgcode "brand": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"},
}
},
"price": {"type": "float"},
"attributes": {
"type": "nested",
"properties": {
"attribute_name": {"type": "text"},
"attribute_value": {"type": "text"},
}
}
}
}
configurations = {
"settings": {
"index": {"number_of_replicas": 2},
"analysis": {
"filter": {
"ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 15,
}
},
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "ngram_filter"],
}
}
}
},
"mappings": {
"properties": {
"id": {"type": "long"},
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword"},
"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
}
},
"brand": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"},
}
},
"price": {"type": "float"},
"attributes": {
"type": "nested",
"properties": {
"attribute_name": {"type": "text"},
"attribute_value": {"type": "text"},
}
}
}
}
}
INDEX_NAME = "laptops-demo"
# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):
print("The index has already existed, going to remove it")
es.options(ignore_status=404).indices.delete(index=INDEX_NAME)
# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)
# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )
def generate_docs():
with open("data.csv", "r") as fi:
reader = csv.DictReader(fi, delimiter=",")
for row in reader:
doc = {
"_index": INDEX_NAME,
"_id": int(row["id"]),
"_source": {
"id": int(row["id"]),
"name": row["name"],
"price": float(row["price"]),
"brand": row["brand"],
"attributes": [
{
"attribute_name": "cpu",
"attribute_value": row["cpu"],
},
{
"attribute_name": "memory",
"attribute_value": row["memory"],
},
{
"attribute_name": "storage",
"attribute_value": row["storage"],
},
],
},
}
yield doc
helpers.bulk(es, generate_docs())
# (200, []) -- 200 indexed, no errors.
es.indices.refresh()
# Check the results:
result = es.count(index=INDEX_NAME)
print(result.body['count'])
运行上面的代码。显示的结果如下:
$ python main.py
The index has already existed, going to remove it
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'laptops-demo'}
200
从上面的结果中我们可以看出来,我们已经成功地摄入了 200 个文档。
文章来源于互联网:Elasticsearch:如何在 Python 中使用批量 API 为 Elasticsearch 索引文档