使用 Vertex AI 和 Elasticsearch 通过 RAG 释放数据的力量

2024年11月6日   |   by mebius

作者:来自 ElasticJuan Bustos

%title插图%num

使用 Vertex AI 和 Elasticsearch 通过 RAG 释放数据的潜力。本博客系列介绍了如何将数据提取到 Elasticsearch,从而为创建基于 RAG 的高级搜索应用程序建立强大的知识库。

使用 Vertex AI 和 Elasticsearch 通过 RAG 释放数据的力量

世界正被数据淹没,但我们是否真正发挥了它的潜力?这就是检索增强生成 (Retrieval Augmented Generation – RAG) 的作用所在,它彻底改变了我们与信息的交互方式。这个由两部分组成的博客系列将为你提供工具,利用 Elasticsearch 和 RAG 的强大组合将原始数据转化为可操作的见解。

第 1 部分将指导你高效地将数据导入 Elasticsearch,为强大且可扩展的知识库奠定基础,该知识库为 AI 模型提供有关私有、特定领域和最新客户数据的相关上下文。第 2 部分深入探讨了令人兴奋的 RAG 管道世界,展示了如何使用 Vertex AI 和 Langchain 等尖端工具构建问答系统。准备好释放数据的力量并将你的应用程序提升到新的水平!

什么是 RAG?

RAG 管道利用检索和生成 AI 模型的强大功能,基于庞大的知识库生成富有洞察力的响应。它们的工作原理是:

  • 检索相关信息:使用嵌入模型,RAG 管道会分析你的查询并从精选数据集中查找相关文档。
  • 生成简洁的答案:然后,像 Gemini Pro 这样的语言模型会利用检索到的信息来制作全面且内容丰富的答案。

Elastic 和 Google Cloud:企业 GenAI 的完美搭配

将 Elastic 和 Google Cloud 技术相结合用于生成式 AI 可显著增强 AI 驱动型应用。Elastic 的向量和混合搜索功能提供了用于管理、搜索和分析大型数据集的高级可扩展解决方案。该平台可高效处理结构化和非结构化数据,是 GenAI 模型的完美选择。Elastic 的语义搜索有助于更深入地理解上下文,从而提高 AI 生成响应的准确性和相关性。这些功能对于特定知识至关重要的用例非常重要 – 例如个性化推荐、智能内容和产品搜索、依赖于用户行为和个人资料的对话等等。

并非所有向量数据库都相同。虽然许多数据库声称提供高级向量功能,但大多数数据库仅限于存储和以有限的方式搜索向量。大多数数据库仅提供最低限度的定制,并且通常缺乏在概念验证之外的实际生产用例中安全采用它们的核心要求。 Elastic 脱颖而出,提供强大、可定制的向量搜索平台,该平台可扩展至复杂场景,并提供即插即用选项以缩短上市时间。借助 Elastic,你可以在采集或查询时创建嵌入、使用内置转换器模型、集成 VertexAI 模型或构建自己的模型。Elastic 的企业级安全功能可确保对数据访问进行细致的控制,支持 Google 的负责任 AI 原则。

这些功能与 Google Cloud 的 Gemini LLM 和 Vertex 非常契合,它们在文本、图像、视频等多模式交互方面表现出色。Gemini 处理多种数据格式的能力与 Elastic 多功能向量搜索相得益彰。这可以实现创新应用程序,例如零售网站的时尚 AI 助手,它可以理解产品描述和视觉内容并与之交互。

示例提取管道

本篇博文探讨了专门为从图像中提取数据而设计的 RAG 管道的创建。我们将使用 Google Cloud Platform 资源来完成这项工作,提供可扩展且强大的解决方案。

1. 设置你的环境

首先,我们需要组装 RAG 管道的基本工具。我们将使用 Google Gemini Pro 作为我们的生成 AI 模型,利用 Vertex AI 平台的强大功能。Elasticsearch 将作为我们的向量数据库,实现图像数据的高效存储和检索。

