使用 Elasticsearch-DSL Python 客户端简化向量嵌入

2024年9月5日   |   by mebius

作者:来自 ElasticMiguel Grinberg

%title插图%num

在本文中,我们将介绍 Python 版 Elasticsearch-DSL 客户端,重点介绍它如何简化构建向量搜索解决方案的任务。

本文附带的代码实现了一个名言数据库。它包括一个使用 FastAPI Web 框架用 Python 编写的后端,以及一个用 TypeScriptReact 编写的前端。关于向量搜索,此应用程序演示了如何:

  • 使用 Docker 运行本地 Elasticsearch 服务,
  • 高效地批量提取大量文档,
  • 在提取文档时为其生成向量嵌入,
  • 利用 GPU 的强大功能通过并行化加速向量嵌入的生成,
  • 使用近似 kNN 算法运行向量搜索查询,
  • 聚合向量搜索的结果,
  • 将向量搜索结果与标准匹配 (BM25) 查询的结果进行比较。

你可以在上面看到该应用程序的屏幕截图。在本文中,你将找到有关提取和搜索功能如何工作的详细说明。然后,你可以选择在自己的计算机上安装并运行代码进行实验和学习!

什么是 Python 版 Elasticsearch-DSL 客户端?

Elasticsearch-DSL 有时被称为 “高级” Python 客户端,它提供对 Elasticsearch 数据库的惯用(或 “Pythonic”)访问,而官方(或 “低级”)Python 客户端则提供对 Elasticsearch 全部功能和端点的直接访问。

使用 Elasticsearch-DSL 时,Elasticsearch 索引的结构(或 “mapping – 映射”)被定义为类,其语法类似于 Python 数据类的语法。存储在这些索引中的文档由这些类的实例表示。在 Python 对象和 Elasticsearch 文档之间进行映射所需的所有转换都是自动且透明地执行的,从而产生简单且惯用的应用程序代码。

要将 Elasticsearch-DSL 添加到 Python 项目中,你可以使用 pip 安装它:

pip install elasticsearch-dsl

如果你的项目是异步的,则需要安装其他依赖项,因此在这种情况下请使用以下命令:

pip install "elasticsearch-dsl[async]"

索引定义

如上所述,使用 Elasticsearch-DSL,Elasticsearch 索引的结构被定义为 Python 类。本文中介绍的示例应用程序使用名言数据集,其中包含以下字段:

  • quote:名言文本,字符串
  • author:作者姓名,字符串
  • tags:适用于名言的标签名称列表,每个标签名称都是字符串

作为此应用程序的一部分,我们将添加一个附加字段,即我们将用于搜索名言的向量嵌入:

  • embedding:表示名言向量嵌入的浮点数列表

让我们编写一个初始文档类来描述我们的名言索引:

import elasticsearch_dsl as dsl

class QuoteDoc(dsl.AsyncDocument):
    quote: str
    author: str
    tags: list[str]
    embedding: list[float]

    class Index:
        name = 'quotes'

作为 QuoteDoc 类的基类使用的 AsyncDocument 类实现了将该类连接到 Elasticsearch 索引的所有功能。之所以选择异步文档基类,是因为本示例使用了 FastAPI Web 框架,该框架也是异步的。对于不使用异步 Python 的项目,在声明文档类时必须使用 Document 基类。

Index 内部类中给出的 name 属性定义了将与此类文档一起使用的 Elasticsearch 索引的名称。

如果你以前使用过 Python 数据类,你可能会发现字段的定义方式非常熟悉,每个字段都被赋予一个 Python 类型提示。这些 Python 类型被映射到最接近的 Elasticsearch 类型,因此例如,在 str 的情况下,Elasticsearch 索引中的相应字段将被赋予类型 text,这是用于需要索引以进行全文搜索的文本的标准类型,而 float 则被映射到 Elasticsearch 端同名的 float。

虽然保留 quote 字段原样很有用,这样我们就可以将其用于向量和全文搜索,但 author 和 tags 字段实际上并不需要与全文搜索相关的所有额外工作。这些字段的最佳 Elasticsearch 类型是 keyword,它只存储文本,而不进行任何索引。同样,嵌入字段不仅仅是一个简单的浮点数列表,我们将使用它进行向量搜索,这是与 Elastitgcodecsearch 中的 dense_vector类型相关的行为。

