Elasticsearch:Apache spark 大数据集成

2022年8月25日   |   by mebius

Elasticsearch 已成为大数据架构中的常用组件,因为它提供了以下几个特性:

  • 它使你可以快速搜索大量数据。
  • 对于常见的聚合操作,它提供对大数据的实时分析。
  • 使用 Elasticsearch 聚合比使用 Spark 聚合更容易。
  • 如果你需要转向快速数据解决方案,在查询后从文档子集开始比对所有数据进行全面重新扫描要快。

用于处理数据的最常见的大数据软件现在是 Apache Spark (http://spark.apache.org/),它被认为是过时的 Hadoop MapReduce 的演变,用于将处理从磁盘移动到内存。
在本中,我们将看到如何将 Elasticsearch 集成到 Spark 中,用于写入和读取数据。 最后,我们将看到如何使用 Apache Pig 以一种简单的方式在Elasticsearch 中写入数据。

安装 Spark

要使用 Apache Spark,我们需要安装它。 这个过程非常简单,因为它的要求不是需要 Apache ZooKeeper 和 Hadoop 分布式文件系统 (HDFS) 的传统 Hadoop。 Apache Spark 可以在类似于 Elasticsearch 的独立节点安装中工作。

要安装 Apache Spark,我们将执行以下步骤:

1)从 https://spark.apache.org/downloads.html 下载二进制发行版。 对于一般用途,我建议你使用以下请求下载标准版本:

wget https://www.apache.org/dyn/closer.lua/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz

2)现在,我们可以使用 tar 提取 Spark 分发包,如下所示:

tar xzvf spark-3.3.0-bin-hadoop3.tgz

3)现在,我们可以通过执行测试来测试 Apache Spark 是否正常工作,如下:

$ cd spark-3.3.0-bin-hadoop3
$ ./bin/run-example SparkPi 10

%title插图%num

如果我们看到类似上面的输出,则标明我们的安装是成功的。

我们甚至可以之前启动 Spark Shell

./bin/spark-shell

%title插图%num

现在,可以插入要在集群中执行的命令行命令。

安装 Elasticsearch 及 Kibana

如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参阅如下的文章:

在今天的展示中,我将使用最新的 Elastic Stack 8.3.2 来进行展示。为了演示的方便,我们在安装 Elasticsearch 时,可以选择不启动 HTTPS 的访问。为此,我们可以参照之前的文章 “Elastic Stack 8.0 安装 – 保护你的 Elastic Stack 现在比以往任何时候都简单” 中的 “如何配置 Elasticsearch 只带有基本安全” 一节来进行安装。当我们安装好 Elasticsearch 及 Kibana 后,我们只需使用用户名及密码来进行访问。为了说明问题的方便,我们的超级用户 elastic 的密码设置为 password。

使用 Apache spark 摄入数据到 Elasticsearch

现在我们已经安装了 Apache Spark 及 Elasticsearch,我们可以将其配置为与 Elasticsearch 一起工作并在其中写入一些数据。现在我们已经安装了 Apache Spark,我们可以将其配置为与 Elasticsearch 一起工作并在其中写入一些数据。

1)我们需要下载 Elasticsearch Spark .jar 文件,如下:

wget https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/8.3tgcode.2/elasticsearch-hadoop-8.3.2.zip
tar xzf elasticsearch-hadoop-8.3.2.zip

或者,你也可以使用如下的方法来进行下载 elasticsearch-hadoop 安装包:

wget -c https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-8.3.2.zip
tar xzf elasticsearch-hadoop-8.3.2.zip

2)在 Elasticsearch 中访问 Spark shell 的一种快速方法是复制 Spark 的 jar 目录中所需的 Elasticsearch Hadoop 文件。 必须复制的文件是 elasticsearch-spark-20_2.11-8.3.2.jar。

$ pwd
/Users/liuxg/java/spark/spark-3.3.0-bin-hadoop3/jars
$ ls elasticsearch-spark-20_2.11-8.3.2.jar 
elasticsearch-spark-20_2.11-8.3.2.jar

从上面的版本信息中,我们可以看出来 Scala 的版本信息是 2.11。 这个在我们下面 IDE 的开发环境中一定要注意。

