飞道的博客

Elasticsearch:运用 Go 语言实现 Elasticsearch 搜索 - 8.x

1509人阅读  评论(0)

在我之前的文章 “Elasticsearch:Go 客户端简介 - 8.x”,我对 Elasticsearch golang 客户端做了一个简单的介绍。在今天的这篇文章中,我将详细介绍如何使用这个客户端来一步一步地连接到 Elasticsearch,进而创建索引,搜索等。关于 golang 客户端的使用,完整的文档托管在 GitHubPkgGoDev 上。

在我们的展示中,我们将使用 Elastic Stack 8.5.3 来进行展示。

安装

Elasticsearch 及 Kibana

如果你还没有安装好自己的 Elasticsearch 及 Kibana 的话,那么请参考我之前的文章:

在今天的展示中,我将使用 Elastic Stack 8.x 来进行展示。在安装的时候,请参考相应的 Elastic Stack 8.x 的文章来进行安装。
 

Golang 安装

要安装客户端的 8.x 版本,请将包添加到你的 go.mod 文件中:

require github.com/elastic/go-elasticsearch/v8 8.5

或者,clone 存储库:

git clone --branch 8.5 https://github.com/elastic/go-elasticsearch.git $GOPATH/src/github

要安装另一个版本,请相应地修改路径或分支名称。 客户端主要版本对应于 Elasticsearch 主要版本。

你可以在下面找到完整的安装示例:


  
  1. mkdir my-elasticsearch-app8 && cd my-elasticsearch-app8
  2. cat > go.mod <<- END
  3. module my-elasticsearch-app8
  4. require github.com/elastic/go-elasticsearch/v8 main
  5. END
  6. cat > main.go <<- END
  7. package main
  8. import (
  9. "log"
  10. "github.com/elastic/go-elasticsearch/v8"
  11. )
  12. func main() {
  13. es, _ := elasticsearch.NewDefaultClient()
  14. log.Println(elasticsearch.Version)
  15. log.Println(es.Info())
  16. }
  17. END

在我的电脑上面,我运行如上的命令:


  
  1. $ pwd
  2. /Users/liuxg/go
  3. $ mkdir my-elasticsearch-app8 && cd my-elasticsearch-app8
  4. $
  5. $ cat > go.mod <<- END
  6. > module my-elasticsearch-app8
  7. >
  8. > require github.com/elastic/go-elasticsearch/v8 main
  9. > END
  10. $
  11. $ cat > main.go <<- END
  12. > package main
  13. >
  14. > import (
  15. > "log"
  16. >
  17. > "github.com/elastic/go-elasticsearch/v8"
  18. > )
  19. >
  20. > func main() {
  21. > es, _ := elasticsearch.NewDefaultClient()
  22. > log.Println(elasticsearch.Version)
  23. > log.Println(es.Info())
  24. > }
  25. > END
  26. $ ls
  27. go.mod main.go
  28. $ pwd
  29. /Users/liuxg/go/my-elasticsearch-app8

很显然,它生成了两个文件:go.mod 及 main.go。我们还不能直接运行上面的命令,除非我们按照我之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单” 进行安装。请参考其中的 “如何配置 Elasticsearch 不带安全性” 章节。这样的配置不需要安全性,索引在连接的时候,我们也不需要任何的验证。一旦我们按照完毕后,我们在 terminal 中打入如下的命令:


  
  1. $ pwd
  2. /Users/liuxg/ go/my-elasticsearch-app8
  3. $ go run main. go
  4. go: updates to go.mod needed; to update it:
  5. go mod tidy
  6. $ go mod tidy
  7. go: downloading github.com/elastic/ go-elasticsearch/v8 v8 .4 .0-alpha .1 .0 .20221227164349-c40d762a40ad
  8. go: downloading github.com/elastic/elastic-transport- go/v8 v8 .0 .0 -20211216131617-bbee439d559c
  9. $ go run main. go
  10. 2023/ 01/ 10 17: 27: 35 8.7 .0-SNAPSHOT
  11. 2023/ 01/ 10 17: 27: 35 [ 200 OK] {
  12. "name" : "liuxgm.local",
  13. "cluster_name" : "elasticsearch",
  14. "cluster_uuid" : "c7GQIJYaQ-yeesPYys24fw",
  15. "version" : {
  16. "number" : "8.5.3",
  17. "build_flavor" : "default",
  18. "build_type" : "tar",
  19. "build_hash" : "4ed5ee9afac63de92ec98f404ccbed7d3ba9584e",
  20. "build_date" : "2022-12-05T18:22:22.226119656Z",
  21. "build_snapshot" : false,
  22. "lucene_version" : "9.4.2",
  23. "minimum_wire_compatibility_version" : "7.17.0",
  24. "minimum_index_compatibility_version" : "7.0.0"
  25. },
  26. "tagline" : "You Know, for Search"
  27. }
  28. < nil>

上面显示我们已经成功地连接到 Elasticsearch 了。Hooray! 小试牛刀,我们对如何连接到 Elasticsearch 有一个基本的印象。

在默认的情况下,我们可以通过设置环境变量 ELASTICSEARCH_URL 来配置 Elasticsearch 的端点地址:

 export ELASTICSEARCH_URL="https://localhost:9200"

