Fork me on GitHub

Kafka系列文章(第五篇 Kafka安全集群)

目录

  • 背景
  • 第一部分 Kafka集群加密传输
  • 第二部分 Kafka集群权限认证
  • 第三部分 加密认证集群的客户端
  • 第四部分 加密认证集群的性能压测
  • 第五部分 总结
  • 参考文献及资料

背景

Kafka在0.9.0.0版本前没有安全机制功能。Kafka Client程序可以直接获取到Kafka集群元数据信息和Kafka Broker地址后,连接到Kafka集群,然后完全操作集群上的所有topic数据资源。另外集群节点间通讯、broker和zookeeper通讯、客户端和集群的网络层通信都是无加密模式。集群的数据存在极大的安全风险。

自0.9.0.0版本开始,Kafka社区逐步添加了较多功能用于提高Kafka群集的安全性。目前Kafka安全集群安全机制主要有三个方面的设置:通信加密(encryption)、身份认证(authentication)和授权(authorization)。

本文重点介绍生产安全集群的一种配置方案。数据通讯传输配置SSL,认证配置SASL,授权通过ACL接口命令来完成的,即:SSL+SASL/SCRAM+ACL

第一部分 Kafka集群加密传输

1.1 背景知识介绍

涉及的技术知识不做详细介绍。

1.1.1 密码学基础

加密算法分为两类:

  • 对称密钥算法(Symmetric Cryptography):数据加密和解密时使用相同的密钥。例如常用的DES就是对称加密算法。

  • 非对称密钥算法(Asymmetric Cryptography):数据加密和解密时使用不同的密钥,分为:公开的公钥(public key)和用户保存的私钥(private key),私钥和公钥在数学上是相关的。利用公钥(或私钥)加密的数据只能用相应的私钥(或公钥)才能解密。举一个例子:客户在银行网银上做一笔交易,首先向银行申请公钥,银行分发公钥给用户,用户使用公钥对请求数据进行加密。银行收到加密数据后通过银行侧保存的私钥进行解密处理,并处理后更新后台数据库。这个通讯过程中银行不需要通过互联网分发私钥。因此保证了私钥的安全。目前最常用的非对称加密算法是RSA算法。

    非对称密钥算法中,私钥来解密公钥加密的数据,公钥来解密私钥加密的数据。

两种加密算法的比较:

  • 对称密钥的强度和密钥长度成正比,但是解密效率和密钥长度成反比。另外私钥的分发存在安全风险。

  • 非对称加密保证了私钥的安全性,但是加密和解密的效率比对称加密低。

所以通常加密场景是两种密钥结合使用。使用数据主体使用对称秘钥算法,但是私钥的传输使用非对称算法在互联网环境分发非对称密钥。最常见的就是SSL/TLS。

1.1.2 CA数字证书

对于非对称密钥算法存在一个安全风险点,那就是公钥的分发存在中间人攻击。还是以客户和银行的通信为例(例子简单化处理)。客户和银行分别有自己的公钥和私钥,私钥各自保留本地。公钥通过互联网分发给对方。那么公钥就是有安全风险的。存在被黑客截取风险。客户向银行申请银行公钥,结果被黑客截取,黑客伪装成银行,返回给用户自己的黑客公钥,用户收到黑客公钥后,将信息加密发给黑客。黑客用黑客私钥进行解密,获取到真实信息。这时候黑客伪装成客户用相同的方法完成和银行的数据交互。这就是中间人攻击的案例。

所以非对称加密算法的公钥传输同样存在风险。当然如果使用原始的离线方式交换密钥是安全的,但是随着互联网通信的爆炸式增长,这是落后低效的。为了保证公钥的真实性和安全性,这时候我们引入第三个角色:公开密钥认证(Public key certificate,简称CA),又称数字证书(digital certificate)或身份证书(identity certificate)。

通常CA是一家第三方权威机构。负责管理和签发证书。整个实现原理也是非对称加密算法:

  • 机构将自己的公钥以及身份信息交给CA机构(安全的),CA使用自己的私钥对各机构的公钥进行加密。这个过程称为验签。输出的加密后的公钥及身份信息称为数字证书。
  • 当其他机构请求A机构公钥的时候,返回的是A机构的数字证书。其他机构可以使用CA的公钥对该数字证书中加密公钥进行解密获取A机构的通信公钥。

那么新得安全问题又来了,如何保证CA机构的公钥不被伪造?通常CA的公钥是集成在浏览器或者操作系统中,并且被很好的保护起来。

