目录
- 背景
- 第一部分 Kafka集群加密传输
- 第二部分 Kafka集群权限认证
- 第三部分 加密认证集群的客户端
- 第四部分 加密认证集群的性能压测
- 第五部分 总结
- 参考文献及资料
背景
Kafka在0.9.0.0版本前没有安全机制功能。Kafka Client程序可以直接获取到Kafka集群元数据信息和Kafka Broker地址后,连接到Kafka集群,然后完全操作集群上的所有topic数据资源。另外集群节点间通讯、broker和zookeeper通讯、客户端和集群的网络层通信都是无加密模式。集群的数据存在极大的安全风险。
自0.9.0.0版本开始,Kafka社区逐步添加了较多功能用于提高Kafka群集的安全性。目前Kafka安全集群安全机制主要有三个方面的设置:通信加密(encryption)、身份认证(authentication)和授权(authorization)。
本文重点介绍生产安全集群的一种配置方案。数据通讯传输配置SSL,认证配置SASL,授权通过ACL接口命令来完成的,即:SSL+SASL/SCRAM+ACL。
第一部分 Kafka集群加密传输
1.1 背景知识介绍
涉及的技术知识不做详细介绍。
1.1.1 密码学基础
加密算法分为两类:
对称密钥算法(Symmetric Cryptography):数据加密和解密时使用相同的密钥。例如常用的DES就是对称加密算法。
非对称密钥算法(Asymmetric Cryptography):数据加密和解密时使用不同的密钥,分为:公开的公钥(public key)和用户保存的私钥(private key),私钥和公钥在数学上是相关的。利用公钥(或私钥)加密的数据只能用相应的私钥(或公钥)才能解密。举一个例子:客户在银行网银上做一笔交易,首先向银行申请公钥,银行分发公钥给用户,用户使用公钥对请求数据进行加密。银行收到加密数据后通过银行侧保存的私钥进行解密处理,并处理后更新后台数据库。这个通讯过程中银行不需要通过互联网分发私钥。因此保证了私钥的安全。目前最常用的非对称加密算法是RSA算法。
非对称密钥算法中,私钥来解密公钥加密的数据,公钥来解密私钥加密的数据。
两种加密算法的比较:
对称密钥的强度和密钥长度成正比,但是解密效率和密钥长度成反比。另外私钥的分发存在安全风险。
非对称加密保证了私钥的安全性,但是加密和解密的效率比对称加密低。
所以通常加密场景是两种密钥结合使用。使用数据主体使用对称秘钥算法,但是私钥的传输使用非对称算法在互联网环境分发非对称密钥。最常见的就是SSL/TLS。
1.1.2 CA数字证书
对于非对称密钥算法存在一个安全风险点,那就是公钥的分发存在中间人攻击。还是以客户和银行的通信为例(例子简单化处理)。客户和银行分别有自己的公钥和私钥,私钥各自保留本地。公钥通过互联网分发给对方。那么公钥就是有安全风险的。存在被黑客截取风险。客户向银行申请银行公钥,结果被黑客截取,黑客伪装成银行,返回给用户自己的黑客公钥,用户收到黑客公钥后,将信息加密发给黑客。黑客用黑客私钥进行解密,获取到真实信息。这时候黑客伪装成客户用相同的方法完成和银行的数据交互。这就是中间人攻击的案例。
所以非对称加密算法的公钥传输同样存在风险。当然如果使用原始的离线方式交换密钥是安全的,但是随着互联网通信的爆炸式增长,这是落后低效的。为了保证公钥的真实性和安全性,这时候我们引入第三个角色:公开密钥认证(Public key certificate,简称CA),又称数字证书(digital certificate)或身份证书(identity certificate)。
通常CA是一家第三方权威机构。负责管理和签发证书。整个实现原理也是非对称加密算法:
- 机构将自己的公钥以及身份信息交给CA机构(安全的),CA使用自己的私钥对各机构的公钥进行加密。这个过程称为验签。输出的加密后的公钥及身份信息称为数字证书。
- 当其他机构请求A机构公钥的时候,返回的是A机构的数字证书。其他机构可以使用CA的公钥对该数字证书中加密公钥进行解密获取A机构的通信公钥。
那么新得安全问题又来了,如何保证CA机构的公钥不被伪造?通常CA的公钥是集成在浏览器或者操作系统中,并且被很好的保护起来。
当然CA证书还涉及更多的安全细节设计(Hash算法防篡改、信任链等大量细节),这里只是简单的介绍。详细介绍可以查看:维基(证书颁发机构)
对于企业内部的应用系统就没必要花钱购买CA机构的证书服务了,可以自建 Root CA,自己给自己颁发证书,充当内网的CA机构。当然这时候客户端就需要导入CA的证书了(浏览器和操作系统没有自建的CA证书)。
1.1.3 SSL/TLS加密协议
SSL(Secure Sockets Layer)是一种安全协议,目的是为保障互联网上数据传输安全,利用数据加密技术,确保数据在网络上之传输过程中不会被截取。
从网络协议层看,SSL协议位于TCP/IP协议与应用层协议之间,为数据通讯提供安全支持。SSL协议自身可分为两层:
- SSL记录协议(SSL Record Protocol):它建立在可靠的传输协议(如TCP)之上,为高层协议提供数据封装、压缩、加密等基本功能的支持。
- SSL握手协议(SSL Handshake Protocol):它建立在SSL记录协议之上,用于在实际的数据传输开始前,通讯双方进行身份认证、协商加密算法、交换加密密钥等。例如HTTPS就是在HTTP应用层上增加了SSL加密协议支持(HTTP over SSL)。
TLS(Transport Layer Security,传输层安全协议),同样用于两个应用程序之间提供保密性和数据完整性。 TLS 1.0建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本,可以理解为SSL 3.1,即是SSL的升级版。TLS的主要目标是使SSL更安全,并使协议的规范更精确和完善。另外,TLS版本号也与SSL的不同(TLS的版本1.0使用的版本号为SSLv3.1)
SSL通过握手过程在client和server之间协商会话參数,并建立会话。一共有三种方式:
- 仅仅验证server的SSL握手过程(单向SSL)
- 验证server和client的SSL握手过程(双向SSL)
- 恢复原有会话的SSL握手过程
第一种:单向SSL通信过程如下(SSL 客户端和SSL 服务端通信):
(1)SSL客户端向SSL服务端发起请求,请求信息包括SSL版本号、加密算法、密钥交换算法、MAC算法等信息;
(2)SSL服务端确定本次通话的SSL版本和加密套件后,将携带公钥信息的证书回给客户端。如果通话可从重用,还会返回会话ID;
(3)SSL服务端发送Server Hello Done消息。通知SSL客户端版本号和加密套件协商结束,开始进行密钥交换;
(4)SSL客户端对CA证书进行验证,证书合法则继续、不成功弹出选择页面;
(5)SSL客户端生产随机私有对称密钥key,并使用服务端公开密钥进行加密后,发给服务端;
(6)SSL服务端使用自己的私钥解密,获取对称密钥key;
(7)最后SSL客户端与SSL服务端将使用该对称密钥key进行加密通信。
第二种:单向认证,仅仅是客户端需要检验服务端证书是否是正确的。双向SSL和单向认证几乎一样,只是在客户端认证完服务器证书后,客户端会将自己的证书传给服务器。服务器验证通过后,才开始秘钥协商。
第三种:协商会话参数、建立会话的过程中,需要使用非对称密钥算法来加密密钥、验证通信对端的身份,计算量较大,占用了大量的系统资源。为了简化SSL握手过程,SSL允许重用已经协商过的会话。即可以重用会话ID。这就是第三种建立会话方式。
1.1.4 Openssl工具
对于企业内部(内部局域网)的应用系统通讯,如果需要CA证书服务,可以使用Openssl自建CA,并完成证书签发。
先说一下常用密钥类文件的规范:
后缀名规范
通常约定后缀含义:crt或者cert 表示证书, key表示私钥, req和csr表示请求文件。
文件格式
pem表示pem格式(经过加密的文本文件),der表示der格式(经过加密的二进制文件)。所有证书和私钥可以是pem,也可以是der格式,取决于需要。两个格式可以转换。
Openssl的配置文件(openssl.cnf
)定义CA的默认参数,例如ubuntu
系统中配置文件位置在/usr/lib/ssl/openssl.cnf
。如果不适用默认参数需要在命令中重新指定。
CA证书的制作
首先生成CA的私钥,使用下面的命令:
1
openssl genrsa -out private/ca.key.pem 2048
private/ca.key.pem
是CA私钥,格式为pem,长度(加密位数)为2048。前面密码学知识知道CA使用一对密钥的(私钥和公钥),并且两个密钥是数学相关的。公钥可以通过私钥算出来。
CA证书自签发
参考命令如下:
1
openssl req -new -x509 -key private/ca.key.pem -out certs/ca.cert.pem
certs/ca.cert.pem
即CA的自签证书。部署导入到客户端(例如浏览器)。用户证书签发
用户证书的签发和CA自签相同,用户证书由CA私钥签发。用户需要提供请求文件。
1
openssl ca -in app.csr -out app.crt -days 365
app.crt
为签发的证书。部署在应用服务器上。
1.1.5 Keytool工具介绍
在密钥证书管理时,通常使用JAVA的Keytool工具程序。Keytool 是一个JAVA数据证书的管理工具 ,Keytool 将密钥(key)和证书(certificates)存在一个称为keystore的文件中,通常称为密钥库文件。文件的扩展名通常使用:jks,全名java key store file。
Keytool是一个Java数据证书的管理工具,所以节点需要配置JAVA_HOME环境变量。
这里列举了命令支持的参数含义及注意点(供后续使用查阅):
- keystore 参数指定保存证书的文件(密钥库二进制文件)。密钥库文件包含证书的私钥,必须对其进行安全保存。
- validity 参数指定密钥有效期,单位是天。默认为90天。
- keyalg 参数指定密钥使用的加密算法(例如RSA,如果不指定默认采用DSA)。
- keysize 参数指定密钥的长度。该参数是选项参数,默认长度是1024位。为了保证密钥安全强度,建议密码长度设置为2048位。
- keypass 参数指定生成密钥的密码(私钥密码)。
- storepass 指定密钥库的密码(获取keystore信息所需的密码)。另外密钥库创建后,要对其做任何修改都必须提供该密码,以便访问密钥库。
- alias 参数指定密钥别名。每个密钥文件有一个唯一的别名,别名不区分大小写。
- dname 参数指定证书拥有者信息。例如: “CN=名字与姓氏,OU=组织单位名称,O=组织名称,L=城市或区域名称,ST=州或省份名称,C=单位的两字母国家代码”。
- list 参数显示密钥库中的证书信息。keytool -list -v -keystore 指定keystore -storepass 密码
- v 参数显示密钥库中的证书详细信息。
- export 将别名指定的证书导出到文件。keytool -export -alias 需要导出的别名 -keystore 指定keystore -file 指定导出的证书位置及证书名称 -storepass 密码。
- file 参数指定导出到文件的文件名。
- delete 删除密钥库中某条目。keytool -delete -alias 指定需删除的别名 -keystore 指定keystore -storepass 密码
- printcert 查看导出的证书信息。keytool -printcert -file yushan.crt
- keypasswd 修改密钥库中指定条目口令。keytool -keypasswd -alias 需修改的别名 -keypass 旧密码 -new 新密码 -storepass keystore密码 -keystore sage
- storepasswd 修改keystore口令。keytool -storepasswd -keystore e:/yushan.keystore(需修改口令的keystore) -storepass 123456(原始密码) -new newpasswd(新密码)
- import 将已签名数字证书导入密钥库。keytool -import -alias 指定导入条目的别名 -keystore 指定keystore -file 需导入的证书
关于Keytool工具的详细介绍,可以参考oracle的官网。
1.2 Kafka集群配置SSL加密
Apache Kafka允许客户端通过SSL连接。默认情况下,SSL是禁用的,可以根据需要打开。
1.2.1 集群环境准备
为了后文讲解方便,我们部署了Kafka集群(3节点)和Zookeeper集群(3节点)测试环境。其中zookeeper和kafka混合部署。
节点编号 | hostname | IP地址 |
---|---|---|
1 | kafka.app.node1 | 192.168.1.5 |
2 | kafka.app.node2 | 192.168.1.6 |
3 | kafka.app.node3 | 192.168.1.7 |
Kafka集群节点对外服务端口为:9092;Zookeeper集群节点对外服务端口为:2181。
1.2.2 配置主机名验证
从Kafka 2.0.0版开始,默认会为客户端连接以及broker之间的连接启用服务器的主机名验证(SSL端点识别算法),以防止中间人攻击。可以通过设置参数ssl.endpoint.identification.algorithm
为空字符串来禁用服务器主机名验证。例如:
1 | ssl.endpoint.identification.algorithm= |
另外高版本支持不停集群服务下,进行动态配置,使用脚本kafka-configs.sh
,参考命令如下:
1 | bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm=" |
对于较旧的Kafka版本,ssl.endpoint.identification.algorithm
默认情况下未定义,因此不会启用主机名验证。若该属性设置HTTPS
,则启用主机名验证,例如:
1 | ssl.endpoint.identification.algorithm=HTTPS |
需要注意的是,一旦启用主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):
通用名称(CN,Common Name)
主题备用名称(SAN,Subject Alternative Name)
两个字段都有效,但RFC-2818建议使用SAN。 SAN也更灵活,允许声明多个DNS条目。 另一个优点是,CN可以设置为更有意义的值用于授权。如要添加SAN字段,需要将以下参数-ext SAN = DNS:{FQDN}
添加到keytool
命令中,例如:
1 | keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN} |
更通俗一点讲,SSL 握手期间验证主机名时,它会检查服务器证书是否具有 SAN 集。如果检测到 SAN 集,那么只使用 SAN 集中的名称或 IP 地址。如果未检测到 SAN 集,那么只使用主题专有名称 (DN) 最重要的属性,通常是通用名称(CN)。将该值与客户端尝试连接的服务器启的主机名进行比较。如果它们相同,主机名验证成功,允许建立连接。
1.2.3 生成SSL密钥和证书(密钥库)
为了方便管理证书密钥,我们使用统一的路径保存。例如统一放在/usr/ca
作为文件目录。
1 | mkdir -p /usr/ca/{root,server,client,trust} |
这里各文件夹的功能是:root:存储CA私钥和证书;server:存储服务端的私钥和证书;client:存储客户端私钥和证书;trust:存储信任库文件;
- 节点1(kafka.app.node1)
1 | keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node1 |
其中dname
参数的含义参考Keytool
工具介绍,文件名为:server.keystore.jks
,这是密钥库。
- 节点2(kafka.app.node2)
1 | keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node2 |
- 节点3(kafka.app.node3)
1 | keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node3 |
证书生成后可以通过下面的命令进行查询(需要输入密钥库管理密码,即keypass
的参数):
1 | keytool -list -v -keystore server.keystore.jks |
1.2.4 创建Kafka集群CA证书
集群中每个服务节点都有一对公钥和私钥,以及用于标识该节点的证书。但这个证书是未签名的,存在中间者攻击的风险。所以需要证书颁发机构(CA)负责签署颁发证书,使用openssl
工具实现。
同一个集群的所有节点共用一个CA证书,所以只需要在集群的一个节点(集群外节点均可)生成CA证书,然后分发给集群其他节点。例如在kafka.app.node1
节点上创建CA证书,命令如下:
1 | openssl req -new -x509 -keyout /usr/ca/root/ca.key.pem -out /usr/ca/root/ca.cert.pem -days 3650 -passout pass:app123 -subj "/C=cn/ST=shanghai/L=shanghai/O=org/OU=depart/CN=kafka.app.node1" |
然后使用scp
命令分发给其他节点:
1 | scp /usr/ca/root/* root@kafka.app.node2:/usr/ca/root/ |
生成两个文件,分别是私钥(ca.key.pem)和证书(ca.cert.pem),它用来签署其他证书。
1.2.5 集群服务节点签署证书
首先给集群各服务节点签发证书(即签名)。步骤如下:
- 第一步 从密钥容器中提取和导出服务端证书(输出文件:server.cert-file,未签名)
1 | keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.itdw -certreq -file /usr/ca/server/server.cert-file -storepass app123 |
- 第二步 给服务端证书签名(输出文件:server.cert-signed,已签名)
1 | openssl x509 -req -CA /usr/ca/root/ca.cert.pem -CAkey /usr/ca/root/ca.key.pem -in /usr/ca/server/server.cert-file -out /usr/ca/server/server.cert-signed -days 365 -CAcreateserial -passin pass:app123 |
- 第三步 将CA证书导入服务端密钥容器中
1 | keytool -keystore /usr/ca/server/server.keystore.jks -alias CARoot -import -file /usr/ca/root/ca.cert.pem -storepass app123 |
- 第四步 将已签名的证书导入密钥容器中
1 | keytool -keystore /usr/ca/server/server.keystore.jks -alias kafka.app -import -file /usr/ca/server/server.cert-signed -storepass app123 |
需要注意集群上每个服务节点均需要签署。
1.2.6 生成服务端信任库
如果kafka集群中配置中的参数ssl.client.auth
设置为: requested
或required
,需要为集群节点提供一个信任库,这个库中需要包含所有CA证书。
使用下面的命令将CA证书导入服务端信任库,输出为信任库文件:server.truststore.jks
1 | keytool -keystore /usr/ca/trust/server.truststore.jks -alias CARoot -import -file /usr/ca/root/ca.cert.pem -storepass app123 |
将CA证书导入服务端信任库,意味着信任该CA证书签名的所有证书。此属性称为信任链,在大型Kafka群集上部署SSL时特别有用。您可以使用单个CA对群集中的所有证书进行签名,并使所有计算机共享信任该CA的同一信任库。这样,所有计算机都可以对所有其他计算机进行身份验证。
1.2.7 配置Kafka Brokers
Kafka Broker节点支持侦听多个端口上的连接。在server.properties中配置,多个端口类型使用逗号分隔,我们以集群中kafka.app.node1
为例:
1 | listeners=SSL://kafka.app.node1:9092 |
代理端需要以下SSL配置
1 | ssl.keystore.location=/usr/ca/server/server.keystore.jks |
其他可选配置设置:
ssl.client.auth
(可选)参数控制SSL认证模式。默认参数值为
requested
,默认使用单向认证,即客户端认证Kafka brokers。此时,没有证书的客户端仍然可以连接集群。参数值为required
,指定开启双向验证(2-way authentication)。Kafka服务器同时会验证客户端证书。生成集群建议开始双向认证。ssl.cipher.suites
(可选)密码套件是认证,加密,MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。(默认为空列表)
ssl.enabled.protocols
建议参数值为
TLSv1.2,TLSv1.1,TLSv1
。列出支持的SSL协议。生成环境不建议使用SSL,建议使用TLS。ssl.keystore.type
和ssl.truststore.type`
文件格式:
JKS
security.inter.broker.protocol
参数kafka集群节点(brokers)之间启用
SSL
通讯,需要配置该配置参数为:SSL
。
最后我们总结合并一下所有的配置参数:
1 | listeners=SSL://kafka.app.node1:9092 |
1.2.8 初步验证
正常启动集群的Zookeeper集群,然后依次启动集群的所有节点。使用下面的命令检查:
1 | openssl s_client -debug -connect kafka.app.node1:9092 -tls1 |
该命令检查服务器的密钥库和信任库是否正确设置。命令中tls1
必须是集群配置参数ssl.enabled.protocols
所支持的协议。
1 | Certificate chain |
如果证书未显示或有其他错误消息,则说明设置不正确。
另外对于’OpenSSL 0.9.8j-fips 07 Jan 2009’版本的openssl版本,由于这个版本不能自己检测出ssl的版本。会报下面的错误信息。
1816:error:1408E0F4:SSL routines:SSL3_GET_MESSAGE:unexpected message:s3_both.c:463:
1.3 配置kafka客户端
kafka集群需要支持集群内外的客户端交互访问。安全集群的客户端同样需要进行相关安全配置。这里客户端指的是Console客户端。
1.3.1 签发客户端证书
类似集群内部服务端的证书签发步骤,客户端证书签发过程入下:
生成客户端SSL密钥和证书,输出密钥容器:
client.keystore.jks
1
2keytool -keystore /usr/ca/client/client.keystore.jks -alias kafka.app
.node1 -validity 365 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=dccsh,O=icbc,L=shanghai,S=shanghai,C=cn" -ext SAN=DNS:kafka.app.node1 -storepass app123从密钥容器中提取和导出客户端证书(输出文件:client.cert-file,未签名)
1
keytool -keystore /usr/ca/client/client.keystore.jks -alias kafka.app.node1 -certreq -file /usr/ca/client/client.cert-file -storepass app123
给客户端证书签名(输出文件:client.cert-signed,已签名)
1
openssl x509 -req -CA /usr/ca/root/ca.cert.pem -CAkey /usr/ca/root/ca.key.pem -in /usr/ca/client/client.cert-file -out /usr/ca/client/client.cert-signed -days 365 -CAcreateserial -passin pass:app123
将CA证书导入客户端密钥容器中
1
keytool -keystore /usr/ca/client/client.keystore.jks -alias CARoot -import -file /usr/ca/root/client.cert-file -storepass app123
将已签名的证书导入密钥容器中
1
keytool -keystore /usr/ca/client/client.keystore.jks -alias kafka.app.node1 -import -file /usr/ca/client/client.cert-signed -storepass app123
1.3.2 生成客户端信任库
使用下面的命令将CA证书导入客户端信任库,输出为信任库文件:client.truststore.jks
1 | keytool -keystore /usr/ca/trust/client.truststore.jks -alias CARoot -import -file /usr/ca/root/ca.cert.pem -storepass app123 |
1.3.3 配置客户端
客户端的console-producer和console-consumer命令需要添加相关安全配置。
如果kafka集群不需要客户端身份验证,只需配置下面的配置:
1 | security.protocol=SSL |
如果需要客户端身份验证,还需要补充下面的配置信息:
1 | ssl.keystore.location=/usr/ca/client/client.keystore.jks |
根据我们的要求和代理配置,可能还需要其他配置设置:
- ssl.provider(可选)。用于SSL连接的安全提供程序的名称。
- ssl.cipher.suites(可选)。密码套件是认证,加密,MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。
- ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1。它应列出在代理方配置的至少一种协议
- ssl.truststore.type = JKS
- ssl.keystore.type = JKS
最后我们总结合并一下所有的配置参数(编辑文件名为:client-ssl.properties
):
1 | security.protocol=SSL |
1.3.4 消费者生产者
使用console-producer的命令:
1 | kafka-console-producer.sh --broker-list kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092 --topic test --producer.config client-ssl.properties |
使用console-consumer的命令:
1 | kafka-console-consumer.sh --bootstrap-server kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092 --topic test --new-consumer --consumer.config client-ssl.properties |
这里test
为topic
名称,在只有SSL通信加密集群中,topic的创建、删除、生产、消费并没有权限管理,依然存在安全问题。所以kafka集群需要进一步配置权限管理。
第二部分 Kafka集群权限认证
Kafka集群的权限认证管理主要涉及:
- 身份认证(Authentication)。对客户端与服务器的连接进行身份认证,brokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的认证。
- 权限控制(Authorization)。实现对于消息级别的权限控制,客户端的读写操作进行Authorization(生产、消费)管理。
通俗的讲,身份认证解决的是证明你是谁,而权限控制解决的是你能干什么。在Kafka中身份认证和权限控制是两套独立的安全配置。
2.1 集群权限认证策略
Kafka从0.9.0.0版本后开始支持下面的SASL安全策略管理。这些安全功能为Kafka通信安全、多租户管理、集群云化提供了安全保障。截止目前Kafka 2.3版本,一共支持5种SASL方式。
验证方式 | 版本 | 说明 |
---|---|---|
SASL/PLAIN | 0.10.0.0 | 不能动态增加用户 |
SASL/SCRAM | 0.10.2.0 | 可以动态增加用户。有两种方式:SASL/SCRAM-SHA-256 和SASL/SCRAM-SHA-512 |
SASL/GSSAPI | 0.9.0.0 | 需要独立部署验证服务(即Kerberos服务) |
SASL/OAUTHBEARER | 2.0.0 | 需自己实现接口实现token的创建和验证,需要额外Oauth服务 |
SASL/Delegation Token | 1.1.0 | 补充现有 SASL 机制的轻量级认证机制 |
对于生产环境,SASL/PLAIN方式有个缺点:只能在JAAS文件KafkaServer参数中配置用户,集群运行期间无法动态新增用户(需要重启重新加载JAAS文件),这对维护管理带来不便。而SASL/SCRAM方式,将认证数据存储在Zookeeper中,可以动态新增用户并分配权限。
SASL/GSSAPI方式需要依赖Kerberos服务。对于一些已经部署了集中式的Kerberos服务的大厂,只需要申请一个principal即可。如果生产Kerberos认证中出现TGT分发性能瓶颈,可以使用SASL/Delegation Token模式。使用 Kafka 提供的 API 去获取对应的 Delegation Token。Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证),减少性能压力。
同样SASL/OAUTHBEARER方式需要Oauth服务。
各种方式引入版本不同,使用依赖各有差异,需要结合自身业务特点选择合适的架构方式。
2.2 SASL/SCRAM策略配置介绍
SASL/SCRAM方式将身份认证和权限控制的凭证(credential)数据均存储在Zookeeper中,需要对Zookeeper进行安全配置。
2.2.1 Zookeeper集群侧配置
对Zookeeper集群中所有节点更新下面的策略后,重启集群生效。
配置
zoo.cfg
文件文件尾部追加下面的配置:
1
2
3authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000新增
zk_server_jaas.conf
文件配置文件内容如下:
1
2
3
4
5
6Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};其中
username
和password
定义的用户和密钥,用于Zookeeper与Kafka集群进行认证。配置项user_admin="admin-secret"
中 admin为用户名,admin-secret为密码,用于Zookeeper集群外客户端和集群内进行认证。拷贝依赖包
将kafka文件系统中
kafka/libs
目录下的jar包拷贝到zookeeper/lib
目录。1
2
3
4
5kafka-clients-2.1.1.jar
lz4-java-1.5.0.jar
osgi-resource-locator-1.0.1.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.2.jar若没有引入依赖包,启动时会报找不到org.apache.kafka.common.security.plain.PlainLoginModule包的错误。
修改zookeeper启动参数
修改
bin/zkEnv.sh
文件, 在文件尾追加下面的配置内容。该配置完成引入的包的加载。变量CLASSPATH
和SERVER_JVMFLAGS
都会在Zookeeper启动时传给JVM
虚拟机。下面的配置中
$ZOOKEEPER_HOME
是zookeeper的环境变量,如果没有配置,使用绝对路径即可。1
2
3
4for i in $ZOOKEEPER_HOME/lib/*.jar; do
CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOKEEPER_HOME/conf/zk_server_jaas.conf"
2.2.2 kafka集群侧配置
kafka集群中每一台节点均需要更新下面的配置。
新增
kafka_server_scram_jaas.conf
文件(在config
目录中)1
2
3
4
5
6
7
8
9
10KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
自定义用户:
user_admin="admin-secret"
user_alice="alice-secret"
user_reader="reader-secret"
user_writer="writer-secret";
};
其中配置username
和password
为Kafka集群之间通讯的SCRAM凭证,用户名为admin
,密码为admin-secret
。
配置中类似user_XXX
格式的配置项为自定义用户。如果是SASL/PLAIN方式,用户只能在该文件中定义,不能动态新增。我们使用SASL/SCRAM方式,可以后续动态声明admin用户,不再此处进行配置。
更新Kafka的配置文件
server.properties
(在config
目录中):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19SASL CONFIG
listeners=SASL_SSL://kafka.app.node1:9092
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:admin
SSL CINFIG
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=app123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
security.inter.broker.protocol=SASL_SSL需要注意参数
allow.everyone.if.no.acl.found
,如果开启参数开关,当客户端和集群交互时候未找到ACL策略时,允许所有类型的访问操作。建议该参数关闭(false)。参数
security.inter.broker.protocol
指定集群brokers之间的通讯协议。不加密协议有:SASL_SSL、SASL_PLAINTEXT、PLAINTEXT;加密协议有:SSL。为了提高节点之间的交互性能,内部网络环境建议使用非加密协议。这里使用加密的SASL_SSL
协议。参数
super.users
指定了集群的超级用户为:admin
。注意如果指定多个超级用户,每个用户使用分号隔开,例如:super.users=User:admin;User:alice
参数
sasl.enabled.mechanisms
列出支持的认证方式。即可以支持多种。参数
sasl.mechanism.inter.broker.protocol
指定集群内部的认证方式。Kafka仅支持最小迭代次数为4096的强哈希函数SHA-256和SHA-512。所以有SCRAM-SHA-512和SCRAM-SHA-256两种方式。配置kafka启动环境变量(
bin
目录下面的kafka-run-class.sh
)为 Kafka 添加 java.security.auth.login.config 环境变量(配置文件路径)。并且在启动模式中添加
KAFKA_SASL_OPTS
。1
2
3
4
5
6
7
8截取配置文件片段:
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/software/kafka/config/kafka_server_scram_jaas.conf'
Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
2.2.3 SCRAM认证管理
在集群的配置文件kafka_server_scram_jaas.conf
中,定义了集群内部的认证用户。对于客户端和集群之间认证可以使用kafka-configs.sh
来动态创建。
创建用户SCRAM凭证
例如集群中的超级用户
admin
用户,使用下面的命令创建:
1 | kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin |
创建自定义普通用户alice
。
1 | kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice |
- 查看SCARM凭证
1 | kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --describe --entity-type users --entity-name admin |
- 删除SCRAM凭证
1 | kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice |
2.3 Kafka客户端配置
Kafka集群配置了认证,那么对于Console客户端访问集群自然需要配置认证信息。可集群节点内部通讯凭证的认知,同样需要定义JAAS文件。加入我们自定义了用户alice
,JAAS文件名为:kafka_console_client_jaas.conf
,配置内容如下:
1 | KafkaClient { |
然后更新kafka-console-producer.sh
脚本和kafka-console-consumer.sh
脚本的启动参数。
1 | 文件截取更新部分: |
在配置SSL时候,我们新建了client-ssl.properties
配置文件,作为Console客户端启动配置。在集群启用SASL_SSL
后,我们同步更新如下:
1 | security.protocol=SASL_SSL |
至此Console客户端已经配置完毕,但目前Console客户端还不能通过命令方式和集群进行交互,因为我们指定的用户对于集群的资源还没有任何权限。需要对用户进行集群资源的ACL控制设置,赋予相关权限。
2.4 ACL控制
Kafka权限资源包含Topic、Group、Cluster、TransactionalId(事务id),每个资源涉及的权限内容如下:
资源类型 | 权限类型 |
---|---|
Topic | Read,Write,Describe,Delete,DescribeConfigs,AlterConfigs,All |
Group | Read,Describe,All |
Cluster | Create,ClusterAction,DescribeConfigs,AlterConfigs,IdempotentWrite,Alter,Describe,All |
TransactionalId | Describe,Write,All |
对于常用类型进行说明:
权限 | 说明 |
---|---|
Read | 读取topic、group信息 |
Write | 写topic、TransactionalId(存储在内部topic) |
Delete | 删除topic |
Create | 创建topic |
ALTER | 修改topic |
Describe | 获取topic、group、TransactionalId信息 |
ALL | 所有权限 |
Kafka提供ACL管理脚本:kafka-acls.sh
。
2.4.1 更新脚本配置
认证数据均存储在Zookeeper集群中,需要和Zookeeper交互自然需要配置相关认证信息。
首先需要新建JAAS文件,文件名为:zk_client_jaas.conf
。这里的用户已经在Zookeeper集群中进行定义。
1 | Client { |
最后更新kafka-acls.sh
脚本:
1 | 截取更新部分 |
当然Kafka集群的配置文件中已经开启了ACL:
1 | authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer |
至此完成配置。
2.4.2 ACL配置
根据官网的介绍,ACL的格式如下:
“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”
参数含义描述如下:
- principal:指定一个Kafka user;
- operation:指定一个具体的操作类型,例如:Read, Write, Delete等;
- Host:表示与集群交互的客户端IP地址,如果是通配符‘*’表示所有IP。目前不支持主机名(hostname)形式,只能是IP地址;
Resource:指定一种Kafka资源类型(共有4种类型);
例如下面的ACL命令:
1 | sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --add --allow-principal User:alice --allow-host '*' --operation ALL --topic test |
赋权之后,用户alice对test具有全部权限,并且访问请求可以是来自任何IP的客户端。
常用参数的补充说明:
- 对主机IP的限制参数,
allow-host
指定允许的IP,deny-host
指定禁用IP; - 新增和删除一个赋权策略,分别使用:
add
和remove
2.4.3 ACL策略查看
使用参数list
参看ACL策略。例如:
1 | sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --list --topic test-topic |
该查看命令显示test-topic
资源相关的所有ACL策略。
2.4.4 超级用户
kafka集群的配置文件server.properties
中定义了超级用户(Super Users),超级用户不在ACL控制范围内,默认可以访问集群中所有资源,并具备所有权限。
2.5 权限认证数据访问
集群的认证数据存储在Zookeeper,可以通过Zookeeper的console客户端访问认证数据。
使用zookeeper自带的命令行客户端:
1 | /dmqs/zookeeper/bin> ./zkCli.sh |
查看zookeeper中的数据:
1 | [zk: localhost:2181(CONNECTED) 1] ls / |
其中kafka-acl
中存储相关权限认证数据。
1 | [zk: localhost:2181(CONNECTED) 3] ls /kafka-acl |
可以查看其中的权限信息。
2.6 SSL和SASL的说明
SSL是传输层安全协议,是位于传输层(TCP/IP)和应用层(HTTP)的协议,SSL是对整个传输过程的加密,SSL是对客户端和服务器之间传输的所有数据进行加密。假如在配置的时候使用了SASL,但是没有使用SSL,那么除了账号密码外,所有的传输内容都是裸奔的。
所以生产集群采用SSL和SASL结合方式,即SSL_SASL方式。
第三部分 安全集群的客户端
3.1 开发语言类
3.1.1 Python客户端
目前市面上kafka的python API常用的有三种:
- 第一种 kafka
该项目(项目地址:https://pypi.org/project/kafka/,最后版本为:`Released: Oct 8, 2017)是
kafka-python的老项目,2017年后调整为
kafka-python`项目。
- 第二种 kafka-python
最新版本为2.0,首先从客户端的密钥库中导出CA证书。
1 | keytool -exportcert -alias CARoot -keystore client.keystore.jks -rfc -file ca.cert.pem |
生产者和消费者的案例如下:
1 | # -*- coding: utf-8 -*- |
需要的注意的事项有:
Kafka集群启用主机名模式,所以应用程序运行节点的hosts文件需要配置Kafka集群节点的域名映射。
ssl_context参数为包装套接字连接的预配置SSLContext。如果非None,将忽略所有其他ssl_ *配置。
主机名验证问题。如果证书中域名和主机名不匹配,客户端侧需要配置需要调整如下:
1
2ssl_ctx.check_hostname=False
ssl_ctx.verify_mode = CERT_NONE如果不提前预配置SSLContext,还需要客户端的证书。
1
keytool -exportcert -alias localhost -keystore client.keystore.jks -rfc -file client.cert.pem
生产者的参数需要添加:
1
2
3
4
5
6
7
8
9
10
11
12
13
14ssl_certfile = "client.cert.pem"
ssl_cafile = "ca.cert.pem"
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
api_version=(0, 10),
acks='all',
retries=1,
security_protocol=security_protocol,
ssl_context=context,
sasl_mechanism=sasl_mechanism,
sasl_plain_username=username,
sasl_plain_password=password,
ssl_check_hostname=False,
ssl_certfile=ssl_certfile,
ssl_cafile=ssl_cafile)第三种 confluent-kafka
confluent-kafka包由confluent公司开源,主要是对C/C++客户端包(librdkafka)的封装。案例代码如下:
1 | # -*- coding: utf-8 -*- |
3.1.2 Go客户端
我们的Go语言中常用的Kafka的客户端包有:
1 | "github.com/Shopify/sarama" |
其中最常用的是sarama
,案例参考github项目。
3.1.3 Java客户端
生产者:
1 | package com.kafka.security; |
消费者:
1 | package com.topsec.kafka.security; |
3.2 组件类
3.2.1 Console客户端
客户端节点部署kafka项目,在bin目录下面我们已经更新了kafka-console-consumer.sh
和kafka-console-producer.sh
两个脚本。并分别新增了加密访问的配置文件consumer.config
和producer.config
。
命令案例参考:1.3.4 章节内容。
3.2.2 Flume客户端
目前Flume项目官网项目文档介绍支持下面三种方式:
- SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证;
- SASL_SSL - 有数据加密的 Kerberos 或明文认证;
- SSL - 基于TLS的加密,可选的身份验证;
事实上对于SASL/SCRAM方式Flume也是支持的。具体配置如下(以Flume1.9版本为例):
3.2.2.1 第一步 新增jaas配置文件
在Flume的conf配置目录下面新增flume_jaas.conf文件,文件内容:
1 | Server { |
3.2.2.2 第二步 更新flume-env.sh文件
Flume的conf配置目录下面flume-env.sh
文件添加JAVA_OPTS
配置更新:
1 | JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/dmqs/apache-flume-1.9.0-bin/conf/flume_jaas.conf" |
其中路径为第一步中新增的flume_jaas.conf
文件路径。
3.2.2.3 测试案例(sinks)
我们使用一个简单的案例来测试,Flume的source为监控文件尾写入,Flume的sinks为加密kafka集群。具体配置如下:
1 | define |
配置保存为flume-sink-auth-kafka.conf
,为了检查输出结果使用下面命令启动(在bin目录中):
1 | ./flume-ng agent --conf ../conf --conf-file ../conf/flume-sink-auth-kafka.conf --name a1 -Dflume.root.logger=INFO,console |
向文件尾部追加信息:
1 | echo "test" >> /dmqs/apache-flume-1.9.0-bin/data/flume/flume.log |
然后使用消费者客户端查看数据是否写入kafka的flume主题中。
3.2.2.3 测试案例(source)
同样可以也可以将加密Kafka作为Flume的source,配置案例如下:
1 | define |
案例中将加密Kafka中flume主题中的数据汇入到指定目录的文件中。
3.2.3 Logstash客户端
Logstash和Kafka交互使用Kafka output plugin
插件实现。其中配置文件中output部分如下:
1 | output { |
参考:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
第四部分 加密认证集群的性能压测
集群启用SSL后,数据交互中需要加密、解密。kafka集群的I/O性能会降低。我们使用Kafka自带的压侧工具对集群加密前和加密后性能进行评测。
4.1生产者压力测试
客户端写入参数配置为acks=all
(即Topic中Leader和fellow副本均写入成功)。每条消息大小为1M(消息体小吞吐量会大一些)。另外测试客户端为集群内部节点,忽略了数据网络传输的性能消耗。
4.1.1不加密集群
1 | ./kafka-consumer-perf-test.sh --topic topsec --throughput 50000 --num-records 1500000 --record-size 10000 --producer-props bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093 acks=all |
测试结果:
1 | 1500000 records sent, 38538.615693 records/sec (37.64 MB/sec), 748.44 ms avg latency, 5485.00 ms max latency, 227 ms 50th, 3194 ms 95th, 3789 ms 99th, 3992 ms 99.9th. |
4.1.2加密集群
1 | ./kafka-producer-perf-test.sh --topic topsec --throughput 50000 --num-records 1500000 --record-size 10000 --producer-props bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093 acks=all --producer.config producer.config |
测试结果:
1 | 1500000 records sent, 16901.027582 records/sec (16.50 MB/sec), 1713.43 ms avg latency, 9345.00 ms max latency, 72 ms 50th, 1283 ms 95th, 2067 ms 99th, 2217 ms 99.9th. |
4.2 压侧结论
加密改造前,生产者的吞吐量为3.8w 条/秒,改造后1.7W 条/秒。整体吞吐性能降低50%左右,数据的加密、解密导致吞吐量性能降低。平均时延也增加了一倍多(改造前700ms,改造后1700ms)。在实际生产中可参考这个性能折扣基线配置集群资源。
参考文献及资料
1、Kafka官网对安全类功能介绍,链接:http://kafka.apache.org/documentation/#security
2、Kafka ACLs in Practice – User Authentication and Authorization,链接:https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/
3、维基百科(数字证书),链接:https://zh.wikipedia.org/wiki/公開金鑰認證
4、SSL技术白皮书,链接:https://blog.51cto.com/xuding/1732723
5、Kafka权限管理,链接:https://www.jianshu.com/p/09129c9f4c80
5、Flume文档,链接:https://flume.apache.org/FlumeUserGuide.html#kafka-sinkorg/FlumeUserGuide.html#kafka-sink