要为字段分配类型覆盖,我们使用 mapped_field() 函数添加赋值,如下方 QuoteDoc 类的改进版本所示:

class QuoteDoc(dsl.AsyncDocument):
    quote: str
    author: str = dsl.mapped_field(dsl.Keyword())
    tags: list[str] = dsl.mapped_field(dsl.Keyword())
    embedding: list[float] = dsl.mapped_field(dsl.DenseVector(), init=False)

    class Index:
        name = 'quotes'

正如你在更新版本中看到的,elasticsearch_dsl 包包含 Keyword 和 DenseVector 等类,用于表示所有原生 Elasticsearch 字段类型。

你是否注意到嵌入字段的新定义中给出了 init=False 参数?如果你熟悉 Python 数据类,你可能会认出 init 是数据类 field() 函数中可用的选项之一,用于指示应从类实例的构造函数中省略给定属性。这里的行为是相同的,这意味着在创建 QuoteDoc 实例时,不应给出此参数。

如果向量嵌入不会传递给文档构造函数,那么将如何生成它们?Elasticsearch-DSL 始终在所有文档中调用 clean() 方法,然后再序列化它们并将它们发送到 Elasticsearch。此方法是一个方便的入口点,应用程序可以在其中添加任何自定义字段处理逻辑。例如,可以在此方法中添加可选或自动生成的字段。这是 QuoteDoc 文档类的最终版本,包括生成嵌入的逻辑:

from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")

class QuoteDoc(dsl.AsyncDocument):
    quote: str
    author: str = dsl.mapped_field(dsl.Keyword())
    tags: list[str] = dsl.mapped_field(dsl.Keyword())
    embedding: list[float] = dsl.mapped_field(dsl.DenseVector(), init=False)

    class Index:
        name = 'quotes'

    def clean(self):
        if not self.embedding:
            self.embedding = model.encode(self.quote).tolist()

对于此示例,我们将使用来自 SentenceTransformers 模型的嵌入。这些嵌入很容易在本地生成,而且开源且免费,在实验时使用起来很方便。all-MiniLM-L6-v2 模型是一种很好的通用英语文本嵌入模型。还有许多其他模型也与 SentenceTransformers 框架兼容,因此如果你愿意,可以随意使用其他模型。

clean() 方法也可用于更高级的用例。例如,在处理大段文本时,通常会将文本拆分为较小的块,然后为每个块生成嵌入。Elasticsearch 通过嵌套对象来适应这种用例。如果你想查看实现此类解决方案的高级示例,请查看 Elasticsearch-DSL 存储库中的向量示例。

文档提取

有了索引结构,我们现在可以创建索引了。这是通过 init() 类方法完成的:

async def ingest_quotes():
    await QuoteDoc.init()

在许多情况下,删除先前存在的索引很有用,以确保摄取过程从干净的起点开始。这可以使用 _index 类属性来完成,该属性提供对 Elasticsearch 索引的访问,以及其 exist() 和 delete() 方法:

async def ingest_quotes():
    if await QuoteDoc._index.exists():
        await QuoteDoc._index.delete()
    await QuoteDoc.init()

示例应用程序使用的示例数据集是近 37,000 条名言的集合。它以 CSV 文件的形式提供,其中包含引文、作者和标签列。标签以逗号分隔的字符串形式提供。该数据集可从示例 GitHub 存储库下载

要提取此数据集中包含的数据,可以使用 Python 的 csv 模块:

import csv

async def ingest_quotes():
    if await QuoteDoc._index.exists():
        await QuoteDoc._index.delete()
    await QuoteDoc.init()

    with open('quotes.csv') as f:
        reader = csv.DictReader(f)
        for row in reader:
            q = QuoteDoc(quote=row['quote'], author=row['author'],
                         tags=row['tags'].split(','))
            await q.save()

csv.DictReader 类创建一个 CSV 文件导入器,它为数据文件中的每一行返回一个字典。对于每一行,我们创建一个 QuoteDoc 实例并在构造函数中传递 quote、author 和 tags。对于 tags,从 CSV 文件读取的字符串必须拆分为一个列表,这就是它将如何存储在 Elasticsearch 索引中。