!pip install --upgrade google-cloud-aiplatform elasticsearch


# # Automatically restart kernel after installs so that your environment can access the new packages
import IPython


app = IPython.Application.instance()
app.kernel.do_shutdown(True)


import vertexai
from vertexai.language_models import TextEmbeddingModel
from vertexai.generative_models import GenerativeModel, Part, FinishReason
import vertexai.preview.generative_models as generative_models


import base64
import json
import os
import time


#Elastic Search
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError

建立环境变量并进行身份验证:

# GCP Parameters
PROJECT = "your-project" #@param {type:"string"}
LOCATION = "us-central1" #@param {type:"string"}


vertexai.init(project=PROJECT, location=LOCATION)
model = GenerativeModel("gemini-1.5-pro-002")


#ES Parameters
ES_URL = "your-elastic-cluster-url" #@param {type:"string"}
ES_INDEX_NAME = "your-index-name"  #@param {type:"string"}
ES_API_KEY = "your-api-key"  #@param {type:"string"}
THRESHOLD = 0.86  #@param {type:"tgcodenumber"}


from google.colab import auth as google_auth
google_auth.authenticate_user()

2. 图像处理

代码首先定义包含目录图像文件的目录并初始化计数器。然后对目录中的文件进行排序并遍历每个文件,检查它是否以 “.jpg” 结尾。

对于每个图像文件,代码使用 get_image 加载图像,使用 generate 函数提取信息,将响应转换为 JSON 对象,并将提取的数据(包括文件名以供参考)附加到项目列表中。

为了防止潜在的 API 限制,代码每处理 5 个项目就会暂停 30 秒。最后,它将处理后的图像移动到 “processed” 目录并更新计数器。

该过程持续进行,直到目录中的所有 “.jpg” 文件都处理完毕。完成后,代码将打印已处理项目的总数,提供提取信息的计数。此过程演示了前面讨论的函数的实际应用,展示了如何将 RAG 管道应用于图像集合,以系统和有组织的方式提取有价值的数据。

def get_image(jpg_filepath):
   """Opens a JPG file, encodes it in base64, and returns a MIMEImage object.


   Args:
       jpg_filepath (str): The path to the JPG file.


   Returns:
       MIMEImage: An email MIMEImage object representing the JPG image,
                  ready to be used with Part.from_data
   """


   with open(jpg_filepath, "rb") as image_file:
       image_data = image_file.read()  # Read the binary data of the image


   image = Part.from_data(
   mime_type="image/jpeg",
   data=image_data)


   return image




!mkdir processed
items = []
directory = "catalog_pages"  # Replace with the actual directory path
counter = 0
files = os.listdir(directory)
files.sort()
for filename in files:
   #Pause every 5 items to prevent throttling
   if counter % 5 == 0:
     time.sleep(30)
   if filename.endswith(".jpg"):
       print(filename)
       image = get_image(f"{directory}/{filename}")
       result = generate(image)
       json_result = json.loads(result.texttgcode)
       for item in json_result:
         item['catalog_page']  = filename
         items.append(item)
       #Move processed files
       !mv {directory}/{filename}  processed
   counter = counter+1


print(f"Processed {len(items)} items")

为了处理图像并提取有意义的数据,我们创建了一个名为 generate 的函数,该函数以图像作为输入。该函数是我们图像数据提取过程的核心,利用 Google Gemini Pro 的强大功能来分析图像并返回结构化信息。

generate 函数使用精心设计的提示来指导 Gemini Pro 进行分析。此提示在 text1 变量中定义,指示模型充当 “专业乐器技师 – expert musical instrument technician”,执行特定任务:提取图像中描绘的每种产品的描述性细节。

