Elasticsearch:运用 Java 创建索引并写入数据
2021年10月21日 | by mebius
在我之前的文章 “Elasticsearch:Java 运用示例”,我讲述了如何在 Java 应用中创建一个索引,并写入数据。在今天的例子中,我来着重讲述如何有目的地创建按照我们需求的索引,并介绍几种常见的方法写入数据。
安装
我们首先参考如下的文章来安装我们需要的 Elasticsearch 及 Kibana:
- 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch
- Kibana:如何在 Linux,MacOS 及 Windows上安装 Elastic 栈中的 Kibana
针对我们如下的练习,我们的 Elasticsearch 的访问地址为 http://localhost:9200。
此外,针对 Elastic Stack 7.15 及以后的版本,强烈建议对 Elasticsearch 进行安全配置。你可以参考文章 “Elasticsearch:设置 Elastic 账户安全”。我的 Elasticsearch 集群的超级用户 elastic 的密码为 password。
创建 Java 应用
我们用自己喜欢的 IDE 创建一个 Java 应用。在本例tgcode中,我将创建一个 Maven 应用:
pom.xml
4.0.0
org.liuxg
Elasticsearch-Java
1.0-SNAPSHOT
org.elasticsearch.client
elasticsearch-rest-high-level-client
${elastic.version}
org.elasticsearch.client
elasticsearch-rest-client
${elastic.version}
org.elasticsearch
elasticsearch
${elastic.version}
com.fasterxml.jackson.core
jackson-databind
2.11.1
8
8
7.15.0
在上面,我们创建一个叫做ElasticsearchJava 的 class:
ElasticsearchJava.java
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
public class ElasticsearchJava {
private static RestHighLevelClient client = null;
private static synchronized RestHighLevelClient makeConnection() {
final BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
basicCredentialsProvider
.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "password"));
if (client == null) {
client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(basicCredentialsProvider);
}
})
);
}
return client;
}
public static void main(String[] args) {
client = makeConnection();
}
}
在上面,我们创建了一个和 Elasticsearch 的连接。请注意我们使用了超级用户 elastic 极其密码。如果我们没有为我们的集群设置密码的话,我们其实可以非常简单地使用如下的代码来进行连接:
private static synchronized RestHighLevelClient makeConnection() {
if(client == null) {
restHighLevelClient = new RestHighLevelClient(
RestClient.builder( new HttpHost("localhost", "9200", "http")));
}
return client;
}
接下来,我们参照 Elastic 官方文档 “Create Index API”,我们使用如下的代码:
public static void main(String[] args) throws IOException {
client = makeConnection();
CreateIndexRequest request = new CreateIndexRequest("employees");
request.settings(Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println("response id: " + createIndexResponse.index());
}
}
在上面,我们建立一个连接,并创建一个叫做 employees 的索引。请注意在索引的名字中,我们不可以有大写字母,否则会导致错误。在上面,我们设置 number_of_shards 为1, number_of_replicas 为0。上面代码的输出为:
response id: employees
我们可以使用 Kibana 来检查我们的结果:
从上面的结果中,我们可以看出来 employees 索引已经被成功地创建。
也许你对创建一个索引的 mapping 也感兴趣,那么你可以使用如下的代码来实现:
public static void main(String[] args) throws IOException {
client = makeConnection();
String mappings = "{n" +
" "properties": {n" +
" "id": {n" +
" "type": "keyword"n" +
" },n" +
" "name": {n" +
" "type": "text"n" +
" }n" + " }n" +
"}";
System.out.println("mapping is as follows: ");
System.out.println(mappings);
try {
CreateIndexRequest request = new CreateIndexRequest("employees");
request.settings(Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
);
request.mapping(mappings, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println("response id: " + createIndexResponse.index());
} catch (Exception e) {
// e.printStackTrace();
}
}
上面代码运行的结果为:
mapping is as follows:
{
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text"
}
}
}
response id: employees
我们可以在 Kibana 中进行查看:
GET employees/_mapping
上面命令的结果为:
{
"employees" : {
"mappings" : {
"properties" : {
"id" : {
"type" : "keyword"
},
"name" : {
"type" : "text"
}
}
}
}
}
上面的代码类似于 Kibana 中如下的命令:
PUT employees
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text"
}
}
}
}
上面的命令创建一个叫做 employees 的索引,并对它进行设置和定义mappings。
接下来,我们参照另外一个文档 “Index API” 来对已经创建的索引进行写入操作:
// Write documents into employees index
IndexRequest request = new IndexRequest("employees");
request.id("1");
String jsonString = "{" +
""id":"1"," +
""name":"liuxg"" +
"}";
request.source(jsonString, XContentType.JSON);
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
System.out.println("response id: "+indexResponse.getId());
System.out.println("response name: "+indexResponse.getResult().name());
上面的代码想已经创建的 employees 索引写入文档。其中的文档时以 JSON 形式写入的。当然如果我们之前没有创建 employees 这个索引,上面的 API 也将会自动帮我们生成 employees 这个索引,并把相应的文档写入。当然这个索引的 settings 及 mappings 也许不是我们想要的,而是系统按照默认的方式给出的。
上面的命令类似于在 Kibana 中的如下的命令:
PUT employees/_doc/1
{
"id": "1",
"name": "liuxg"
}
我们重新编译并运行我们的代码:
mapping is as follows:
{
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text"
}
}
}
response id: 1
response name: CREATED
运行完后,我们可以在 Kibana 中通过如下的方式来进行查看:
GET employees/_search
上面的命令显示的结果为:
{
"took" : 7,
"timed_out" : false,
"_shards" : {
"total" : 1,
"succestgcodesful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : "1",
"name" : "liuxg"
}
}
]
}
}
我们可以看到文档已经被正确地写入了。
接下来,我们介绍另外一种写入的方法:
// Method 2: Write documents into employees index
Map jsonMap = new HashMap();
jsonMap.put("id", "2");
jsonMap.put("name", "Nancy");
IndexRequest indexRequest = new IndexRequest("employees")
.id("2").source(jsonMap);
IndexResponse indexResponse2 = client.index(indexRequest, RequestOptions.DEFAULT);
System.out.println("response id: "+indexResponse2.getId());
System.out.println("response name: "+indexResponse2.getResult().name());
运行代码:
mapping is as follows:
{
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text"
}
}
}
response id: 1
response name: UPDATED
response id: 2
response name: CREATED
我们重新在 Kibana 中进行查看:
GET employees/_search
上面的命令显示的结果为:
{
"took" : 165,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : "1",
"name" : "liuxg"
}
},
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"name" : "Nancy",
"id" : "2"
}
}
]
}
}
我们看到文档 2 已经被正确地写入。
按照官方的文档,我们可以有另外一种写入的方法:
// Method 3: Write documents into employees index
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("id", "3");
builder.field("name", "Jason");
}
builder.endObject();
IndexRequest indexRequest3 = new IndexRequest("employees")
.id("3").source(builder);
IndexResponse indexResponse3 = client.index(indexRequest3, RequestOptions.DEFAULT);
System.out.println("response id: "+indexResponse3.getId());
System.out.println("response name: "+indexResponse3.getResult().name());
还有:
// Method 4: Write documents into employees index
IndexRequest indexRequest4 = new ItgcodendexRequest("employees")
.id("4")
.source("id", "4",
"name", "Mark");
IndexResponse indexResponse4 = client.index(indexRequest4, RequestOptions.DEFAULT);
System.out.println("response id: "+indexResponse4.getId());
System.out.println("response name: "+indexResponse4.getResult().name());
运行上面的代码,我们可以在 Kibana 中进行查看:
"hits" : [
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : "1",
"name" : "liuxg"
}
},
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"name" : "Nancy",
"id" : "2"
}
},
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"id" : "3",
"name" : "Jason"
}
},
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"id" : "4",
"name" : "Mark"
}
}
]
最后,我们创建一个叫做 Employee 的 Java class:
Employee.java
public class Employee {
private String id;
private String name;
public Employee(String id, String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
public void setId(String id) {
this.id = id;
}
public void setName(String name) {
this.name = name;
}
}
在上面一定要注意的是要实现 setters 及 getters。
我们接下来使用如下的代码来写入:
// Method 5: Write documents into employees index
Employee employee = new Employee("5", "Martin");
IndexRequest indexRequest5 = new IndexRequest("employees");
indexRequest.id("5");
indexRequest.source(new ObjectMapper().writeValueAsString(employee), XContentType.JSON);
IndexResponse indexResponse5 = client.index(indexRequest, RequestOptions.DEFAULT);
System.out.println("response id: "+indexResponse5.getId());
System.out.println("response name: "+indexResponse5.getResult().name());
重新运行代码,并在 Kibana 中进行查看:
GET employees/_search
我们可以看到如下的结果:
"hits" : [
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : "1",
"name" : "liuxg"
}
},
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"name" : "Nancy",
"id" : "2"
}
},
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"id" : "3",
"name" : "Jason"
}
},
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"id" : "4",
"name" : "Mark"
}
},
{
"_index" : "employees",
"_type" : "_doc",
"_id" : "5",
"_score" : 1.0,
"_source" : {
"id" : "5",
"name" : "Martin"
}
}
]
为了方便大家学习,我把源码放于 github:https://github.com/liu-xiao-guo/Elasticsearch-java
文章来源于互联网:Elasticsearch:运用 Java 创建索引并写入数据
相关推荐: Logstash:Jdbc_static filter plugin 介绍
在 Logstash 的过滤器中。有许多的过滤器是可以用来和外部的数据对 pipeline 里的数据进行丰富的。Jdbc_static 过滤器就是其中的一个。此过滤器使用从远程数据库预加载的数据丰富事件。我们之所以选择这个,是因为你不必频繁查询你的数据库,也不…