如果你有多个 Elasticsearch 端点地址,请用逗号分隔它们。

警告:不建议在未启用安全性的情况下运行 Elasticsearch。

Elasticsearch 版本兼容性

语言客户端向前兼容; 这意味着客户端支持与更大或相等的次要版本的 Elasticsearch 进行通信。 Elasticsearch 语言客户端仅向后兼容默认发行版,并且不作任何保证。

Elasticsearch server 8.0 版本引入了新的兼容模式,让你从 7.x 到 8x 的升级体验更流畅。简而言之,你可以将最新的 7.x go-elasticsearch Elasticsearch 客户端与 8.x Elasticsearch 服务器一起使用,提供更多协调将代码库升级到下一个主要版本的空间。

如果你想利用此功能,请确保你使用的是最新的 7.x go-elasticsearch 客户端,并将环境变量 ELASTIC_CLIENT_APIVERSIONING 设置为 true 或在客户端配置中设置配置选项 config.EnableCompatibilityMode。 客户端在内部处理其余部分。 对于每个 8.0 及更高版本的 go-elasticsearch 客户端,你都准备好了! 默认情况下启用兼容模式。

使用 Go 模块时,在导入路径中包含版本,并指定显式版本或分支:


  
  1. require github.com/elastic/go-elasticsearch/v8 v8. 0.0
  2. require github.com/elastic/go-elasticsearch/v7 7.17

可以在一个项目中使用多个版本的客户端:


  
  1. // go.mod
  2. github.com/elastic/ go-elasticsearch/v7 v7 .17 .0
  3. github.com/elastic/ go-elasticsearch/v8 v8 .0 .0
  4. // main.go
  5. import (
  6. elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
  7. elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
  8. )
  9. // ...
  10. es7, _ := elasticsearch7.NewDefaultClient()
  11. es8, _ := elasticsearch8.NewDefaultClient()

客户端的 main 分支兼容当前 Elasticsearch 的 master 分支。

连接到 Elasticsearch

在今天的文章中,我主要来讲述如何连接到自托管的 Elasticsearch 集群。

连接到没有设置安全的集群

在上面,我们已经显示了如何连接到没有设置安全的集群。在上面,它默认连接的是 http://localhost:9200。在实际的部署中,你可能并不是使用的默认的地址。这个时候我们需要对连接进行配置:

main.go


  
  1. package main
  2. import (
  3. "log"
  4. "github.com/elastic/go-elasticsearch/v8"
  5. )
  6. func main() {
  7. cfg := elasticsearch.Config{
  8. Addresses: [] string{
  9. "http://localhost:9200",
  10. },
  11. }
  12. es, err := elasticsearch.NewClient(cfg)
  13. log.Println(err)
  14. if err == nil {
  15. log.Println(elasticsearch.Version)
  16. log.Println(es.Info())
  17. } else {
  18. log.Println( "Something wrong with connection to Elasticsearch")
  19. }
  20. }

  
  1. $ pwd
  2. /Users/liuxg/ go/my-elasticsearch-app8
  3. $ ls
  4. go.mod go.sum main. go
  5. $ go run main. go
  6. 2023/ 01/ 10 19: 21: 49 < nil>
  7. 2023/ 01/ 10 19: 21: 49 8.7 .0-SNAPSHOT
  8. 2023/ 01/ 10 19: 21: 49 [ 200 OK] {
  9. "name" : "liuxgm.local",
  10. "cluster_name" : "elasticsearch",
  11. "cluster_uuid" : "c7GQIJYaQ-yeesPYys24fw",
  12. "version" : {
  13. "number" : "8.5.3",
  14. "build_flavor" : "default",
  15. "build_type" : "tar",
  16. "build_hash" : "4ed5ee9afac63de92ec98f404ccbed7d3ba9584e",
  17. "build_date" : "2022-12-05T18:22:22.226119656Z",
  18. "build_snapshot" : false,
  19. "lucene_version" : "9.4.2",
  20. "minimum_wire_compatibility_version" : "7.17.0",
  21. "minimum_index_compatibility_version" : "7.0.0"
  22. },
  23. "tagline" : "You Know, for Search"
  24. }
  25. < nil>

从上面,我们可以看出来,我们成功地连接到 Elasticsearch。此外,由于 Addresses 是一个 slice,它可以由多个 Elasticsearch 的端点组成。比如,我们可以有一下的格式:


  
  1. cfg := elasticsearch.Config{
  2. Addresses: [] string{
  3. "http://localhost:9200",
  4. "http://localhost:9201",
  5. },
  6. }
  7. es, err := elasticsearch.NewClient(cfg)

注意:如果你的 Elasticsearch 集群位于负载均衡器后面,就像在使用 Elastic Cloud 时一样,你将不需要配置多个节点。 而是使用负载平衡器主机和端口。

连接到带有基本安全的集群

我们可以连接到带有基本安全的集群。针对 Elastic Stack 8.x,在默认的安装下,集群是带有 HTTPS 的访问。我们可以通过参考文章  “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单” 中的 “如何配置 Elasticsearch 只带有基本安全” 章节来进行安装。

基本认证

要以编程方式设置集群端点、用户名和密码,请将配置对象传递给 elasticsearch.NewClient() 函数。