要将文档写入索引,请调用 save() 方法。此方法将调用文档的 clean() 方法,该方法又将为 quote 生成向量嵌入。

启动 Elasticsearch 实例

在执行上述摄取脚本之前,你需要访问正在运行的 Elasticsearch 实例。到目前为止,最简单(也是 100% 免费)的方法是使用 Docker 容器。

要在你的计算机上启动单节点 Elasticsearch 服务,首先请确保 Docker 正在运行,然后执行以下命令:

docker run -p 127.0.0.1:9200:9200 -d --name elasticsearch 
  -e "discovery.type=single-node" 
  -e "xpack.security.enabled=false" 
  -e "xpack.license.self_generated.type=basic" 
  -v "./data:/usr/share/elasticsearch/data" 
  docker.elastic.co/elasticsearch/elasticsearch:8.15.0

要确保你运行的是最新和最佳版本,请打开发行说明页面以了解当前版本,然后替换上述命令最后一行中的版本号。

上述命令中的 -v 选项设置了本地系统中名为 data 的目录与 Elasticsearch 容器中的数据目录之间的映射。Elasticsearch 使用的所有数据文件都将保存在此目录中,这样如果你需要重新启动容器,就不会丢失任何数据。如果你不想将数据文件存储在计算机中,则可以删除 -v 行,数据将暂时存储在容器中。

请注意,使用此方法部署 Elasticsearch 仅适用于本地实验。如果你打算在生产服务器上部署 Elasticsearch,请考虑使用我们的 Docker Compose 上的 ElasticsearchKubernetes 上的 Elasticsearch 指南。

连接到 Elasticsearch

提取脚本需要知道如何连接到 Elasticsearch。如果你正在运行 Docker 容器(如上一节所示),请在导入和 QuoteDoc 类的定义之间添加以下行:

dsl.tgcodeasync_connections.create_connection(hosts=['http://localhost:9200'])

要完成脚本,应调用 ingest_quotes() 函数。在源文件底部添加以下代码片段:

if __name__ == '__main__':
    asyncio.run(ingest_quotes())

asyncio.run() 函数将启动异步应用程序。如果你的应用程序不是异步的,那么你只需直接调用 ingest 函数即可。

为方便起见,你可以在下面找到到目前为止的脚本的完整代码。你可以将此文件另存为 search.py​​。你可以在此处找到此文件的示例。

import asyncio
import csv
import elasticsearch_dsl as dsl
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")
dsl.async_connections.create_connection(hosts=['http://localhost:9200'], serializer=OrjsonSerializer())


class QuoteDoc(dsl.AsyncDocument):
    quote: str
    author: str = dsl.mapped_field(dsl.Keyword())
    tags: list[str] = dsl.mapped_field(dsl.Keyword())
    embedding: list[float] = dsl.mapped_field(dsl.DenseVector(), init=False)

    class Index:
        name = 'quotes'

    def clean(self):
        if not self.embedding:
            self.embedding = model.encode(self.quote).tolist()

async def ingest_quotes():
    if await QuoteDoc._index.exists():
        await QuoteDoc._index.delete()
    await QuoteDoc.init()

    with open('quotes.csv') as f:
        reader = csv.DictReader(f)
        for row in reader:
            q = QuoteDoc(quote=row['quote'], author=row['author'],
                         tags=row['tags'].split(','))
            await q.save()

if __name__ == '__main__':
    asyncio.run(ingest_quotes())

使用你选择的工具为你的项目创建一个虚拟环境,然后在其上安装依赖项:

pip install "elasticsearch-dsl[async]" sentence-transformers

确保当前目录中有 quotes.csv 文件,然后通过运行脚本开始提取:

python search.py

该脚本不会打印任何内容,因此它将运行一段时间,将 CSV 文件中的引文添加到你的 Elasticsearch 索引中。该文件有大约 37,000 条引文,因此预计该过程将运行几分钟。

幸运的是,你不需要等待那么长时间。如果你启动脚本并且没有出现错误,则确认一切正常。你可以按 Ctrl-C 停止它并继续阅读以了解摄取性能。

性能调优第 1 部分:批量处理

如果你的数据集很小,那么上述摄取解决方案就可以正常工作,并且它的优点是代码简单且易于理解。

