Elasticsearch:Elasticsearch 开发入门 – Golang
2021年1月9日 | by mebius
在本文中,我将分享如何在 Golang 中如何使用Elasticsearch 来开发的经验。 顺便说一句,以防万一你从未听说过 Elasticsearch:
Elasticsearch 是一个高度可扩展的开源全文本搜索和分析引擎。 它使你可以快速,近乎实时地存储,搜索和分析大量数据。 它通常用作支持具有复杂搜索功能和要求的应用程序的基础引擎/技术。
如果你想了解更多关于 Elasticsearch 的介绍,你可以参阅我之前的文章 “Elasticsearch 简介”。
针对 Golang 的 Elasticsearch 支持,你可以访问 Elastic 的官方 githubhttps://github.com/elastic/go-elasticsearch。
前提条件
- 你需要在你的电脑上安装 Golang,并且 $GOPATH 和 $GOROOT 都需要导出到 bash 配置文件中。 你可以使用 go version 和 go env 命令来确认已安装 Golang 并设置了正确的路径。
- 你需要安装 docker 18.03.0-ce 或以上的版本
创建一个 Golang项目
我们在自己的电脑里创建一个如下的目录:
mkdir go-elasticsearch
cd go-elasticsearch
接着我们在这个目录里创建一个叫做 main.go 的文件。你可以使用你喜欢的编辑器,比如:
vi main.go
在上面我们使用 vi 编辑器来创建 main.go 文件。
用于 Elasticsearch 的 Golang 驱动程序(go-elasticsearch)必须安装在服务器的 $GOPATH 中。 使用 git 将库的存储库克隆到 $GOPATH 中,如下例所示:
git clone --branch master https://github.com/elastic/go-elasticsearch.git $GOPATH/src/github.com/elastic/go-elasticsearch
在编译 Go 应用时,有时遇到库不能从 github 上下载的错误信息。我们需要在 terminal 中打入如下的命令:
export GO111MODULE=on
export GOPROXY=https://goproxy.io
我们也可以使用如下的方法来达到安装的 go-elasticsearch 的目的。我们需要在go-elasticsearch 目录下创建一个叫做 go.mod 的文件。它的内容如下:
go.mod
require github.com/elastic/go-elasticsearch/v7 master
客户端主要版本与兼容的 Elasticsearch 主要版本相对应:要连接到 Elasticsearch 7.x,请使用客户端的 7.x 版本,要连接到Elasticsearch 6.x,请使用客户端的 6.x 版本。
require github.com/elastic/go-elasticsearch/v7 7.x
require github.com/elastic/go-elasticsearch/v7 7.0.0
可以在一个项目中使用客户端的多个版本:
// go.mod
github.com/elastic/go-elasticsearch/v6 6.x
github.com/elastic/go-elasticsearch/v7 7.x
// main.go
import (
elasticsearch6 "github.com/elastic/go-elasticsearch/v6"
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
)
// ...
es6, _ := elasticsearch6.NewDefaultClient()
es7, _ := elasticsearch7.NewDefaultClient()
安装 Elasticsearch 及 Kibana
如果你之前从来没有安装过 Elasticsearch 或 Kibana。你可以阅读我之前的文章 “Elastic:菜鸟上手指南” 来进行安装。在本练习中,我们将使用 docker 来安装 Elasticsearch 及 Kibana。我们首先来创建一个叫做 docker-compose.yml 的文件:
docker-compose.yml
---
version: "3"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.10.0
container_name: es01
environment:
- node.name=es01
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- esdata:/usr/share/elasticsearch/data
ports:
- 9200:9200
kibana:
image: docker.elastic.co/kibana/kibana:7.10.0
ports:
- 5601:5601
depends_on:
- elasticsearch
volumes:
esdata:
driver: local
在上面,我们使用了 Elastic Stack 7.10.0 发行版作为实验的版本。在你实际的使用中,你可以根据自己的版本需求而进行修改。
我们必须先启动 docker,然后在命令行中执行:
docker-compose up
上面命令必须执行于 docker-compose.yml 文件所在的目录中。
它将启动 http://localhost:9200 中的 Elasticsearch 和 http://localhost:5601 中的 Kibana。 你可以通过在浏览器中打开链接来进行验证。
测试 elasticsearch 包
elasticsearch 软件包将两个单独的软件包捆绑在一起,分别用于调用 Elasticsearch API 和通过 HTTP 传输数据:esapi 和 estransport。
使用 elasticsearch.NewDefaultClient() 函数创建具有默认设置的客户端。
main.go
package main
import (
"log"
// Import the Elasticsearch library packages
"github.com/elastic/go-elasticsearch/v7"
)
func main() {
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
log.Println(res)
}
我们使用如下的命令来运行:
go run main.go
上面的命令显示的结果为:
$ go run main.go
go: finding github.com/elastic/go-elasticsearch latest
2020/12/24 10:56:23 [200 OK] {
"name" : "es01",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "ZYQ9cGOdS06uZvxOvjug8A",
"version" : {
"number" : "7.10.0",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96",
"build_date" : "2020-11-09T21:30:33.964949Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
注意:关闭并使用响应 body 至关重要,以便在默认的 HTTP 传输中重新使用持久性 TCP 连接。 如果你对响应正文不感兴趣,请调用 io.Copy(ioutil.Discard,res.Body)。
当你 export ELASTICSEARCH_URL环境变量时,它将用于设置集群端点。 用逗号分隔多个地址。
要以编程方式设置集群端点,请将配置对象传递给 elasticsearch.NewClient() 函数。
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
"http://localhost:9201",
},
// ...
}
es, err := elasticsearch.NewClient(cfg)
要设置用户名和密码,请将其包括在端点 URL 中,或使用相应的配置选项。
cfg := elasticsearch.Config{
// ...
Username: "foo",
Password: "bar",
}
若要设置用于对群集节点的证书进行签名的自定义证书颁发机构,请使用 CACert 配置选项。
cert, _ := ioutil.ReadFile(*cacert)
cfg := elasticsearch.Config{
// ...
CACert: cert,
}
插入文档到索引
在这个章节中,我将一步一步地指导如何如何使用 go-elasticsearch 驱动来把文档导入到 Elasticsearch 中。
创建一个 Go 脚本并导入包
现在,我们已经确保正确安装和设置了我们需要的所有内容,我们可以开始使用 Go 脚本了。 编辑之前的 main.go 文件,然后将 main 包放在顶部。 请确保导入所有必需的程序包和库,如以下示例所示:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"reflect"
"strconv"
"strings"
// Import the Elasticsearch library packages
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)
在上面,我们使用 v7 版本,它对应于 Elastic Stack 7.x 版本的发布。在之前的部署中,我们使用的版本是 7.10。
为 Elasticsearch 文档的字段创建结构数据类型
我们将使用 Golang struct 数据类型为要编制索引的 Elasticsearch 文档以及索引的相应字段创建框架:
// Declare a struct for Elasticsearch fields
type ElasticDocs struct {
SomeStr string
SomeInt int
SomeBool bool
}
声明一个将 Elasticsearch 结构数据转换为 JSON 字符串的函数
接下来,让我们看一个简单的函数,它将 Elasticsearch struct 文档实例转换为 JSON 字符串。 下面显示的代码可能看起来有些复杂,但是实际上发生的事情很简单–所有功能所做的就是将结构转换为字符串文字,然后将该字符串传递给 Golang 的 json.Marshal() 方法以使其返回字符串的JSON编码:
// A function for marshaling structs to JSON string
func jsonStruct(doc ElasticDocs) string {
// Create struct instance of the Elasticsearch fields struct object
docStruct := &ElasticDocs{
SomeStr: doc.SomeStr,
SomeInt: doc.SomeInt,
SomeBool: doc.SomeBool,
}
fmt.Println("ndocStruct:", docStruct)
fmt.Println("docStruct TYPE:", reflect.TypeOf(docStruct))
// Marshal the struct to JSON and check for errors
b, err := json.Marshal(docStruct)
if err != nil {
fmt.Println("json.Marshal ERROR:", err)
return string(err.Error())
}
return string(b)
}
声明 main() 函数并创建一个新的 Elasticsearch Golang 客户端实例
在我们的 Go 脚本中,所有 API 方法调用都必须位于 main() 函数内部或从另一个函数内部进行调用。 让我们为 API 调用创建一个新的上下文对象,并为 Elasticsearch 文档创建一个 map 对象:
func main() {
// Allow for custom formatting of log output
log.SetFlags(0)
// Create a context object for the API calls
ctx := context.Background()
// Create a mapping for the Elasticsearch documents
var (
docMap map[string]interface{}
)
fmt.Println("docMap:", docMap)
fmt.Println("docMap TYPE:", reflect.TypeOf(docMap))
实例化 Elasticsearch 客户端配置和 Golang 客户端实例
在这一步中,我们将实例化一个新的 Elasticsearch 配置对象。 确保将正确的主机和端口信息以及任何用户名或密码传递给其 “Adressess” 属性。
// Declare an Elasticsearch configuration
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
},
Username: "user",
Password: "pass",
}
// Instantiate a new Elasticsearch client object instance
client, err := elasticsearch.NewClient(cfg)
if err != nil {
fmt.Println("Elasticsearch connection error:", err)
}
检查用于 Elasticsearch 的 Golang 客户端在连接到集群时是否返回了任何错误
接下来,我们将检查与 Elasticsearch 的连接是否成功或是否返回了任何错误:
// Have the client instance return a response
res, err := client.Info()
// Deserialize the response into a map.
if err != nil {
log.Fatalf("client.Info() ERROR:", err)
} else {
log.Printf("client response:", res)
}
创建 Elasticsearch 结构文档并将其放入数组
我们将声明一个空字符串数组,以存储当前以 JSON 字符串表示的 Elasticsearch 文档。 以下代码显示了一些将用于索引的 Elasticsearch 文档示例。 要设置其字段的值,你需要做的就是修改结构实例的属性:
我们会将这些文档实例传递给我们先前声明的 jsonStruct() 函数,并使它们返回代表每个文档的 JSON 字符串。 然后,我们将使用 Golang 的 append() 函数将 JSON 字符串添加到字符串数组中:
迭代 Elasticsearch 文档数组并调用 Golang 客户端的 IndexRequest() 方法
现在我们已经建立了一个文档数组,我们将对其进行迭代,并在进行过程中向 Elasticsearch 集群发出 API 请求。 这些API调用将通过调用 Golang 驱动程序的 esapi.IndexRequest() 方法来索引文档:
// Iterate the array of string documents
for i, bod := range docs {
fmt.Println("nDOC _id:", i+1)
fmt.Println(bod)
// Instantiate a request object
req := esapi.IndexRequest {
Index: "some_index",
DocumentID: strconv.Itoa(i + 1),
Body: strings.NewReader(bod),
Refresh: "true",
}
fmt.Println(reflect.TypeOf(req))
在上面一定要注意的是:我们设置 Refresh 为 true。这在实际的使用中并不建议,原因是每次写入的时候都会 refresh。当我们面对大量的数据时,这样的操作会造成效率的底下。
检查 IndexRequest() API 方法调用是否返回任何错误
在文档数组上进行迭代的最后一步是从 API 调用中获取响应,并检查是否存在错误:
// Return an API response object from request
res, err := req.Do(ctx, client)
if err != nil {
log.Fatalf("IndexRequest ERROR: %s", err)
}
defer res.Body.Close()
在下面显示的代码中,如果没有错误返回,我们将解析 API 响应返回的结果对象:
if res.IsError() {
log.Printf("%s ERROR indexing document ID=%d", res.Status(), i+1)
} else {
// Deserialize the response into a map.
var resMap map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&resMap); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
log.Printf("nIndexRequest() RESPONSE:")
// Print the response status and indexed document version.
fmt.Println("Status:", res.Status())
fmt.Println("Result:", resMap["result"])
fmt.Println("Version:", int(resMap["_version"].(float64)))
fmt.Println("resMap:", resMap)
fmt.Println("n")
}
}
}
}
每个文档迭代都应打印出一个map[string]interface{}对象响应,如下所示:
resMap: map[_id:1 _index:some_index _primary_term:1 _seq_no:32 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:2 forced_refresh:true result:updated]
在上面,我们讲了很多代码。为了方便大家练习,我把整个 main.go 的代码贴出来:
main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"reflect"
"strconv"
"strings"
// Import the Elasticsearch library packages
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)
// Declare a struct for Elasticsearch fields
type ElasticDocs struct {
SomeStr string
SomeInt int
SomeBool bool
}
// A function for marshaling structs to JSON string
func jsonStruct(doc ElasticDocs) string {
// Create struct instance of the Elasticsearch fields struct object
docStruct := &ElasticDocs{
SomeStr: doc.SomeStr,
SomeInt: doc.SomeInt,
SomeBool: doc.SomeBool,
}
fmt.Println("ndocStruct:", docStruct)
fmt.Println("docStruct TYPE:", reflect.TypeOf(docStruct))
// Marshal the struct to JSON and check for errors
b, err := json.Marshal(docStruct)
if err != nil {
fmt.Println("json.Marshal ERROR:", err)
return string(err.Error())
}
return string(b)
}
func main() {
// Allow for custom formatting of log output
log.SetFlags(0)
// Create a context object for the API calls
ctx := conttgcodeext.Background()
// Create a mapping for the Elasticsearch documents
var (
docMap map[string]interface{}
)
fmt.Println("docMap:", docMap)
fmt.Println("docMap TYPE:", reflect.TypeOf(docMap))
// Declare an Elasticsearch configuration
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
},
Username: "user",
Password: "pass",
}
// Instantiate a new Elasticsearch client object instance
client, err := elasticsearch.NewClient(cfg)
if err != nil {
fmt.Println("Elasticsearch connection error:", err)
}
// Have the client instance return a response
res, err := client.Info()
// Deserialize the response into a map.
if err != nil {
log.Fatalf("client.Info() ERROR:", err)
} else {
log.Printf("client response:", res)
}
// Declare empty array for the document strings
var docs []string
// Declare documents to be indexed using struct
doc1 := ElasticDocs{}
doc1.SomeStr = "Some Value"
doc1.SomeInt = 123456
doc1.SomeBool = true
doc2 := ElasticDocs{}
doc2.SomeStr = "Another Value"
doc2.SomeInt = 42
doc2.SomeBool = false
// Marshal Elasticsearch document struct objects to JSON string
docStr1 := jsonStruct(doc1)
docStr2 := jsonStruct(doc2)
// Append the doc strings to an array
docs = append(docs, docStr1)
docs = append(docs, docStr2)
// Iterate the array of string documents
for i, bod := range docs {
fmt.Println("nDOC _id:", i+1)
fmt.Println(bod)
// Instantiate a request object
req := esapi.IndexRequest {
tgcodeIndex: "some_index",
DocumentID: strconv.Itoa(i + 1),
Body: strings.NewReader(bod),
Refresh: "true",
}
fmt.Println(reflect.TypeOf(req))
// Return an API response object from request
res, err := req.Do(ctx, client)
if err != nil {
log.Fatalf("IndexRequest ERROR: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Printf("%s ERROR indexing document ID=%d", res.Status(), i+1)
} else {
// Deserialize the response into a map.
var resMap map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&resMap); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
log.Printf("nIndexRequest() RESPONSE:")
// Print the response status and indexed document version.
fmt.Println("Status:", res.Status())
fmt.Println("Result:", resMap["result"])
fmt.Println("Version:", int(resMap["_version"].(float64)))
fmt.Println("resMap:", resMap)
fmt.Println("n")
}
}
}
}
运行上面的代码,我们将看到如下的输出:
$ go run main.go
go: finding github.com/elastic/go-elasticsearch latest
docMap: map[]
docMap TYPE: map[string]interface {}
client response:%!(EXTRA *esapi.Response=[200 OK] {
"name" : "es01",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "ZYQ9cGOdS06uZvxOvjug8A",
"version" : {
"number" : "7.10.0",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96",
"build_date" : "2020-11-09T21:30:33.964949Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_versiotgcoden" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
)
docStruct: &{Some Value 123456 true}
docStruct TYPE: *main.ElasticDocs
docStruct: &{Another Value 42 false}
docStruct TYPE: *main.ElasticDocs
DOC _id: 1
{"SomeStr":"Some Value","SomeInt":123456,"SomeBool":true}
esapi.IndexRequest
IndexRequest() RESPONSE:
Status: 200 OK
Result: updated
Version: 4
resMap: map[_id:1 _index:some_index _primary_term:1 _seq_no:36 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:4 forced_refresh:true result:updated]
DOC _id: 2
{"SomeStr":"Another Value","SomeInt":42,"SomeBool":false}
esapi.IndexRequest
IndexRequest() RESPONSE:
Status: 200 OK
Result: updated
Version: 18
resMap: map[_id:2 _index:some_index _primary_term:1 _seq_no:37 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:18 forced_refresh:true result:updated]
我们可以在 Kibana 中使用如下的命令来进行查看被导入的文档:
GET some_index/_search
搜索文档
我们接下来搜索已经建立好的文档。我们接下来搜索在 SomeStr 这个字段含有 Another 的文档。在 main.go 里添加如下的代码:
// Search for the indexed document
// Build the request body
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"SomeStr": "Another",
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}
// Perform the search request.
res, err = client.Search(
client.Search.WithContext(context.Background()),
client.Search.WithIndex("some_index"),
client.Search.WithBody(&buf),
client.Search.WithTrackTotalHits(true),
client.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
} else {
// Print the response status and error information.
log.Fatalf("[%s] %s: %s",
res.Status(),
e["error"].(map[string]interface{})["type"],
e["error"].(map[string]interface{})["reason"],
)
}
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print the response status, number of results, and request duration.
log.Printf(
"[%s] %d hits; took: %dms",
res.Status(),
int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),
int(r["took"].(float64)),
)
// Print the ID and document source for each hit.
for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
log.Printf(" * ID=%s, %s", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])
}
同时由于我们使用了 bytes 模块,我们需要在文档的开始部分添加:
import (
"context"
"encoding/json"
"fmt"
"log"
"reflect"
"strconv"
"strings"
"bytes"
// Import the Elasticsearch library packages
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)
运行上面的代码。我们可以看到如下新添加的结果:
[200 OK] 1 hits; took: 1ms
* ID=2, map[SomeBool:%!s(bool=false) SomeInt:%!s(float64=42) SomeStr:Another Value]
删除文档
删除一个文档非常容易。在 main.go 文件中,我们添加如下的代码来删除文档 id 为 1 的文档:
// Set up the request object.
req := esapi.DeleteRequest{
Index: "some_index",
DocumentID: strconv.Itoa(1),
}
res, err = req.Do(context.Background(), client)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
重新运行 main.go 应用。我们再到 Kibana 中去查询一下:
这次查询我们会发现只有一个文档存在。那个 id 为 2 的文档虽然也被导入,但是又被删除了。
为了方便大家的学习,我把代码放在 github 上:https://github.com/liu-xiao-guo/go-elasticsearch-demo