目录
背景
第一部分 Receiver接口模式
第二部分 Direct接口模式
第三部分 PySpark和Kafka交互
第四部分 任务提交
参考文献及资料
背景
Apache Kafka项目是大数据处理中重要的消息引擎。Spark Streaming作为重要的流处理计算框架通常和Kafka结合使用。Spark Streaming(新旧版本)支持Kafka的编程模型有两种:Receiver模式和Direct模式。Direct模式在Apache Spark 1.3中最新引入,下面是版本演进过程(截止Spark 2.1)。
本文主要介绍Spark Streaming和Kafka进行交互模式和PySpark实现。
0.0 Kafka的Consumer API
在正式介绍前,我们首先介绍一下Kafka Consumer API。Kafka在0.9
版本前提供了两种版本的Consumer:高级版和简单版。
高级版。消费者客户端不需要去管理偏移量(offset)状态,而是由Zookeeper管理。消费者中断重启后,可以根据上一次记录在Zookeeper中的offset状态信息,继续获取数据(默认为1分钟更新一次Zookeeper中的offset)。通俗的说就是全auto,最适合小白,哈哈。
简单版。消费者客户端需要自行管理offset状态信息(持久化为文件、数据库或者内存等)。
两个客户端的主要差异就是:谁来负责管理offset状态。但是高级的不一定就好,需要具体架构场景具体分析,适合才是最好的,哈哈(扯远了)。
Kafka项目组在0.8.1
版本开始重写生产者API,在0.9
版本完成新版本API的发版。新版的Consumer API使用原生Java编写(居然不用scala了,啧啧)。当然新版本API也不依赖于Zookeeper。详细细节可以参考官方文章:《Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client》。
第一部分 Receiver模式
1.1 原始方式
最开始Kafka和Spark Streaming集成是通过Receiver。即在每个Spark Executor中实例化Apache Kafka”高级版” Consumer API,启动一个Receiver,实现异步消费Kafka消息,并存储在执行程序的内存中,同时更新在Zookeeper中偏移量(offset)。接着Spark Streaming Driver启动作业(jobs)处理已经接受到的缓存数据。
这种方式是朴素和高效的,但是不具备容错性,遇到故障或者Executor重启任务,均会导致缓存消息的丢失和offset状态的滞后。对于一些重要业务数据处理场景,这是无法容忍的架构缺陷。
1.2 预写日志(WAL)模式
为了保证容错性,最简单的架构思路就是”存储状态”。Spark项目组引入了预写日志机制(WAL,Write Ahead Logs)。主要思想是:每个Spark Executor中Receiver接受到的数据持久化到分布式文件系统(HDFS)中,数据只有完成持久化,Receiver才会发起更新Zookeeper中的偏移量状态。当发生故障或者Executor重启时,从持久化的预写日志中恢复数据。
引入WAL后,保证了数据不会丢失。但是有这样的场景:Receiver接收到数据并持久化到WAL,但是系统在更新Zookeeper中相应的偏移量时出现故障,更新失败。当从故障中恢复后,由于offset滞后性,就会出现部分数据重复消费。
之所以出现这种场景,因为架构上无法保证状态信息在两个系统中保持数据一致(exactly-once)。为了避免这种情况,架构上需要只指定一个系统来维护这个状态信息。
Spark项目最后决定由Spark侧来管理offset状态,将offset信息持久化在Spark Streaming中(持久化在checkpoint中(hdfs文件系统)、数据库等,如果是client模式,可以存放在client本地文件系统等),当然与Kafka交互的API也要换成”简单版” Consumer API。这就是下面要介绍的Direct接口方式。
第二部分 Direct模式
2.1 Direct模式介绍
Apache Spark 1.3中项目组引入了Direct接口方式。Direct方式抛弃了Receiver,采取周期性(Batch Intervel)获取Kafka中每个topic的所有partition中的最新offsets信息,然后根据参数spark.streaming.kafka.maxRatePerPartition
设置的速度来消费数据。这就避免了和Zookeeper中偏移量的不一致的问题。而且可以保证即使出现故障,每个记录仅仅被消费一次。
具体消费速度计算如下:
假设Spark window窗口设置为60s(60s拉取一次Kafka数据);Kafka中该Topic有3个Partitions;maxRatePerPartition设置为100。那么每次拉取的最大数据量为: 60* 3 * 100 条数据。
2.2 两种方式的比较
Direct模式相比Receiver模式的优势有:
- Receiver模式需要在内存中和预写日志中保存两份数据,如果数据量较大,特别是当任务的作业出现大量延迟(delay)的时候,会占用大量存储资源。而Direct模式只有在计算的时候才会去拉取Kafka侧数据,Kafka侧仍然要充当数据的缓冲角色(削峰填谷)。
- Direct模式中,Kafka中的partition与RDD中的partition是一一对应的(即一个KafkaRDDIterator对应一个 KafkaRDDPartition)并行读取Kafka数据,即天然利用了并发处理优势。而Receiver模式需要创建多个Receiver之后,可以利用union方法合并成一个Dstream的方式提高数据传输的并行度(后面程序实现将详细介绍)。
- Direct模式保证了流计算Spark Streaming和Kafka管道数据 at least once语义。
第三部分 PySpark和Kafka交互
3.1 PySpark中Receiver接口
3.1.1 接口说明
pyspark.streaming.kafka
文件中的KafkaUtils
类:
1 | def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, |
参数说明:
ssc:StreamingContext对象;
zkQuorum:Zookeeper集群地址,格式为:
hostname:port,hostname:port,..
,逗号隔开;groupId:消费者群组名;
topics:主题名,字典类型。例如:{“test”:1},其中1表示线程数量(core)。
kafkaParams:可以补充的Kafka其他参数,字典数据类型;如果非空,其他Kafka的参数设置失效。
storageLevel:RDD的存储级别,StorageLevel参数决定如何存储RDD。在Spark中,StorageLevel决定RDD是应该存储在内存中还是存储在磁盘上,或两者都存储。它还决定是否序列化RDD以及是否复制RDD分区。参数类型有:
1
2
3
4
5
6
7
8
9
10
11DISK_ONLY = StorageLevel(True,False,False,False,1)
DISK_ONLY_2 = StorageLevel(True,False,False,False,2)
MEMORY_AND_DISK = StorageLevel(True,True,False,False,1)
MEMORY_AND_DISK_2 = StorageLevel(True,True,False,False,2)
MEMORY_AND_DISK_SER = StorageLevel(True,True,False,False,1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True,True,False,False,2)
MEMORY_ONLY = StorageLevel(False,True,False,False,1)
MEMORY_ONLY_2 = StorageLevel(False,True,False,False,2)
MEMORY_ONLY_SER = StorageLevel(False,True,False,False,1)
MEMORY_ONLY_SER_2 = StorageLevel(False,True,False,False,2)
OFF_HEAP = StorageLevel(True,True,True,False,1)从命名规范也很容易理解,MEMORY是内存,DISK是磁盘,SER表示是否序列化,数字是副本数量。那么默认参数:MEMORY_AND_DISK_2,含义是:数据同时存储内存和磁盘,并且副本数量为2(存储在不同节点)。即开启了WAL机制。
3.1.2 案例介绍
下面是计算单词数的流处理任务代码案例:
1 | # -*- coding: utf-8 -*- |
关于
auto.offset.reset
参数补充说明一下。原生的API中这个参数有三个值:
- earliest:自动将偏移重置为最早的偏移量;
- latest(默认值):自动将偏移量重置为最新偏移量;
- none:如果consumer group中没有发现先前的偏移量,则抛出异常;
与Spark Streaming整合后,有两个参数:
- smallest:从头开始消费,等价于上面的 earliest;
- largest(默认值):从最新的开始消费,等价于上面的 latest;
在
spark-streaming-kafka-0-10
新客户端中,这个参数也有none值(offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面)。程序具体判断逻辑是:如果存在已经提交的offest时,不管设置为earliest 或者latest 都会从已经提交的offest处开始消费。如果不存在已经提交的offest时,使用参数auto.offset.reset
的值。当值为none时,topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
程序中我们将并发度设置为3,即实例化了3个DStreams,最后union合并。这个处理效果可以在作业的UI界面中看到差异:
补充UI界面。
https://www.cnblogs.com/juncaoit/p/9452333.html
3.2 PySpark中Direct接口
Python API在Spark 1.4中引入了此功能。
3.2.1 接口说明
pyspark.streaming.kafka
文件中的KafkaUtils
类:
1 | def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None, |
参数说明:
ssc:StreamingContext对象;
topics:主题名,List数据类型(支持多个topic同时消费);
kafkaParams:Kafka参数,字典格式;
fromOffsets:offset状态信息(Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.)。如果没有指定,则使用参数:
{"auto.offset.reset": "largest"}
。
需要特别注意的是:
createDirectStream
在spark-streaming-kafka-0-8
下不支持group id模式,因为它是使用“简单版”Kafka API。后续spark-streaming-kafka-0-10
开始提供对group id模式的支持。在0.9.0.0中,引入了新的Java 客户端API,以替代旧的基于Scala的简单和高级API。Spark Streaming integration for Kafka 0.10
是使用新的API,支持group id参数。
3.2.2 案例介绍
下面的是接口的使用案例。
1 | # -*- coding: utf-8 -*- |
我们启动一个Kafka 生产者每秒发送一条信息hello world
,截取回显:
1 | test 1 4599722 4599724 2 |
回显中:
1 | test 1 4599724 4599748 24 分别对应下面的字段: |
上面的程序我们只是打印了offset的状态信息(参考官方案例),但是并没有对offset状态进行持久化处理。如果任务故障或终止,重新启动时候,任务会重新开始消费。在实际生产环境中,一些高可用数据处理场景,这是不可容忍的。offset状态数据需要进行高可用持久化处理。这就是接下来我们介绍的checkpoint机制。
3.3 checkpoint机制
3.3.1 持久化(persist)和Checkpoint机制
Spark中对于RDD的容错性是通过storageLevel
参数设置RDD存储级别(持久化级别)。persist()的默认参数为MEMORY_ONLY
(即storageLevel=MEMORY_ONLY
),即内存缓存,称为缓存(cache)。在这种存储级别下,Spark计算出的RDD结果将缓存在内存中,一旦计算任务中一个执行器(Executor)故障下宕,缓存在该执行器的RDD数据就会丢失,执行器重建后需要通过RDD依赖链重新计算。
例如Receiver模式,Pyspark中接口默认使用MEMORY_AND_DISK_2
。在该存储级别下,RDD除了缓存在内存还会持久化到磁盘(2份),当执行器失败可以从磁盘中加载持久化数据。但是一旦Spark的Driver故障下宕或者任务正常结束,计算的所有存储资源将被集群回收。
而Checkpoint机制将RDD持久化到HDFS文件系统,天然的利用了HDFS的分布式高可用文件系统特性。
3.3.2 Checkpoint机制的实现
前面的Receiver模式的例子中,我们开启了WAL机制。但是这种机制是执行器级别的高可用。这里我们提高高可用级别,增加checkpoint机制将RDD持久化到HDFS分布式文件系统中。这样即使Spark流任务的Driver重启依然能从checkpoint重启启动,继续消费数据。
1 | # -*- coding: utf-8 -*- |
但启动一个具有checkpoint机制的spark任务的时候。通过函数getOrCreate
实现:
StreamingContext.getOrCreate(checkpointDirectory, lambda: functionToCreateContext())
函数参数的具体含义就是:首先检查是否有checkpoint(即checkpointDirectory)。如果非空,StreamingContext从checkpointDirectory加载启动。如果没有执行函数functionToCreateContext()
创建(函数中已经声明了创建逻辑)。具体数据流参考下图:
3.4 自行管理offset状态
除了checkpoint机制,例如下面代码将offset信息持久化到本地文件系统。需要注意的是该持久方式需要任务为Client模式提交集群,否则保存在Drive中本地文件系统会被集群回收。
1 | #!/usr/bin/python |
上面的例子中,将offset信息以json格式持久化到文件系统中。实际生产中建议以二维表形式存储在Mysql等数据库中进行持久化(不再详细举例)。
java 保存在mysql的例子
https://www.jianshu.com/p/2369a020e604
https://blog.csdn.net/lxb1022/article/details/78041168
其他交互接口:
https://www.cnblogs.com/yanshw/p/11929180.html
1 | from kafka import SimpleClient |
第四部分 任务的提交
4.1 兼容性
Spark 针对 Kafka 的不同版本(主要还是以0.8
版本为重要分界线),提供了两种方案:spark-streaming-kafka-0-8
和 spark-streaming-kafka-0-10
,其主要区别如下:
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Broker Version(Kafka版本) | 0.8.2.1 or higher | 0.10.0 or higher |
Api Stability | Stable | Experimental |
Language Support | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit Api | No | Yes |
Dynamic Topic Subscription | No | Yes |
目前只有spark-streaming-kafka-0-8
方案是支持python语言的,所以我们提交任务是需要指定相应的依赖jar
包。高版本(spark-streaming-kafka-0-10
)的方案已经抛弃了对Receiver
方式的支持。
spark-streaming-kafka-0-8
接口的jar下载路径:spark-streaming-kafka_2.10-1.5.1.jar 和 spark-streaming-kafka-assembly_2.10-1.5.1.jar
如果出现包冲突,提交任务时添加参数
--conf spark.yarn.user.classpath.first=true
,这样设置后yarn中优先使用用户传上去的jar包,避免包冲突。
4.2 任务提交案例
与任何Spark应用程序一样,spark-submit
用于启动应用程序。但是,Scala / Java应用程序和Python应用程序的细节略有不同。
对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将spark-streaming-kafka_2.11
其及其依赖项打包到应用程序JAR中。确保spark-core_2.10
并spark-streaming_2.10
标记为provided
Spark安装中已存在的依赖项。然后使用spark-submit
启动应用程序。
对于缺少SBT / Maven项目管理的Python应用程序,spark-streaming-kafka_2.11
可以直接将其依赖项添加到spark-submit
使用中--packages
。那是,
1 | ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3 ... |
另外,您也可以下载Maven构件的JAR spark-streaming-kafka-assembly
从 Maven仓库,并将其添加到spark-submit
用--jars
。
4.3 参数调优建议
Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。
- 合理的批处理时间(batchDuration):几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整。
- 合理的Kafka拉取量(maxRatePerPartition重要):对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的,配置参数为:spark.streaming.kafka.maxRatePerPartition。这个参数默认是没有上线的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time
- 缓存反复使用的Dstream(RDD):Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache,将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数
- 设置合理的GC:长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(edengeneration)、年轻代young generation)、老年代(oldgeneration)以及永久代(permanentgeneration),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVMGC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:–conf “spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC”
- 设置合理的CPU资源数:CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况
- 设置合理的parallelism:partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。 在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。
- 使用高性能的算子:(1)使用reduceByKey/aggregateByKey替代groupByKe(2)使用mapPartitions替代普通map(3) 使用foreachPartitions替代foreach(4) 使用filter之后进行coalesce操作5 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
- 使用Kryo优化序列化性能
主要有三个地方涉及到了序列化
在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):
// 创建SparkConf对象。 val conf = new SparkConf.setMaster(…).setAppName(…) //设置序列化器为KryoSerializer。
conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”) //注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
https://www.cnblogs.com/frankdeng/p/9308585.html
参考文献及资料
1、Improvements to Kafka integration of Spark Streaming,链接:https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html
2、Source code for pyspark.streaming.kafka,链接:https://spark.apache.org/docs/2.2.0/api/python/_modules/pyspark/streaming/kafka.html
3、Introducing Spark Streaming,链接:https://engineering.billymob.com/introducing-spark-streaming-c1b8be36c775
4、Spark Streaming基于kafka的Direct详解,链接:https://blog.csdn.net/matrix_google/article/details/80033524
5、Enabling fault-tolerant processing in Spark Streaming,链接:https://docs.cloudera.com/runtime/7.0.2/developing-spark-applications/topics/spark-streaming-fault-tolerance.html
6、Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client,链接:https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
7、官网文档,链接:https://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html
8、官网文档,链接:https://spark.apache.org/docs/2.3.1/streaming-kafka-0-10-integration.html