要使用 Apache Spark 在 Elasticsearch 中存储数据,我们将执行以下步骤:

1)在 Spark 的根目录中,通过运行以下命令启动 Spark shell 以应用 Elasticsearch 配置:

./bin/spark-shell 
   --conf spark.es.index.auto.create=true 
   --conf spark.es.net.http.auth.user=$ES_USER 
   --conf spark.es.net.http.auth.pass=$ES_PASSWORD

%title插图%num

ES_USER 和 ES_PASSWORD 是保存 Elasticsearch 集群凭据的环境变量。

2)在使用 Elasticsearch 特殊的韧性分布式数据集 (ResilientDistributed Dataset – RDD) 之前,我们将导入 Elasticsearch Spark 隐式,如下:

import org.elasticsearch.spark._

3)我们将创建两个要索引的文档,如下所示:

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "SanFran")

4)现在,我们可以创建一个 RDD 并将文档保存在 Elasticsearch 中,如下所示:

sc.makeRDD(Seq(numbers, airports)).saveToEs("spark")

%title插图%num

我们回到 Kibana 的界面进行查看:

GET spark/_search

%title插图%num

从上面的输出中我们可以看出来有两个文档已经成功地写入到 Elasticsearch 中了。

上面是如何工作的?

通过 Spark 在 Elasticsearch 中存储文档非常简单。 在 shell 上下文中启动 Spark shell 后,sc 变量可用,其中包含 SparkContext。 如果我们需要将值传递给底层 Elasticsearch 配置,我们需要在 Spark shell 命令行中进行设置。
有几种配置可以设置(如果通过命令行传递,加 spark.前缀); 以下是最常用的:

  • es.index.auto.create:如果索引不存在,则用于创建索引。
  • es.nodes:这用于定义要连接的节点列表(默认本地主机)。
  • es.port:用于定义要连接的 HTTP Elasticsearch 端口(默认 9200)。
  • es.ingest.pipeline:用于定义要使用的摄取管道(默认无)。
  • es.mapping.id:这个用来定义一个字段来提取ID值(默认无)。
  • es.mapping.parent:这用于定义一个字段以提取父值(默认无)。

简单文档可以定义为 Map[String, AnyRef],并且可以通过 RDD(集合上的特殊 Spark 抽象)对它们进行索引。 通过 org.elasticsearch.spark 中可用的隐式函数,RDD 有一个名为 saveToEs 的新方法,允许你定义要用于索引的对索引或文档:

sc.makeRDD(Seq(numbers, airports)).saveToEs("spark")

使用 meta 来写入数据

使用简单的 map 来摄取数据并不适合简单的工作。 Spark 中的最佳实践是使用案例类(case class),这样你就可以快速序列化并可以管理复杂的类型检查。 在索引期间,提供自定义 ID 会非常方便。 在下面,我们将看到如何涵盖这些问题。

要使用 Apache Spark 在 Elasticsearch 中存储数据,我们将执行以下步骤:

1)在 Spark 根目录中,通过运行以下命令启动 Spark shell 以应用 Elasticsearch 配置:

./bin/spark-shell 
   --conf spark.es.index.auto.create=true 
   --conf spark.es.net.http.auth.user=$ES_USER 
   --conf spark.es.net.http.auth.pass=$ES_PASSWORD

2)我们将导入所需的类,如下所示:

import org.elasticsearch.spark.rdd.EsSpark

3)我们将创建案例类 Person,如下:

case class Person(username:String, name:String, age:Int)

4)我们将创建两个要被索引的文档,如下所示:

val persons = Seq(Person("bob", "Bob",19), Person("susan","Susan",21))

5)现在,我们可以创建 RDD,如下:

val rdd=sc.makeRDD(persons)

6)我们可以使用 EsSpark 对它们进行索引,如下所示:

EsSpark.saveToEs(rdd, "spark2", Map("es.mapping.id" -> "username"))

%title插图%num

我们回到 Kibana 中来进行查看:

GET spark2/_search

%title插图%num

从上面的输出中,我们可以看到有两个文档被成功地写入到 Elasticsearch 中,并且它们的 id 是 Person 中的 username。