main.go


  
  1. package main
  2. import (
  3. "log"
  4. "github.com/elastic/go-elasticsearch/v8"
  5. )
  6. func main() {
  7. cfg := elasticsearch.Config{
  8. Addresses: [] string{
  9. "https://localhost:9200",
  10. },
  11. Username: "elastic",
  12. Password: "password",
  13. }
  14. es, err := elasticsearch.NewClient(cfg)
  15. log.Println(err)
  16. if err == nil {
  17. log.Println(elasticsearch.Version)
  18. log.Println(es.Info())
  19. } else {
  20. log.Println( "Something wrong with connection to Elasticsearch")
  21. }
  22. }

在上面,我使用了超级用户 elastic 来进行验证,尽管在实际的使用中,我们并不建议这么做。你可以使用一个带有一定权限的用户来进行连接。运行上面的代码:

go run main.go 

  
  1. $ go run main. go
  2. 2023/ 01/ 10 19: 44: 29 < nil>
  3. 2023/ 01/ 10 19: 44: 29 8.7 .0-SNAPSHOT
  4. 2023/ 01/ 10 19: 44: 29 [ 200 OK] {
  5. "name" : "liuxgm.local",
  6. "cluster_name" : "elasticsearch",
  7. "cluster_uuid" : "jBt9oXsxT4y_2YOWOw8QRQ",
  8. "version" : {
  9. "number" : "8.5.3",
  10. "build_flavor" : "default",
  11. "build_type" : "tar",
  12. "build_hash" : "4ed5ee9afac63de92ec98f404ccbed7d3ba9584e",
  13. "build_date" : "2022-12-05T18:22:22.226119656Z",
  14. "build_snapshot" : false,
  15. "lucene_version" : "9.4.2",
  16. "minimum_wire_compatibility_version" : "7.17.0",
  17. "minimum_index_compatibility_version" : "7.0.0"
  18. },
  19. "tagline" : "You Know, for Search"
  20. }
  21. < nil>

很显然,我们的连接是成功的。

你还可以在端点 URL 中包含用户名和密码:

main.go


  
  1. package main
  2. import (
  3. "log"
  4. "github.com/elastic/go-elasticsearch/v8"
  5. )
  6. func main() {
  7. cfg := elasticsearch.Config{
  8. Addresses: [] string{
  9. "http://elastic:password@localhost:9200",
  10. },
  11. }
  12. es, err := elasticsearch.NewClient(cfg)
  13. log.Println(err)
  14. if err == nil {
  15. log.Println(elasticsearch.Version)
  16. log.Println(es.Info())
  17. } else {
  18. log.Println( "Something wrong with connection to Elasticsearch")
  19. }
  20. }

连接到带有 HTTPS 的集群

在 Elastic Stack 8.x 的默认安装中,Elasticsearch 是带有 HTTPS 的访问权限的。特别是针对自签名证书的安装,我们需要使用证书来进行连接。请按照如下的文档进行安装:

在 Elasticsearch 第一次启动的时候:

我们从上面可以看到超级用户 elastic 的信息。记下 elastic 用户密码和 HTTP CA 指纹。我们在下面的示例中将使用到。

根据具体情况,有两种验证 HTTPS 连接的选项,要么使用 CA 证书本身进行验证,要么通过 HTTP CA 证书指纹进行验证。

使用 CA 证书来验证 HTTPS

生成的根 CA 证书可以在 Elasticsearch 配置位置 ($ES_CONF_PATH/certs/http_ca.crt) 的 certs 目录中找到。 如果你在 Docker 中运行 Elasticsearch,则还有用于检索 CA 证书的其他文档。一旦你在某个地方获得了 http_ca.crt 文件,就可以通过 CACert 将文件的内容传递给客户端:

我们可以在 Elasticsearch 的安装目录中查看到证书的信息:


  
  1. $ pwd
  2. /Users/liuxg /elastic/elasticsearch- 8.5. 3/config/certs
  3. $ ls
  4. http.p12 http_ca.crt transport.p12

我们可以通过如下的方式来连接到 Elasticsearch:

main.go


  
  1. package main
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "github.com/elastic/go-elasticsearch/v8"
  6. )
  7. func main() {
  8. cert, _ := ioutil.ReadFile( "/Users/liuxg/elastic/elasticsearch-8.5.3/config/certs/http_ca.crt")
  9. cfg := elasticsearch.Config{
  10. Addresses: [] string{
  11. "https://localhost:9200",
  12. },
  13. Username: "elastic",
  14. Password: "YQ7kq-gh3K4xi9l4akd-",
  15. CACert: cert,
  16. }
  17. es, err := elasticsearch.NewClient(cfg)
  18. log.Println(err)
  19. if err == nil {
  20. log.Println(elasticsearch.Version)
  21. log.Println(es.Info())
  22. } else {
  23. log.Println( "Something wrong with connection to Elasticsearch")
  24. }
  25. }

运行上面的代码,我们可以看到和上面一样的运行结果。

使用证书 fingerprint 来验证 HTTPS

这种验证 HTTPS 连接的方法利用了前面记下的证书指纹值。 获取此 SHA256 指纹值并通过 ca_fingerprint 将其传递给 Go Elasticsearch 客户端:

main.go


  
  1. package main
  2. import (
  3. "log"
  4. "github.com/elastic/go-elasticsearch/v8"
  5. )
  6. func main() {
  7. cfg := elasticsearch.Config{
  8. Addresses: [] string{
  9. "https://localhost:9200",
  10. },
  11. Username: "elastic",
  12. Password: "YQ7kq-gh3K4xi9l4akd-",
  13. CertificateFingerprint: "2d1bcafa3cb22f6a0c4b2c087409c6b0b59017d444c49456fe9e87c0c6a2db60",
  14. }
  15. es, err := elasticsearch.NewClient(cfg)
  16. log.Println(err)
  17. if err == nil {
  18. log.Println(elasticsearch.Version)
  19. log.Println(es.Info())
  20. } else {
  21. log.Println( "Something wrong with connection to Elasticsearch")
  22. }
  23. }

我们运行上面的代码,我们可以看到和之前输出一样的结果。

可以使用带有证书文件的 openssl x509 计算证书指纹:

openssl x509 -fingerprint -sha256 -noout -in /path/to/http_ca.crt

  
  1. $ pwd
  2. /Users/liuxg/elastic/elasticsearch- 8.5. 3/config/certs
  3. $ ls
  4. http.p12 http_ca.crt transport.p12
  5. $ openssl x509 -fingerprint -sha256 -noout - in http_ca.crt
  6. sha256 Fingerprint=2 D:1 B: CA :FA :3 C: B2 :2 F:6 A:0 C:4 B:2 C:08 : 74 :09 :C6 :B0 :B5 : 90 : 17 :D4 : 44 :C4 : 94 : 56 :FE :9 E: 87 :C0 :C6 :A2 :DB : 60

如果你无权访问 Elasticsearch 生成的 CA 文件,你可以使用以下脚本通过 openssl s_client 输出 Elasticsearch 实例的根 CA 指纹:


  
  1. # Replace the values of 'localhost' and '9200' to the
  2. # corresponding host and port values for the cluster.
  3. openssl s_client - connect localhost: 9200 -servername localhost -showcerts < /dev / null 2 > /dev / null \
  4. | openssl x509 -fingerprint -sha256 -noout - in /dev /stdin

  
  1. $ openssl s_client -connect localhost: 9200 -servername localhost -showcerts < /dev/ null 2> /dev/ null \
  2. > | openssl x509 -fingerprint -sha256 -noout - in /dev/stdin
  3. sha256 Fingerprint= 92: 54: 07: A7: BF: FE: AA:6 C: A9:4 C: 17:7 E: A8: E7:7 D: F9: B7: 27:2 E: 99: BF: DC:9 C: D0: 51: D1:9 F: F2:2 E: D7:9 A:4A

在上面的代码中,千万要注意的是我们代码中的 fingerprint 是没有冒号的。我们可以使用如下的命令来直接进行获得:


  
  1. $ pwd
  2. /Users/liuxg /elastic/elasticsearch- 8.5. 3/config/certs
  3. $ ls
  4. http.p12 http_ca.crt transport.p12
  5. $ openssl x509 -in http_ca.crt -sha256 -fingerprint | grep sha256 | sed 's/://g'
  6. sha256 Fingerprint= 2D1BCAFA3CB22F6A0C4B2C087409C6B0B59017D444C49456FE9E87C0C6A2DB60

HTTP Bearer 认证

HTTP Bearer 身份验证通过将令牌作为字符串传递来使用 ServiceToken 参数。 此身份验证方法由服务帐户令牌和不记名令牌使用。关于如何生成 service token,请参考我之前的文章 “Elasticsearch:无需基本身份验证即可创建用于访问的不记名令牌”。


  
  1. POST /_security/oauth2/token
  2. {
  3. "grant_type": "client_credentials"
  4. }

 main.go


  
  1. package main
  2. import (
  3. "log"
  4. "github.com/elastic/go-elasticsearch/v8"
  5. )
  6. func main() {
  7. cfg := elasticsearch.Config{
  8. Addresses: [] string{
  9. "https://localhost:9200",
  10. },
  11. CertificateFingerprint: "d293735f339412c738c5a258fe950fb2fcdaa33f772365a948abbb332a7c58b4",
  12. ServiceToken: "363rAxZZbmFaTnROSVRjT1Q0ZEVmQmszelhRAAAAAAAAAAAA",
  13. }
  14. es, err := elasticsearch.NewClient(cfg)
  15. log.Println(err)
  16. if err == nil {
  17. log.Println(elasticsearch.Version)
  18. log.Println(es.Info())
  19. } else {
  20. log.Println( "Something wrong with connection to Elasticsearch")
  21. }
  22. }

