Fork me on GitHub

Flink链接Kafka总结

目录

  • 背景

  • 第一部分 su命令

  • 第二部分 susu -区别

  • 参考文献及资料

背景

Flink提供了Kafka connector用于消费/生产Apache Kafka topic的数据。Flink的Kafka consumer集成了checkpoint机制以提供精确一次的处理语义。在具体的实现过程中,Flink不依赖于Kafka内置的消费组位移管理,而是在内部自行记录和维护consumer的位移。

第一部分 依赖准备

用户在使用时需要根据Kafka版本来选择相应的connector,如下表所示:

Maven依赖 支持的最低Flink版本 Kafka客户端类名 说明
flink-connector-kafka-0.8_2.10 1.0.0 FlinkKafkaConsumer08、FlinkKafkaProducer08 使用的是Kafka老版本low-level consumer,即SimpleConsumer. Flink在内部会提交位移到Zookeeper
flink-connector-kafka-0.9_2.10 1.0.0 FlinkKafkaConsumer09、FlinkKafkaProducer09 使用Kafka新版本consumer
flink-connector-kafka-0.10_2.10 1.2.0 FlinkKafkaConsumer010、FlinkKafkaProducer010 支持使用Kafka 0.10.0.0版本新引入的内置时间戳信息

maven依赖配置:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

第二部分 消费者

消费者的构造

Flink kafka connector使用的consumer取决于用户使用的是老版本consumer还是新版本consumer,新旧两个版本对应的connector类名是不同的,分别是:FlinkKafkaConsumer09(或FlinkKafkaConsumer010)以及FlinkKafkaConsumer08。它们都支持同时消费多个topic。

该Connector的构造函数包含以下几个字段:

  1. 待消费的topic列表
  2. key/value解序列化器,用于将字节数组形式的Kafka消息解序列化回对象
  3. Kafka consumer的属性对象,常用的consumer属性包括:bootstrap.servers(新版本consumer专用)、zookeeper.connect(旧版本consumer专用)和group.id
1
2
3
4
5
6
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

序列化

Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会调用 T deserialize(ConsumerRecord<byte[], byte[]> record) 反序列化。

为了方便使用,Flink 提供了以下几种 schemas:

  1. TypeInformationSerializationSchema(和 TypeInformationKeyValueSerializationSchema) 基于 Flink 的 TypeInformation 创建 schema。 如果该数据的读和写都发生在 Flink 中,那么这将是非常有用的。此 schema 是其他通用序列化方法的高性能 Flink 替代方案。

  2. JsonDeserializationSchema(和 JSONKeyValueDeserializationSchema)将序列化的 JSON 转化为 ObjectNode 对象,可以使用 objectNode.get("field").as(Int/String/...)() 来访问某个字段。 KeyValue objectNode 包含一个含所有字段的 key 和 values 字段,以及一个可选的”metadata”字段,可以访问到消息的 offset、partition、topic 等信息。

  3. AvroDeserializationSchema 使用静态提供的 schema 读取 Avro 格式的序列化数据。 它能够从 Avro 生成的类(AvroDeserializationSchema.forSpecific(...))中推断出 schema,或者可以与 GenericRecords 一起使用手动提供的 schema(用 AvroDeserializationSchema.forGeneric(...))。此反序列化 schema 要求序列化记录不能包含嵌入式架构!

    • 此模式还有一个版本,可以在 Confluent Schema Registry 中查找编写器的 schema(用于编写记录的 schema)。
    • 使用这些反序列化 schema 记录将读取从 schema 注册表检索到的 schema 转换为静态提供的 schema(或者通过 ConfluentRegistryAvroDeserializationSchema.forGeneric(...)ConfluentRegistryAvroDeserializationSchema.forSpecific(...))。

要使用此反序列化 schema 必须添加以下依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.13.0</version>
</dependency>

当遇到因一些原因而无法反序列化的损坏消息时,反序列化 schema 会返回 null,以允许 Flink Kafka 消费者悄悄地跳过损坏的消息。请注意,由于 consumer 的容错能力(请参阅下面的部分以获取更多详细信息),在损坏的消息上失败作业将使 consumer 尝试再次反序列化消息。因此,如果反序列化仍然失败,则 consumer 将在该损坏的消息上进入不间断重启和失败的循环。

容错性

伴随着启用 Flink 的 checkpointing 后,Flink Kafka Consumer 将使用 topic 中的记录,并以一致的方式定期检查其所有 Kafka offset 和其他算子的状态。如果 Job 失败,Flink 会将流式程序恢复到最新 checkpoint 的状态,并从存储在 checkpoint 中的 offset 开始重新消费 Kafka 中的消息。

因此,设置 checkpoint 的间隔定义了程序在发生故障时最多需要返回多少。

为了使 Kafka Consumer 支持容错,需要在 执行环境 中启用拓扑的 checkpointing。

如果未启用 checkpoint,那么 Kafka consumer 将定期向 Zookeeper 提交 offset。

第二部分 生产者

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

参考文献及资料

1、Su Command in Linux (Switch User),链接:https://linuxize.com/post/su-command-in-linux/

本文标题:Flink链接Kafka总结

文章作者:rong xiang

发布时间:2021年06月14日 - 12:06

最后更新:2022年10月25日 - 23:10

原始链接:https://zjrongxiang.github.io/posts/52ad7019/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

0%