通过 IDE 写入到 Elasticsearch 中

在这个练习中,我们使用 IDE 工具来进行展示。在这里,你可以选择自己喜欢的 IDE 来进行。我选择 Intelij 来展示。你需要安装 Scala 插件。我们来创建一个叫做 SparkDemo 的项目。它的 build.sbt 如下:

build.sbt

name := "SparkDemo"

version := "0.1"

scalaVersion := "2.11.12"

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.3"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"

请注意上面的 2.11.12 scalaVersion。在上面,我们介绍了,elasticsearch-spark 在目前位置是使用 scala 2.11 版本来开发的。我们可以选择一个 Scala 的发行版本。我们需要使用到 spark-core 及 spark-sql 两个包。我们到地址https://mvnrepository.com/artifact/org.apache.spark来进行查看:

%title插图%num

%title插图%num

%title插图%num

在上面,我们可以查看到 spark-core 的想要的版本依赖。依照同样的方法,我们可以找到 spark-sql 的依赖配置。

为了能够访问 Elasticsearch,我们也可以在 IDE 中直接加载我们之前下载的elasticsearch-spark-20_2.11-8.3.2.jar 安装包:

%title插图%num

我们接下来创建如下的 scala 文件:

SparkDemo.scala

import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql._

object SparkDemo {

  def main(args: Array[String]): Unit = {
    SparkDemo.writeToIndex()
  }

  def writeToIndex(): Unit = {

    val spark = SparkSession
      .builder()
      .appName("WriteToES")
      .master("local[*]")
      .config("spark.es.nodes","localhost")
      .config("spark.es.port","9200")
      .config("spark.es.nodes.wan.only","true") // Needed for ES on AWS
      .config("spark.es.net.http.auth.user", "elastic")
      .config("spark.es.net.http.auth.pass", "password")
      .getOrCreate()

    import spark.implicits._

    val indexDocuments = Seq (
      AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
      AlbumIndex("Boston",1976,"Boston"),
      AlbumIndex("Fleetwood Mac", 1979,"Tusk")
    ).toDF

    indexDocuments.saveToEs("albumindex")
  }
}

case class AlbumIndex(artist:String, yearOfRelease:Int, albumName: String)

请注意在上面我们定义 elastic 用户的密码为 password。你需要根据自己的配置进行相应的修改。运行上面的代码。运行完后,我们可以在 Kibana 中进行查看:

GET albumindex/_search

%title插图%num

本质上,这个代码和我们在上面通过命令行来操作所生成的结果是一模一样的。它是通过AlbumIndex 这个 case class 进行写入的。

把 JSON 文件写入到 Elasticsearch 中

我们接下来创建一个如下的 JSON 文件:

$ pwd
/Users/liuxg/java/spark
$ cat sample_json
[ { "color": "red", "value": "#f00" }, { "color": "green", "value": "#0f0" }, { "color": "blue", "value": "#00f" }, { "color": "cyan", "value": "#0ff" }, { "color": "magenta", "value": "#f0f" }, { "color": "yellow", "value": "#ff0" }, { "color": "black", "value": "#000" } ]

如上所示,上面是一个非常简单的 JSON 文件。我们接下来改写我们上面书写的 SparkDemo.scala 文件:

SparkDemo.scala

import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql._

object SparkDemo {

  def main(args: Array[String]): Unit = {

    // Configuration
    val spark = SparkSession
      .builder()
      .appName("WriteJSONToES")
      .master("local[*]")
      .config("spark.es.nodes", "localhost")
      .config("spark.es.port", "9200")
      .config("spark.es.net.http.auth.user", "elastic")
      .config("spark.es.net.http.auth.pass", "password")
      .getOrCreate()

    // Create dataframe
    val frame = spark.read.json("/Users/liuxg/java/spark/sample_json")

    // Write to ES with index name in lower case
    frame.saveToEs("dataframejsonindex")
  }
}

运行上面的应用,并在 Kibana 中进行查看:

GET dataframejsonindex/_search

%title插图%num

如上所示,我们可以看到有7个文档已经被成功地写入到 Elasticsearch 中。

写入 CSV 文档到 Elasticsearch 中