我们运行上面的代码,它会输出和上面一样的结果。我们或者使用如下的格式:


  
  1. package main
  2. import (
  3. "log"
  4. "net/http"
  5. "github.com/elastic/go-elasticsearch/v8"
  6. )
  7. func main() {
  8. cfg := elasticsearch.Config{
  9. Addresses: [] string{
  10. "https://localhost:9200",
  11. },
  12. CertificateFingerprint: "d293735f339412c738c5a258fe950fb2fcdaa33f772365a948abbb332a7c58b4",
  13. Header: http.Header( map[ string][] string{
  14. "Authorization": { "Bearer 363rAxZsR0Q0RDMzb1MtaXU1alJPMnFHMjZ3AAAAAAAAAAAA"},
  15. }),
  16. }
  17. es, err := elasticsearch.NewClient(cfg)
  18. log.Println(err)
  19. if err == nil {
  20. log.Println(elasticsearch.Version)
  21. log.Println(es.Info())
  22. } else {
  23. log.Println( "Something wrong with connection to Elasticsearch")
  24. }
  25. }

在上面,我们使用 Bearer 在 header 中的定义来实现请求。

API key 验证

我们也可以使用 API key 的方法来进行验证。我们可以参考文章 “Elasticsearch:创建 API key 接口访问 Elasticsearch” 来获取  API key。我们也可以使用如下的方法来获取 API key:

 

 

 

点击上面的 copy 按钮。拷贝生成的 API key。我们把这个 API key 应用到如下的代码中:

main.go


  
  1. package main
  2. import (
  3. "log"
  4. "github.com/elastic/go-elasticsearch/v8"
  5. )
  6. func main() {
  7. cfg := elasticsearch.Config{
  8. Addresses: [] string{
  9. "https://localhost:9200",
  10. },
  11. CertificateFingerprint: "d293735f339412c738c5a258fe950fb2fcdaa33f772365a948abbb332a7c58b4",
  12. APIKey: "dTM5Tm40VUJ2OUFNdUFLUDNnRUU6NWQ2QzhTZmhURk82MUFsUFE0a2ltUQ==",
  13. }
  14. es, err := elasticsearch.NewClient(cfg)
  15. log.Println(err)
  16. if err == nil {
  17. log.Println(elasticsearch.Version)
  18. log.Println(es.Info())
  19. } else {
  20. log.Println( "Something wrong with connection to Elasticsearch")
  21. }
  22. }

 运行上面的代码。它将成功地连接到 Elasticsearch 集群。

Retries

我们已经了解了客户端如何管理连接并针对特定条件重试请求。 现在让我们看看相关的配置选项。

默认情况下,客户端最多重试请求 3 次; 要设置不同的限制,请使用 MaxRetries 字段。 要更改应重试的响应状态代码列表,请使用 RetryOnStatus 字段。 与 RetryBackoff 选项一起,您可以使用它在服务器发送 429 Too Many Requests 响应时重试请求:

main.go


  
  1. package main
  2. import (
  3. "log"
  4. "math"
  5. "time"
  6. "github.com/elastic/go-elasticsearch/v8"
  7. )
  8. func main() {
  9. cfg := elasticsearch.Config{
  10. Addresses: [] string{
  11. "https://localhost:9200",
  12. },
  13. CertificateFingerprint: "d293735f339412c738c5a258fe950fb2fcdaa33f772365a948abbb332a7c58b4",
  14. APIKey: "dTM5Tm40VUJ2OUFNdUFLUDNnRUU6NWQ2QzhTZmhURk82MUFsUFE0a2ltUQ==",
  15. RetryOnStatus: [] int{ 429, 502, 503, 504},
  16. RetryBackoff: func(i int) time.Duration {
  17. // A simple exponential delay
  18. d := time.Duration(math.Exp2( float64(i))) * time.Second
  19. log.Printf( "Attempt: %d | Sleeping for %s...\n", i, d)
  20. return d
  21. },
  22. }
  23. es, err := elasticsearch.NewClient(cfg)
  24. log.Println(err)
  25. if err == nil {
  26. log.Println(elasticsearch.Version)
  27. log.Println(es.Info())
  28. } else {
  29. log.Println( "Something wrong with connection to Elasticsearch")
  30. }
  31. }

配置其它 HTTP 设置

要配置其他 HTTP 设置,请在配置对象中传递一个 http.Transport 对象。


  
  1. cfg := elasticsearch.Config{
  2. Transport: &http.Transport{
  3. MaxIdleConnsPerHost: 10,
  4. ResponseHeaderTimeout: time.Second,
  5. TLSClientConfig: &tls.Config{
  6. MinVersion: tls.VersionTLS12,
  7. // ...
  8. },
  9. // ...
  10. },
  11. }

有关客户端配置和自定义的更多示例,请参阅 _examples/configuration.go_examples/customization.go 文件。 有关安全配置的示例,请参阅 _examples/security

完整例子

以下示例演示了更复杂的用法。 它从集群中获取 Elasticsearch 版本,同时索引几个文档,并使用响应主体周围的轻量级包装器打印搜索结果。我们从上面的代码作为基础进行编码:

