Fork me on GitHub

Kafka系列文章(第三篇 Kafka可视化管理界面)

目录

  • 背景
  • 第一部分 kafka-manager安装
  • 第二部分 kafka-manager配置
  • 第三部分 kafka-manager管理
  • 第四部分 总结
  • 参考文献及资料

背景

在Kafka的监控系统中有很多优秀的开源监控系统。比如Kafka-manager,open-faclcon,zabbix等主流监控工具均可直接监控kafka。Kafka集群性能监控可以从消息网络传输,消息传输流量,请求次数等指标来衡量集群性能。这些指标数据可以通过访问kafka集群的JMX接口获取。Kafka-manager工具由Yahoo研发的Kafka管理和监控工具,并在github上开源。

对于非加密Kafka集群配置Kafka manager,目前互联网也有大量的资料。而对于加密集群(特别是云端集群还配置了域名方式),参考材料较为匮乏。本文针对云端加密Kafka集群配置Kafka Manager进行详细介绍,供大家参考。

第一部分 kafka-manager安装

1.1 版本选择

版本使用cmak-3.0.0.0版本,依赖java11(使用openjdk-11+28_linux-x64_bin.tar.gz)。使用已经编译好的介质包cmak-3.0.0.0.zip。假设安装目录为/dmqs

1.2 介质部署

1.2.1 部署cmak

上传cmak-3.0.0.0.zip至安装目录/dmqs,使用命令解压:

1
f-itdw-4c8g-100g-11:/dmqs # unzip cmak-3.0.0.0.zip

1.2.2 部署java

上传openjdk-11+28_linux-x64_bin.tar.gz介质到/dmqs/cmak-3.0.0.0路径:

1
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # tar -zxvf openjdk-11+28_linux-x64_bin.tar.gz

重命名java路径名:

1
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # mv jdk11 jdk

1.3 配置文件准备

1.3.1 配置application.conf文件

备份文件并修改:

1
2
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # cp application.conf application.conf.bak
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # vi application.conf

调整kafka-manager.zkhosts参数项的配置信息:

1
kafka-manager.zkhosts="84.10.228.50:2181,84.10.228.55:2181,84.10.228.56:2181"

1.3.2 加密集群配置jaas文件

如果是加密集群需要准备jaas文件,文件名为:kafka_server_jaas.conf

1
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # touch kafka_server_jaas.conf

文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};

上面配置中KafkaClient为和kafka通信配置;Client为和zookeeper通信配置。

1.3.3 配置consumer.properties

首先备份:

1
2
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # cp consumer.properties consumer.properties.bak
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # vi consumer.properties

配置文件调整为:

1
2
3
4
5
6
7
8
9
10
11
12
13
#security.protocol=PLAINTEXT
#key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
#value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeseriazer

bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=itdw123
ssl.keystore.password=itdw123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.key.password=itdw123
ssl.endpoint.identification.algorithm=

其中注释部分为源配置文件内容。

1.4 准备ca信任证书

对于已经配置为域名方式的Kafka集群需要配置域名信任证书。