但是,对于较大的摄取作业,必须牺牲代码清晰度并关注性能,因此让我们看看可以在此应用程序中进行哪些优化。

首先,要评估性能,我们需要能够衡量现有解决方案的性能。下面是更新后的 ingest_quotes() 函数,它现在每摄取 100 个文档调用 ingest_progress() 以显示已摄取的文档数量以及每秒的平均文档数量。

from time import time

# ...

def ingest_progress(count, start):
    elapsed = time() - start
    print(f'rIngested {count} quotes. ({count / elapsed:.0f}/sec)', end='')

async def ingest_quotes():
    if await QuoteDoc._index.exists():
        await QuoteDoc._index.delete()
    await QuoteDoc.init()

    with open('quotes.csv') as f:
        reader = csv.DictReader(f)
        count = 0
        start = time()
        for row in reader:
            q = QuoteDoc(quote=row['quote'], author=row['author'],
                         tags=row['tags'].split(','))
            await q.save()
            count += 1
            if count % 100 == 0:
                ingest_progress(count, start)
        ingest_progress(count, start)

# ...

此版本的采集比上一个版本更好,因为它会打印定期的状态更新。如果让脚本运行一段时间,你可能会看到类似下面的输出:

❯ python search.py
Ingested 4900 quotes. (97/sec)

数据文件包含近 37,000 条引文,因此现在你可以很好地了解摄取需要多长时间。假设整个摄取作业中平均每秒摄取 97 个文档,则摄取整个数据集应该需要不到 7 分钟的时间。你可以按 Ctrl-C 停止此摄取过程,目前无需让它运行完成。

Elasticsearch 提供了一个非常灵活的批量摄取功能,该功能在 Elasticsearch-DSL 包的 bulk() 方法中可用。无需保存每个文档,可以将整个导入循环移到生成器函数中,该函数作为参数提供给 bulk() 方法:

async def ingest_quotes():
    if await QuoteDoc._index.exists():
        await QuoteDoc._index.delete()
    await QuoteDoc.init()

    async def get_next_quote():
        with open('quotes.csv') as f:
            reader = csv.DictReader(f)
            count = 0
            start = time()
            for row in reader:
                q = QuoteDoc(quote=row['quote'], author=row['author'],
                             tags=row['tags'].split(','))
                yield q
                count += 1
                if count % 100tgcode == 0:
                    ingest_progress(count, start)
            ingest_progress(count, start)

    await QuoteDoc.bulk(get_next_quote())

这里 get_next_quote() 内部生成器函数生成 QuoteDoc 实例。QuoteDoc.bulk() 方法将运行生成器并向 Elasticsearch 发出批量更新。通过此更改,你可以期待看到速度略有提升:

❯ python s.py
Ingested 5500 quotes. (108/sec)

另一个小改进是,Elasticsearch 客户端使用的 JSON 序列化器可以更改为 orjson 库,其性能比 Python 自己的更好:

from elasticsearch import OrjsonSerializer
# ...

dsl.async_connections.create_connection(hosts=['http://localhost:9200'],
                                        serializer=OrjsonSerializer())

# ...

这应该会带来另一个小的性能改进:

❯ python s.py
Ingested 5100 quotes. (111/sec)

性能调优第 2 部分:GPU 加速嵌入

你在上一节中已经看到,通过批量处理摄取请求,我们获得了一些适度的性能改进。但是,虽然摄取请求现在被分组,但嵌入仍然在 QuoteDoc 类的 clean() 方法中逐个生成。

有没有办法优化嵌入生成?SentenceTransformers 模型使用 PyTorch,如果有 GPU,它会使用 GPU。但是嵌入是单独生成的,这不会导致 GPU 硬件的最佳利用。GPU 非常擅长并行化,因此我们可以重新组织摄取函数以批量生成嵌入。我们为此付出的代价再次是代码复杂性的增加。

因此,我们将停止使用 clean() 方法来生成文档嵌入,而是将 QuoteDoc 实例累积在一个列表中,一旦达到一个好的数量,我们将在一次操作中为所有实例生成嵌入。

让我们首先编写一个辅助函数,为 QuoteDoc 实例列表生成嵌入:

