Fork me on GitHub

PySpark和Kafka交互总结

目录

  • 背景

  • 第一部分 Receiver接口模式

  • 第二部分 Direct接口模式

  • 第三部分 PySpark和Kafka交互

  • 第四部分 任务提交

  • 参考文献及资料

背景

Apache Kafka项目是大数据处理中重要的消息引擎。Spark Streaming作为重要的流处理计算框架通常和Kafka结合使用。Spark Streaming(新旧版本)支持Kafka的编程模型有两种:Receiver模式和Direct模式。Direct模式在Apache Spark 1.3中最新引入,下面是版本演进过程(截止Spark 2.1)。

SparkStreamingHistory

本文主要介绍Spark Streaming和Kafka进行交互模式和PySpark实现。

0.0 Kafka的Consumer API

在正式介绍前,我们首先介绍一下Kafka Consumer API。Kafka在0.9版本前提供了两种版本的Consumer:高级版和简单版。

  • 高级版。消费者客户端不需要去管理偏移量(offset)状态,而是由Zookeeper管理。消费者中断重启后,可以根据上一次记录在Zookeeper中的offset状态信息,继续获取数据(默认为1分钟更新一次Zookeeper中的offset)。通俗的说就是全auto,最适合小白,哈哈。

  • 简单版。消费者客户端需要自行管理offset状态信息(持久化为文件、数据库或者内存等)。

两个客户端的主要差异就是:谁来负责管理offset状态。但是高级的不一定就好,需要具体架构场景具体分析,适合才是最好的,哈哈(扯远了)。

Kafka项目组在0.8.1版本开始重写生产者API,在0.9版本完成新版本API的发版。新版的Consumer API使用原生Java编写(居然不用scala了,啧啧)。当然新版本API也不依赖于Zookeeper。详细细节可以参考官方文章:《Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client》。

第一部分 Receiver模式

1.1 原始方式

最开始Kafka和Spark Streaming集成是通过Receiver。即在每个Spark Executor中实例化Apache Kafka”高级版” Consumer API,启动一个Receiver,实现异步消费Kafka消息,并存储在执行程序的内存中,同时更新在Zookeeper中偏移量(offset)。接着Spark Streaming Driver启动作业(jobs)处理已经接受到的缓存数据。

ReceiverOld

这种方式是朴素和高效的,但是不具备容错性,遇到故障或者Executor重启任务,均会导致缓存消息的丢失和offset状态的滞后。对于一些重要业务数据处理场景,这是无法容忍的架构缺陷。

1.2 预写日志(WAL)模式

为了保证容错性,最简单的架构思路就是”存储状态”。Spark项目组引入了预写日志机制(WAL,Write Ahead Logs)。主要思想是:每个Spark Executor中Receiver接受到的数据持久化到分布式文件系统(HDFS)中,数据只有完成持久化,Receiver才会发起更新Zookeeper中的偏移量状态。当发生故障或者Executor重启时,从持久化的预写日志中恢复数据。

ReceiverWAL

引入WAL后,保证了数据不会丢失。但是有这样的场景:Receiver接收到数据并持久化到WAL,但是系统在更新Zookeeper中相应的偏移量时出现故障,更新失败。当从故障中恢复后,由于offset滞后性,就会出现部分数据重复消费。

之所以出现这种场景,因为架构上无法保证状态信息在两个系统中保持数据一致(exactly-once)。为了避免这种情况,架构上需要只指定一个系统来维护这个状态信息。

Spark项目最后决定由Spark侧来管理offset状态,将offset信息持久化在Spark Streaming中(持久化在checkpoint中(hdfs文件系统)、数据库等,如果是client模式,可以存放在client本地文件系统等),当然与Kafka交互的API也要换成”简单版” Consumer API。这就是下面要介绍的Direct接口方式。

第二部分 Direct模式

2.1 Direct模式介绍

Apache Spark 1.3中项目组引入了Direct接口方式。Direct方式抛弃了Receiver,采取周期性(Batch Intervel)获取Kafka中每个topic的所有partition中的最新offsets信息,然后根据参数spark.streaming.kafka.maxRatePerPartition设置的速度来消费数据。这就避免了和Zookeeper中偏移量的不一致的问题。而且可以保证即使出现故障,每个记录仅仅被消费一次。

