Elasticsearch:使用 Elasticsearch 和 Cohere 构建 RAG

2024年11月29日   |   by mebius

在我之前的文章 “将 Cohere 与 Elasticsearch 结合使用” 里,我详述了如何使用 Cohere 及 inference API 来创建 RAG 应用。鉴于 semantic_text 已经推出,我们将使用 semantic_text 及 semantic 搜索来完成之前的练习。

%title插图%num

Elasticsearch 拥有开发人员使用生成式 AI 构建下一代搜索体验所需的所有工具,并且它通过其推理 API(inference API)支持与 Cohere 的本机集成。

如果你想要使用以下工具构建,请使用 Elastic:

  • 向量数据库
  • 部署多个 ML 模型
  • 执行文本、向量和混合搜索
  • 使用过滤器、方面、聚合进行搜索
  • 应用文档和字段级安全性
  • 在本地、云或 serverless 运行(预览)

本指南使用维基百科文章数据集来设置语义搜索管道。它将涵盖:

  • 使用 Cohere 嵌入创建 Elastic 推理处理器
  • 使用嵌入创建 Elasticsearch 索引
  • 对 Elasticsearch 索引执行混合搜索并重新排名结果
  • 执行基本 RAG

要查看完整的代码示例,请参阅此笔记本。你还可以在此处找到集成指南。

:在上面所示的笔记本中,它没有使用 semantic_text 字段。在下面的展示中,我将使用最新的 semantic_text 字段来代替之前的 dense_vector 字段。

要求

  • 一个 Cohere 帐户。你可以在地址申请一个 API key。你可以在地址Login | Cohere进行申请。
  • 一个本地安装的集群。安装指令如下
  • Python 3.7 或更高版本

安装

Elasticsearch 及 Kibana

如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:

在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。特别值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下载 Elastic Stack 8.11 及以后得版本来进行安装。

在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:

%title插图%num

在上面,我们可以看到 elastic 超级用户的密码。我们记下它,并将在下面的代码中进行使用。

我们还可以在安装 Elasticsearch 目录中找到 Elasticsearch 的访问证书:

$ pwd
/Users/liuxg/elastic/elasticsearch-8.16.0/config/certs
$ ls
http.p12      http_ca.crt   transport.p12

在上面,http_ca.crt 是我们需要用来访问 Elasticsearch 的证书。

我们首先克隆已经写好的代码

git clone https://github.com/liu-xiao-guo/elasticsearch-labs

我们然后进入到该项目的根目录下:

$ pwd
/Users/liuxg/python/elasticsearch-labs/notebooks/cohere
$ ls
cohere-elasticsearch.ipynb           inference-cohere.ipynb
inference-cohere-semantic-text.ipynb

如上所示,inference-cohere-semantic-text.ipynb 就是我们今天想要工作的 notebook。

我们通过如下的命令来拷贝所需要的证书:

$ cp ~/elastic/elasticsearch-8.16.0/config/certs/http_ca.crt .
$ ls
cohere-elasticsearch.ipynb           inference-cohere-semantic-text.ipynb
http_ca.crt                          inference-cohere.ipynb

安装所需要的 python 依赖包

pip3 install elasticsearch==8.16.0 python-dotenv cohere

我们通过如下的命令来查看 Elasticsearch 客户端的版本:

$ pip3 list | grep cohere
cohere                                  5.5.8
$ pip3 list | grep elasticsearch
elasticsearch                           8.16.0

启动白金试用

在下面,我们需要使用 ELSER。这是一个白金试用的功能。我们按照如下的步骤来启动白金试用:

%title插图%num

%title插图%num

这样我们就完成了白金试用功能。

创建环境变量

为了能够使得下面的应用顺利执行,在项目当前的目录下创建如下的一个叫做 .env 的文件:

.env

export ES_ENDPOINT="localhost"
export ES_USER="elastic"
export ES_PASSWORD="uK+7WbkeXMzwk9YvP-H3"
export COHERE_API_KEY="YourCohereAPIkey"

你需要根据自己的 Elasticsearch 配置进行相应的修改。你需要在地址获得你的 COHER_API_KEY。

然后,我们在运行上面命令的 terminal 中打入如下的命令:

