目录
- 背景
- 第一部分实现原理
- 第二部分 实现源码
- 参考文献及资料
背景
业务上需要实现对Kafka的Topic中数据进行监控。业务正常下,Kafka生产者是持续生产数据的。如果一段时间出现Kafka中指定Topic没有新的数据,那么说明业务生产者可能出现异常。
第一部分 实现原理
1.1 Kafka生产者的写入原理
Kafka每个Topic在创建时,划分为若干个Partition。消息在写入的时候,分布式写入指定的多个Partitions中。对于每个Partition,消息是顺序写入的。Topic在创建时,每个Partition的Offset是0,当消息顺序写入后逐步累加Offset值。
Kafka中Offset变量定义是一个长整型(Long),这个值最大为:9223372036854775807。那么逐步累加会不会用完呢?多虑了哈,这是一个天文级别的数值哈。
1.2 监控原理
既然Offset记录了每个Topic的每个Partition的消息量,最朴素的方法就是监控这个值的变化来判断是否有新的数据写入。
编写语言我们选择Python,而且对于处理Offset这个超大数据,Python是天然支持。
那么可行性是没问题的。
第二部分 实现源码
1.1 Python Api 接口
与Kafka交互的Python包我们使用:Kafka-Python
。对于消费者有下面的函数方法:
end_offsets
(partitions)[source]
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
This method does not change the current consumer position of the partitions.
Note
This method may block indefinitely if the partition does not exist.
我们写一个简单的测试:
1 | from kafka import KafkaConsumer, TopicPartition |
接口比较简洁。案例中,我们创建了一个Topic,取名为test。一共2个分区,1个副本:
1 | Topic:test PartitionCount:2 ReplicationFactor:1 Configs: |
所以回显分别显示了目前Topic的两个Partition的Offset值。
另外我们还可以使用kafka-consumer-groups.sh
脚本查看一下目前某个group的offset情况:
1 | root@deeplearning:/data/kafka/kafka_2.12-2.1.0/bin# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.1:9092 --describe --group my-group |
其中回显:
- CURRENT-OFFSET,目前group-id名称为’my-group’的消费群组Offset;
- LOG-END-OFFSET,目前topic的的Offset,也就是我们的监控对象;
- LAG,这是LOG-END-OFFSET-CURRENT-OFFSET的差,即’my-group’群组还有多少消息未消费;
1.2 完整的代码
实现对多个Topic的监控:
1 | # -*- coding: utf-8 -*- |
输出:
1 | {'test': 4, 'testTopic': 0} |
参考文献及资料
1、kafka-python API
介绍,链接:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html