具体消费速度计算如下:

假设Spark window窗口设置为60s(60s拉取一次Kafka数据);Kafka中该Topic有3个Partitions;maxRatePerPartition设置为100。那么每次拉取的最大数据量为: 60* 3 * 100 条数据。

Direct

2.2 两种方式的比较

Direct模式相比Receiver模式的优势有:

  • Receiver模式需要在内存中和预写日志中保存两份数据,如果数据量较大,特别是当任务的作业出现大量延迟(delay)的时候,会占用大量存储资源。而Direct模式只有在计算的时候才会去拉取Kafka侧数据,Kafka侧仍然要充当数据的缓冲角色(削峰填谷)。
  • Direct模式中,Kafka中的partition与RDD中的partition是一一对应的(即一个KafkaRDDIterator对应一个 KafkaRDDPartition)并行读取Kafka数据,即天然利用了并发处理优势。而Receiver模式需要创建多个Receiver之后,可以利用union方法合并成一个Dstream的方式提高数据传输的并行度(后面程序实现将详细介绍)。
  • Direct模式保证了流计算Spark Streaming和Kafka管道数据 at least once语义。

第三部分 PySpark和Kafka交互

3.1 PySpark中Receiver接口

3.1.1 接口说明

pyspark.streaming.kafka文件中的KafkaUtils类:

1
2
3
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder)

参数说明:

  • ssc:StreamingContext对象;

  • zkQuorum:Zookeeper集群地址,格式为:hostname:port,hostname:port,..,逗号隔开;

  • groupId:消费者群组名;

  • topics:主题名,字典类型。例如:{“test”:1},其中1表示线程数量(core)。

  • kafkaParams:可以补充的Kafka其他参数,字典数据类型;如果非空,其他Kafka的参数设置失效。

  • storageLevel:RDD的存储级别,StorageLevel参数决定如何存储RDD。在Spark中,StorageLevel决定RDD是应该存储在内存中还是存储在磁盘上,或两者都存储。它还决定是否序列化RDD以及是否复制RDD分区。参数类型有:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    DISK_ONLY = StorageLevel(True,False,False,False,1)
    DISK_ONLY_2 = StorageLevel(True,False,False,False,2)
    MEMORY_AND_DISK = StorageLevel(True,True,False,False,1)
    MEMORY_AND_DISK_2 = StorageLevel(True,True,False,False,2)
    MEMORY_AND_DISK_SER = StorageLevel(True,True,False,False,1)
    MEMORY_AND_DISK_SER_2 = StorageLevel(True,True,False,False,2)
    MEMORY_ONLY = StorageLevel(False,True,False,False,1)
    MEMORY_ONLY_2 = StorageLevel(False,True,False,False,2)
    MEMORY_ONLY_SER = StorageLevel(False,True,False,False,1)
    MEMORY_ONLY_SER_2 = StorageLevel(False,True,False,False,2)
    OFF_HEAP = StorageLevel(True,True,True,False,1)

    从命名规范也很容易理解,MEMORY是内存,DISK是磁盘,SER表示是否序列化,数字是副本数量。那么默认参数:MEMORY_AND_DISK_2,含义是:数据同时存储内存和磁盘,并且副本数量为2(存储在不同节点)。即开启了WAL机制。

3.1.2 案例介绍

下面是计算单词数的流处理任务代码案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

zkQuorum = "localhost:2181"
topics = "test"
groupid = "test"

if __name__ == "__main__":
SparkConf = SparkConf()
SparkConf.set("spark.streaming.kafka.maxRatePerPartition",5000)
# 开启WAL机制
#sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
sc = SparkContext(appName="PythonStreamingKafkaWordCount", conf=SparkConf)
ssc = StreamingContext(sc, 20)

# 偏移量模式:largest、smallest、none,默认largest
kafkaParams = {"auto.offset.reset": "smallest"}
# 提高并行度
numStreams = 3
kafkaStreams = [KafkaUtils.createStream(ssc, zkQuorum, groupid, {topics:1}, kafkaParams) for _ in range (numStreams)]
unifiedStream = ssc.union(*kafkaStreams)
# 统计单词数
lines = unifiedStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
# 打印
counts.pprint()
ssc.start()
ssc.awaitTermination()