如法炮制,我们也可以把 CSV 文件写入到 Elasticsearch 中。我们首先创建如下的一个 CSV 文件:

cities.csv

LatD, LatM, LatS, NS, LonD, LonM, LonS, EW", City, State
41,    5,   59, "N",     80,   39,    0, "W", "Youngstown", OH
42,   52,   48, "N",     97,   23,   23, "W", "Yankton", SD
46,   35,   59, "N",    120,   30,   36, "W", "Yakima", WA
42,   16,   12, "N",     71,   48,    0, "W", "Worcester", MA
43,   37,   48, "N",     89,   46,   11, "W", "Wisconsin Dells", WI
36,    5,   59, "N",     80,   15,    0, "W", "Winston-Salem", NC
49,   52,   48, "N",     97,    9,    0, "W", "Winnipeg", MB
39,   11,   23, "N",     78,    9,   36, "W", "Winchester", VA
34,   14,   24, "N",     77,   55,   11, "W", "Wilmington", NC
39,   45,    0, "N",     75,   33,    0, "W", "Wilmington", DE
48,    9,    0, "N",    103,   37,   12, "W", "Williston", ND
41,   15,    0, "N",     77,    0,    0, "W", "Williamsport", PA
37,   40,   48, "N",     82,   16,   47, "W", "Williamson", WV
33,   54,    0, "N",     98,   29,   23, "W", "Wichita Falls", TX
37,   41,   23, "N",     97,   20,   23, "W", "Wichita", KS
40,    4,   11, "N",     80,   43,   12, "W", "Wheeling", WV
26,   43,   11, "N",     80,    3,    0, "W", "West Palm Beach", FL
47,   25,   11, "N",    120,   19,   11, "W", "Wenatchee", WA
41,   25,   11, "N",    122,   23,   23, "W", "Weed", CA
31,   13,   11, "N",     82,   20,   59, "W", "Waycross", GA
44,   57,   35, "N",     89,   38,   23, "W", "Wausau", WI
42,   21,   36, "N",     87,   49,   48, "W", "Waukegan", IL
44,   54,    0, "N",     97,    6,   36, "W", "Watertown", SD
43,   58,   47, "N",     75,   55,   11, "W", "Watertown", NY
42,   30,    0, "N",     92,   20,   23, "W", "Waterloo", IA
41,   32,   59, "N",     73,    3,    0, "W", "Waterbury", CT
38,   53,   23, "N",     77,    1,   47, "W", "Washington", DC
41,   50,   59, "N",     79,    8,   23, "W", "Warren", PA
46,    4,   11, "N",    118,   19,   48, "W", "Walla Walla", WA
31,   32,   59, "N",     97,    8,   23, "W", "Waco", TX
38,   40,   48, "N",     87,   31,   47, "W", "Vincennes", IN
28,   48,   35, "N",     97,    0,   36, "W", "Victoria", TX
32,   20,   59, "N",     90,   52,   47, "W", "Vicksburg", MS
49,   16,   12, "N",    123,    7,   12, "W", "Vancouver", BC
46,   55,   11, "N",     98,    0,   36, "W", "Valley City", ND
30,   49,   47, "N",     83,   16,   47, "W", "Valdosta", GA
43,    6,   36, "N",     75,   13,   48, "W", "Utica", NY
39,   54,    0, "N",     79,   43,   48, "W", "Uniontown", PA
32,   20,   59, "N",     95,   18,    0, "W", "Tyler", TX
42,   33,   36, "N",    114,   28,   12, "W", "Twin Falls", ID
33,   12,   35, "N",     87,   34,   11, "W", "Tuscaloosa", AL
34,   15,   35, "N",     88,   42,   35, "W", "Tupelo", MS
36,    9,   35, "N",     95,   54,   36, "W", "Tulsa",tgcode OK
32,   13,   12, "N",    110,   58,   12, "W", "Tucson", AZ
37,   10,   11, "N",    104,   30,   36, "W", "Trinidad", CO
40,   13,   47, "N",     74,   46,   11, "W", "Trenton", NJ
44,   45,   35, "N",     85,   37,   47, "W", "Traverse City", MI
43,   39,    0, "N",     79,   22,   47, "W", "Toronto", ON
39,    2,   59, "N",     95,   40,   11, "W", "Topeka", KS
41,   39,    0, "N",     83,   32,   24, "W", "Toledo", OH
33,   25,   48, "N",     94,    3,    0, "W", "Texarkana", TX
39,   28,   12, "N",     87,   24,   36, "W", "Terre Haute", IN
27,   57,    0, "N",     82,   26,   59, "W", "Tampa", FL
30,   27,    0, "N",     84,   16,   47, "W", "Tallahassee", FL
47,   14,   24, "N",    122,   25,   48, "W", "Tacoma", WA
43,    2,   59, "N",     76,    9,    0, "W", "Syracuse", NY
32,   35,   59, "N",     82,   20,   23, "W", "Swainsboro", GA
33,   55,   11, "N",     80,   20,   59, "W", "Sumter", SC
40,   59,   24, "N",     75,   11,   24, "W", "Stroudsburg", PA
37,   57,   35, "N",    121,   17,   24, "W", "Stockton", CA
44,   31,   12, "N",     89,   34,   11, "W", "Stevens Point", WI
40,   21,   36, "N",     80,   37,   12, "W", "Steubenville", OH
40,   37,   11, "N",    103,   13,   12, "W", "Sterling", CO
38,    9,    0, "N",     79,    4,   11, "W", "Staunton", VA
39,   55,   11, "N",     83,   48,   35, "W", "Springfield", OH
37,   13,   12, "N",     93,   17,   24, "W", "Springfield", MO
42,    5,   59, "N",     72,   35,   23, "W", "Springfield", MA
39,   47,   59, "N",     89,   39,    0, "W", "Springfield", IL
47,   40,   11, "N",    117,   24,   36, "W", "Spokane", WA
41,   40,   48, "N",     86,   15,    0, "W", "South Bend", IN
43,   32,   24, "N",     96,   43,   48, "W", "Sioux Falls", SD
42,   29,   24, "N",     96,   23,   23, "W", "Sioux City", IA
32,   30,   35, "N",     93,   45,    0, "W", "Shreveport", LA
33,   38,   23, "N",     96,   36,   36, "W", "Sherman", TX
44,   47,   59, "N",    106,   57,   35, "W", "Sheridan", WYtgcode
35,   13,   47, "N",     96,   40,   48, "W", "Seminole", OK
32,   25,   11, "N",     87,    1,   11, "W", "Selma", AL
38,   42,   35, "N",     93,   13,   48, "W", "Sedalia", MO
47,   35,   59, "N",    122,   19,   48, "W", "Seattle", WA
41,   24,   35, "N",     75,   40,   11, "W", "Scranton", PA
41,   52,   11, "N",    103,   39,   36, "W", "Scottsbluff", NB
42,   49,   11, "N",     73,   56,   59, "W", "Schenectady", NY
32,    4,   48, "N",     81,    5,   23, "W", "Savannah", GA
46,   29,   24, "N",     84,   20,   59, "W", "Sault Sainte Marie", MI
27,   20,   24, "N",     82,   31,   47, "W", "Sarasota", FL
38,   26,   23, "N",    122,   43,   12, "W", "Santa Rosa", CA
35,   40,   48, "N",    105,   56,   59, "W", "Santa Fe", NM
34,   25,   11, "N",    119,   41,   59, "W", "Santa Barbara", CA
33,   45,   35, "N",    117,   52,   12, "W", "Santa Ana", CA
37,   20,   24, "N",    121,   52,   47, "W", "San Jose", CA
37,   46,   47, "N",    122,   25,   11, "W", "San Francisco", CA
41,   27,    0, "N",     82,   42,   35, "W", "Sandusky", OH
32,   42,   35, "N",    117,    9,    0, "W", "San Diego", CA
34,    6,   36, "N",    117,   18,   35, "W", "San Bernardino", CA
29,   25,   12, "N",     98,   30,    0, "W", "San Antonio", TX
31,   27,   35, "N",    100,   26,   24, "W", "San Angelo", TX
40,   45,   35, "N",    111,   52,   47, "W", "Salt Lake City", UT
38,   22,   11, "N",     75,   35,   59, "W", "Salisbury", MD
36,   40,   11, "N",    121,   39,    0, "W", "Salinas", CA
38,   50,   24, "N",     97,   36,   36, "W", "Salina", KS
38,   31,   47, "N",    106,    0,    0, "W", "Salida", CO
44,   56,   23, "N",    123,    1,   47, "W", "Salem", OR
44,   57,    0, "N",     93,    5,   59, "W", "Saint Paul", MN
38,   37,   11, "N",     90,   11,   24, "W", "Saint Louis", MO
39,   46,   12, "N",     94,   50,   23, "W", "Saint Joseph", MO
42,    5,   59, "N",     86,   28,   48, "W", "Saint Joseph", MI
44,   25,   11, "N",     72,    1,   11, "W", "Saint Johnsbury", VT
45,   34,   11, "N",     94,   10,   11, "W", "Saint Cloud", MN
29,   53,   23, "N",     81,   19,   11, "W", "Saint Augustine", FL
43,   25,   48, "N",     83,   56,   24, "W", "Saginaw", MI
38,   35,   24, "N",    121,   29,   23, "W", "Sacramento", CA
43,   36,   36, "N",     72,   58,   12, "W", "Rutland", VT
33,   24,    0, "N",    104,   31,   47, "W", "Roswell", NM
35,   56,   23, "N",     77,   48,    0, "W", "Rocky Mount", NC
41,   35,   24, "N",    109,   13,   48, "W", "Rock Springs", WY
42,   16,   12, "N",     89,    5,   59, "W", "Rockford", IL
43,    9,   35, "N",     77,   36,   36, "W", "Rochester", NY
44,    1,   12, "N",     92,   27,   35, "W", "Rochester", MN
37,   16,   12, "N",     79,   56,   24, "W", "Roanoke", VA
37,   32,   24, "N",     77,   26,   59, "W", "Richmond", VA
39,   49,   48, "N",     84,   53,   23, "W", "Richmond", IN
38,   46,   12, "N",    112,    5,   23, "W", "Richfield", UT
45,   38,   23, "N",     89,   25,   11, "W", "Rhinelander", WI
39,   31,   12, "N",    119,   48,   35, "W", "Reno", NV
50,   25,   11, "N",    104,   39,    0, "W", "Regina", SA
40,   10,   48, "N",    122,   14,   23, "W", "Red Bluff", CA
40,   19,   48, "N",     75,   55,   48, "W", "Reading", PA
41,    9,   35, "N",     81,   14,   23, "W", "Ravenna", OH 

