将 OneLake 数据索引到 Elasticsearch – 第 1 部分

2025年2月6日   |   by mebius

作者:来自 ElasticGustavo Llermaly

%title插图%num

学习配置 OneLake,使用 Python 消费数据并在 Elasticsearch 中索引文档,然后运行语义搜索。

OneLake 是一款工具,可让你连接到不同的 Microsoft 数据源,例如 Power BI、Data Activator 和 Data factory 等。它支持将数据集中在 DataLakes 中,DataLakes 是支持全面数据存储、分析和处理的大容量存储库。

在本文中,我们将学习如何配置 OneLake、使用 Python 消费数据以及在 Elasticsearch 中索引文档,然后运行语义搜索。

有时,你可能希望在非结构化数据和来自不同来源和软件提供商的结构化数据中运行搜索,并使用 Kibana 创建可视化。对于这种任务,在 Elasticsearch 中索引文档作为中央存储库会变得非常有用。

在这个例子中,我们将使用一家名为 Shoestic 的虚拟公司,这是一家在线鞋店。我们在结构化文件 (CSV) 中列出了产品列表,而一些产品的数据表则采用非结构化格式 (DOCX)。这些文件存储在 OneLake 中。

你可以在此处找到包含完整示例(包括测试文档)的笔记本。

步骤

  • OneLake 初始配置
  • 使用 Python 连接到 OneLake
  • 索引文档
  • 查询

OneLake 初始配置

OneLake 架构可以总结如下:

%title插图%num

要使用 OneLake 和 Microsoft Fabric,我们需要一个 Office 365 帐户。如果你没有,可以在此处tgcode建一个试用帐户。

使用你的帐户登录 Microsoft Fabric。然后,创建一个名为 “ShoesticWorkspace” 的工作区。进入新创建的工作区后,创建一个 Lakehouse 并将其命名为“ShoesticDatalake”。最后一步是在 “Files” 中创建一个新文件夹。单击 “new subfolder” 并将其命名为 “ProductsData”。

%title插图%num

完成了!我们准备开始提取数据了。

使用 Python 连接到 OneLake

配置完 OneLake 后,我们现在可以准备 Python 脚本。Azure 有处理凭据并与 OneLake 通信的库。

pip install azure-identity elasticsearch==8.14 azure-storage-file-datalake azure-cli python-docx

“azure-identity azure-storage-file-datalake” 库让我们可以与 OneLake 交互,同时 “azure-cli” 可以访问凭据并授予权限。为了读取文件内容以便稍后将其索引到 Elasticsearch,我们使用 python-docx。

在我们的本地环境中保存 Microsoft 凭据

我们将使用 “az login” 进入我们的 Microsoft 帐户并运行:

 az login --allow-no-subscriptions

标志 “ –allow-no-subscriptions”允许我们在没有有效订阅的情况下向 Microsoft Azure 进行身份验证。

此命令将打开一个浏览器窗口,你必须在其中访问你的帐户,然后选择你帐户的订阅号。

%title插图%num

现在我们可以开始编写代码了!

创建一个名为 onelake.py 的文件并添加以下内容:

_onelake.py_

# Importing dependencies 
import chardet 
from azure.identity import DefaultAzureCredential 
from docx import Document 
from azure.storage.filedatalake import DataLakeServiceClient 
  
# Initializing the OneLake client 
ONELAKE_ACCOUNT_NtgcodeAME = "onelake" 
ONELAKE_WORKSPACE_NAME = "ShoesticWorkspace" 
# Path in format .Lakehouse/files/ 
ONELAKE_DATA_PATH = "shoesticDatalake.Lakehouse/Files/ProductsData" 
  
# Microsoft token 
token_credential = DefaultAzureCredential() 
  
# OneLake services 
service_client = DataLakeServiceClient( 
	account_url=f"https://{ONELAKE_ACCOUNT_NAME}.dfs.fabric.microsoft.com", 
	credential=token_credential, 
) 
file_system_client = service_client.get_file_system_client(ONELAKE_WORKSPACE_NAME) 
directory_client = file_system_client.get_directory_client(ONELAKE_DATA_PATH) 
 
# OneLake functions   
  
# Upload a file to a LakeHouse directory 
def upload_file_to_directory(directory_client, local_path, file_name): 
	file_client = directory_client.get_file_client(file_name) 
  
	with open(local_path, mode="rb") as data: 
    	file_client.upload_data(data, overwrite=True) 
  
	print(f"File: {file_name} uploaded to the data lake.") 
  
  
# Get directory contents from your lake folder 
def list_directory_contents(file_system_client, directory_name): 
	paths = file_system_client.get_paths(path=directory_name) 
  
	for path in paths: 
    	print(path.name + "n") 
  
  
# Get a file by name from your lake folder 
def get_file_by_name(file_name, directory_client): 
	return directory_client.get_file_client(file_name) 
  
  