关于auto.offset.reset参数补充说明一下。原生的API中这个参数有三个值:

  1. earliest:自动将偏移重置为最早的偏移量;
  2. latest(默认值):自动将偏移量重置为最新偏移量;
  3. none:如果consumer group中没有发现先前的偏移量,则抛出异常;

与Spark Streaming整合后,有两个参数:

  1. smallest:从头开始消费,等价于上面的 earliest;
  2. largest(默认值):从最新的开始消费,等价于上面的 latest;

spark-streaming-kafka-0-10新客户端中,这个参数也有none值(offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面)。程序具体判断逻辑是:如果存在已经提交的offest时,不管设置为earliest 或者latest 都会从已经提交的offest处开始消费。如果不存在已经提交的offest时,使用参数auto.offset.reset的值。当值为none时,topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常。

程序中我们将并发度设置为3,即实例化了3个DStreams,最后union合并。这个处理效果可以在作业的UI界面中看到差异:

补充UI界面。

https://www.cnblogs.com/juncaoit/p/9452333.html

3.2 PySpark中Direct接口

Python API在Spark 1.4中引入了此功能。

3.2.1 接口说明

pyspark.streaming.kafka文件中的KafkaUtils类:

1
2
3
def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
messageHandler=None):

参数说明:

  • ssc:StreamingContext对象;

  • topics:主题名,List数据类型(支持多个topic同时消费);

  • kafkaParams:Kafka参数,字典格式;

  • fromOffsets:offset状态信息(Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.)。如果没有指定,则使用参数:{"auto.offset.reset": "largest"}

需要特别注意的是:createDirectStreamspark-streaming-kafka-0-8下不支持group id模式,因为它是使用“简单版”Kafka API。后续spark-streaming-kafka-0-10开始提供对group id模式的支持。在0.9.0.0中,引入了新的Java 客户端API,以替代旧的基于Scala的简单和高级API。Spark Streaming integration for Kafka 0.10是使用新的API,支持group id参数。

3.2.2 案例介绍

下面的是接口的使用案例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

brokers = "localhost:9092"
topic = "test"

offsetRanges = []

def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print(o.topic,o.partition,o.fromOffset,o.untilOffset,o.untilOffset-o.fromOffset)

if __name__ == "__main__":
conf = SparkConf().set("spark.streaming.kafka.maxRatePerPartition", 5000)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount",conf=conf)
ssc = StreamingContext(sc, 60)

kafkaStreams = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": brokers})
lines = kafkaStreams.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(' ')) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
kafkaStreams.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
counts.pprint()
ssc.start()
ssc.awaitTermination()

我们启动一个Kafka 生产者每秒发送一条信息hello world,截取回显:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
test 1 4599722 4599724 2 
test 0 4596564 4596567 3
test 2 4605282 4605282 0
-------------------------------------------
Time: 2020-06-19 16:22:00
-------------------------------------------
('world', 5)
('hello', 5)

test 1 4599724 4599748 24
test 0 4596567 4596586 19
test 2 4605282 4605299 17
-------------------------------------------
Time: 2020-06-19 16:23:00
-------------------------------------------
('world', 60)
('hello', 60)

回显中:

1
2
test 1 4599724 4599748 24 分别对应下面的字段:
o.topic(主题名) o.partition(分区名) o.fromOffset(起始位移),o.untilOffset(终止位移),o.untilOffset-o.fromOffset(终止位移和起始位移的差)

上面的程序我们只是打印了offset的状态信息(参考官方案例),但是并没有对offset状态进行持久化处理。如果任务故障或终止,重新启动时候,任务会重新开始消费。在实际生产环境中,一些高可用数据处理场景,这是不可容忍的。offset状态数据需要进行高可用持久化处理。这就是接下来我们介绍的checkpoint机制。

3.3 checkpoint机制

3.3.1 持久化(persist)和Checkpoint机制

