Fork me on GitHub

监控Kafka的Topic数据

目录

  • 背景
  • 第一部分实现原理
  • 第二部分 实现源码
  • 参考文献及资料

背景

业务上需要实现对Kafka的Topic中数据进行监控。业务正常下,Kafka生产者是持续生产数据的。如果一段时间出现Kafka中指定Topic没有新的数据,那么说明业务生产者可能出现异常。

第一部分 实现原理

1.1 Kafka生产者的写入原理

Kafka每个Topic在创建时,划分为若干个Partition。消息在写入的时候,分布式写入指定的多个Partitions中。对于每个Partition,消息是顺序写入的。Topic在创建时,每个Partition的Offset是0,当消息顺序写入后逐步累加Offset值。

Kafka中Offset变量定义是一个长整型(Long),这个值最大为:9223372036854775807。那么逐步累加会不会用完呢?多虑了哈,这是一个天文级别的数值哈。

1.2 监控原理

既然Offset记录了每个Topic的每个Partition的消息量,最朴素的方法就是监控这个值的变化来判断是否有新的数据写入。

编写语言我们选择Python,而且对于处理Offset这个超大数据,Python是天然支持。

那么可行性是没问题的。

第二部分 实现源码

1.1 Python Api 接口

与Kafka交互的Python包我们使用:Kafka-Python。对于消费者有下面的函数方法:

end_offsets(partitions)[source]

Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

This method does not change the current consumer position of the partitions.

Note

This method may block indefinitely if the partition does not exist.

Parameters: partitions (list) – List of TopicPartition instances to fetch offsets for.
Returns: int}: The end offsets for the given partitions.
Return type: {TopicPartition
Raises: UnsupportedVersionError – If the broker does not support looking up the offsets by timestamp.KafkaTimeoutError – If fetch failed in request_timeout_ms

我们写一个简单的测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from kafka import KafkaConsumer, TopicPartition
# Kafka配置
BOOTSTRAP="192.168.1.1:9092"
TOPIC="test"
# 定义消费者
consumer = KafkaConsumer(bootstrap_servers=[BOOTSTRAP])
# 获取指定Topic的Partitions
PARTITIONS = []
for partition in consumer.partitions_for_topic(TOPIC):
PARTITIONS.append(TopicPartition(TOPIC, partition))
# 获取Offset信息
partitions = consumer.end_offsets(PARTITIONS)
print(partitions)
#{TopicPartition(topic='test', partition=0): 4759, TopicPartition(topic='test', partition=1): 4823}

接口比较简洁。案例中,我们创建了一个Topic,取名为test。一共2个分区,1个副本:

1
2
3
Topic:test	PartitionCount:2	ReplicationFactor:1	Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0

所以回显分别显示了目前Topic的两个Partition的Offset值。

另外我们还可以使用kafka-consumer-groups.sh脚本查看一下目前某个group的offset情况:

1
2
3
4
5
6
root@deeplearning:/data/kafka/kafka_2.12-2.1.0/bin# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.1:9092 --describe --group my-group
Consumer group 'my-group' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 1 4827 4835 8 - - -
test 0 4763 4772 9 - - -

其中回显:

  • CURRENT-OFFSET,目前group-id名称为’my-group’的消费群组Offset;
  • LOG-END-OFFSET,目前topic的的Offset,也就是我们的监控对象;
  • LAG,这是LOG-END-OFFSET-CURRENT-OFFSET的差,即’my-group’群组还有多少消息未消费;

1.2 完整的代码

实现对多个Topic的监控:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# -*- coding: utf-8 -*-
"""
Copyright 2020 RongXiang.
Licensed under the terms of the Apache 2.0 license.
Please see LICENSE file in the project root for terms.
"""
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
import time
import logging

# Kafka 配置
BOOTSTRAP = ["192.168.31.3:9092"]
MONITORTOPIC = ["test", "testTopic"]

def monitoroffset(bootstrap, topicList):
try:
consumer = KafkaConsumer(bootstrap_servers=bootstrap)
topicoffset = {}
for topic in topicList:
PARTITIONS = []
for partition in consumer.partitions_for_topic(topic):
PARTITIONS.append(TopicPartition(topic, partition))
partitions = consumer.end_offsets(PARTITIONS)
print(partitions)
topicoffset[topic] = sum([partitions[item] for item in partitions])
consumer.close()
return topicoffset
except Exception as e:
print(e)

def diffdict(dictFirst, dictEnd):
try:
diffdict = {}
for item in dictFirst:
diffdict[item] = dictEnd[item]-dictFirst[item]
return diffdict
except Exception as e:
print(e)

def sendMessage():
pass

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
firstOffset = monitoroffset(BOOTSTRAP, MONITORTOPIC)
time.sleep(300)
secondOffset = monitoroffset(BOOTSTRAP, MONITORTOPIC)
print(diffdict(firstOffset, secondOffset))

输出:

1
{'test': 4, 'testTopic': 0}

参考文献及资料

1、kafka-python API介绍,链接:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

本文标题:监控Kafka的Topic数据

文章作者:rong xiang

发布时间:2020年10月05日 - 13:10

最后更新:2022年10月25日 - 23:10

原始链接:https://zjrongxiang.github.io/posts/99157561/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

0%