prompt_pt1 = """You are an expert musical instrument technician your job is to extract for every page a description of the product depicted.


To achieve your task, carefully follow the instructions.



[{
"item_name": ,
"item_type": ,
"color": ,
"model": ,
"description_from_text":  ,
"description_from_vision": ,
"fretboard_options":[] ,
"color_options": [],
"part_number": ONLY IF AVAILABLE
},
{
"item_name": ,
"item_type": ,
"color": ,
"model": ,
"description_from_text":  ,
"description_from_vision": ,
"fretboard_options":[] ,
"color_options": [],
"part_number": ONLY IF AVAILABLE
}]



"""


prompt_pt2="""


1. Carefully analyze every page of the catalog, you will create a single entry per item presented.
2. Look at the item image and carefully read the text.
3. Always think step by step, and NEVER make up information.
4. Return results as well format JSONL.

output:
"""

提示(prompt)为模型提供了分步指南:

  • 分析图像:指示模型仔细分析每张图像,识别所呈现的单个产品。
  • 提取信息:模型应从图像本身和任何附带文本(如果有)中提取信息。
  • 格式化输出:输出应采用结构良好的 JSON 格式格式化,确保清晰度和一致性。
  • 批判性思考:鼓励模型逐步思考,避免做出假设或捏造信息。

text1 提示还包括 JSON 输出的特定格式,定义应包含的字段:

{
"item_name": ,
"item_type": ,
"color": ,
"model": ,
"description_from_text":  ,
"description_from_vision": ,
"ftgcoderetboard_options":[] ,
"color_options": [],
"part_number": ONLY IF AVAILABLE
}

这种结构化格式可确保提取的信息一致且易于用于进一步分析或与其他系统集成。

然后,该函数使用 model.generate_content 方法将图像、提示和配置设置传递给 Gemini Pro。此方法指示模型处理图像、应用提供的指令并生成结构化的 JSON 输出。

最后,generate 函数返回生成的响应,以易于访问的格式提供有关图像的提取信息。

def generate(image):


 responses = model.generate_content(
     [prompt_pt1, image, prompt_pt2],
     generation_config=generation_config,
     safety_settings=safety_settings,
     stream=False,
 )
 if(responses.candidates[0].finish_reason.value == 1):
    if responses.candidates[0].content.parts[0].text.startswith("```json") and responses.candidates[0].content.parts[0].text.endswith("```"):
     # Slice to remove the markers
     return responses.candidates[0].content.parts[0].text[7:-3].strip()  # Remove 7 chars at start, 3 at end, strip whitespace
    else:
     # Return original string if markers are missing
     return(responses.candidates[0].content.parts[0].text)
 else:
     return(f"Content has been blocked for {responses.candidates[0].finish_reason.name} reasons.")

数据集中的所有文件都通过此代码片段进行处理,该代码片段遍历图像文件目录,使用生成函数处理每个图像以提取数据,并将提取的信息存储在名为 items 的列表中。它还将处理后的图像移动到单独的目录中。

!mkdir processed
items = []
directory = "catalog_pages"  # Replace with the actual directory path
counter = 0
files = os.listdir(directory)
files.sort()
for filename in files:
   #Pause every 5 items to prevent throttling
   if counter % 5 == 0:
     time.sleep(30)
   if filename.endswith(".jpg"):
       print(filename)
       image = get_image(f"{directory}/{filename}")
       result = generate(image)
       print(result)
       json_result = json.loads(result)
       for item in json_result:
         item['catalog_page']  = filename
         items.append(item)
       #Move processed files
       !mv {directory}/{filename}  processed
   counter = counter+1


print(f"Processed {len(items)} items")

3. 生成嵌入并存储

生成函数成功从图像中提取数据并以结构化 JSON 格式返回后,将处理此文本以创建嵌入。这些嵌入是提取文本的数字表示,使我们能够以一种易于比较和搜索的方式表示数据的含义。

#Split items into text and metadata


embeddings = []
metadatas = []
for item in items:
 item_text=(item['description_from_text']+"n"+item['description_from_vision'])
 embeddings.append(text_embedding(item_text))
 metadatas.append(item)
print(len(embeddings))

