Fork me on GitHub

配置kerberos认证的Kafka集群交互介绍

目录

  • 背景
  • 第一部分
  • 参考文献及资料

背景

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html

https://blog.csdn.net/justry_deng/article/details/88387898

Spark Kafka Consumer in secure( Kerberos) enviornment

Raw

SparkKafkaIntegration.md

Sample Application

using direct stream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._


object SparkKafkaConsumer2 {

def main(args: Array[String]) {

// TODO: Print out line in log of authenticated user
val Array(brokerlist, group, topics, numThreads) = args
var kafkaParams = Map(
"bootstrap.servers"->"rks253secure.hdp.local:6667",
"key.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer",
"group.id"-> "test",
"security.protocol"->"PLAINTEXTSASL",
"auto.offset.reset"-> "smallest"
)

val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(100))

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, Set(topics))

// TODO: change to be a variable
kafkaStream.saveAsTextFiles("/tmp/streaming_output")
ssc.start()
ssc.awaitTermination()
}
}
using createStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils

object Kafka_Word_Count {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("KafkaWordCount")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.driver.allowMultipleContexts", "true")

val ssc = new StreamingContext(conf, Seconds(3))
val groupID = "test"
val numThreads = "2"
val topic = "kafkatopic"
val topicMap = topic.split(",").map((_, numThreads.toInt)).toMap

val kafkaParams = Map[String, String](
"zookeeper.connect" -> "rks253secure.hdp.local:2181",
"group.id" -> groupID,
"zookeeper.connection.timeout.ms" -> "10000",
"security.protocol"->"PLAINTEXTSASL"
)

val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
lines.print()
ssc.start()
ssc.awaitTermination()
}
}

kafka_jaas.conf (for spark local mode)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
KafkaServer { 
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};

kafka_jaas.conf (for spark yarn client mode)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
serviceName="kafka"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};

Spark Submit command

kinit from kafka user..

1
2
spark-submit --files /etc/kafka/conf/kafka_jaas.conf,/etc/security/keytabs/kafka.service.keytab --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf" --class SparkKafkaConsumer2 --master local[2] 
/tmp/SparkKafkaSampleApp-1.0-SNAPSHOT-jar-with-dependencies.jar "rks253secure.hdp.local:6667" test kafkatopic 1

参考文献及资料

1、

0%