$ pwd
/Users/liuxg/python/elasticsearch-labs/notebooks/cohere
$ ls
cohere-elasticsearch.ipynb           inference-cohere-semantic-text.ipynb
http_ca.crt                          inference-cohere.ipynb
$ jupter notebook inference-cohere-semantic-text.ipynb

准备数据

我们通过如下的命令来活动数据集:

 wget https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jtgcodeobs_sampletgcode_data.jsonl
$ wget https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl
--2024-11-21 18:41:18--  https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1545639 (1.5M) [text/plain]
Saving to: ‘embed_jobs_sample_data.jsonl’

embed_jobs_sample_data.jso 100%[=====================================>]   1.47M  1.23MB/s    in 1.2s    

2024-11-21 18:41:21 (1.23 MB/s) - ‘embed_jobs_sample_data.jsonl’ saved [1545639/1545639]

$ ls
cohere-elasticsearch.ipynb           inference-cohere-semantic-text.ipynb
embed_jobs_sample_data.jsonl         inference-cohere.ipynb
http_ca.crt

上面的embed_jobs_sample_data.jsonl 具有如下的格式:

%title插图%num

展示

读入变量并连接到 Elasticsearch

现在我们可以实例化 Python Elasticsearch 客户端。

from elasticsearch import Elasticsearch, helpers
import cohere
import json
import requests
from dotenv import load_dotenv
import os
load_dotenv()
 
ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_ENDPOINT = os.getenv("ES_ENDPOINT")
COHERE_API_KEY = os.getenv("COHERE_API_KEY")
 
url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"
print(url)
 
client = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
print(client.info())

%title插图%num

创建推理端点

构建向量搜索索引的最大痛点之一是计算大量数据的嵌入。幸运的是,Elastic 提供了推理端点,可用于摄取管道,以便在执行批量索引操作时自动计算嵌入。

要设置用于摄取的推理管道,我们首先必须创建一个使用 Cohere 嵌入的推理端点。你需要一个 Cohere API 密钥,你可以在 Cohere 帐户的 API 密钥部分下找到它。在上面我们已经把它从 .env 文件中读出来了。

我们将创建一个使用 embed-english-v3.0 和 int8 或字节压缩来节省存储空间的推理端点。

from elasticsearch import BadRequestError

try:
    client.inference.delete_model(inference_id="cohere_embeddings")
except:
    ;

try: 
    client.inference.put(
        task_type="text_embedding",
        inference_id="cohere_embeddings",
        body={
            "service": "cohere",
            "service_settings": {
                "api_key": COHERE_API_KEY,
                "model_id": "embed-english-v3.0",
                "embedding_type": "int8",
                "similarity": "cosine"
            },
        },
    )
except BadRequestError as e:
    print(e)

%title插图%num

我们可以在 Kibana 中进行查看:

GET _inference/_all

%title插图%num

或者:

GET _inference/cohere_embeddings

%title插图%num

创建索引

必须创建目标索引的映射(包含模型将根据你的输入文本生成的嵌入的索引)。目标索引必须具有具有 semantic_text 字段类型的字段,以便索引 Cohere 模型的输出。

让我们创建一个名为 cohere-wiki-embeddings 的索引,其中包含我们需要的映射:

index_name="cohere-wiki-embeddings"

try:
    client.indices.delete(index=index_name, ignore_unavailable=True)
except:
    ;

if not client.indices.exists(index=index_name):
    client.indices.create(
        index=index_name,
        settingtgcodes={"index": {"default_pipeline": "cohere_embeddings"}},
        mappings={
            "properties": {
                "text_semantic": {
                    "type": "semantic_text",
                    "inference_id": "cohere_embeddings"
                },
                "text": {"type": "text", "copy_to": "text_semantic"},
                "wiki_id": {"type": "integer"},
                "url": {"type": "text"},
                "views": {"type": "float"},
                "langs": {"type": "integer"},
                "title": {"type": "text"},
                "paragraph_id": {"type": "integer"},
                "id": {"type": "integer"}
            }
        },
    )

我们可以到 Kibana 里查看已经创建的索引:

GET cohere-wiki-embeddings

%title插图%num