为了生成这些嵌入,我们使用了 text_embedding 函数,该函数利用了 Vertex AI text-embedding-004 模型。此模型将提取的文本作为输入,并将其转换为数值向量。生成的嵌入捕获了提取文本的语义含义,使我们能够根据其含义(而不仅仅是其字面形式)搜索类似内容。

def text_embedding(query):
   """Text embedding with a Large Language Model."""
   model = TextEmbeddingModel.from_pretrained("text-embedding-004")
   #Working with a single
   embeddings = model.get_embeddings([query])
   return embeddings[0].values

生成嵌入后,下一步是将它们提取到 Elasticsearch 索引中。此过程由 store_embeddings_with_metadata 函数处理。此函数获取生成的嵌入以及相关元数据(如原始图像名称或其他相关信息),并将它们添加到 Elasticsearch 数据库中。

store_embeddings_with_metadata 函数确保每个嵌入都与其相应的元数据一起存储,使我们能够在执行搜索时检索嵌入及其相关信息。此元数据对于理解每个嵌入的上下文以及在搜索期间提供更相关的结果至关重要。

def store_embeddings_with_metadata(
   client,
   index_name,
   embeddings,
   metadatas,
   vector_query_field,
):
   """
   Stores embeddings and metadata in an Elastic search cluster.


   Args:
       client: Elastic search client instance.
       index_name: Name of the Elastic search index.
       embeddings: List of embeddings to store.
       metadatas: List of metadata dictionaries corresponding to the embeddings.
       vector_query_field: Name of the field to store the embedding vectors in.
       refresh_indices: Whether to refresh the index after adding documents.
   """
   try:
       from elasticsearch.helpers import bulk
   except ImportError:
       raise ImportError(
           "Could not import elasticsearch python package. "
           "Please install it with `pip install elasticsearch`."
       )


   requests = []
   for embedding, metadata in zip(embeddings, metadatas):
       request = {
           "_op_type": "index",
           "_index": index_name,
           vector_query_field: embedding,
           "metadata": metadata,
       }
       requests.append(request)


   if len(requests) > 0:
       try:
           success, failed = bulk(
               client, requests, stats_only=True, refresh=True
           )
           print(f"Added {success} and failed to add {failed} documents to index")
       except BulkIndexError as e:
           print(f"Error adding documents: {e}")
           raise e
   else:
       print("No documents to add to index")

通过将嵌入和元数据存储在 Elasticsearch 中,我们利用强大的向量数据库,能够基于语义相似性高效地搜索和检索与图像相关的信息。这使我们能够根据提取数据的含义找到相关图像,为高级图像搜索和检索功能开辟了可能性。

结论

在本博客系列的第一部分中,我们探讨了如何从图像中提取有价值的数据并将其存储在 Elasticsearch 索引中。下次,我们将讨论如何组合一个示例应用程序来与这些数据进行交互。

想要了解有关 Elastic 和 Google Cloud 为你的 GenAI 应用程序带来的强大功能的更多信息?请访问 Elastic Search Labs(https://www.elastic.co/search-labs/integrations/google),探索尖端集成并释放数据的全部潜力。

准备好自己尝试了吗?开始免费试用

Elasticsearch 集成了 LangChain、Cohere 等工具。加入我们的高级语义搜索网络研讨会,构建你的下一个 GenAI 应用程序!

原文:Unlock the Power of Your Data with RAG using Vertex AI and Elasticsearch – Search Labs

文章来源于互联网:使用 Vertex AI 和 Elasticsearch 通过 RAG 释放数据的力量

相关推荐: Elasticsearch 中的高效按位匹配

作者:来自 ElasticAlexander Marquardt 探索在 Elasticsearch 中编码和匹配二进制数据的六种方法,包括术语编码(我喜欢的方法)、布尔编码、稀疏位位置编码、具有精确匹配的整数编码、具有脚本按位匹配的整数编码以及使用 ESQL…

Tags: , ,