Elasticsearch:从零开始创建一个 ingest pipeline 处理器
2022年9月16日 | by mebius
实际上在我之前的文章:
我已经详述了如和使用一些工具来生产相应的最基本的 ingest pipeline 的处理器。在今天的文章中,我进一步来通过一个例子来进行展示。在今天的展示中,我将使用最新的 Elastic Stack 8.4.0 来进行展示。我们将设计一个叫做 sample 的处理器。它可以把文档中的一个字段的首字母进行提取,转换为小写字母,并置于一个用户自己设定的字段中:
安装
如果你还没有安装好自己的 Elastic Stack,请参考如下的文章来安装 Elasticsearch 及 Kibana:
- 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch
- Elastic:使用 Docker 安装 Elastic Stack 8.0 并开始使用
- Kibana:如何在 Linux,MacOS 及 Windows上安装 Elastic 栈中的 Kibana
创建插件模板
就像在之前的文章 “Elasticsearch:创建一个 Elasticsearch Ingest 插件” 所描述的那样,我们可以使用elasticsearch-plugin-archtype插件来生产。我们使用如下的命令来创建一个最为基本的插件模板:
mvn archetype:generate
-DarchetypeGroupId=org.codelibs
-DarchetypeArtifactId=elasticsearch-plugin-archetype
-DarchetypeVersion=6.6.0
-DgroupId=com.liuxg
-DartifactId=elasticsearch-plugin
-Dversion=1.0-SNAPSHOT
-DpluginName=ingest
上面已经帮我们创建了一个最为基本的插件模板。它在当前的目录下创建了一个叫做 elasticsearch-plugin 的目录。我们首先进入到该目录中:
$ pwd
/Users/liuxg/java/plugins/elasticsearch-plugin
$ tree -L 8
.
├── pom.xml
└── src
└── main
├── assemblies
│ └── plugin.xml
├── java
│ └── com
│ └── liuxg
│ ├── ingestPlugin.java
│ └── rest
│ └── RestingestAction.java
└── plugin-metadata
└── plugin-descriptor.properties
上面的命令为我们生成了一个最为基本的 REST handler 的插件架构。它不是我们所需要的,我们需要对文件进行重新命名,并对文件的目录进行调整。调整后的文件架构如下:
$ pwd
/Users/liuxg/java/plugins/elasticsearch-plugin
$ tree -L 8
.
├── pom.xml
└── src
└── main
├── assemblies
│ └── plugin.xml
├── java
│ └── com
│ └── liuxg
│ ├── ingest
│ │ └── SampleProcessor.java
│ └── plugin
│ └── ingest
│ └── ingestPlugin.java
└── plugin-metadata
└── plugin-descriptor.properties
我们接下来修改 pom.xml:
pom.xml
elasticsearch-plugin
4.0.0
com.liuxg
elasticsearch-plugin
1.0-SNAPSHOT
jar
elasticsearch ingest plugin
2019
The Apache Software License, Version 2.0
http://www.apache.org/licenses/LICENSE-2.0.txt
repo
8.4.0
com.liuxg.plugin.ingest.ingestPlugin
2.11.1
1.8
1.8
maven-compiler-plugin
3.8.0
${maven.compiler.source}
${maven.compiler.target}
UTF-8
maven-surefire-plugin
2.22.1
**/*Tests.java
maven-source-plugin
3.0.1
tgcode
attach-sources
jar
maven-assembly-plugin
3.1.0
false
${project.build.directory}/releases/
${basedir}/src/main/assemblies/plugin.xml
package
single
org.elasticsearch
elasticsearch
${elasticsearch.version}
provided
org.apache.logging.log4j
log4j-api
${log4j.version}
provided
在上面,我们对如下的两行做了修改:
8.4.0
com.liuxg.plugin.ingest.ingestPlugin
我们需要把 elasticsearch.version 设置为和 Elastic Stack 一样的版本才可以得到安装。另外,我们也必须修改elasticsearch.plugin.classname,这是因为我们的文件路径发生变化了。
紧接着,我们来修改ingestPlugin.java 文件:
ingestPlugin.java
package com.liuxg.plugin.ingest;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import com.liuxg.ingest.SampleProcessor;
import java.util.Collections;
import java.util.Map;
public class ingestPlugin extends Plugin implements IngestPlugin {
@Override
public Map getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(SampleProcessor.TYPE, new SampleProcessor.Factory());
}
}
上面的代码是用来这次这个 ingest pipeline 的插件的。
再接着下来,我们修改SampleProcessor.java 文件:
SampleProcessor.java
package com.liuxg.ingest;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.Locale;
import java.util.Map;
public final class SampleProcessor extends AbstractProcessor {
public static final String TYPE = "sample";
private final String field;
private final String targetField;
private final String defaultValue;
private final boolean ignoreMissing;
public SampleProcessor(String tag, String description, String field, String targetField, boolean ignoreMissing, String defaultValue) {
super(tag, description);
this.field = field;
this.targetField = targetField;
this.ignoreMissing = ignoreMissing;
this.defaultValue = defaultValue;
}
String getField() {
return field;
}
String getTargetField() {
return targetField;
}
String getDefaultField() {
return defaultValue;
}
boolean isIgnoreMissing() {
return ignoreMissing;
}
@Override
public IngestDocument execute(IngestDocument document) {
if (!document.hasField(field, true)) {
if (ignoreMissing) {
return document;
} else {
throw new IllegalArgumentException("field [" + field + "] not present as part of path [" + field + "]");
}
}
// We fail here if the target field point to an array slot that is out of range.
// If we didn't do this then we would fail if we set the value in the target_field
// and then on failure processors would not see that value we tried to rename as we already
// removed it.
if (document.hasField(targetField, true)) {
throw new IllegalArgumentException("field [" + targetField + "] already exists");
}
Object value = document.getFieldValue(field, Object.class);
if( value!=null && value instanceof String ) {
String myValue=value.toString().trim();
if(myValue.length()>1){
try {
document.setFieldValue(targetField, myValue.substring(0,1).toLowerCase(Locale.getDefault()));
} catch (Exception e) {
// setting the value back to the original field shouldn't as we just fetched it from that field:
document.setFieldValue(field, value);
throw e;
}
}
}
return document;
}
@Override
public String getType() {
return TYPE;
}
public static final class Factory implements Processor.Factory {
@Override
public Processor create(Map processorFactories, String tag, String description, Map config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag,
config, "target_field");
String defaultValue = ConfigurationUtils.readOptionalStringProperty(TYPE, tag,
config, "defaultValue");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag,
config, "ignore_missing", false);
return new SampleProcessor(tag, description, field, targetField, ignoreMissing, defaultValue);
}
}
}
在上面,我们定义了该处理器的名字为 sample。我们将在下面的测试中进行使用。上面的实现使得我们提取一个字段的首字母,并放置于一个自定义的字段中去。我们将在下面的测试中进行展示。
编译
我们在项目的根目录下使人如下的命令来进行编译:
mvn clean install
$ pwd
/Users/liuxg/java/plugins/elasticsearch-plugin
$ mvn clean install
[INFO] Scanning for projects...
[INFO]
[INFO] --------------------------------------
[INFO] Building elasticsearch-plugin 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ elasticsearch-plugin ---
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ elasticsearch-plugin ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/liuxg/java/plugins/elasticsearch-plugin/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ elasticsearch-plugin ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /Users/liuxg/java/plugins/elasticsearch-plugin/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ elasticsearch-plugin ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/liuxg/java/plugins/elasticsearch-plugin/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ elasticsearch-plugin ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.22.1:test (default-test) @ elasticsearch-plugin ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ elasticsearch-plugin ---
[INFO] Building jar: /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT.jar
[INFO]
[INFO] >>> maven-sotgcodeurce-plugin:3.0.1:jar (attach-sources) > generate-sources @ elasticsearch-plugin >>>
[INFO]
[INFO]
编译成功后,我们可以在 target 目录先看到如下的安装文件:
$ pwd
/Users/liuxg/java/plugins/elasticsearch-plugin
$ ls target/releases/
elasticsearch-plugin-1.0-SNAPSHOT.zip
上面显示的elasticsearch-plugin-1.0-SNAPSHOT.zip 就是我们可以安装的插件文件。
安装插件并测试插件
我们接下来换到 Elasticsearch 的安装目录下,并打入如下的命令:
$ pwd
/Users/liuxg/elastic0/elasticsearch-8.4.0
$ bin/elasticsearch-plugin install file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
-> Installing file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
-> Downloading file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
[=================================================] 100%
-> Installed ingest
-> Please restart Elasticsearch to activate any plugins installed
$ ./bin/elasticsearch-plugin list
ingest
从上面的显示中,我们可以看出来 ingest 插件已经被成功地安装。我们接下来需要重新启动 Elasticsearch。这个非常重要!
等 Elasticsearch 重新启动后,我们打开 Kibana,并使用如下的命令来进行测试:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "This is a test for my custom pipeline",
"processors": [
{
"sample": {
"field": "user",
"target_field": "user_initial"
}
}
]
},
"docs": [
{
"_source": {
"user": "xiaoguo"
}
},
{
"_source": {
"user": "liu"
}
}
]
}
在上面的测试中,我们的字段 user 的值分别为 xiaoguo 及 Liu。经过我们的 sample 处理器后, 结果如下:
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"user_initial": "x",
"user": "xiaoguo"
},
"_ingest": {
tgcode "timestamp": "2022-09-08T04:00:48.922489Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"user_initial": "l",
"user": "Liu"
},
"_ingest": {
"timestamp": "2022-09-08T04:00:48.922514Z"
}
}
}
]
}
显然第一个字母被成功提取,并且把它转换为小写字母。最终把这个字母置于我们设定的 user_initial 字段中去。
为了方便大家的学习,我把最终的代码置于仓库:https://github.com/liu-xiao-guo/es-ingest-pipeline
文章来源于互联网:Elasticsearch:从零开始创建一个 ingest pipeline 处理器
相关推荐: Elasticsearch:Rank feature query – 排名功能查询
根据上下文动态地对文档进行评分是很常见的。 例如,如果你需要对某个类别内的更多文档进行评分,经典方案是提升(给低分的文档提分)基于某个值的文档,例如页面排名、点击量或类别。Elasticsearch 提供了两种基于值提高分数的新方法。 一个是 rank fea…