def embed_quotes(quotes):
    embeddings = model.encode([q.quote for q in quotes])
    for q, e in zip(quotes, embeddings):
        q.embedding = e.tolist()

请注意,现在 model.encode() 方法被赋予要嵌入的引语列表,而不是单个引语。当输入参数是列表时,模型会为每个列表元素生成一个嵌入。该方法接受一个可选的 batch_size 参数(上例中未使用),默认值为 32,可用于控制发送到模型进行计算的每批样本的大小。根据 GPU 硬件的不同,你可能会发现此参数的不同值有助于将性能调整到最佳状态。生成嵌入后,使用 for 循环将它们分配给每个 quote。

现在可以重构 ingest 函数以累积引语并使用辅助函数生成嵌入:

async def ingest_quotes():
    if await QuoteDoc._index.exists():
        await QuoteDoc._index.delete()
    await QuoteDoc.init()

    async def get_next_quote():
        quotes = []
        with open('quotes.csv') as f:
            reader = csv.DictReader(f)
            count = 0
            start = time()
            for row in reader:
                q = QuoteDoc(quote=row['quote'], author=row['author'],
                             tags=row['tags'].split(','))
                quotes.append(q)
                if len(quotes) == 512:
                    embed_quotes(quotes)
                    for q in quotes:
                        yield q
                    count += len(quotes)
                    ingest_progress(count, start)
                    quotes = []
            if len(quotes) > 0:
                embed_quotes(quotes)
                for q in quotes:
                    yield q
            ingest_progress(count, start)

在此版本的 ingest_quotes() 中,每个 QuoteDoc 实例都会添加到 quotes 列表中,当积累了 512 个元素时,上面添加的 embed_quotes() 函数用于更有效地生成嵌入。一旦对象有了嵌入,就会产生它们,以便 Elasticsearch-DSL 中的 bulk() 方法可以像以前一样将它们添加到索引中。

512 这个数字有什么意义?没有任何意义。我们知道该模型使用 32 的批处理大小,因此积累至少那么多文档是有意义的。从 32 开始,你可以尝试更大的 2 的幂是否能提供更好的性能。凭借我可用的硬件,我发现 512 可以提供最佳性能。

以下是使用批处理嵌入运行的示例:

❯ python search.py
Ingested 36864 quotes. (481/sec)

现在,提取过程运行得更快,整个数据集提取大约需要 1 分 16 秒。

如果你决定尝试优化提取,我们鼓励你尝试不同的选项,看看哪种方式最适合你的硬件。

查询索引

如果你一直在关注,那么你现在有一个名为 quotes 的 Elasticsearch 索引,其中包含大约 37K 条名言,每条名言都有一个可搜索的向量嵌入。现在是时候学习如何查询此索引了。

使用 Elasticsearch-DSL 时,文档类会从其 search() 方法返回一个搜索对象:

s = QuoteDoc.search()

搜索对象有大量方法可以映射到 Elasticsearch queryDSL 中的查询选项。

可以发出的最简单的查询是匹配所有查询,它返回所有元素。使用 Elasticsearch-DSL 使用的基于类的方法,运行查询的方法如下:

s = QuoteDoc.search()
s = s.query(dsl.query.MatchAll())
async for q in s:
    print(q.quote)

这显然会打印索引中存储的整个报价列表,最多 10,000 个,这是 Elasticsearch 默认返回的最大结果数。

在许多情况下,请求结果的子集很有用。搜索对象为此使用 Python 样式切片。以下是如何仅请求前 25 个结果:

async for q in s[:25]:
    print(q.quote)

以下是如何请求第二页结果(每页 25 个结果):

async for q in s[25:50]:
    print(q.quote)

Elasticsearch 提供近似和精确向量搜索查询,也称为 k 最近邻 (kNN) 查询。要使用近似 k 最近邻算法运行向量搜索查询,应使用 Knn 查询:

s = QuoteDoc.search()
s = s.query(dsl.query.Knn(field=QuoteDoc.embedding, query_vector=model.encode(q).tolist()))

Knn 查询类接受存储嵌入和搜索向量的字段作为参数。在上面的代码片段中,变量 q 包含用户输入的搜索文本。

如果你更喜欢运行常规全文搜索,则可以使用 Match 查询类:

s = QuoteDoc.search()
s = s.query(dsl.query.Match(quote=q))

过滤器

使用 Elasticsearch 作为向量数据库的最重要的好处之一是它是一个强大的数据库系统,并且你可以从数据库中获得的所有选项都可以与你的向量搜索查询完美集成。

过滤器就是一个很好的例子。著名的引语数据库存储了每个引语的标签列表,因此将查询限制为具有特定标签的引语是很自然的。

给定存储在 tags 变量中的标签过滤器列表,以下代码片段使用 “terms” 过滤器将搜索对象配置为仅返回包含给定标签的结果:

for tag in tags:
    s = s.filter(dsl.query.Terms(tags=[tag]))

聚合

与向量搜索完全集成的有用数据库功能的另一个示例是聚合。给定一个查询,Elasticsearch 可以聚合 tags 并提供每个 tag 的 quotes 计数。

下一个代码片段显示如何将 terms 聚合添加到现有查询,这将返回结果中引用最多的 100 个标签:

s.aggs.bucket('tags', dsl.aggs.Terms(field=QuoteDoc.tags, size=100))

回想一下,tags 字段是用 Keyword() 类型声明的,这意味着标签将按原样存储在索引中,而无需进行任何处理。这是 Terms 聚合所必需的,它将计算结果中每个 tag 的出现次数。

完整的查询示例

你已经看到了一些独立的查询示例。在本节中,你可以看到如何将它们全部集成到示例应用程序中执行查询的函数中。

下面显示的 search_quotes() 函数接受查询字符串 q、过滤器标签列表和 use_knn 标志,以在 kNN 或全文搜索查询之间进行选择。它还接受开始和大小分页参数。

该函数根据输入参数决定发出上面看到的三个查询中的哪一个。如果 q 为空,则它会选择 “match all” 查询,在任何其他情况下,它会根据 use_knn 标志选择 kNN 或匹配查询,用户可以通过应用程序用户界面中的复选框控制该标志。

该函数以元组形式返回三个结果:

  • 搜索结果的 QuoteDoc 实例列表,
  • 标签聚合作为元组列表,每个元组包含标签名称和文档计数,
  • 结果总数,可用于分页查询

以下是该函数的完整代码:

async def search_quotes(q, tags, use_knn=True, start=0, size=25):
    s = QuoteDoc.search()
    if q == '':
        s = s.query(dsl.query.MatchAll())
    elif use_knn:
        s = s.query(dsl.query.Knn(field=QuoteDoc.embedding, query_vector=model.encode(q).tolist()))
    else:
        s = s.query(dsl.query.Match(quote=q))
    for tag in tags:
        s = s.filter(dsl.query.Terms(tags=[tag]))
    s.aggs.bucket('tags', dsl.aggs.Terms(field=QuoteDoc.tags, size=100))
    r = await s[start:start + size].execute()
    tags = [(tag.key, tag.doc_count) for tag in r.aggs.tags.buckets]
    return r.hits, tags, r['hits'].total.value

为了能够同时访问搜索结果和聚合结果,我们现在通过 execute() 方法显式发出请求,并将响应存储在 r 中。响应对象的 hits 属性包含实际的搜索结果,而 aggs 属性提供对聚合的访问。聚合结果的提供格式在术语聚合文档中进行了描述。

结论

完整的引文示例可在 GitHub 存储库中找到,你可以在计算机上安装并运行该存储库。按照 README.md 文件中的说明进行设置。

欢迎你使用此示例试验向量嵌入和 Elasticsearch!

准备好自己尝试了吗?开始免费试用
Elasticsearch 集成了 LangChain、Cohere 等工具。加入我们的高级语义搜索网络研讨会,构建你的下一个 GenAI 应用程序!

原文:Vector embeddings made simple with the Elasticsearch-DSL client for Python — Search Labs

文章来源于互联网:使用 Elasticsearch-DSL Python 客户端简化向量嵌入

相关推荐: Elasticsearch:介绍 retrievers – 搜索一切事物

tgcode 作者:来自 ElasticJeff Vestal, Jack Conradson 在 8.14 中,Elastic 在 Elasticsearch 中引入了一项名为 “retrievers – 检索器” 的新搜索功能。继续阅读以了解它们的简单性和…