Spark中对于RDD的容错性是通过storageLevel参数设置RDD存储级别(持久化级别)。persist()的默认参数为MEMORY_ONLY(即storageLevel=MEMORY_ONLY),即内存缓存,称为缓存(cache)。在这种存储级别下,Spark计算出的RDD结果将缓存在内存中,一旦计算任务中一个执行器(Executor)故障下宕,缓存在该执行器的RDD数据就会丢失,执行器重建后需要通过RDD依赖链重新计算。

例如Receiver模式,Pyspark中接口默认使用MEMORY_AND_DISK_2。在该存储级别下,RDD除了缓存在内存还会持久化到磁盘(2份),当执行器失败可以从磁盘中加载持久化数据。但是一旦Spark的Driver故障下宕或者任务正常结束,计算的所有存储资源将被集群回收。

而Checkpoint机制将RDD持久化到HDFS文件系统,天然的利用了HDFS的分布式高可用文件系统特性。

3.3.2 Checkpoint机制的实现

前面的Receiver模式的例子中,我们开启了WAL机制。但是这种机制是执行器级别的高可用。这里我们提高高可用级别,增加checkpoint机制将RDD持久化到HDFS分布式文件系统中。这样即使Spark流任务的Driver重启依然能从checkpoint重启启动,继续消费数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# -*- coding: utf-8 -*-

import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# 定义checkpoint
checkpointDirectory = "hdfs://user/python/kafka/checkpoint"

zkQuorum = "localhost:2181"
topics = "test"
groupid = "test"

def functionToCreateContext():
sparkConf = SparkConf()
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
sc = SparkContext(appName="PythonStreamingKafkaWordCount",conf=sparkConf)
ssc = StreamingContext(sc, 60)

global zkQuorum
global topics
kafkaStreams = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kafkaStreams.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.checkpoint(checkpointDirectory) # set checkpoint directory
return ssc

if __name__ == "__main__":
zkQuorum = "localhost:2181"
topics = "test"
groupid = "test"
#如果 checkpointDirectory 目录存在,则context对象会从检查点数据重新构建出来。如果该目录不存在(如:首 #次运行),则 functionToCreateContext 函数会被调用,创建一个新的StreamingContext对象并定义好 #DStream数据流。
ssc = StreamingContext.getOrCreate(checkpointDirectory, lambda: functionToCreateContext())
ssc.start()
ssc.awaitTermination()

但启动一个具有checkpoint机制的spark任务的时候。通过函数getOrCreate实现:

StreamingContext.getOrCreate(checkpointDirectory, lambda: functionToCreateContext())

函数参数的具体含义就是:首先检查是否有checkpoint(即checkpointDirectory)。如果非空,StreamingContext从checkpointDirectory加载启动。如果没有执行函数functionToCreateContext()创建(函数中已经声明了创建逻辑)。具体数据流参考下图:

spark-streaming-checkpoint-in-apache-spark-1

3.4 自行管理offset状态

除了checkpoint机制,例如下面代码将offset信息持久化到本地文件系统。需要注意的是该持久方式需要任务为Client模式提交集群,否则保存在Drive中本地文件系统会被集群回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/python
# coding=utf-8
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
import time
import os
import json
broker_list = "xxxx"
topic_name = "xxxx"
timer = 60
offsetRanges = []

def store_offset_ranges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd

def save_offset_ranges(rdd):
root_path = os.path.dirname(os.path.realpath(__file__))
record_path = os.path.join(root_path, "offset.txt")
data = dict()
f = open(record_path, "w")
for o in offsetRanges:
data = {"topic": o.topic, "partition": o.partition, "fromOffset": o.fromOffset, "untilOffset": o.untilOffset}
f.write(json.dumps(data))
f.close()

def deal_data(rdd):
data = rdd.collect()
for d in data:
# do something
pass

def save_by_spark_streaming():
# 定义本地文件系统
root_path = os.path.dirname(os.path.realpath(__file__))
record_path = os.path.join(root_path, "offset.txt")
# from_offsets = {}
# 获取已有的offset,没有记录文件时则用默认值即最大值
if os.path.exists(record_path):
f = open(record_path, "r")
offset_data = json.loads(f.read())
f.close()
if offset_data["topic"] != topic_name:
raise Exception("the topic name in offset.txt is incorrect")

