Elasticsearch:在 Java 客户端中使用 scroll 来遍历搜索结果 – Elastic Stack 8.x
2022年11月10日 | by mebius
如果你搜索不经常更改的文档,则使用标准查询的分页效果非常好; 否则,使用实时数据执行分页会返回不可预测的结果。 为了绕过这个问题,Elasticsearch 在查询中提供了一个额外的参数:scroll。如果你对搜索结果分页不是很熟悉的话,请参考我之前的文章 “Elasticsearch:运用 scroll 接口对大量数据实现更好的分页”。
准备数据
在今天的练习中,为了说明问题的方便,我们使用如下的数据来进行练习:
POST _bulk
{ "index" : { "_index" : "twitter", "_id": 1} }
{"user":"双榆树-张三","message":"今儿天气不错啊,出去转转去","uid":2,"age":20,"city":"北京","province":"北京","country":"中国","address":"中国北京市海淀区","location":{"lat":"39.970718","lon":"116.325747"}}
{ "index" : { "_indextgcode" : "twitter", "_id": 2 }}
{"user":"东城区-老刘","message":"出发,下一站云南!","uid":3,"age":30,"city":"北京","province":"北京","country":"中国","address":"中国北京市东城区台基厂三条3号","location":{"lat":"39.904313","lon":"116.412754"}}
{ "index" : { "_index" : "twitter", "_id": 3} }
{"user":"东城区-李四","message":"happy birthday!","uid":4,"age":30,"city":"北京","province":"北京","country":"中国","address":"中国北京市东城区","location":{"lat":"39.893801","lon":"116.408986"}}
{ "index" : { "_index" : "twitter", "_id": 4} }
{"user":"朝阳区-老贾","message":"123,gogogo","uid":5,"age":35,"city":"北京","province":"北京","country":"中国","address":"中国北京市朝阳区建国门","location":{"lat":"39.718256","lon":"116.367910"}}
{ "index" : { "_index" : "twitter", "_id": 5} }
{"user":"朝阳区-老王","message":"Happy BirthDay My Friend!","uid":6,"age":50,"city":"北京","province":"北京","country":"中国","address":"中国北京市朝阳区国贸","location":{"lat":"39.918256","lon":"116.467910"}}
{ "index" : { "_index" : "twitter", "_id": 6} }
{"user":"虹桥-老吴","message":"好友来了都今天我生日,好友来了,什么 birthday happy 就成!","uid":7,"age":90,"city":"上海","province":"上海","country":"中国","address":"中国上海市闵行区","location":{"lat":"31.175927","lon":"121.383328"}}
在上面,我们写入6个文档到 Elasticsearch 中。在练习中,我将设置每页的文档数为 2。我们可以进行如下的搜索:
GET twitter/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"city": "北京"
}
}
],
"filter": [
{
"range": {
"age": {
"gte": 0,
"lte": 100
}
}
}
]
}
},
"size": 2
}
上面的搜索显示搜索结果中的前两个:
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
"max_score": 0.48232412,
"hits": [
{
"_index": "twitter",
"_id": "1",
"_score": 0.48232412,
"_source": {
"user": "双榆树-张三",
"message": "今儿天气不错啊,出去转转去",
"uid": 2,
"age": 20,
"city": "北京",
"province": "北京",
"country": "中国",
"address": "中国北京市海淀区"
}
},
{
"_index": "twitter",
"_id": "2",
"_score": 0.48232412,
"_source": {
"user": "东城区-老刘",
"message": "出发,下一站云南!",
"uid": 3,
"age": 30,
"city": "北京",
"province": "北京",
"countgcodetry": "中国",
"address": "中国北京市东城区台基厂三条3号"
}
}
]
}
}
从上面的显示结果中,我们可以看出来,它共有5个文档是满足搜索的条件的。按照每页 2 个文档,我们共有 3 页。那么我们该如何对搜索结果进行分页呢?我们可以使用 scroll 参数:
GET twitter/_search?scroll=2m
{
"query": {
"bool": {
"must": [
{
"match": {
"city": "北京"
}
}
],
"filter": [
{
"range": {
"age": {
"gte": 0,
"lte": 100
}
}
}
]
}
},
"size": 2
}
在上面,2m 代表2分钟之内有效。它返回的结果为:
{
"_scroll_id": "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFi1rOUlBMFdGU2tLSS0yTlMyUkdRdUEAAAAAAAFeHBZReU4zSnhXVlR5eW5WQW5Yb09RSHNR",
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
"max_score": 0.48232412,
"hits": [
{
"_index": "twitter",
"_id": "1",
"_score": 0.48232412,
"_source": {
"user": "双榆树-张三",
"message": "今儿天气不错啊,出去转转去",
"uid": 2,
"age": 20,
"city": "北京",
"province": "北京",
"country": "中国",
"address": "中国北京市海淀区"
}
},
{
"_index": "twitter",
"_id": "2",
"_score": 0.48232412,
"_source": {
"user": "东城区-老刘",
"message": "出发,下一站云南!",
"uid": 3,
"age": 30,
"city": "北京",
"province": "北京",
"country": "中国",
"address": "中国北京市东城区台基厂三条3号"
}
}
]
}
}
很显然,它返回了第一个页的两个结果,但是它同时返回了一个 _scroll_id。我们可以运用这个 _scroll_id 来返回第二页的搜索结果:
GET _search/scrtgcodeoll
{
"scroll": "2m",
"scroll_id": "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFi1rOUlBMFdGU2tLSS0yTlMyUkdRdUEAAAAAAAFeHBZReU4zSnhXVlR5eW5WQW5Yb09RSHNR"
}
上面的返回结果为:
{
"_scroll_id": "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFi1rOUlBMFdGU2tLSS0yTlMyUkdRdUEAAAAAAAFeHBZReU4zSnhXVlR5eW5WQW5Yb09RSHNR",
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
"max_score": 0.48232412,
"hits": [
{
"_index": "twitter",
"_id": "3",
"_score": 0.48232412,
"_source": {
"user": "东城区-李四",
"message": "happy birthday!",
"uid": 4,
"age": 30,
"city": "北京",
"province": "北京",
"country": "中国",
"address": "中国北京市东城区"
}
},
{
"_index": "twitter",
"_id": "4",
"_score": 0.48232412,
"_source": {
"user": "朝阳区-老贾",
"message": "123,gogogo",
"uid": 5,
"age": 35,
"city": "北京",
"province": "北京",
"country": "中国",
"address": "中国北京市朝阳区建国门"
}
}
]
}
}
我们可以运用返回的 _scroll_id 再接着返回接下来的搜索结果,直到我们的 hits 里的数组里没有数据为止。
运用 Java client APIs 来实现分页
接下来,我们来设计 Java 应用来对搜索结果进行分页。为了方便大家对代码的理解,我把最终的项目上传到 github:https://github.com/liu-xiao-guo/elasticsearchjava-scroll
首先我们创建一个叫做 Twitter 的 class:
Twitter.java
public class Twitter {
private String user;
private long uid;
private String province;
private String message;
private String country;
private String city;
private long age;
private String address;
public Twitter() {
}
public Twitter(String user, long uid, String province, String message,
String country, String city, long age, String address) {
this.user = user;
this.uid = uid;
this.province = province;
this.message = message;
this.country = country;
this.city = city;
this.age = age;
this.address = address;
}
public String getUser() {
return user;
}
public long getUid() {
return uid;
}
public String getProvince() {
return province;
}
public String getMessage() {
return message;
}
public String getCountry() {
return country;
}
public String getCity() {
return city;
}
public long getAge() {
return age;
}
public String getAddress() {
return address;
}
public void setUser(String user) {
this.user = user;
}
public void setUid(long uid) {
this.uid = uid;
}
public void setProvince(String province) {
this.province = province;
}
public void setMessage(String message) {
this.message = message;
}
public void setCountry(String country) {
this.country = country;
}
public void setCity(String city) {
this.city = city;
}
public void setAge(long age) {
this.age = age;
}
public void setAddress(String address) {
this.address = address;
}
}
这个和上面的 twitter 文档相对应。
我们接下来连接到 Elasticsearch 集群。我们可以参考之前的文章 “Elasticsearch:在 Java 客户端中使用 truststore 来创建 HTTPS 连接”。一旦连接到 Elasticsearch 后,我们可以设计如下的代码来对搜索的结果进行分页:
ElasticsearchJava.java
final String INDEX_NAME = "twitter";
SearchRequest searchRequest = new SearchRequest.
Builder().index(INDEX_NAME)
.query( q -> q.bool(b -> b
.must(must->must.match(m ->m.field("city").query("北京")))
.filter(f -> f.range(r -> r.field("age").gte(JsonData.of(0)).lte(JsonData.of(100))))
)
)
.size(2)
.scroll(Time.of(t -> t.time("2m")))
.build();
SearchResponse response = client.
search(searchRequest, Twitter.class);
do {
System.out.println("size: " + response.hits().hits().size());
for (Hit hit : response.hits().hits()) {
System.out.println("hit: " + hit.index() + ": " + hit.id());
}
final SearchResponse old_response = response;
System.out.println("scrollId: " + old_response.scrollId());
response = client.scroll(s -> s.scrollId(old_response.scrollId()).scroll(Time.of(t -> t.time("2m"))),
Twitter.class);
System.out.println("=================================");
} while (response.hits().hits().size() != 0); // 0 hits mark the end of the scroll and the while loop.
我们运行上面的代码后,我们可以看到如下的搜索结果:
size: 2
hit: twitter: 1
hit: twitter: 2
scrollId: FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFi1rOUlBMFdGU2tLSS0yTlMyUkdRdUEAAAAAAAFAnxZReU4zSnhXVlR5eW5WQW5Yb09RSHNR
=================================
size: 2
hit: twitter: 3
hit: twitter: 4
scrollId: FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFi1rOUlBMFdGU2tLSS0yTlMyUkdRdUEAAAAAAAFAnxZReU4zSnhXVlR5eW5WQW5Yb09RSHNR
=================================
size: 1
hit: twitter: 5
scrollId: FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFi1rOUlBMFdGU2tLSS0yTlMyUkdRdUEAAAAAAAFAnxZReU4zSnhXVlR5eW5WQW5Yb09RSHNR
=================================
从上面的搜索结果中,我们可以看出来它有三个页。共有5个文档被搜索到了。
文章来源于互联网:Elasticsearch:在 Java 客户端中使用 scroll 来遍历搜索结果 – Elastic Stack 8.x
Logstash 可以轻松解析 CSV 和 JSON 文件,因为这些格式的数据组织得很好,可以进行 Elasticsearch 分析。 但是,有时我们需要处理非结构化数据,例如纯文本日志。 在这些情况下,我们需要使用 Logstash Grok 或其他第三方服…