让我们注意一下该 API 调用中的几个重要参数:

  • semantic_text:字段类型使用推理端点自动生成文本内容的嵌入。
  • inference_id:指定要使用的推理端点的 ID。在此示例中,模型 ID 设置为 cohere_embeddings。
  • copy_to:指定包含推理结果的输出字段

创建摄入管道

现在,你已拥有一个推理端点和一个可用于存储嵌入的索引。下一步是创建一个摄取管道,该管道使用推理端点创建嵌入并将其存储在索引中。

client.ingest.put_pipeline(
    id="cohere_embeddings",
    description="Ingest pipeline for Cohere inference.",
    processors=[
        {
            "inference": {
                "model_id": "cohere_embeddings",
                "input_output": {
                    "input_field": "text",
                    "output_field": "text_embedding",
                },
            }
        }
    ],
)

注意:这个部署可以省去,如果我们在上面是以如下的方式来创建索引的:

index_name="cohere-wiki-embeddings"

try:
    client.indices.delete(index=index_name, ignore_unavailable=True)
except:
    ;

if not client.indices.exists(index=index_name):
    client.indices.create(
        index=index_name,
        mappings={
            "properties": {
                "text_semantic": {
                    "type": "semantic_text",
                    "inference_id": "cohere_embeddings"
                },
                "text": {"type": "text", "copy_to": "text_semantic"},
                "wiki_id": {"type": "integer"},
                "url": {"type": "text"},
                "views": {"type": "float"},
                "langs": {"type": "integer"},
                "title": {"type": "text"},
                "paragraph_id": {"type": "integer"},
                "id": {"type": "integer"}
            }
        },
    )

插入文档

让我们插入示例 wiki 数据集。你需要一个生产 Cohere 帐户来完成此步骤,否则文档摄取将因 API 请求速率限制而超时。

:在本例中,我们采用一个试用的账号来进行展示。我们只摄取少量的数据,以避免限流。

# url = "https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl"
# response = requests.get(url)

# Load the response data into a JSON object
#jsonl_data = response.content.decode('utf-8').splitlines()

import json
from elasticsearch.helpers import BulkIndexError

with open('./embed_jobs_sample_data.jsonl', 'r') as file:
    content = file.read()
 
# Split the content by new lines and parse each line as JSON
data = [json.loads(line) for line in content.strip().split("n") if line]

# We just take the very first 10 documents
data = data[:10]
print(f"Successfully loaded {len(data)} documents")

# Prepare the documents to be indexed
documents = []
for line in data:
    data_dict = line
    documents.append({
        "_index": index_name,
        "_source": data_dict,
        }
      )

# Use the bulk endpoint to index
try:
    helpers.bulk(client, documents)
except BulkIndexError as exc:
    print(f"Failed to index {len(exc.errors)} documents:")
    for error in exc.errors:
        print(error)

print("Done indexing documents into `cohere-wiki-embeddings` index!")

%title插图%num

我们可以通过 Kibana 进行查看:

GET cohere-wiki-embeddings/_search

%title插图%num

语义搜索

在使用嵌入丰富数据集后,你可以使用 Elasticsearch 提供的语义查询来查询数据。Elasticsearch 中的 semantic_text大大简化了语义搜索。详细了解 Elasticsearch 中的语义文本如何让你专注于模型和结果而不是技术细节。

query = "What are the Video categories on YouTube?"

response = client.search(
    index="cohere-wiki-embeddings",
    size=100,
    query = {
        "semantic": {
            "query": query,
             "field": "text_semantic"
        }
    }
)

raw_documents = response["hits"]["hits"]

# Display the first 10 results
for document in raw_documents[0:10]:
  print(f'Title: {document["_source"]["title"]}nText: {document["_source"]["text"]}n')

# Format the documents for ranking
documents = []
for hit in response["hits"]["hits"]:
    documents.append(hit["_source"]["text"])

%title插图%num

混合搜索

使用嵌入丰富数据集后,你可以使用混合搜索查询数据。传递语义查询,并提供查询文本和用于创建嵌入的模型。

query = "What are the Video categories on YouTube?"