topic_partion = TopicAndPartition(offset_data["topic"], offset_data["partition"])
from_offsets = {topic_partion: long(offset_data["untilOffset"])}
# 注意设置起始offset时的方法
print("start from offsets: %s" % from_offsets)

sc = SparkContext(appName="Realtime-Analytics-Engine")
ssc = StreamingContext(sc, int(timer))

kafkaStreams = KafkaUtils.createDirectStream(ssc=ssc, topics=[topic_name], fromOffsets=from_offsets,kafkaParams={"metadata.broker.list": broker_list})
kafkaStreams.foreachRDD(lambda rec: deal_data(rec))
kafkaStreams.transform(store_offset_ranges).foreachRDD(save_offset_ranges)

ssc.start()
ssc.awaitTermination()
#ssc.stop()

if __name__ == '__main__':
save_by_spark_streaming()

上面的例子中,将offset信息以json格式持久化到文件系统中。实际生产中建议以二维表形式存储在Mysql等数据库中进行持久化(不再详细举例)。

java 保存在mysql的例子

https://www.jianshu.com/p/2369a020e604

https://blog.csdn.net/lxb1022/article/details/78041168

其他交互接口:

https://www.cnblogs.com/yanshw/p/11929180.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from kafka import SimpleClient
from kafka.structs import OffsetRequestPayload

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
from pyspark import SparkContext

from algo_core.utils.hbase_util import HappyBaseUtil # 自己封装的happybase包

def save_offsets(topic_name, group_id, offset_ranges, hbase_table_name):
happybase_util = HappyBaseUtil()
for offset in offset_ranges:
happybase_util.put(hbase_table_name, topic_name+"_"+group_id, {"offsets:"+str(offset.partition): str(offset.untilOffset)})

def get_last_committed_offsets(topic_name, group_id, hbase_table_name):
# client = SimpleClient('localhost:9092')
client = SimpleClient(["xxxxx:9092","xxxxx:9092","xxxxx:9092"])
# 获取zookeeper中kafka topic的partition
topic_partition_ids = client.get_partition_ids_for_topic(topic_name)

happybase_util = HappyBaseUtil()
# 获取hbase存放的kafka topic的partition
partition_offset_values = happybase_util.get_row(hbase_table_name, row=topic_name+"_"+group_id)