main.go


  
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. "math"
  6. "time"
  7. "github.com/elastic/go-elasticsearch/v8"
  8. )
  9. func main() {
  10. cfg := elasticsearch.Config{
  11. Addresses: [] string{
  12. "https://localhost:9200",
  13. },
  14. CertificateFingerprint: "d293735f339412c738c5a258fe950fb2fcdaa33f772365a948abbb332a7c58b4",
  15. APIKey: "dTM5Tm40VUJ2OUFNdUFLUDNnRUU6NWQ2QzhTZmhURk82MUFsUFE0a2ltUQ==",
  16. RetryOnStatus: [] int{ 429, 502, 503, 504},
  17. RetryBackoff: func(i int) time.Duration {
  18. // A simple exponential delay
  19. d := time.Duration(math.Exp2( float64(i))) * time.Second
  20. log.Printf( "Attempt: %d | Sleeping for %s...\n", i, d)
  21. return d
  22. },
  23. }
  24. log.SetFlags( 0)
  25. var (
  26. r map[ string] interface{}
  27. // wg sync.WaitGroup
  28. )
  29. es, err := elasticsearch.NewClient(cfg)
  30. log.Println(err)
  31. if err == nil {
  32. log.Println( "Successfully connected to Elasticsearch!")
  33. }
  34. // 1. Get cluster info
  35. //
  36. res, err := es.Info()
  37. if err != nil {
  38. log.Fatalf( "Error getting response: %s", err)
  39. }
  40. defer res.Body.Close()
  41. // Check response status
  42. if res.IsError() {
  43. log.Fatalf( "Error: %s", res.String())
  44. }
  45. // Deserialize the response into a map.
  46. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  47. log.Fatalf( "Error parsing the response body: %s", err)
  48. }
  49. }

运行上面的代码:

go run main.go

上面的代码运行的结果为:


  
  1. $ go run main. go
  2. < nil>
  3. Successfully connected to Elasticsearch!

它显示我们的连接到 Elasticsearch 是成功的。

我们接下来打印运行的结果:


  
  1. // Print client and server version numbers.
  2. log.Printf( "Client: %s", elasticsearch.Version)
  3. log.Printf( "Server: %s", r[ "version"].( map[ string] interface{})[ "number"])
  4. log.Println(strings.Repeat( "~", 37))

  
  1. $ go run main. go
  2. < nil>
  3. Successfully connected to Elasticsearch!
  4. Client: 8.7 .0-SNAPSHOT
  5. Server: 8.5 .3
  6. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

我们接下来运行如下的命令来下载 esapi 包:

go get github.com/elastic/go-elasticsearch/v8/esapi

main.go


  
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "log"
  7. "math"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/elastic/go-elasticsearch/v8"
  13. "github.com/elastic/go-elasticsearch/v8/esapi"
  14. )
  15. func main() {
  16. cfg := elasticsearch.Config{
  17. Addresses: [] string{
  18. "https://localhost:9200",
  19. },
  20. CertificateFingerprint: "d293735f339412c738c5a258fe950fb2fcdaa33f772365a948abbb332a7c58b4",
  21. APIKey: "dTM5Tm40VUJ2OUFNdUFLUDNnRUU6NWQ2QzhTZmhURk82MUFsUFE0a2ltUQ==",
  22. RetryOnStatus: [] int{ 429, 502, 503, 504},
  23. RetryBackoff: func(i int) time.Duration {
  24. // A simple exponential delay
  25. d := time.Duration(math.Exp2( float64(i))) * time.Second
  26. log.Printf( "Attempt: %d | Sleeping for %s...\n", i, d)
  27. return d
  28. },
  29. }
  30. log.SetFlags( 0)
  31. var (
  32. r map[ string] interface{}
  33. // wg sync.WaitGroup
  34. )
  35. es, err := elasticsearch.NewClient(cfg)
  36. log.Println(err)
  37. if err == nil {
  38. log.Println( "Successfully connected to Elasticsearch!")
  39. }
  40. // 1. Get cluster info
  41. //
  42. res, err := es.Info()
  43. if err != nil {
  44. log.Fatalf( "Error getting response: %s", err)
  45. }
  46. defer res.Body.Close()
  47. // Check response status
  48. if res.IsError() {
  49. log.Fatalf( "Error: %s", res.String())
  50. }
  51. // Deserialize the response into a map.
  52. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  53. log.Fatalf( "Error parsing the response body: %s", err)
  54. }
  55. // Print client and server version numbers.
  56. log.Printf( "Client: %s", elasticsearch.Version)
  57. log.Printf( "Server: %s", r[ "version"].( map[ string] interface{})[ "number"])
  58. log.Println(strings.Repeat( "~", 37))
  59. var wg sync.WaitGroup
  60. for i, title := range [] string{ "Test One", "Test Two"} {
  61. wg.Add( 1)
  62. go func(i int, title string) {
  63. defer wg.Done()
  64. // Build the request body.
  65. data, err := json.Marshal( struct{ Title string }{Title: title})
  66. if err != nil {
  67. log.Fatalf( "Error marshaling document: %s", err)
  68. }
  69. // Set up the request object.
  70. req := esapi.IndexRequest{
  71. Index: "test",
  72. DocumentID: strconv.Itoa(i + 1),
  73. Body: bytes.NewReader(data),
  74. Refresh: "true",
  75. }
  76. // Perform the request with the client.
  77. res, err := req.Do(context.Background(), es)
  78. if err != nil {
  79. log.Fatalf( "Error getting response: %s", err)
  80. }
  81. defer res.Body.Close()
  82. if res.IsError() {
  83. log.Printf( "[%s] Error indexing document ID=%d", res.Status(), i+ 1)
  84. } else {
  85. // Deserialize the response into a map.
  86. var r map[ string] interface{}
  87. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  88. log.Printf( "Error parsing the response body: %s", err)
  89. } else {
  90. // Print the response status and indexed document version.
  91. log.Printf( "[%s] %s; version=%d", res.Status(), r[ "result"], int(r[ "_version"].( float64)))
  92. }
  93. }
  94. }(i, title)
  95. }
  96. wg.Wait()
  97. log.Println(strings.Repeat( "-", 37))
  98. }