我们重新修改上面的 SparkDemo.scala 文件:

SparkDemo.scala

import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql._
object SparkDemo {
def main(args: Array[String]): Unit = {
// Configuration
val spark = SparkSession
.builder()
.appName("WriteCSVToES")
.master("local[*]")
.config("spark.es.nodes", "localhost")
.config("spark.es.port", "9200")
.config("spark.es.net.http.auth.user", "elastic")
.config("spark.es.net.http.auth.pass", "password")
.getOrCreate()
// Create dataframe
val frame = spark.read.option("header", "true").csv("/Users/liuxg/java/spark/cities.csv")
// Write to ES with index name in lower case
frame.saveToEs("dataframecsvindex")
}
}

%title插图%num

从上面,我们可以看出来 csv 格式的文件已经被成功地写入了。共有 128 个文档被写入。

使用 Apache spark 把数据从 Elasticsearch 中导出

我们首先在 Kibana 中使用如下的命令来创建以叫做 twitter 的索引:

PUT twitter
{
"mappings": {
"properties": {
"DOB": {
"type": "date"
},
"address": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"age": {
"type": "long"
},
"city": {
"type": "keyword"
},
"country": {
"type": "keyword"
},
"message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"province": {
"type": "keyword"
},
"uid": {
"type": "long"
},
"user": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}

然后使用 bulk 指令来写入数据:

POST _bulk
{"index":{"_index":"twitter","_id":1}}
{"user":"张三","message":"今儿天气不错啊,出去转转去","uid":2,"age":20,"city":"北京","province":"北京","country":"中国","address":"中国北京市海淀区","DOB": "1999-04-01"}
{"index":{"_index":"twitter","_id":2}}
{"user":"老刘","message":"出发,下一站云南!","uid":3,"age":22,"city":"北京","province":"北京","country":"中国","address":"中国北京市东城区台基厂三条3号", "DOB": "1997-04-01"}
{"index":{"_index":"twitter","_id":3}}
{"user":"李四","message":"happy birthday!","uid":4,"age":25,"city":"北京","province":"北京","country":"中国","address":"中国北京市东城区","DOB": "1994-04-01"}
{"index":{"_index":"twitter","_id":4}}
{"user":"老贾","message":"123,gogogo","uid":5,"age":30,"city":"北京","province":"北京","country":"中国","address":"中国北京市朝阳区建国门", "DOB": "1989-04-01"}
{"index":{"_index":"twitter","_id":5}}
{"user":"老王","message":"Happy BirthDay My Friend!","uid":6,"age":26,"city":"北京","province":"北京","country":"中国","address":"中国北京市朝阳区国贸","DOB": "1993-04-01"}
{"index":{"_index":"twitter","_id":6}}
{"user":"老吴","message":"好友来了都今天我生日,好友来了,什么 birthday happy 就成!","uid":7,"age":28,"city":"上海","province":"上海","country":"中国","address":"中国上海市闵行区", "DOB": "1991-04-01"}

这样我们就有 6 个文档数据。我们重新改写我们的 SparkDemo.scala:

SparkDemo.scala

import org.apache.spark.sql.SparkSession
object SparkDemo {
def main(args: Array[String]): Unit = {
// Configuration
val spark = SparkSession
.builder()
.appName("ExportESIndex")
.master("local[*]")
.config("spark.es.nodes", "localhost")
.config("spark.es.port", "9200")
.config("spark.es.net.http.auth.user", "elastic")
.config("spark.es.net.http.auth.pass", "password")
.getOrCreate()
val reader = spark.read
.format("org.elasticsearch.spark.sql")
.option("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val df = reader.load("twitter")
println("No of records: " + df.count())
df.write.format("csv")
.option("header", true)
.mode("overwrite")
.save("file:///Users/liuxg/tmp/samples_download")
println("Job completed!")
}
}

重新运行我们的应用。我们在电脑的目录中查看生成的文件:

$ pwd
/Users/liuxg/tmp/samples_download
$ ls
_SUCCESS
part-00000-b8a5faee-2a0d-40c8-b25c-f4a5f23fba09-c000.csv
$ cat part-00000-b8a5faee-2a0d-40c8-b25c-f4a5f23fba09-c000.csv 
DOB,address,age,city,country,message,province,uid,user
1999-04-01T00:00:00.000+08:00,中国北京市海淀区,20,北京,中国,今儿天气不错啊,出去转转去,北京,2,张三
1997-04-01T00:00:00.000+08:00,中国北京市东城区台基厂三条3号,22,北京,中国,出发,下一站云南!,北京,3,老刘
1994-04-01T00:00:00.000+08:00,中国北京市东城区,25,北京,中国,happy birthday!,北京,4,李四
1989-04-01T00:00:00.000+08:00,中国北京市朝阳区建国门,30,北京,中国,"123,gogogo",北京,5,老贾
1993-04-01T00:00:00.000+08:00,中国北京市朝阳区国贸,26,北京,中国,Happy BirthDay My Friend!,北京,6,老王
1991-04-01T00:00:00.000+08:00,中国上海市闵行区,28,上海,中国,"好友来了都今天我生日,好友来了,什么 birthday happy 就成!",上海,7,老吴

参考:

【1】Apache Spark support | Elasticsearch for Apache Hadoop [8.3] | Elastic

文章来源于互联网:Elasticsearch:Apache spark 大数据集成

相关推荐: Observability:你所需要知道的关于 Syslog 的一些知识

Syslog 是一种标准,用于以特定格式从各种网络设备发送和接收通知消息。 这些消息包括时间戳、事件消息、严重性、主机 IP 地址、诊断等。 就其内置的严重性级别而言,它可以传达级别 0、紧急、级别 5、警告、系统不稳定、严重以及级别 6 和 7(即信息和tg…