当然CA证书还涉及更多的安全细节设计(Hash算法防篡改、信任链等大量细节),这里只是简单的介绍。详细介绍可以查看:维基(证书颁发机构

对于企业内部的应用系统就没必要花钱购买CA机构的证书服务了,可以自建 Root CA,自己给自己颁发证书,充当内网的CA机构。当然这时候客户端就需要导入CA的证书了(浏览器和操作系统没有自建的CA证书)。

1.1.3 SSL/TLS加密协议

SSL(Secure Sockets Layer)是一种安全协议,目的是为保障互联网上数据传输安全,利用数据加密技术,确保数据在网络上之传输过程中不会被截取。

从网络协议层看,SSL协议位于TCP/IP协议与应用层协议之间,为数据通讯提供安全支持。SSL协议自身可分为两层:

  • SSL记录协议(SSL Record Protocol):它建立在可靠的传输协议(如TCP)之上,为高层协议提供数据封装、压缩、加密等基本功能的支持。
  • SSL握手协议(SSL Handshake Protocol):它建立在SSL记录协议之上,用于在实际的数据传输开始前,通讯双方进行身份认证、协商加密算法、交换加密密钥等。例如HTTPS就是在HTTP应用层上增加了SSL加密协议支持(HTTP over SSL)。

TLS(Transport Layer Security,传输层安全协议),同样用于两个应用程序之间提供保密性和数据完整性。 TLS 1.0建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本,可以理解为SSL 3.1,即是SSL的升级版。TLS的主要目标是使SSL更安全,并使协议的规范更精确和完善。另外,TLS版本号也与SSL的不同(TLS的版本1.0使用的版本号为SSLv3.1)

SSL通过握手过程在client和server之间协商会话參数,并建立会话。一共有三种方式:

  • 仅仅验证server的SSL握手过程(单向SSL)
  • 验证server和client的SSL握手过程(双向SSL)
  • 恢复原有会话的SSL握手过程

第一种:单向SSL通信过程如下(SSL 客户端和SSL 服务端通信):

(1)SSL客户端向SSL服务端发起请求,请求信息包括SSL版本号、加密算法、密钥交换算法、MAC算法等信息;

(2)SSL服务端确定本次通话的SSL版本和加密套件后,将携带公钥信息的证书回给客户端。如果通话可从重用,还会返回会话ID;

(3)SSL服务端发送Server Hello Done消息。通知SSL客户端版本号和加密套件协商结束,开始进行密钥交换;

(4)SSL客户端对CA证书进行验证,证书合法则继续、不成功弹出选择页面;

(5)SSL客户端生产随机私有对称密钥key,并使用服务端公开密钥进行加密后,发给服务端;

(6)SSL服务端使用自己的私钥解密,获取对称密钥key;

(7)最后SSL客户端与SSL服务端将使用该对称密钥key进行加密通信。

第二种:单向认证,仅仅是客户端需要检验服务端证书是否是正确的。双向SSL和单向认证几乎一样,只是在客户端认证完服务器证书后,客户端会将自己的证书传给服务器。服务器验证通过后,才开始秘钥协商。

第三种:协商会话参数、建立会话的过程中,需要使用非对称密钥算法来加密密钥、验证通信对端的身份,计算量较大,占用了大量的系统资源。为了简化SSL握手过程,SSL允许重用已经协商过的会话。即可以重用会话ID。这就是第三种建立会话方式。

1.1.4 Openssl工具

对于企业内部(内部局域网)的应用系统通讯,如果需要CA证书服务,可以使用Openssl自建CA,并完成证书签发。

先说一下常用密钥类文件的规范:

  • 后缀名规范

    通常约定后缀含义:crt或者cert 表示证书, key表示私钥, req和csr表示请求文件。

  • 文件格式

    pem表示pem格式(经过加密的文本文件),der表示der格式(经过加密的二进制文件)。所有证书和私钥可以是pem,也可以是der格式,取决于需要。两个格式可以转换。

Openssl的配置文件(openssl.cnf)定义CA的默认参数,例如ubuntu系统中配置文件位置在/usr/lib/ssl/openssl.cnf。如果不适用默认参数需要在命令中重新指定。

  • CA证书的制作

    首先生成CA的私钥,使用下面的命令:

    1
    $ openssl genrsa -out private/ca.key.pem 2048

    private/ca.key.pem是CA私钥,格式为pem,长度(加密位数)为2048。

    前面密码学知识知道CA使用一对密钥的(私钥和公钥),并且两个密钥是数学相关的。公钥可以通过私钥算出来。

  • CA证书自签发

    参考命令如下:

    1
    $ openssl req -new -x509 -key private/ca.key.pem -out certs/ca.cert.pem

    certs/ca.cert.pem 即CA的自签证书。部署导入到客户端(例如浏览器)。

  • 用户证书签发

    用户证书的签发和CA自签相同,用户证书由CA私钥签发。用户需要提供请求文件。

    1
    $ openssl ca -in app.csr -out app.crt -days 365

    app.crt为签发的证书。部署在应用服务器上。

1.1.5 Keytool工具介绍

在密钥证书管理时,通常使用JAVA的Keytool工具程序。Keytool 是一个JAVA数据证书的管理工具 ,Keytool 将密钥(key)和证书(certificates)存在一个称为keystore的文件中,通常称为密钥库文件。文件的扩展名通常使用:jks,全名java key store file。

Keytool是一个Java数据证书的管理工具,所以节点需要配置JAVA_HOME环境变量。

这里列举了命令支持的参数含义及注意点(供后续使用查阅):

  • keystore 参数指定保存证书的文件(密钥库二进制文件)。密钥库文件包含证书的私钥,必须对其进行安全保存。
  • validity 参数指定密钥有效期,单位是天。默认为90天。
  • keyalg 参数指定密钥使用的加密算法(例如RSA,如果不指定默认采用DSA)。
  • keysize 参数指定密钥的长度。该参数是选项参数,默认长度是1024位。为了保证密钥安全强度,建议密码长度设置为2048位。
  • keypass 参数指定生成密钥的密码(私钥密码)。
  • storepass 指定密钥库的密码(获取keystore信息所需的密码)。另外密钥库创建后,要对其做任何修改都必须提供该密码,以便访问密钥库。
  • alias 参数指定密钥别名。每个密钥文件有一个唯一的别名,别名不区分大小写。
  • dname 参数指定证书拥有者信息。例如: “CN=名字与姓氏,OU=组织单位名称,O=组织名称,L=城市或区域名称,ST=州或省份名称,C=单位的两字母国家代码”。
  • list 参数显示密钥库中的证书信息。keytool -list -v -keystore 指定keystore -storepass 密码
  • v 参数显示密钥库中的证书详细信息。
  • export 将别名指定的证书导出到文件。keytool -export -alias 需要导出的别名 -keystore 指定keystore -file 指定导出的证书位置及证书名称 -storepass 密码。
  • file 参数指定导出到文件的文件名。
  • delete 删除密钥库中某条目。keytool -delete -alias 指定需删除的别名 -keystore 指定keystore -storepass 密码
  • printcert 查看导出的证书信息。keytool -printcert -file yushan.crt
  • keypasswd 修改密钥库中指定条目口令。keytool -keypasswd -alias 需修改的别名 -keypass 旧密码 -new 新密码 -storepass keystore密码 -keystore sage
  • storepasswd 修改keystore口令。keytool -storepasswd -keystore e:/yushan.keystore(需修改口令的keystore) -storepass 123456(原始密码) -new newpasswd(新密码)
  • import 将已签名数字证书导入密钥库。keytool -import -alias 指定导入条目的别名 -keystore 指定keystore -file 需导入的证书

关于Keytool工具的详细介绍,可以参考oracle的官网

1.2 Kafka集群配置SSL加密

Apache Kafka允许客户端通过SSL连接。默认情况下,SSL是禁用的,可以根据需要打开。

1.2.1 集群环境准备

为了后文讲解方便,我们部署了Kafka集群(3节点)和Zookeeper集群(3节点)测试环境。其中zookeeper和kafka混合部署。

节点编号 hostname IP地址
1 kafka.app.node1 192.168.1.5
2 kafka.app.node2 192.168.1.6
3 kafka.app.node3 192.168.1.7

Kafka集群节点对外服务端口为:9092;Zookeeper集群节点对外服务端口为:2181。

1.2.2 配置主机名验证

从Kafka 2.0.0版开始,默认会为客户端连接以及broker之间的连接启用服务器的主机名验证(SSL端点识别算法),以防止中间人攻击。可以通过设置参数ssl.endpoint.identification.algorithm为空字符串来禁用服务器主机名验证。例如:

1
ssl.endpoint.identification.algorithm=

另外高版本支持不停集群服务下,进行动态配置,使用脚本kafka-configs.sh,参考命令如下:

1
bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

对于较旧的Kafka版本,ssl.endpoint.identification.algorithm默认情况下未定义,因此不会启用主机名验证。若该属性设置HTTPS,则启用主机名验证,例如:

1
ssl.endpoint.identification.algorithm=HTTPS

需要注意的是,一旦启用主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):

  • 通用名称(CN,Common Name)

  • 主题备用名称(SAN,Subject Alternative Name)