# Decode docx 
def get_docx_content(file_client): 
	download = file_client.download_file() 
	file_content = download.readall() 
	temp_file_path = "temp.docx" 
  
	with open(temp_file_path, "wb") as temp_file: 
    	temp_file.write(file_content) 
  
	doc = Document(temp_file_path) 
	text = [] 
  
	for paragraph in doc.paragraphs: 
    	text.append(paragraph.text) 
  
	return "n".join(text) 
  
  
# Decode csv 
def get_csv_content(file_client): 
	download = file_client.download_file() 
	file_content = download.readall() 
  
	result = chardet.detect(file_content) 
	encoding = result["encoding"] 
  
	return file_content.decode(encoding) 

将文件上传到 OneLake

在此示例中,我们将使用一个 CSV 文件和一些包含有关我们鞋店产品信息的 .docx 文件。虽然你可以使用 UI 上传它们,但我们将使用 Python 来完成。在此处下载文件。

我们将文件放在文件夹 /data 中,位于名为 upload_files.py 的新 Python 脚本旁边:

# upload_files.py 
  
# Importing dependencies 
from azure.identity import DefaultAzureCredential 
from azure.storage.filedatalake import DataLakeServiceClient 
  
from functions import list_directory_contents, upload_file_to_directory 
from onelake import ONELAKE_DATA_PATH, directory_client, file_system_client 
  
csv_file_name = "products.csv" 
csv_local_path = f"./data/{csv_file_name}" 
  
docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] 
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] 
  
# Upload files to Lakehouse 
upload_file_to_directory(directory_client, csv_local_path, csv_file_name) 
 
for docx_local_path in docx_local_paths: 
	docx_file_name = docx_local_path.split("/")[-1] 
 	upload_file_to_directory(directory_client, docx_local_path, docx_file_name) 
  
# To check that the files have been uploaded, run "list_directory_contents" function to show the contents of the /ProductsData folder in our Datalake: 
print("Upload finished, Listing files: ") 
list_directory_contents(file_system_client, ONELAKE_DATA_PATH) 

运行上传脚本:

python upload_files.py

结果应该是:

Upload finished, Listing files: 
shoesticDatalake.Lakehouse/Files/ProductsData/beach-flip-flops.docx 
shoesticDatalake.Lakehouse/Files/ProductsData/classic-loafers.docx 
shoesticDatalake.Lakehouse/Files/ProductsData/products.csv 
shoesticDatalake.Lakehouse/Files/ProductsData/sport-sneakers.docx 

现在我们已经准备好文件了,让我们开始使用 Elasticsearch 分析和搜索我们的数据!

索引文档

我们将使用 ELSER 作为向量数据库的嵌入提供程序,以便我们可以运行语义查询。

我们选择 ELSER 是因为它针对 Elasticsearch 进行了优化,在域外检索方面胜过大多数竞争对手,这意味着按原样使用模型,而无需针对你自己的数据进行微调。

配置 ELSER

首先创建推理端点:

PUT _inference/sparse_embedding/onelake-inference-endpoint 
{ 
 "service": "elser", 
 "service_settings": { 
   "num_allocations": 1, 
   "num_threads": 1 
 } 

在后台加载模型时,如果你以前没有使用过 ELSER,则可能会收到 502 Bad Gateway 错误。在 Kibana 中,你可以在 “Machine Learning” > “Trained Models” 中检查模型状态。等到模型部署完成后再继续执行后续步骤。

%title插图%num

索引数据

现在,由于我们同时拥有结构化数据和非结构化数据,因此我们将在 Kibana DevTools 控制台中使用具有不同映射的两个不同索引。

对于我们的结构化销售,让我们创建以下索引:

PUT shoestic-products 
{ 
  "mappings": { 
	"properties": { 
  	"product_id": { 
    	"type": "keyword" 
      }, 
  	"product_name": { 
    	"type": "text" 
      }, 
  	"amount": { 
    	"type": "float" 
      }, 
  	"tags": { 
    	"type": "keyword" 
  	} 
	} 
  } 
} 

为了索引我们的非结构化数据(产品数据表),我们将使用:

PUT shoestic-products-descriptions 
{ 
  "mappings": { 
	"properties": { 
  	"title": { 
    	"type": "text", 
    	"analyzer": "english" 
  	}, 
  	"super_body": { 
    	"type": "semantic_text", 
    	"inference_id": "onelake-inference-endpoint" 
  	}, 
  	"body": { 
    	"type": "text", 
    	"copy_to": "super_body" 
  	} 
	} 
  } 
} 

注意:使用带有 copy_to 的字段很重要,这样还可以运行全文搜索,而不仅仅是在正文字段上运行语义搜索。

读取 OneLake 文件

在开始之前,我们需要使用这些命令(使用你自己的云 ID 和 API 密钥)初始化我们的 Elasticsearch 客户端。

创建一个名为 indexing.py 的 Python 脚本并添加以下几行:

# Importing dependencies 
import csv 
from io import StringIO 
  
from onelake import directory_client 
from elasticsearch import Elasticsearch, helpers 
  
from functions import get_csv_content, get_docx_content, get_file_by_name 
from upload_files_to_onelake import csv_file_client 
 
ELASTIC_CLUSTER_ID = "your-cloud-id" 
ELASTIC_API_KEY = "your-api-key" 
 
# Elasticsearch client 
es_client = Elasticsearch( 
	cloud_id=ELASTIC_CLUSTER_ID, 
	api_key=ELASTIC_API_KEY, 
) 
 
docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] 
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] 
  