运行上面的代码:


  
  1. <nil>
  2. Successfully connected to Elasticsearch!
  3. Client: 8.7.0-SNAPSHOT
  4. Server: 8.5.3
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. [201 Created] created; version=1
  7. [201 Created] created; version=1
  8. -------------------------------------

上面的代码在 Elasticsearch 中创建一个叫做 test 的索引,并向里面写入两个文档。我们可以在 Kibana 中进行查看:

GET test/_search?filter_path=**.hits

 接下来,我们添加如下的代码来进行搜索:


  
  1. // 3. Search for the indexed documents
  2. //
  3. // Build the request body.
  4. var buf bytes.Buffer
  5. query := map[ string] interface{}{
  6. "query": map[ string] interface{}{
  7. "match": map[ string] interface{}{
  8. "Title": "test",
  9. },
  10. },
  11. }
  12. if err := json.NewEncoder(&buf).Encode(query); err != nil {
  13. log.Fatalf( "Error encoding query: %s", err)
  14. }
  15. // Perform the search request.
  16. res, err = es.Search(
  17. es.Search.WithContext(context.Background()),
  18. es.Search.WithIndex( "test"),
  19. es.Search.WithBody(&buf),
  20. es.Search.WithTrackTotalHits( true),
  21. es.Search.WithPretty(),
  22. )
  23. if err != nil {
  24. log.Fatalf( "Error getting response: %s", err)
  25. }
  26. defer res.Body.Close()
  27. if res.IsError() {
  28. var e map[ string] interface{}
  29. if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
  30. log.Fatalf( "Error parsing the response body: %s", err)
  31. } else {
  32. // Print the response status and error information.
  33. log.Fatalf( "[%s] %s: %s",
  34. res.Status(),
  35. e[ "error"].( map[ string] interface{})[ "type"],
  36. e[ "error"].( map[ string] interface{})[ "reason"],
  37. )
  38. }
  39. }
  40. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  41. log.Fatalf( "Error parsing the response body: %s", err)
  42. }
  43. // Print the response status, number of results, and request duration.
  44. log.Printf(
  45. "[%s] %d hits; took: %dms",
  46. res.Status(),
  47. int(r[ "hits"].( map[ string] interface{})[ "total"].( map[ string] interface{})[ "value"].( float64)),
  48. int(r[ "took"].( float64)),
  49. )
  50. // Print the ID and document source for each hit.
  51. for _, hit := range r[ "hits"].( map[ string] interface{})[ "hits"].([] interface{}) {
  52. log.Printf( " * ID=%s, %s", hit.( map[ string] interface{})[ "_id"], hit.( map[ string] interface{})[ "_source"])
  53. }
  54. log.Println(strings.Repeat( "=", 37))

运行上面的结果:


  
  1. $ go run main.go
  2. <nil>
  3. Successfully connected to Elasticsearch!
  4. Client: 8.7.0-SNAPSHOT
  5. Server: 8.5.3
  6. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  7. [200 OK] updated; version=4
  8. [200 OK] updated; version=4
  9. -------------------------------------
  10. [200 OK] 2 hits; took: 0ms
  11. * ID=1, map[Title:Test One]
  12. * ID=2, map[Title:Test Two]
  13. =====================================

上面的搜索相当于如下的搜索:


  
  1. GET test/_search
  2. {
  3. "query": {
  4. "match": {
  5. "Title": "test"
  6. }
  7. }
  8. }

它显示的搜索结果为:


  
  1. {
  2. "took": 0,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 2,
  13. "relation": "eq"
  14. },
  15. "max_score": 0.074107975,
  16. "hits": [
  17. {
  18. "_index": "test",
  19. "_id": "1",
  20. "_score": 0.074107975,
  21. "_source": {
  22. "Title": "Test One"
  23. }
  24. },
  25. {
  26. "_index": "test",
  27. "_id": "2",
  28. "_score": 0.074107975,
  29. "_source": {
  30. "Title": "Test Two"
  31. }
  32. }
  33. ]
  34. }
  35. }

最终的代码为:

