Fork me on GitHub

Spark和Elasticsearch交互实践总结

目录

  • 背景
  • 第一部分 环境依赖
  • 第二部分 交互接口
  • 第三部分 任务提交
  • 参考文献及资料

背景

为了更好的支持Spark应用和Elasticsearch交互,Elasticsearch官方推出了elasticsearch-hadoop项目。本文将详细介绍Spark Java应用和Elasticsearch的交互细节。

第一部分 环境依赖

1.1 配置Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-13_2.11</artifactId>
<version>6.8.2</version>
</dependency>

需要注意Spark版本和elasticsearch-hadoop版本的兼容性,参考版本对照表:

Spark Version Scala Version ES-Hadoop Artifact ID
1.0 - 1.2 2.10
1.0 - 1.2 2.11
1.3 - 1.6 2.10 elasticsearch-spark-13_2.10
1.3 - 1.6 2.11 elasticsearch-spark-13_2.11
2.0+ 2.10 elasticsearch-spark-20_2.10
2.0+ 2.11 elasticsearch-spark-20_2.11

1.2 Spark配置

关于elasticsearch集群的交互配置,定义在SparkConf中,例如下面的案例:

1
2
3
4
5
6
7
8
9
import org.apache.spark.SparkConf;

SparkConf sparkConf = new SparkConf().setAppName("JavaSpark").setMaster("local");
//config elasticsearch
sparkConf.set("es.nodes","192.168.31.3:9200");
sparkConf.set("es.port","9200");
sparkConf.set("es.index.auto.create","true");

JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  • es.nodes,集群节点;
  • es.port,服务端口;
  • es.index.auto.create,参数指定index是否自动创建;

其他配置参考官方文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html

第二部分 交互接口

2.1 自定义id的写入

在业务数据写入elasticsearch集群的时候,需要数据去重。这时候就需要自己指定元数据字段中的_idelasticsearch在处理_id相同的数据时,会覆盖写入。例如下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.Metadata;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashMap;
import static org.elasticsearch.spark.rdd.Metadata.ID;


try{
ArrayList<Tuple2<HashMap,HashMap>> metaList = new ArrayList<>();

for(int i=0;i<100;i++) {
HashMap<String, String> map = new HashMap<String, String>();
map.put("id", String.valueOf(i));
map.put("name", "one");

HashMap<Metadata, String> metamap = new HashMap<Metadata, String>();
metamap.put(ID, String.valueOf(i));

metaList.add(new Tuple2(metamap, map));
}

JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(metaList);
JavaEsSpark.saveToEsWithMeta(pairRdd,"spark/doc");

}catch (Exception e){
e.printStackTrace();
System.out.println("finish!");
jsc.stop();
}

例子中我们使用ArrayList<Tuple2<HashMap,HashMap>>数据结构来存储待写入的数据,然后构造RDD,最后使用JavaEsSpark.saveToEsWithMeta方法写入。需要注意这里构造的两个HashMap:

  • 数据HashMap,数据结构为:HashMap<String, String>,用于存储数据键值对。
  • 元数据HashMap,数据结构为:HashMap<Metadata, String>,用于存储元数据键值对。例如ID即为_id

其他类型读写可以参考官方网站:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

第三部分 任务提交

最后编译运行。主要是setMaster()指定运行方式,分为如下几种。

运行模式 说明
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*] Run Spark locally with as many worker threads as logical cores on your machine.
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://.... To submit with --deploy-mode cluster, the HOST:PORT should be configured to connect to the MesosClusterDispatcher.
yarn Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
yarn-client Equivalent to yarn with --deploy-mode client, which is preferred to yarn-client
yarn-cluster Equivalent to yarn with --deploy-mode cluster, which is preferred to yarn-cluster

除了在eclipse、Intellij中运行local模式的任务,也可以打成jar包,使用spark-submit来进行任务提交。

参考文献及资料

1、 Apache Spark support,链接:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

2、elasticsearch-hadoop项目,链接:https://github.com/elastic/elasticsearch-hadoop

0%