Elasticsearch:使用 Elasticsearch 和 Cohere 构建 RAG
2024年11月29日 | by mebius
在我之前的文章 “将 Cohere 与 Elasticsearch 结合使用” 里,我详述了如何使用 Cohere 及 inference API 来创建 RAG 应用。鉴于 semantic_text 已经推出,我们将使用 semantic_text 及 semantic 搜索来完成之前的练习。
Elasticsearch 拥有开发人员使用生成式 AI 构建下一代搜索体验所需的所有工具,并且它通过其推理 API(inference API)支持与 Cohere 的本机集成。
如果你想要使用以下工具构建,请使用 Elastic:
- 向量数据库
- 部署多个 ML 模型
- 执行文本、向量和混合搜索
- 使用过滤器、方面、聚合进行搜索
- 应用文档和字段级安全性
- 在本地、云或 serverless 运行(预览)
本指南使用维基百科文章数据集来设置语义搜索管道。它将涵盖:
- 使用 Cohere 嵌入创建 Elastic 推理处理器
- 使用嵌入创建 Elasticsearch 索引
- 对 Elasticsearch 索引执行混合搜索并重新排名结果
- 执行基本 RAG
要查看完整的代码示例,请参阅此笔记本。你还可以在此处找到集成指南。
注:在上面所示的笔记本中,它没有使用 semantic_text 字段。在下面的展示中,我将使用最新的 semantic_text 字段来代替之前的 dense_vector 字段。
要求
- 一个 Cohere 帐户。你可以在地址申请一个 API key。你可以在地址Login | Cohere进行申请。
- 一个本地安装的集群。安装指令如下
- Python 3.7 或更高版本
安装
Elasticsearch 及 Kibana
如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:
- 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch
- Kibana:如何在 Linux,MacOS 及 Windows上安装 Elastic 栈中的 Kibana
在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。特别值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下载 Elastic Stack 8.11 及以后得版本来进行安装。
在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:
在上面,我们可以看到 elastic 超级用户的密码。我们记下它,并将在下面的代码中进行使用。
我们还可以在安装 Elasticsearch 目录中找到 Elasticsearch 的访问证书:
$ pwd
/Users/liuxg/elastic/elasticsearch-8.16.0/config/certs
$ ls
http.p12 http_ca.crt transport.p12
在上面,http_ca.crt 是我们需要用来访问 Elasticsearch 的证书。
我们首先克隆已经写好的代码:
git clone https://github.com/liu-xiao-guo/elasticsearch-labs
我们然后进入到该项目的根目录下:
$ pwd
/Users/liuxg/python/elasticsearch-labs/notebooks/cohere
$ ls
cohere-elasticsearch.ipynb inference-cohere.ipynb
inference-cohere-semantic-text.ipynb
如上所示,inference-cohere-semantic-text.ipynb 就是我们今天想要工作的 notebook。
我们通过如下的命令来拷贝所需要的证书:
$ cp ~/elastic/elasticsearch-8.16.0/config/certs/http_ca.crt .
$ ls
cohere-elasticsearch.ipynb inference-cohere-semantic-text.ipynb
http_ca.crt inference-cohere.ipynb
安装所需要的 python 依赖包
pip3 install elasticsearch==8.16.0 python-dotenv cohere
我们通过如下的命令来查看 Elasticsearch 客户端的版本:
$ pip3 list | grep cohere
cohere 5.5.8
$ pip3 list | grep elasticsearch
elasticsearch 8.16.0
启动白金试用
在下面,我们需要使用 ELSER。这是一个白金试用的功能。我们按照如下的步骤来启动白金试用:
这样我们就完成了白金试用功能。
创建环境变量
为了能够使得下面的应用顺利执行,在项目当前的目录下创建如下的一个叫做 .env 的文件:
.env
export ES_ENDPOINT="localhost"
export ES_USER="elastic"
export ES_PASSWORD="uK+7WbkeXMzwk9YvP-H3"
export COHERE_API_KEY="YourCohereAPIkey"
你需要根据自己的 Elasticsearch 配置进行相应的修改。你需要在地址获得你的 COHER_API_KEY。
然后,我们在运行上面命令的 terminal 中打入如下的命令:
$ pwd
/Users/liuxg/python/elasticsearch-labs/notebooks/cohere
$ ls
cohere-elasticsearch.ipynb inference-cohere-semantic-text.ipynb
http_ca.crt inference-cohere.ipynb
$ jupter notebook inference-cohere-semantic-text.ipynb
准备数据
我们通过如下的命令来活动数据集:
wget https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jtgcodeobs_sampletgcode_data.jsonl
$ wget https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl
--2024-11-21 18:41:18-- https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1545639 (1.5M) [text/plain]
Saving to: ‘embed_jobs_sample_data.jsonl’
embed_jobs_sample_data.jso 100%[=====================================>] 1.47M 1.23MB/s in 1.2s
2024-11-21 18:41:21 (1.23 MB/s) - ‘embed_jobs_sample_data.jsonl’ saved [1545639/1545639]
$ ls
cohere-elasticsearch.ipynb inference-cohere-semantic-text.ipynb
embed_jobs_sample_data.jsonl inference-cohere.ipynb
http_ca.crt
上面的embed_jobs_sample_data.jsonl 具有如下的格式:
展示
读入变量并连接到 Elasticsearch
现在我们可以实例化 Python Elasticsearch 客户端。
from elasticsearch import Elasticsearch, helpers
import cohere
import json
import requests
from dotenv import load_dotenv
import os
load_dotenv()
ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_ENDPOINT = os.getenv("ES_ENDPOINT")
COHERE_API_KEY = os.getenv("COHERE_API_KEY")
url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"
print(url)
client = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
print(client.info())
创建推理端点
构建向量搜索索引的最大痛点之一是计算大量数据的嵌入。幸运的是,Elastic 提供了推理端点,可用于摄取管道,以便在执行批量索引操作时自动计算嵌入。
要设置用于摄取的推理管道,我们首先必须创建一个使用 Cohere 嵌入的推理端点。你需要一个 Cohere API 密钥,你可以在 Cohere 帐户的 API 密钥部分下找到它。在上面我们已经把它从 .env 文件中读出来了。
我们将创建一个使用 embed-english-v3.0 和 int8 或字节压缩来节省存储空间的推理端点。
from elasticsearch import BadRequestError
try:
client.inference.delete_model(inference_id="cohere_embeddings")
except:
;
try:
client.inference.put(
task_type="text_embedding",
inference_id="cohere_embeddings",
body={
"service": "cohere",
"service_settings": {
"api_key": COHERE_API_KEY,
"model_id": "embed-english-v3.0",
"embedding_type": "int8",
"similarity": "cosine"
},
},
)
except BadRequestError as e:
print(e)
我们可以在 Kibana 中进行查看:
GET _inference/_all
或者:
GET _inference/cohere_embeddings
创建索引
必须创建目标索引的映射(包含模型将根据你的输入文本生成的嵌入的索引)。目标索引必须具有具有 semantic_text 字段类型的字段,以便索引 Cohere 模型的输出。
让我们创建一个名为 cohere-wiki-embeddings 的索引,其中包含我们需要的映射:
index_name="cohere-wiki-embeddings"
try:
client.indices.delete(index=index_name, ignore_unavailable=True)
except:
;
if not client.indices.exists(index=index_name):
client.indices.create(
index=index_name,
settingtgcodes={"index": {"default_pipeline": "cohere_embeddings"}},
mappings={
"properties": {
"text_semantic": {
"type": "semantic_text",
"inference_id": "cohere_embeddings"
},
"text": {"type": "text", "copy_to": "text_semantic"},
"wiki_id": {"type": "integer"},
"url": {"type": "text"},
"views": {"type": "float"},
"langs": {"type": "integer"},
"title": {"type": "text"},
"paragraph_id": {"type": "integer"},
"id": {"type": "integer"}
}
},
)
我们可以到 Kibana 里查看已经创建的索引:
GET cohere-wiki-embeddings
让我们注意一下该 API 调用中的几个重要参数:
- semantic_text:字段类型使用推理端点自动生成文本内容的嵌入。
- inference_id:指定要使用的推理端点的 ID。在此示例中,模型 ID 设置为 cohere_embeddings。
- copy_to:指定包含推理结果的输出字段
创建摄入管道
现在,你已拥有一个推理端点和一个可用于存储嵌入的索引。下一步是创建一个摄取管道,该管道使用推理端点创建嵌入并将其存储在索引中。
client.ingest.put_pipeline(
id="cohere_embeddings",
description="Ingest pipeline for Cohere inference.",
processors=[
{
"inference": {
"model_id": "cohere_embeddings",
"input_output": {
"input_field": "text",
"output_field": "text_embedding",
},
}
}
],
)
注意:这个部署可以省去,如果我们在上面是以如下的方式来创建索引的:
index_name="cohere-wiki-embeddings"
try:
client.indices.delete(index=index_name, ignore_unavailable=True)
except:
;
if not client.indices.exists(index=index_name):
client.indices.create(
index=index_name,
mappings={
"properties": {
"text_semantic": {
"type": "semantic_text",
"inference_id": "cohere_embeddings"
},
"text": {"type": "text", "copy_to": "text_semantic"},
"wiki_id": {"type": "integer"},
"url": {"type": "text"},
"views": {"type": "float"},
"langs": {"type": "integer"},
"title": {"type": "text"},
"paragraph_id": {"type": "integer"},
"id": {"type": "integer"}
}
},
)
插入文档
让我们插入示例 wiki 数据集。你需要一个生产 Cohere 帐户来完成此步骤,否则文档摄取将因 API 请求速率限制而超时。
注:在本例中,我们采用一个试用的账号来进行展示。我们只摄取少量的数据,以避免限流。
# url = "https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl"
# response = requests.get(url)
# Load the response data into a JSON object
#jsonl_data = response.content.decode('utf-8').splitlines()
import json
from elasticsearch.helpers import BulkIndexError
with open('./embed_jobs_sample_data.jsonl', 'r') as file:
content = file.read()
# Split the content by new lines and parse each line as JSON
data = [json.loads(line) for line in content.strip().split("n") if line]
# We just take the very first 10 documents
data = data[:10]
print(f"Successfully loaded {len(data)} documents")
# Prepare the documents to be indexed
documents = []
for line in data:
data_dict = line
documents.append({
"_index": index_name,
"_source": data_dict,
}
)
# Use the bulk endpoint to index
try:
helpers.bulk(client, documents)
except BulkIndexError as exc:
print(f"Failed to index {len(exc.errors)} documents:")
for error in exc.errors:
print(error)
print("Done indexing documents into `cohere-wiki-embeddings` index!")
我们可以通过 Kibana 进行查看:
GET cohere-wiki-embeddings/_search
语义搜索
在使用嵌入丰富数据集后,你可以使用 Elasticsearch 提供的语义查询来查询数据。Elasticsearch 中的 semantic_text大大简化了语义搜索。详细了解 Elasticsearch 中的语义文本如何让你专注于模型和结果而不是技术细节。
query = "What are the Video categories on YouTube?"
response = client.search(
index="cohere-wiki-embeddings",
size=100,
query = {
"semantic": {
"query": query,
"field": "text_semantic"
}
}
)
raw_documents = response["hits"]["hits"]
# Display the first 10 results
for document in raw_documents[0:10]:
print(f'Title: {document["_source"]["title"]}nText: {document["_source"]["text"]}n')
# Format the documents for ranking
documents = []
for hit in response["hits"]["hits"]:
documents.append(hit["_source"]["text"])
混合搜索
使用嵌入丰富数据集后,你可以使用混合搜索查询数据。传递语义查询,并提供查询文本和用于创建嵌入的模型。
query = "What are the Video categories on YouTube?"
response = client.search(
index="cohere-wiki-embeddings",
size=100,
query={
"bool": {
"must": {
"multi_match": {
"query": query,
"fields": ["text", "title"]
}
},
"should": {
"semantic": {
"query": query,
"field": "text_semantic"
}
},
}
}
)
raw_documents = response["hits"]["hits"]
# Display the first 10 results
for document in raw_documents[0:10]:
print(f'Title: {document["_source"]["title"]}nText: {document["_source"]["text"]}n')
# Format the documents for ranking
documents = []
for hit in response["hits"]["hits"]:
documents.append(hit["_source"]["text"])
排名
为了有效地结合向量和 BM25 检索的结果,我们可以通过推理 API 使用 Cohere 的 Rerank 3 模型来对我们的结果进行最终、更精确的语义重新排名。
首先,使用你的 Cohere API 密钥创建一个推理端点。确保为你的端点指定一个名称,以及其中一个重新排名模型的 model_id。在此示例中,我们将使用 Rerank 3。
# Delete the inference model if it already exists
client.options(ignore_status=[404]).inference.delete(inference_id="cohere_rerank")
client.inference.put(
task_type="rerank",
inference_id="cohere_rerank",
body={
"service": "cohere",
"service_settings":{
"api_key": COHERE_API_KEY,
"model_id": "rerank-english-v3.0"
},
"task_settings": {
"top_n": 10,
},
}
)
现在,你可以使用该推理端点对结果进行重新排序。在这里,我们将传入用于检索的查询,以及我们刚刚使用混合搜索检索到的文档。
推理服务将以相关性降序排列的文档列表作为响应。每个文档都有一个对应的索引(反映文档发送到推理端点时的顺序),如果 “return_documents” 任务设置为 True,则文档文本也将包含在内。
在这种情况下,我们将响应设置为 False,并根据响应中返回的索引重建输入文档。
response = client.inference.inference(
inference_id="cohere_rerank",
body={
"query": query,
"input": documents,
"task_settings": {
"return_documents": False
}
}
)
# Reconstruct the input documents based on the index provided in the rereank response
ranked_documents = []
for document in response.body["rerank"]:
ranked_documents.append({
"title": raw_documents[int(document["index"])]["_source"]["title"],
"text": raw_documents[int(document["index"])]["_source"]["text"]
})
# Print the top 10 results
for document in ranked_documents[0:10]:
print(f"Title: {document['title']}nText: {document['text']}n")
检索增强生成 – RAG
现在我们已经对结果进行了排序,我们可以使用 Cohere 的 Chat API 轻松地将其转变为 RAG 系统。传入检索到的文档以及查询,并使用 Cohere 最新的生成模型 Command R+ 查看基础响应。
首先,我们将创建 Cohere 客户端。
co = cohere.Client(COHERE_API_KEY)
接下来,我们可以轻松地从 Cohere Chat API 获得带有引文的接地生成。我们只需将用户查询和从 Elastic 检索到的文档传递给 API,然后打印出我们的接地响应。
response = co.chat(
message=query,
documents=ranked_documents,
model='command-r-plus-08-2024'
)
source_documents = []
for citation in response.citations:
for document_id in citation.document_ids:
if document_id not in source_documents:
source_documents.append(document_id)
print(f"Query: {query}")
print(f"Response: {response.text}")
print("Sources:")
for document in response.documents:
if document['id'] in source_documents:
print(f"{document['title']}: {document['text']}")
就这样!使用 Cohere 和 Elastic 快速轻松地实现混合搜索和 RAG。
整个 notebook 的源码可以在地址https://github.com/liu-xiao-guo/elasticsearch-labs/blob/main/notebooks/cohere/inference-cohere-semantic-text.ipynb下载。
我们也可以在上面的步骤中省去 pipeline 的创建,代码可以在地址https://github.com/liu-xiao-guo/elasticsearch-labs/blob/main/notebooks/cohere/inference-cohere-semantic-text-without-pipeline.ipynb下载。