if len(partition_offset_values) == 0:
# 第一次运行处理
partitions = client.topic_partitions[topic_name]
offset_requests = [OffsetRequestPayload(topic_name, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
offsets = dict((TopicAndPartition(topic_name, r.partition), r.offsets[0]) for r in offsets_responses)

elif len(partition_offset_values) < len(topic_partition_ids):
# 如果hbase中partition个数小于zookeeper中partition的个数,说明有新增的partition,新增的partition偏移量设为0
offsets = dict((TopicAndPartition(topic_name, int(k.decode("utf-8").split(":")[1])), int(v))
for k, v in partition_offset_values.items())
extra_partitions = dict((TopicAndPartition(topic_name, i), 0)
for i in range(len(topic_partition_ids), len(partition_offset_values)))
offsets.update(extra_partitions)
else:
offsets = dict((TopicAndPartition(topic_name, int(k.decode("utf-8").split(":")[1])), int(v))
for k, v in partition_offset_values.items())

return offsets


if __name__ == "__main__":
sc = SparkContext(appName="test")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 5)
# kafka_params = {"metadata.broker.list": "localhost:9092"}
kafka_params = {"metadata.broker.list": "xxxxx:9092,xxxxx:9092,xxxxx:9092"}

# fromOffset = get_last_committed_offsets("test", "test-id", "stream_kafka_offsets")
fromOffset = get_last_committed_offsets("mytopic", "test-group-2", "stream_kafka_offsets")

# kafkaStream = KafkaUtils.createDirectStream(ssc, ["test"], kafka_params, fromOffsets=fromOffset)
kafkaStream = KafkaUtils.createDirectStream(ssc, ["mytopic"], kafka_params, fromOffsets=fromOffset)

def inner_func(rdd):
rdd.foreach(lambda x: print(x))
save_offsets("mytopic", "test-group-2", rdd.offsetRanges(),"stream_kafka_offsets")

kafkaStream.foreachRDD(inner_func)

ssc.start()
ssc.awaitTermination()

第四部分 任务的提交

4.1 兼容性

Spark 针对 Kafka 的不同版本(主要还是以0.8版本为重要分界线),提供了两种方案:spark-streaming-kafka-0-8spark-streaming-kafka-0-10,其主要区别如下:

spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
Broker Version(Kafka版本) 0.8.2.1 or higher 0.10.0 or higher
Api Stability Stable Experimental
Language Support Scala, Java, Python Scala, Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit Api No Yes
Dynamic Topic Subscription No Yes

目前只有spark-streaming-kafka-0-8方案是支持python语言的,所以我们提交任务是需要指定相应的依赖jar包。高版本(spark-streaming-kafka-0-10)的方案已经抛弃了对Receiver方式的支持。

spark-streaming-kafka-0-8接口的jar下载路径:

spark-streaming-kafka_2.10-1.5.1.jarspark-streaming-kafka-assembly_2.10-1.5.1.jar

如果出现包冲突,提交任务时添加参数--conf spark.yarn.user.classpath.first=true,这样设置后yarn中优先使用用户传上去的jar包,避免包冲突。

4.2 任务提交案例

与任何Spark应用程序一样,spark-submit用于启动应用程序。但是,Scala / Java应用程序和Python应用程序的细节略有不同。

对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将spark-streaming-kafka_2.11其及其依赖项打包到应用程序JAR中。确保spark-core_2.10spark-streaming_2.10标记为providedSpark安装中已存在的依赖项。然后使用spark-submit启动应用程序。

对于缺少SBT / Maven项目管理的Python应用程序,spark-streaming-kafka_2.11可以直接将其依赖项添加到spark-submit使用中--packages。那是,

1
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3 ...

另外,您也可以下载Maven构件的JAR spark-streaming-kafka-assemblyMaven仓库,并将其添加到spark-submit--jars

4.3 参数调优建议

Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。

  • 合理的批处理时间(batchDuration):几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整。
  • 合理的Kafka拉取量(maxRatePerPartition重要):对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的,配置参数为:spark.streaming.kafka.maxRatePerPartition。这个参数默认是没有上线的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time
  • 缓存反复使用的Dstream(RDD):Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache,将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数
  • 设置合理的GC:长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(edengeneration)、年轻代young generation)、老年代(oldgeneration)以及永久代(permanentgeneration),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVMGC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:–conf “spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC”
  • 设置合理的CPU资源数:CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况
  • 设置合理的parallelism:partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。 在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。
  • 使用高性能的算子:(1)使用reduceByKey/aggregateByKey替代groupByKe(2)使用mapPartitions替代普通map(3) 使用foreachPartitions替代foreach(4) 使用filter之后进行coalesce操作5 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
  • 使用Kryo优化序列化性能
    主要有三个地方涉及到了序列化
    在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
    将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
    使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

// 创建SparkConf对象。 val conf = new SparkConf.setMaster(…).setAppName(…) //设置序列化器为KryoSerializer。
conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”) //注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

https://www.cnblogs.com/frankdeng/p/9308585.html

参考文献及资料

1、Improvements to Kafka integration of Spark Streaming,链接:https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html

2、Source code for pyspark.streaming.kafka,链接:https://spark.apache.org/docs/2.2.0/api/python/_modules/pyspark/streaming/kafka.html

3、Introducing Spark Streaming,链接:https://engineering.billymob.com/introducing-spark-streaming-c1b8be36c775

4、Spark Streaming基于kafka的Direct详解,链接:https://blog.csdn.net/matrix_google/article/details/80033524

5、Enabling fault-tolerant processing in Spark Streaming,链接:https://docs.cloudera.com/runtime/7.0.2/developing-spark-applications/topics/spark-streaming-fault-tolerance.html

6、Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client,链接:https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

7、官网文档,链接:https://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html

8、官网文档,链接:https://spark.apache.org/docs/2.3.1/streaming-kafka-0-10-integration.html

0%