两个字段都有效,但RFC-2818建议使用SAN。 SAN也更灵活,允许声明多个DNS条目。 另一个优点是,CN可以设置为更有意义的值用于授权。如要添加SAN字段,需要将以下参数-ext SAN = DNS:{FQDN}添加到keytool命令中,例如:

1
$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}

更通俗一点讲,SSL 握手期间验证主机名时,它会检查服务器证书是否具有 SAN 集。如果检测到 SAN 集,那么只使用 SAN 集中的名称或 IP 地址。如果未检测到 SAN 集,那么只使用主题专有名称 (DN) 最重要的属性,通常是通用名称(CN)。将该值与客户端尝试连接的服务器启的主机名进行比较。如果它们相同,主机名验证成功,允许建立连接。

1.2.3 生成SSL密钥和证书(密钥库)

为了方便管理证书密钥,我们使用统一的路径保存。例如统一放在/usr/ca作为文件目录。

1
$ mkdir -p /usr/ca/{root,server,client,trust}

这里各文件夹的功能是:root:存储CA私钥和证书;server:存储服务端的私钥和证书;client:存储客户端私钥和证书;trust:存储信任库文件;

  • 节点1(kafka.app.node1)
1
$ keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node1

其中dname参数的含义参考Keytool工具介绍,文件名为:server.keystore.jks,这是密钥库。

  • 节点2(kafka.app.node2)
1
$ keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node2
  • 节点3(kafka.app.node3)
1
$ keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node3

证书生成后可以通过下面的命令进行查询(需要输入密钥库管理密码,即keypass的参数):

1
$ keytool -list -v -keystore server.keystore.jks

1.2.4 创建Kafka集群CA证书

集群中每个服务节点都有一对公钥和私钥,以及用于标识该节点的证书。但这个证书是未签名的,存在中间者攻击的风险。所以需要证书颁发机构(CA)负责签署颁发证书,使用openssl工具实现。

同一个集群的所有节点共用一个CA证书,所以只需要在集群的一个节点(集群外节点均可)生成CA证书,然后分发给集群其他节点。例如在kafka.app.node1节点上创建CA证书,命令如下:

1
$ openssl req -new -x509 -keyout /usr/ca/root/ca.key.pem -out /usr/ca/root/ca.cert.pem -days 3650 -passout pass:app123 -subj "/C=cn/ST=shanghai/L=shanghai/O=org/OU=depart/CN=kafka.app.node1"

然后使用scp命令分发给其他节点:

1
2
$ scp /usr/ca/root/* root@kafka.app.node2:/usr/ca/root/
$ scp /usr/ca/root/* root@kafka.app.node3:/usr/ca/root/

生成两个文件,分别是私钥(ca.key.pem)和证书(ca.cert.pem),它用来签署其他证书。

1.2.5 集群服务节点签署证书

首先给集群各服务节点签发证书(即签名)。步骤如下:

  • 第一步 从密钥容器中提取和导出服务端证书(输出文件:server.cert-file,未签名)
1
$ keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.itdw -certreq -file /usr/ca/server/server.cert-file -storepass app123
  • 第二步 给服务端证书签名(输出文件:server.cert-signed,已签名)
1
$ openssl x509 -req -CA /usr/ca/root/ca.cert.pem -CAkey /usr/ca/root/ca.key.pem -in /usr/ca/server/server.cert-file -out /usr/ca/server/server.cert-signed -days 365 -CAcreateserial -passin pass:app123
  • 第三步 将CA证书导入服务端密钥容器中
1
$ keytool -keystore /usr/ca/server/server.keystore.jks -alias CARoot -import -file /usr/ca/root/ca.cert.pem -storepass app123
  • 第四步 将已签名的证书导入密钥容器中
1
$ keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.app -import -file /usr/ca/server/server.cert-signed -storepass app123

需要注意集群上每个服务节点均需要签署。

1.2.6 生成服务端信任库

如果kafka集群中配置中的参数ssl.client.auth设置为: requestedrequired,需要为集群节点提供一个信任库,这个库中需要包含所有CA证书。

使用下面的命令将CA证书导入服务端信任库,输出为信任库文件:server.truststore.jks

1
$ keytool -keystore /usr/ca/trust/server.truststore.jks -alias CARoot -import -file /usr/ca/root/ca.cert.pem -storepass app123

将CA证书导入服务端信任库,意味着信任该CA证书签名的所有证书。此属性称为信任链,在大型Kafka群集上部署SSL时特别有用。您可以使用单个CA对群集中的所有证书进行签名,并使所有计算机共享信任该CA的同一信任库。这样,所有计算机都可以对所有其他计算机进行身份验证。

1.2.7 配置Kafka Brokers

Kafka Broker节点支持侦听多个端口上的连接。在server.properties中配置,多个端口类型使用逗号分隔,我们以集群中kafka.app.node1为例:

1
listeners=SSL://kafka.app.node1:9092

代理端需要以下SSL配置

1
2
3
4
5
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=app123

其他可选配置设置:

  • ssl.client.auth(可选)

    参数控制SSL认证模式。默认参数值为requested,默认使用单向认证,即客户端认证Kafka brokers。此时,没有证书的客户端仍然可以连接集群。参数值为required,指定开启双向验证(2-way authentication)。Kafka服务器同时会验证客户端证书。生成集群建议开始双向认证。

  • ssl.cipher.suites(可选)

    密码套件是认证,加密,MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。(默认为空列表)

  • ssl.enabled.protocols

    建议参数值为TLSv1.2,TLSv1.1,TLSv1。列出支持的SSL协议。生成环境不建议使用SSL,建议使用TLS。

  • ssl.keystore.typessl.truststore.type`

    文件格式:JKS

  • security.inter.broker.protocol参数

    kafka集群节点(brokers)之间启用SSL通讯,需要配置该配置参数为:SSL

最后我们总结合并一下所有的配置参数:

1
2
3
4
5
6
7
8
9
10
11
12
listeners=SSL://kafka.app.node1:9092
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=app123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
security.inter.broker.protocol=SSL

1.2.8 初步验证

正常启动集群的Zookeeper集群,然后依次启动集群的所有节点。使用下面的命令检查:

1
$ openssl s_client -debug -connect kafka.app.node1:9092 -tls1

该命令检查服务器的密钥库和信任库是否正确设置。命令中tls1必须是集群配置参数ssl.enabled.protocols所支持的协议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Certificate chain
(省略)
---
Server certificate
-----BEGIN CERTIFICATE-----
(省略)
-----END CERTIFICATE-----
subject=(省略)
issuer=(省略)
---
No client certificate CA names sent
---
SSL handshake has read 2029 bytes and written 264 bytes
---
New, TLSv1/SSLv3, Cipher is ECDHE-RSA-DES-CBC3-SHA
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
SSL-Session:
Protocol : TLSv1
Cipher : ECDHE-RSA-DES-CBC3-SHA
Session-ID: 5E580D610AEB5DDD8BCD0D31E88180F45391109792CA3CDD1E861EB87C704261
Session-ID-ctx:
Master-Key: E544FF34B993B2C3B7F7CB28D8166213F8D3A9864A82247F6948E33B319CD1A8943127DDF9B528EA73435EBC73B0DD55
Key-Arg : None
Start Time: 1582828897
Timeout : 7200 (sec)
Verify return code: 7 (certificate signature failure)
---
(省略)

如果证书未显示或有其他错误消息,则说明设置不正确。

另外对于’OpenSSL 0.9.8j-fips 07 Jan 2009’版本的openssl版本,由于这个版本不能自己检测出ssl的版本。会报下面的错误信息。

1816:error:1408E0F4:SSL routines:SSL3_GET_MESSAGE:unexpected message:s3_both.c:463:

1.3 配置kafka客户端

kafka集群需要支持集群内外的客户端交互访问。安全集群的客户端同样需要进行相关安全配置。这里客户端指的是Console客户端。

1.3.1 签发客户端证书

类似集群内部服务端的证书签发步骤,客户端证书签发过程入下:

  • 生成客户端SSL密钥和证书,输出密钥容器:client.keystore.jks

    1
    2
    $ keytool -keystore /usr/ca/client/client.keystore.jks -alias kafka.app
    .node1 -validity 365 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=dccsh,O=icbc,L=shanghai,S=shanghai,C=cn" -ext SAN=DNS:kafka.app.node1 -storepass app123
  • 从密钥容器中提取和导出客户端证书(输出文件:client.cert-file,未签名)

    1
    $ keytool -keystore /usr/ca/client/client.keystore.jks -alias kafka.app.node1 -certreq -file /usr/ca/client/client.cert-file -storepass app123
  • 给客户端证书签名(输出文件:client.cert-signed,已签名)

    1
    $ openssl x509 -req -CA /usr/ca/root/ca.cert.pem -CAkey /usr/ca/root/ca.key.pem -in /usr/ca/client/client.cert-file -out /usr/ca/client/client.cert-signed -days 365 -CAcreateserial -passin pass:app123
  • 将CA证书导入客户端密钥容器中

    1
    $ keytool -keystore /usr/ca/client/client.keystore.jks -alias CARoot -import -file /usr/ca/root/client.cert-file -storepass app123
  • 将已签名的证书导入密钥容器中

    1
    $ keytool -keystore /usr/ca/client/client.keystore.jks -alias kafka.app.node1 -import -file /usr/ca/client/client.cert-signed -storepass app123

1.3.2 生成客户端信任库

使用下面的命令将CA证书导入客户端信任库,输出为信任库文件:client.truststore.jks

1
$ keytool -keystore /usr/ca/trust/client.truststore.jks -alias CARoot -import -file /usr/ca/root/ca.cert.pem -storepass app123

1.3.3 配置客户端

客户端的console-producer和console-consumer命令需要添加相关安全配置。

如果kafka集群不需要客户端身份验证,只需配置下面的配置:

1
2
3
security.protocol=SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123

如果需要客户端身份验证,还需要补充下面的配置信息:

1
2
3
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123

根据我们的要求和代理配置,可能还需要其他配置设置:

  1. ssl.provider(可选)。用于SSL连接的安全提供程序的名称。
  2. ssl.cipher.suites(可选)。密码套件是认证,加密,MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。
  3. ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1。它应列出在代理方配置的至少一种协议
  4. ssl.truststore.type = JKS
  5. ssl.keystore.type = JKS

最后我们总结合并一下所有的配置参数(编辑文件名为:client-ssl.properties):

1
2
3
4
5
6
security.protocol=SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123

1.3.4 消费者生产者

使用console-producer的命令:

1
kafka-console-producer.sh --broker-list kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092 --topic test --producer.config client-ssl.properties

使用console-consumer的命令:

1
kafka-console-consumer.sh --bootstrap-server kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092 --topic test --new-consumer --consumer.config client-ssl.properties

这里testtopic名称,在只有SSL通信加密集群中,topic的创建、删除、生产、消费并没有权限管理,依然存在安全问题。所以kafka集群需要进一步配置权限管理。

第二部分 Kafka集群权限认证

Kafka集群的权限认证管理主要涉及:

  • 身份认证(Authentication)。对客户端与服务器的连接进行身份认证,brokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的认证。
  • 权限控制(Authorization)。实现对于消息级别的权限控制,客户端的读写操作进行Authorization(生产、消费)管理。

通俗的讲,身份认证解决的是证明你是谁,而权限控制解决的是你能干什么。在Kafka中身份认证和权限控制是两套独立的安全配置。

2.1 集群权限认证策略

Kafka从0.9.0.0版本后开始支持下面的SASL安全策略管理。这些安全功能为Kafka通信安全、多租户管理、集群云化提供了安全保障。截止目前Kafka 2.3版本,一共支持5种SASL方式。

验证方式 版本 说明
SASL/PLAIN 0.10.0.0 不能动态增加用户
SASL/SCRAM 0.10.2.0 可以动态增加用户。有两种方式:SASL/SCRAM-SHA-256 和SASL/SCRAM-SHA-512
SASL/GSSAPI 0.9.0.0 需要独立部署验证服务(即Kerberos服务)
SASL/OAUTHBEARER 2.0.0 需自己实现接口实现token的创建和验证,需要额外Oauth服务
SASL/Delegation Token 1.1.0 补充现有 SASL 机制的轻量级认证机制

对于生产环境,SASL/PLAIN方式有个缺点:只能在JAAS文件KafkaServer参数中配置用户,集群运行期间无法动态新增用户(需要重启重新加载JAAS文件),这对维护管理带来不便。而SASL/SCRAM方式,将认证数据存储在Zookeeper中,可以动态新增用户并分配权限。

SASL/GSSAPI方式需要依赖Kerberos服务。对于一些已经部署了集中式的Kerberos服务的大厂,只需要申请一个principal即可。如果生产Kerberos认证中出现TGT分发性能瓶颈,可以使用SASL/Delegation Token模式。使用 Kafka 提供的 API 去获取对应的 Delegation Token。Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证),减少性能压力。

同样SASL/OAUTHBEARER方式需要Oauth服务。

各种方式引入版本不同,使用依赖各有差异,需要结合自身业务特点选择合适的架构方式。

2.2 SASL/SCRAM策略配置介绍

SASL/SCRAM方式将身份认证和权限控制的凭证(credential)数据均存储在Zookeeper中,需要对Zookeeper进行安全配置。

2.2.1 Zookeeper集群侧配置

对Zookeeper集群中所有节点更新下面的策略后,重启集群生效。

  • 配置zoo.cfg文件

    文件尾部追加下面的配置:

    1
    2
    3
    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
  • 新增zk_server_jaas.conf文件

    配置文件内容如下:

    1
    2
    3
    4
    5
    6
    Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret";
    };

    其中usernamepassword定义的用户和密钥,用于Zookeeper与Kafka集群进行认证。配置项user_admin="admin-secret" 中 admin为用户名,admin-secret为密码,用于Zookeeper集群外客户端和集群内进行认证。

  • 拷贝依赖包

    将kafka文件系统中kafka/libs目录下的jar包拷贝到zookeeper/lib目录。

    1
    2
    3
    4
    5
    kafka-clients-2.1.1.jar
    lz4-java-1.5.0.jar
    osgi-resource-locator-1.0.1.jar
    slf4j-api-1.7.25.jar
    snappy-java-1.1.7.2.jar

    若没有引入依赖包,启动时会报找不到org.apache.kafka.common.security.plain.PlainLoginModule包的错误。

  • 修改zookeeper启动参数

    修改bin/zkEnv.sh文件, 在文件尾追加下面的配置内容。该配置完成引入的包的加载。变量CLASSPATHSERVER_JVMFLAGS都会在Zookeeper启动时传给JVM虚拟机。

    下面的配置中$ZOOKEEPER_HOME是zookeeper的环境变量,如果没有配置,使用绝对路径即可。

    1
    2
    3
    4
    for i in $ZOOKEEPER_HOME/lib/*.jar; do
    CLASSPATH="$i:$CLASSPATH"
    done
    SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOKEEPER_HOME/conf/zk_server_jaas.conf"

2.2.2 kafka集群侧配置

kafka集群中每一台节点均需要更新下面的配置。

  • 新增kafka_server_scram_jaas.conf文件(在config目录中)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
    # 自定义用户:
    # user_admin="admin-secret"
    # user_alice="alice-secret"
    # user_reader="reader-secret"
    # user_writer="writer-secret";
    };

其中配置usernamepassword为Kafka集群之间通讯的SCRAM凭证,用户名为admin,密码为admin-secret

配置中类似user_XXX格式的配置项为自定义用户。如果是SASL/PLAIN方式,用户只能在该文件中定义,不能动态新增。我们使用SASL/SCRAM方式,可以后续动态声明admin用户,不再此处进行配置。

  • 更新Kafka的配置文件server.properties(在config目录中):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    #SASL CONFIG
    listeners=SASL_SSL://kafka.app.node1:9092
    sasl.enabled.mechanisms=SCRAM-SHA-512
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    #allow.everyone.if.no.acl.found=true
    super.users=User:admin
    #SSL CINFIG
    ssl.keystore.location=/usr/ca/server/server.keystore.jks
    ssl.keystore.password=app123
    ssl.key.password=app123
    ssl.truststore.location=/usr/ca/trust/server.truststore.jks
    ssl.truststore.password=app123
    ssl.client.auth=required
    ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.endpoint.identification.algorithm=HTTPS
    security.inter.broker.protocol=SASL_SSL

    需要注意参数allow.everyone.if.no.acl.found,如果开启参数开关,当客户端和集群交互时候未找到ACL策略时,允许所有类型的访问操作。建议该参数关闭(false)。

    参数security.inter.broker.protocol指定集群brokers之间的通讯协议。不加密协议有:SASL_SSL、SASL_PLAINTEXT、PLAINTEXT;加密协议有:SSL。为了提高节点之间的交互性能,内部网络环境建议使用非加密协议。这里使用加密的SASL_SSL协议。

    参数super.users指定了集群的超级用户为:admin。注意如果指定多个超级用户,每个用户使用分号隔开,例如:super.users=User:admin;User:alice

    参数sasl.enabled.mechanisms列出支持的认证方式。即可以支持多种。

    参数sasl.mechanism.inter.broker.protocol指定集群内部的认证方式。Kafka仅支持最小迭代次数为4096的强哈希函数SHA-256和SHA-512。所以有SCRAM-SHA-512和SCRAM-SHA-256两种方式。

  • 配置kafka启动环境变量(bin目录下面的kafka-run-class.sh

    为 Kafka 添加 java.security.auth.login.config 环境变量(配置文件路径)。并且在启动模式中添加KAFKA_SASL_OPTS

    1
    2
    3
    4
    5
    6
    7
    8
    # 截取配置文件片段:
    KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/software/kafka/config/kafka_server_scram_jaas.conf'
    # Launch mode
    if [ "x$DAEMON_MODE" = "xtrue" ]; then
    nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
    else
    exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
    fi

2.2.3 SCRAM认证管理

在集群的配置文件kafka_server_scram_jaas.conf中,定义了集群内部的认证用户。对于客户端和集群之间认证可以使用kafka-configs.sh来动态创建。

  • 创建用户SCRAM凭证

    例如集群中的超级用户admin用户,使用下面的命令创建:

1
$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

创建自定义普通用户alice

1
$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
  • 查看SCARM凭证
1
$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --describe --entity-type users --entity-name admin
  • 删除SCRAM凭证
1
$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice

2.3 Kafka客户端配置

Kafka集群配置了认证,那么对于Console客户端访问集群自然需要配置认证信息。可集群节点内部通讯凭证的认知,同样需要定义JAAS文件。加入我们自定义了用户alice,JAAS文件名为:kafka_console_client_jaas.conf,配置内容如下:

1
2
3
4
5
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice-secret";
};

然后更新kafka-console-producer.sh脚本和kafka-console-consumer.sh脚本的启动参数。

1
2
3
4
5
# 文件截取更新部分:

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/software/kafka/config/kafka_write_jaas.conf"
fi

在配置SSL时候,我们新建了client-ssl.properties配置文件,作为Console客户端启动配置。在集群启用SASL_SSL后,我们同步更新如下:

1
2
3
4
5
6
security.protocol=SASL_SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123

至此Console客户端已经配置完毕,但目前Console客户端还不能通过命令方式和集群进行交互,因为我们指定的用户对于集群的资源还没有任何权限。需要对用户进行集群资源的ACL控制设置,赋予相关权限。

2.4 ACL控制

Kafka权限资源包含Topic、Group、Cluster、TransactionalId(事务id),每个资源涉及的权限内容如下:

资源类型 权限类型
Topic Read,Write,Describe,Delete,DescribeConfigs,AlterConfigs,All
Group Read,Describe,All
Cluster Create,ClusterAction,DescribeConfigs,AlterConfigs,IdempotentWrite,Alter,Describe,All
TransactionalId Describe,Write,All

对于常用类型进行说明:

权限 说明
Read 读取topic、group信息
Write 写topic、TransactionalId(存储在内部topic)
Delete 删除topic
Create 创建topic
ALTER 修改topic
Describe 获取topic、group、TransactionalId信息
ALL 所有权限

Kafka提供ACL管理脚本:kafka-acls.sh

2.4.1 更新脚本配置

认证数据均存储在Zookeeper集群中,需要和Zookeeper交互自然需要配置相关认证信息。

首先需要新建JAAS文件,文件名为:zk_client_jaas.conf。这里的用户已经在Zookeeper集群中进行定义。

1
2
3
4
5
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};

最后更新kafka-acls.sh脚本:

1
2
3
4
5
# 截取更新部分

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/software/kafka/config/zk_client_jaas.conf"
fi

当然Kafka集群的配置文件中已经开启了ACL:

1
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

至此完成配置。

2.4.2 ACL配置

根据官网的介绍,ACL的格式如下:

“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”

参数含义描述如下:

  • principal:指定一个Kafka user;
  • operation:指定一个具体的操作类型,例如:Read, Write, Delete等;
  • Host:表示与集群交互的客户端IP地址,如果是通配符‘*’表示所有IP。目前不支持主机名(hostname)形式,只能是IP地址;
  • Resource:指定一种Kafka资源类型(共有4种类型);

    例如下面的ACL命令:

1
$ sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --add --allow-principal User:alice --allow-host '*' --operation ALL --topic test

赋权之后,用户alice对test具有全部权限,并且访问请求可以是来自任何IP的客户端。

常用参数的补充说明:

  • 对主机IP的限制参数,allow-host指定允许的IP,deny-host指定禁用IP;
  • 新增和删除一个赋权策略,分别使用:addremove

2.4.3 ACL策略查看

使用参数list参看ACL策略。例如:

1
$ sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --list --topic test-topic

该查看命令显示test-topic资源相关的所有ACL策略。

2.4.4 超级用户

kafka集群的配置文件server.properties中定义了超级用户(Super Users),超级用户不在ACL控制范围内,默认可以访问集群中所有资源,并具备所有权限。

2.5 权限认证数据访问

集群的认证数据存储在Zookeeper,可以通过Zookeeper的console客户端访问认证数据。

使用zookeeper自带的命令行客户端:

1
2
3
4
5
/dmqs/zookeeper/bin> ./zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled
[zk: localhost:2181(CONNECTING) 0]

查看zookeeper中的数据:

1
2
[zk: localhost:2181(CONNECTED) 1] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, kafka-acl, kafka-acl-changes, admin, isr_change_notification, consumers, config]

其中kafka-acl中存储相关权限认证数据。

1
2
[zk: localhost:2181(CONNECTED) 3] ls /kafka-acl
[Cluster, Topic]

可以查看其中的权限信息。

2.6 SSL和SASL的说明

SSL是传输层安全协议,是位于传输层(TCP/IP)和应用层(HTTP)的协议,SSL是对整个传输过程的加密,SSL是对客户端和服务器之间传输的所有数据进行加密。假如在配置的时候使用了SASL,但是没有使用SSL,那么除了账号密码外,所有的传输内容都是裸奔的。

所以生产集群采用SSL和SASL结合方式,即SSL_SASL方式。

第三部分 安全集群的客户端

3.1 开发语言类

3.1.1 Python客户端

目前市面上kafka的python API常用的有三种:

  • 第一种 kafka

该项目(项目地址:https://pypi.org/project/kafka/,最后版本为:`Released: Oct 8, 2017)是kafka-python的老项目,2017年后调整为kafka-python`项目。

  • 第二种 kafka-python

最新版本为2.0,首先从客户端的密钥库中导出CA证书。

1
$ keytool -exportcert -alias CARoot -keystore client.keystore.jks -rfc -file ca.cert.pem

生产者和消费者的案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer, KafkaProducer
import kafka
import ssl
import logging
import time
#logging.basicConfig(level=logging.DEBUG)

try:
bootstrap_servers = 'kafka.itdw.node1:9092,kafka.itdw.node2:9092,kafka.itdw.node3:9092'
topic = "test"
sasl_mechanism = "SCRAM-SHA-512"
username = "alice"
password = "alice-secret"
security_protocol = "SASL_SSL"
# CA 证书路径
ssl_cafile = 'ca.cert.pem'
# SSL
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
context.load_verify_locations(ssl_cafile)
# 消费者
consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers,
api_version=(0, 10),
security_protocol=security_protocol,
ssl_context=context,
sasl_mechanism = sasl_mechanism,
sasl_plain_username = username,
sasl_plain_password = password
)
# 生产者
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
api_version=(0, 10),
acks='all',
retries=1,
security_protocol=security_protocol,
ssl_context=context,
sasl_mechanism=sasl_mechanism,
sasl_plain_username=username,
sasl_plain_password=password
)
# 生产数据
for i in range(10):
producer.send(topic, bytes("测试",encoding='utf8'))
producer.flush()
# 消费数据
for msg in consumer:
print(msg)

except Exception as e:
print(e)

需要的注意的事项有:

  • Kafka集群启用主机名模式,所以应用程序运行节点的hosts文件需要配置Kafka集群节点的域名映射。

  • ssl_context参数为包装套接字连接的预配置SSLContext。如果非None,将忽略所有其他ssl_ *配置。

  • 主机名验证问题。如果证书中域名和主机名不匹配,客户端侧需要配置需要调整如下:

    1
    2
    ssl_ctx.check_hostname=False
    ssl_ctx.verify_mode = CERT_NONE
  • 如果不提前预配置SSLContext,还需要客户端的证书。

    1
    $ keytool -exportcert -alias localhost -keystore client.keystore.jks -rfc -file client.cert.pem

    生产者的参数需要添加:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    ssl_certfile = "client.cert.pem"
    ssl_cafile = "ca.cert.pem"
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
    api_version=(0, 10),
    acks='all',
    retries=1,
    security_protocol=security_protocol,
    ssl_context=context,
    sasl_mechanism=sasl_mechanism,
    sasl_plain_username=username,
    sasl_plain_password=password,
    ssl_check_hostname=False,
    ssl_certfile=ssl_certfile,
    ssl_cafile=ssl_cafile)
  • 第三种 confluent-kafka

confluent-kafka包由confluent公司开源,主要是对C/C++客户端包(librdkafka)的封装。案例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding: utf-8 -*-
from confluent_kafka import Producer

# 回调函数
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print(’Message delivery failed: {}‘.format(err))
else:
print(‘Message delivered to {} [{}]‘.format(msg.topic(), msg.partition()))

if __name__ == ‘__main__‘:
producerConfing = {"bootstrap.servers": 'kafka.itdw.node1:9092,kafka.itdw.node2:9092,kafka.itdw.node3:9092',
"security.protocol": 'SASL_SSL',
"sasl.mechanisms": 'SCRAM-SHA-256',
"sasl.username": 'alice',
"sasl.password": 'alice-secret',
"ssl.ca.location": 'ca.cert.pem'
}
ProducerTest = Producer(producerConfing)

ProducerTest.poll(0)
ProducerTest.produce(‘testTopic‘, ‘confluent kafka test‘.encode(‘utf-8‘),callback=delivery_report)
ProducerTest.flush()

3.1.2 Go客户端

我们的Go语言中常用的Kafka的客户端包有:

1
2
3
4
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/segmentio/ksuid"

其中最常用的是sarama,案例参考github项目

3.1.3 Java客户端

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.kafka.security;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.util.Properties;
import java.util.Random;


public class KafkaProducerWithSASL_SSL {
private static final String KAFKA_TOPIC = "topsec";
private static final String BOOTSTRAP_SERVER = "docker31:9092";
private static final String[] strs = new String[]{"zhao", "qian", "sun", "li", "zhou", "wu", "zheng", "wang", "feng", "chen"};
private static final Random r = new Random();

public static void main(String[] args) {
try {
producer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private static void producer() throws InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
//SASL_SSL加密
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\trust\\client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "hadoop");
// SSL用户认证
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\client\\client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "hadoop");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "hadoop");
//SASL用户认证
props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");

props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

Producer<String, String> producer = new KafkaProducer<>(props);
while (true) {
producer.send(new ProducerRecord<>(KAFKA_TOPIC, strs[r.nextInt(10)],strs[r.nextInt(10)]));
Thread.sleep(2000);
}
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.topsec.kafka.security;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.util.Collections;
import java.util.Properties;


public class KafkaConsumerWithSASLAndSSL {
private static final String KAFKA_TOPIC = "topsec";
private static final String BOOTSTRAP_SERVER = "docker31:9092";
public static void main(String[] args) {
consumer();
}

private static void consumer() {
Properties props = new Properties();
//SASL_SSL加密配置
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\trust\\client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "hadoop");
//SSL身份验证配置
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\client\\client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "hadoop");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "hadoop");
//SASL身份验证
props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
record.offset(),
record.key(),
record.value(),
record.partition());
}
}
}
}

3.2 组件类

3.2.1 Console客户端

客户端节点部署kafka项目,在bin目录下面我们已经更新了kafka-console-consumer.shkafka-console-producer.sh两个脚本。并分别新增了加密访问的配置文件consumer.configproducer.config

命令案例参考:1.3.4 章节内容。

3.2.2 Flume客户端

目前Flume项目官网项目文档介绍支持下面三种方式:

  • SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证;
  • SASL_SSL - 有数据加密的 Kerberos 或明文认证;
  • SSL - 基于TLS的加密,可选的身份验证;

事实上对于SASL/SCRAM方式Flume也是支持的。具体配置如下(以Flume1.9版本为例):

3.2.2.1 第一步 新增jaas配置文件

在Flume的conf配置目录下面新增flume_jaas.conf文件,文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
3.2.2.2 第二步 更新flume-env.sh文件

Flume的conf配置目录下面flume-env.sh文件添加JAVA_OPTS配置更新:

1
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/dmqs/apache-flume-1.9.0-bin/conf/flume_jaas.conf"

其中路径为第一步中新增的flume_jaas.conf文件路径。

3.2.2.3 测试案例(sinks)

我们使用一个简单的案例来测试,Flume的source为监控文件尾写入,Flume的sinks为加密kafka集群。具体配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /dmqs/apache-flume-1.9.0-bin/data/flume/flume.log
a1.sources.r1.shell = /bin/bash -c

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 15000
a1.channels.c1.transactionCapacity = 15000

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = kafka.itdw.node1:9093
a1.sinks.k1.kafka.topic = flume
a1.sinks.k1.kafka.flumeBatchSize = 15000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1000
a1.sinks.k1.kafka.producer.security.protocol=SASL_SSL
a1.sinks.c1.kafka.producer.sasl.mechanism =SCRAM-SHA-512
a1.sinks.c1.kafka.producer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"
####
a1.sinks.k1.kafka.producer.ssl.truststore.location =/usr/ca/trust/client.truststore.jks
a1.sinks.k1.kafka.producer.sasl.mechanism =SCRAM-SHA-512
a1.sinks.k1.kafka.producer.ssl.truststore.password=app123
a1.sinks.k1.kafka.producer.ssl.keystore.location=/usr/ca/client/client.keystore.jks
a1.sinks.k1.kafka.producer.ssl.keystore.password=app123
a1.sinks.k1.kafka.producer.ssl.key.password=app123
a1.sinks.k1.kafka.producer.timeout.ms = 100
a1.sinks.k1.batchSize=15000
a1.sinks.k1.batchDurationMillis=2000

配置保存为flume-sink-auth-kafka.conf,为了检查输出结果使用下面命令启动(在bin目录中):

1
./flume-ng agent --conf ../conf --conf-file ../conf/flume-sink-auth-kafka.conf --name a1 -Dflume.root.logger=INFO,console

向文件尾部追加信息:

1
echo "test" >> /dmqs/apache-flume-1.9.0-bin/data/flume/flume.log

然后使用消费者客户端查看数据是否写入kafka的flume主题中。

3.2.2.3 测试案例(source)

同样可以也可以将加密Kafka作为Flume的source,配置案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = kafka.app .node1:9093,kafka.app.node2:9093,kafka.app.node3:9093
a1.sources.r1.kafka.topics = flume
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.kafka.consumer.timeout.ms = 2000
a1.sources.r1.batchSize=150
a1.sources.r1.batchDurationMillis=1000
#####
a1.sources.r1.kafka.consumer.ssl.truststore.location =/usr/ca/trust/client.truststore.jks
a1.sources.r1.kafka.consumer.sasl.mechanism =SCRAM-SHA-512
a1.sources.r1.kafka.consumer.ssl.truststore.password=itdw123
a1.sources.r1.kafka.consumer.ssl.keystore.location=/usr/ca/client/client.keystore.jks
a1.sources.r1.kafka.consumer.ssl.keystore.password=itdw123
a1.sources.r1.kafka.consumer.ssl.key.password=itdw123
a1.sources.r1.kafka.consumer.security.protocol=SASL_SSL
a1.sources.r1.kafka.consumer.sasl.mechanism =SCRAM-SHA-512
a1.sources.r1.kafka.consumer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 15000
a1.channels.c1.transactionCapacity = 15000

# sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /dmqs/apache-flume-1.9.0-bin/data/flume
a1.sinks.k1.sink.serializer = TEXT

案例中将加密Kafka中flume主题中的数据汇入到指定目录的文件中。

3.2.3 Logstash客户端

Logstash和Kafka交互使用Kafka output plugin插件实现。其中配置文件中output部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
output {
kafka {
id => "kafkaSLL_SASL"
codec => "json"
ssl_endpoint_identification_algorithm => ""
bootstrap_servers => "kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092"
jaas_path =>"/etc/logstash/certificates/kafka_servcer_jaas.conf"
ssl_keystore_location => "/etc/logstash/certificates/client.keystore.jks"
ssl_keystore_password => "app123"
ssl_keystore_type => "JKS"
ssl_truststore_location => "/etc/logstash/certificates/client.truststore.jks"
ssl_truststore_password => "app123"
ssl_truststore_type => "JKS"
sasl_mechanism => "SCRAM-SHA-512"
security_protocol => "SASL_SSL"
topic_id => "test"
}
}

参考:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

第四部分 加密认证集群的性能压测

集群启用SSL后,数据交互中需要加密、解密。kafka集群的I/O性能会降低。我们使用Kafka自带的压侧工具对集群加密前和加密后性能进行评测。

4.1生产者压力测试

客户端写入参数配置为acks=all(即Topic中Leader和fellow副本均写入成功)。每条消息大小为1M(消息体小吞吐量会大一些)。另外测试客户端为集群内部节点,忽略了数据网络传输的性能消耗。

4.1.1不加密集群

1
./kafka-consumer-perf-test.sh --topic topsec --throughput 50000 --num-records 1500000 --record-size 10000 --producer-props bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093 acks=all

测试结果:

1
1500000 records sent, 38538.615693 records/sec (37.64 MB/sec), 748.44 ms avg latency, 5485.00 ms max latency, 227 ms 50th, 3194 ms 95th, 3789 ms 99th, 3992 ms 99.9th.

4.1.2加密集群

1
./kafka-producer-perf-test.sh --topic topsec --throughput 50000 --num-records 1500000 --record-size 10000 --producer-props bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093 acks=all --producer.config producer.config

测试结果:

1
1500000 records sent, 16901.027582 records/sec (16.50 MB/sec), 1713.43 ms avg latency, 9345.00 ms max latency, 72 ms 50th, 1283 ms 95th, 2067 ms 99th, 2217 ms 99.9th.

4.2 压侧结论

加密改造前,生产者的吞吐量为3.8w 条/秒,改造后1.7W 条/秒。整体吞吐性能降低50%左右,数据的加密、解密导致吞吐量性能降低。平均时延也增加了一倍多(改造前700ms,改造后1700ms)。在实际生产中可参考这个性能折扣基线配置集群资源。

参考文献及资料

1、Kafka官网对安全类功能介绍,链接:http://kafka.apache.org/documentation/#security

2、Kafka ACLs in Practice – User Authentication and Authorization,链接:https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/

3、维基百科(数字证书),链接:https://zh.wikipedia.org/wiki/公開金鑰認證

4、SSL技术白皮书,链接:https://blog.51cto.com/xuding/1732723

5、Kafka权限管理,链接:https://www.jianshu.com/p/09129c9f4c80

5、Flume文档,链接:https://flume.apache.org/FlumeUserGuide.html#kafka-sinkorg/FlumeUserGuide.html#kafka-sink

本文标题:Kafka系列文章(第五篇 Kafka安全集群)

文章作者:rong xiang

发布时间:2020年03月02日 - 13:03

最后更新:2022年10月25日 - 23:10

原始链接:https://zjrongxiang.github.io/posts/e97c75b5/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

0%