response = client.search(
    index="cohere-wiki-embeddings",
    size=100,
    query={
        "bool": {
            "must": {
                "multi_match": {
                "query": query,
                "fields": ["text", "title"]
        }
            },
            "should": {
                "semantic": {
                    "query": query,
                     "field": "text_semantic"
                }
            },
        }
    }

)

raw_documents = response["hits"]["hits"]

# Display the first 10 results
for document in raw_documents[0:10]:
  print(f'Title: {document["_source"]["title"]}nText: {document["_source"]["text"]}n')

# Format the documents for ranking
documents = []
for hit in response["hits"]["hits"]:
    documents.append(hit["_source"]["text"])

%title插图%num

排名

为了有效地结合向量和 BM25 检索的结果,我们可以通过推理 API 使用 Cohere 的 Rerank 3 模型来对我们的结果进行最终、更精确的语义重新排名。

首先,使用你的 Cohere API 密钥创建一个推理端点。确保为你的端点指定一个名称,以及其中一个重新排名模型的 model_id。在此示例中,我们将使用 Rerank 3。

# Delete the inference model if it already exists
client.options(ignore_status=[404]).inference.delete(inference_id="cohere_rerank")

client.inference.put(
    task_type="rerank",
    inference_id="cohere_rerank",
    body={
        "service": "cohere",
        "service_settings":{
            "api_key": COHERE_API_KEY,
            "model_id": "rerank-english-v3.0"
           },
        "task_settings": {
            "top_n": 10,
        },
    }
)

%title插图%num

现在,你可以使用该推理端点对结果进行重新排序。在这里,我们将传入用于检索的查询,以及我们刚刚使用混合搜索检索到的文档。

推理服务将以相关性降序排列的文档列表作为响应。每个文档都有一个对应的索引(反映文档发送到推理端点时的顺序),如果 “return_documents” 任务设置为 True,则文档文本也将包含在内。

在这种情况下,我们将响应设置为 False,并根据响应中返回的索引重建输入文档。

response = client.inference.inference(
    inference_id="cohere_rerank",
    body={
        "query": query,
        "input": documents,
        "task_settings": {
            "return_documents": False
            }
        }
)

# Reconstruct the input documents based on the index provided in the rereank response
ranked_documents = []
for document in response.body["rerank"]:
  ranked_documents.append({
      "title": raw_documents[int(document["index"])]["_source"]["title"],
      "text": raw_documents[int(document["index"])]["_source"]["text"]
  })

# Print the top 10 results
for document in ranked_documents[0:10]:
  print(f"Title: {document['title']}nText: {document['text']}n")

%title插图%num

检索增强生成 – RAG

现在我们已经对结果进行了排序,我们可以使用 Cohere 的 Chat API 轻松地将其转变为 RAG 系统。传入检索到的文档以及查询,并使用 Cohere 最新的生成模型 Command R+ 查看基础响应。

首先,我们将创建 Cohere 客户端。

co = cohere.Client(COHERE_API_KEY)

接下来,我们可以轻松地从 Cohere Chat API 获得带有引文的接地生成。我们只需将用户查询和从 Elastic 检索到的文档传递给 API,然后打印出我们的接地响应。

response = co.chat(
    message=query,
    documents=ranked_documents,
    model='command-r-plus-08-2024'
)

source_documents = []
for citation in response.citations:
  for document_id in citation.document_ids:
    if document_id not in source_documents:
      source_documents.append(document_id)

print(f"Query: {query}")
print(f"Response: {response.text}")
print("Sources:")
for document in response.documents:
  if document['id'] in source_documents:
    print(f"{document['title']}: {document['text']}")

%title插图%num

就这样!使用 Cohere 和 Elastic 快速轻松地实现混合搜索和 RAG。

整个 notebook 的源码可以在地址https://github.com/liu-xiao-guo/elasticsearch-labs/blob/main/notebooks/cohere/inference-cohere-semantic-text.ipynb下载。

我们也可以在上面的步骤中省去 pipeline 的创建,代码可以在地址https://github.com/liu-xiao-guo/elasticsearch-labs/blob/main/notebooks/cohere/inference-cohere-semantic-text-without-pipeline.ipynb下载。

文章来源于互联网:Elasticsearch:使用 Elasticsearch 和 Cohere 构建 RAG