csv_file_client = get_file_by_name("products.csv", directory_client) 
docx_files_clients = [] 
  
  
for docx_file_name in docx_files: 
	docx_files_clients.append(get_file_by_name(docx_file_name, directory_client)) 
  
  
# We use these functions to extract data from the files: 
csv_content = get_csv_content(csv_file_client) 
reader = csv.DictReader(StringIO(csv_content)) 
docx_contents = [] 
  
  
for docx_file_client in docx_files_clients: 
	docx_contents.append(get_docx_content(docx_file_client)) 
  
  
print("CSV FILE CONTENT: ", csv_content) 
print("DOCX FILE CONTENT: ", docx_contents) 
  
  
# The CSV tags are separated by commas (,). We'll turn these tags into an array: 
rows = csv_content.splitlines() 
reader = csv.DictReader(rows) 
modified_rows = [] 
  
for row in reader: 
	row["tags"] = row["tags"].replace('"', "").split(",") 
	modified_rows.append(row) 
	print(row["tags"]) 
  
# We can now index the files into Elasticsearch 
reader = modified_rows 
csv_actions = [{"_index": "shoestic-products", "_source": row} for row in reader] 
  
docx_actions = [ 
	{ 
    	"_index": "shoestic-products-descriptions", 
    	"_source": {"title": docx_file_name, "body": docx}, 
	} 
	for docx_file_name, docx in zip(docx_files, docx_contents) 
] 
  
  
helpers.bulk(es_client, csv_actions) 
print("CSV data indexed successfully.") 
helpers.bulk(es_client, docx_actions) 
print("DOCX data indexed successfully.") 

现在运行脚本:

python indexing.py

查询

在 Elasticsearch 中对文档进行索引后,我们就可以测试语义查询了。在本例中,我们将在某些产品(tag)中搜索唯一术语。我们将针对结构化数据运行关键字搜索,针对非结构化数据运行语义搜索。

1. 关键字搜索

GET shoestic-products/_search 
{ 
  "query": { 
   "term": { 
  	"tags": "summer" 
	} 
  } 
} 

结果:

"_source": { 
      	"product_id": "P-118", 
      	"product_name": "Casual Sandals", 
      	"amount": "128.22", 
      	"tags": [ 
        	"casual", 
        	"summer" 
      	] 
    	} 

2. 语义搜索:

GET shoestic-products-descriptions/_search 
{ 
  "_source": { 
	"excludes": [ 
  	"*embeddings", 
  	"*chunks" 
	] 
  }, 
  "query": { 
	"semantic": { 
  	"field": "super_body", 
 	"query": "summer" 
	} 
  } 
} 

*我们排除了嵌入和块只是为了便于阅读。

结果:

"hits": { 
	"total": { 
  	"value": 3, 
  	"relation": "eq" 
	}, 
	"max_score": 4.3853106, 
	"hits": [ 
  	{ 
    	"_index": "shoestic-products-descriptions", 
    	"_id": "P2Hj6JIBF7lnCNFTDQEA", 
    	"_score": 4.3853106, 
    	"_source": { 
      	"super_body": { 
        	"inference": { 
          	"inference_id": "onelake-inference-endpoint", 
          	"model_settings": { 
            	"task_type": "sparse_embedding" 
          	} 
        	} 
      	}, 
      	"title": "beach-flip-flops.docx", 
      	"body": "Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing in the sun." 
    	} 
  	} 
	] 
  } 

如你所见,当使用关键字搜索时,我们会得到与其中一个标签的完全匹配,相反,当我们使用语义搜索时,我们会得到与描述中的含义匹配的结果,而无需完全匹配。

结论

OneLake 使使用来自不同 Microsoft 来源的数据变得更容易,然后索引这些文档 Elasticsearch 允许我们使用高级搜索工具。在第一部分中,我们学习了如何连接到 OneLake 并在 Elasticsearch 中索引文档。在第二部分中,我们将使用 Elastic 连接器框架制作更强大的解决方案。敬请期待!

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训的时间!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在你的本地机器上试用 Elastic。

原文:Indexing OneLake data into Elasticsearch – Part 1 – Elasticsearch Labs

文章来源于互联网:将 OneLake 数据索引到 Elasticsearchtgcode – 第 1 部分

Tags: , , , ,