创建InstallCert.java,java程序文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/*
* Copyright 2006 Sun Microsystems, Inc. All Rights Reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* - Neither the name of Sun Microsystems nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

import java.io.*;
import java.net.URL;

import java.security.*;
import java.security.cert.*;

import javax.net.ssl.*;

public class InstallCert {

public static void main(String[] args) throws Exception {
String host;
int port;
char[] passphrase;
if ((args.length == 1) || (args.length == 2)) {
String[] c = args[0].split(":");
host = c[0];
port = (c.length == 1) ? 443 : Integer.parseInt(c[1]);
String p = (args.length == 1) ? "changeit" : args[1];
passphrase = p.toCharArray();
} else {
System.out.println("Usage: java InstallCert <host>[:port] [passphrase]");
return;
}

File file = new File("jssecacerts");
if (file.isFile() == false) {
char SEP = File.separatorChar;
File dir = new File(System.getProperty("java.home") + SEP
+ "lib" + SEP + "security");
file = new File(dir, "jssecacerts");
if (file.isFile() == false) {
file = new File(dir, "cacerts");
}
}
System.out.println("Loading KeyStore " + file + "...");
InputStream in = new FileInputStream(file);
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(in, passphrase);
in.close();

SSLContext context = SSLContext.getInstance("TLS");
TrustManagerFactory tmf =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
X509TrustManager defaultTrustManager = (X509TrustManager)tmf.getTrustManagers()[0];
SavingTrustManager tm = new SavingTrustManager(defaultTrustManager);
context.init(null, new TrustManager[] {tm}, null);
SSLSocketFactory factory = context.getSocketFactory();

System.out.println("Opening connection to " + host + ":" + port + "...");
SSLSocket socket = (SSLSocket)factory.createSocket(host, port);
socket.setSoTimeout(10000);
try {
System.out.println("Starting SSL handshake...");
socket.startHandshake();
socket.close();
System.out.println();
System.out.println("No errors, certificate is already trusted");
} catch (SSLException e) {
System.out.println();
e.printStackTrace(System.out);
}

X509Certificate[] chain = tm.chain;
if (chain == null) {
System.out.println("Could not obtain server certificate chain");
return;
}

BufferedReader reader =
new BufferedReader(new InputStreamReader(System.in));

System.out.println();
System.out.println("Server sent " + chain.length + " certificate(s):");
System.out.println();
MessageDigest sha1 = MessageDigest.getInstance("SHA1");
MessageDigest md5 = MessageDigest.getInstance("MD5");
for (int i = 0; i < chain.length; i++) {
X509Certificate cert = chain[i];
System.out.println
(" " + (i + 1) + " Subject " + cert.getSubjectDN());
System.out.println(" Issuer " + cert.getIssuerDN());
sha1.update(cert.getEncoded());
System.out.println(" sha1 " + toHexString(sha1.digest()));
md5.update(cert.getEncoded());
System.out.println(" md5 " + toHexString(md5.digest()));
System.out.println();
}

System.out.println("Enter certificate to add to trusted keystore or 'q' to quit: [1]");
String line = reader.readLine().trim();
int k;
try {
k = (line.length() == 0) ? 0 : Integer.parseInt(line) - 1;
} catch (NumberFormatException e) {
System.out.println("KeyStore not changed");
return;
}

X509Certificate cert = chain[k];
String alias = host + "-" + (k + 1);
ks.setCertificateEntry(alias, cert);

OutputStream out = new FileOutputStream("jssecacerts");
ks.store(out, passphrase);
out.close();

System.out.println();
System.out.println(cert);
System.out.println();
System.out.println
("Added certificate to keystore 'jssecacerts' using alias '"
+ alias + "'");
}

private static final char[] HEXDIGITS = "0123456789abcdef".toCharArray();

private static String toHexString(byte[] bytes) {
StringBuilder sb = new StringBuilder(bytes.length * 3);
for (int b : bytes) {
b &= 0xff;
sb.append(HEXDIGITS[b >> 4]);
sb.append(HEXDIGITS[b & 15]);
sb.append(' ');
}
return sb.toString();
}

private static class SavingTrustManager implements X509TrustManager {

private final X509TrustManager tm;
private X509Certificate[] chain;

SavingTrustManager(X509TrustManager tm) {
this.tm = tm;
}

public X509Certificate[] getAcceptedIssuers() {
throw new UnsupportedOperationException();
}

public void checkClientTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
throw new UnsupportedOperationException();
}

public void checkServerTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
this.chain = chain;
tm.checkServerTrusted(chain, authType);
}
}
}

上传至目的目录,并编译:

1
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # /dmqs/cmak-3.0.0.0/jdk/bin/javac InstallCert.java

编译后生成下面的文件:

1
2
3
4
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # ll
-rw-r--r-- 1 dmqs dmqs 975 May 28 02:23 InstallCert$SavingTrustManager.class
-rw-r--r-- 1 dmqs dmqs 6126 May 28 02:23 InstallCert.class
-rw-r--r-- 1 dmqs dmqs 6884 May 28 02:21 InstallCert.java

添加域名(kafka集群配置为域名方式)到jssecacerts文件中:

1
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # /dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node1:9093

这时在当前目录就生成了jssecacerts文件。如果集群是多节点,需要将其他节点域名信息追加到这个文件中。执行命令即为:

1
2
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # /dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node2:9093
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # /dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node3:9093

这样就生成了集群所有的节点域名的信任证书。

最后将jssecacerts文件拷贝至jdk/lib/security

1
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # cp jssecacerts jdk/lib/security

完成所有配置的准备。

1.5 服务启动

完成配置文件准备后,使用下面的命令启动Kafka-manager服务:

1
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/bin # ./cmak -java-home ../jdk -Djava.security.auth.login.config=../conf/kafka_server_jaas.conf -Dapplication.home=/dmqs/cmak-3.0.0.0 > /dev/null 2>&1 &

其中参数命令说明如下:

  • 参数-java-home指定服务启动的java依赖环境目录;

  • 参数-Djava.security.auth.login.config指定和kafka和zookeeper交互的jaas文件路径;

  • 参数-Dapplication.home指定了应用的主目录;
  • 参数-Dhttp.port=8888指定了应用的监听端口,默认9000;
  • 参数-Dconfig.file=../conf/application.conf指定了应用的应用配置文件;

启动后应用目录下面生成logs目录,作为日志存放目录。启动命令不指定端口的情况下,默认监听9000端口。

1.6 自动化脚本

为了提高服务运维管理,对服务启停进行自动化管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/bin/bash -e

RETVAL=0
cmak="/dmqs/cmak-3.0.0.0/bin/cmak"
start() {
$cmak -java-home ../jdk -Djava.security.auth.login.config=../conf/kafka_server_jaas.conf -Dapplication.home=/dmqs/cmak-3.0.0.0 >/dev/null 2>&1 &
RETVAL=$?
[ $RETVAL -eq 0 ] && echo "Start Kafka Manager Success!" ||echo "Start Kafka Manager failed!"
return $RETVAL
}

stop() {
CMAKPID=$(ps -ef|grep cmak|grep -v grep| awk '{print $2}')
if [[ -a /dmqs/cmak-3.0.0.0/RUNNING_PID ]]
then
rm /dmqs/cmak-3.0.0.0/RUNNING_PID && echo -e "\n已删除文件:RUNNING_PID\n" && kill -9 $CMAKPID >/dev/null 2>&1 &
RETVAL=$?
else
kill -9 $CMAKPID >/dev/null 2>&1 &
RETVAL=$?
fi;
[ $? -eq 0 ] && echo "Stop Kafka Manager Success!" ||echo "Stop Kafka Manager failed!"
return $RETVAL
}
case "$1" in
start)
start
;;
stop)
stop
;;
restart)

sh $0 stop
sh $0 start
;;
*)
echo "Format error!"
echo $"Usage: $0 {start|stop|restart}"
exit 1
;;
esac
exit $RETVAL

对于启动命令,可以自定义修改。

第二部分 kafka-manager配置

2.1 创建新集群管理

创建新的管理集群,需要填入下面的信息:

  • Cluster Name

    集群名称;

  • Cluster Zookeeper Hosts

    配置kafka集群背后的zookeeper集群的信息。例如:192.168.1.1:2181;

  • Kafka Version

    Kafka的版本信息;

  • Enable JMX Polling (Set JMX_PORT env variable before starting kafka server)

    是否启用集群的监控组件。

  • Security Protocol

    安全协议。目前支持:SSL、SASL_PLAINTEXT、SASL_SSL、PLAINTEXT

  • SASL Mechanism (only applies to SASL based security)

    SASL的权限管理协议:DEFAULT、PLAIN、GSSAPI、SCRAM-SHA-256、SCRAM-SHA-512

  • SASL JAAS Config (only applies to SASL based security)

    SASL的用户配置信息。例如:

    1
    org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";

    需要注意的是配置以分号结束,否则会报错。

第三部分 kafka-manager管理

Kafka Manager服务启动后,默认监听9000端口,所以服务URL地址为:http://102.168.1.1:9000。目前组件支持的管理功能有:

  • 管理多个集群
  • 轻松检查集群状态(主题,使用者,偏移量,代理,副本分发,分区分发)
  • 运行首选副本选择
  • 生成带有选项的分区分配,以选择要使用的代理
  • 运行分区的重新分配(基于生成的分配)
  • 使用可选的主题配置创建主题(0.8.1.1与0.8.2+具有不同的配置)
  • 删除主题(仅在0.8.2+上受支持,并记住在代理配置中设置delete.topic.enable = true)
  • 现在,主题列表指示标记为删除的主题(仅在0.8.2+上受支持)
  • 批量生成多个主题的分区分配,并可以选择要使用的代理
  • 批量运行分区的多个主题的重新分配
  • 将分区添加到现有主题
  • 更新现有主题的配置
  • (可选)为代理级别和主题级别的度量启用JMX轮询。
  • (可选)过滤出在Zookeeper中没有id / owner /&offsets /目录的使用者。

对于具体的组件使用,可以参文献中的[3]。

参考文献及资料

1、kafka-manager项目地址,链接:https://github.com/yahoo/kafka-manager

2、kafka-manager项目下载地址,链接:https://blog.wolfogre.com/posts/kafka-manager-download/

3、Apache Kafka集群管理工具CMAK(Cluster Manager for Apache Kafka)从安装启动到配置使用,链接:http://www.luyixian.cn/news_show_324464.aspx

0%