main.go


  
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "log"
  7. "math"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/elastic/go-elasticsearch/v8"
  13. "github.com/elastic/go-elasticsearch/v8/esapi"
  14. )
  15. func main() {
  16. cfg := elasticsearch.Config{
  17. Addresses: [] string{
  18. "https://localhost:9200",
  19. },
  20. CertificateFingerprint: "d293735f339412c738c5a258fe950fb2fcdaa33f772365a948abbb332a7c58b4",
  21. APIKey: "dTM5Tm40VUJ2OUFNdUFLUDNnRUU6NWQ2QzhTZmhURk82MUFsUFE0a2ltUQ==",
  22. RetryOnStatus: [] int{ 429, 502, 503, 504},
  23. RetryBackoff: func(i int) time.Duration {
  24. // A simple exponential delay
  25. d := time.Duration(math.Exp2( float64(i))) * time.Second
  26. log.Printf( "Attempt: %d | Sleeping for %s...\n", i, d)
  27. return d
  28. },
  29. }
  30. log.SetFlags( 0)
  31. var (
  32. r map[ string] interface{}
  33. // wg sync.WaitGroup
  34. )
  35. es, err := elasticsearch.NewClient(cfg)
  36. log.Println(err)
  37. if err == nil {
  38. log.Println( "Successfully connected to Elasticsearch!")
  39. }
  40. // 1. Get cluster info
  41. //
  42. res, err := es.Info()
  43. if err != nil {
  44. log.Fatalf( "Error getting response: %s", err)
  45. }
  46. defer res.Body.Close()
  47. // Check response status
  48. if res.IsError() {
  49. log.Fatalf( "Error: %s", res.String())
  50. }
  51. // Deserialize the response into a map.
  52. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  53. log.Fatalf( "Error parsing the response body: %s", err)
  54. }
  55. // Print client and server version numbers.
  56. log.Printf( "Client: %s", elasticsearch.Version)
  57. log.Printf( "Server: %s", r[ "version"].( map[ string] interface{})[ "number"])
  58. log.Println(strings.Repeat( "~", 37))
  59. var wg sync.WaitGroup
  60. for i, title := range [] string{ "Test One", "Test Two"} {
  61. wg.Add( 1)
  62. go func(i int, title string) {
  63. defer wg.Done()
  64. // Build the request body.
  65. data, err := json.Marshal( struct{ Title string }{Title: title})
  66. if err != nil {
  67. log.Fatalf( "Error marshaling document: %s", err)
  68. }
  69. // Set up the request object.
  70. req := esapi.IndexRequest{
  71. Index: "test",
  72. DocumentID: strconv.Itoa(i + 1),
  73. Body: bytes.NewReader(data),
  74. Refresh: "true",
  75. }
  76. // Perform the request with the client.
  77. res, err := req.Do(context.Background(), es)
  78. if err != nil {
  79. log.Fatalf( "Error getting response: %s", err)
  80. }
  81. defer res.Body.Close()
  82. if res.IsError() {
  83. log.Printf( "[%s] Error indexing document ID=%d", res.Status(), i+ 1)
  84. } else {
  85. // Deserialize the response into a map.
  86. var r map[ string] interface{}
  87. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  88. log.Printf( "Error parsing the response body: %s", err)
  89. } else {
  90. // Print the response status and indexed document version.
  91. log.Printf( "[%s] %s; version=%d", res.Status(), r[ "result"], int(r[ "_version"].( float64)))
  92. }
  93. }
  94. }(i, title)
  95. }
  96. wg.Wait()
  97. log.Println(strings.Repeat( "-", 37))
  98. // 3. Search for the indexed documents
  99. //
  100. // Build the request body.
  101. var buf bytes.Buffer
  102. query := map[ string] interface{}{
  103. "query": map[ string] interface{}{
  104. "match": map[ string] interface{}{
  105. "Title": "test",
  106. },
  107. },
  108. }
  109. if err := json.NewEncoder(&buf).Encode(query); err != nil {
  110. log.Fatalf( "Error encoding query: %s", err)
  111. }
  112. // Perform the search request.
  113. res, err = es.Search(
  114. es.Search.WithContext(context.Background()),
  115. es.Search.WithIndex( "test"),
  116. es.Search.WithBody(&buf),
  117. es.Search.WithTrackTotalHits( true),
  118. es.Search.WithPretty(),
  119. )
  120. if err != nil {
  121. log.Fatalf( "Error getting response: %s", err)
  122. }
  123. defer res.Body.Close()
  124. if res.IsError() {
  125. var e map[ string] interface{}
  126. if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
  127. log.Fatalf( "Error parsing the response body: %s", err)
  128. } else {
  129. // Print the response status and error information.
  130. log.Fatalf( "[%s] %s: %s",
  131. res.Status(),
  132. e[ "error"].( map[ string] interface{})[ "type"],
  133. e[ "error"].( map[ string] interface{})[ "reason"],
  134. )
  135. }
  136. }
  137. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  138. log.Fatalf( "Error parsing the response body: %s", err)
  139. }
  140. // Print the response status, number of results, and request duration.
  141. log.Printf(
  142. "[%s] %d hits; took: %dms",
  143. res.Status(),
  144. int(r[ "hits"].( map[ string] interface{})[ "total"].( map[ string] interface{})[ "value"].( float64)),
  145. int(r[ "took"].( float64)),
  146. )
  147. // Print the ID and document source for each hit.
  148. for _, hit := range r[ "hits"].( map[ string] interface{})[ "hits"].([] interface{}) {
  149. log.Printf( " * ID=%s, %s", hit.( map[ string] interface{})[ "_id"], hit.( map[ string] interface{})[ "_source"])
  150. }
  151. log.Println(strings.Repeat( "=", 37))
  152. }

关于 7.x 的文章:Elasticsearch:Elasticsearch 开发入门 - Golang

更多例子:go-elasticsearch/_examples at main · elastic/go-elasticsearch · GitHub


转载:https://blog.csdn.net/UbuntuTouch/article/details/128633043
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场