1. 使用 SSL 设置 Kafka 实例2. 提取密钥
2.1 提取客户端证书2.2 提取客户端密钥2.3 提取 CARoot 证书 3. kafka-python创建连接4. pykafka创建连接
1. 使用 SSL 设置 Kafka 实例
Kafka 可以通过 SSL 加密与消息消费者和生产者的连接。可以在不同的地方找到有关如何设置的说明。比如 Confluence 平台文档(Confluence 平台可以理解为围绕 Kafka 的复杂包装器/生态系统)或 Apache Kafka 文档。这些说明基于keytooljava 实用程序来生成和签署 SSL 证书。如果我们想在 Python 中使用这些证书,我们必须提取凭证。
以下 Python 包允许连接到 Kafka:
pykafka:https://github.com/Parsely/pykafkakafka-python:https://github.com/dpkp/kafka-pythonconfluent-kafka-python:https://github.com/confluentinc/confluent-kafka-python
但是,如果想从 Python 建立 SSL 连接,首先必须确保在 JKS 容器中拥有所需的证书和私钥,如上面的说明中所述。有两种方法可以从 Python 访问证书:
将 JKS 容器中的证书和密钥导出为 PEM 格式以在内部使用它们kafka-python使用例如pyjks包直接在 Python 中导入证书和密钥
下面使用的就是第一种方法。
2. 提取密钥配置 Apache Kafka 实例后,将有两个 JKS 容器:kafka.client.keystore.jks和kafka.client.truststore.jks。第一个包含已签名的客户端证书、其私钥和用于对其进行签名的 “CARoot” 证书。第二个包含用于签署客户端证书和密钥的证书。因此,我们需要的一切都包含在kafka.client.keystore.jks文件中。要了解其内容的概述,可以调用:
keytool -list -rfc -keystore kafka.client.keystore.jks2.1 提取客户端证书
首先,我们将提取客户端证书:
keytool -exportcert -alias caroot -keystore kafka.client.keystore.jks -rfc -file certificate.pem
需要注意的是,上面命令的参数别名-alias可以通过下面的命令来查看:
keytool -list -rfc -keystore client.keystore.jks2.2 提取客户端密钥
接下来我们将提取客户端密钥。但是keytool不直接支持这一点,所以我们必须先将密钥库转换为pkcs12格式,然后从中提取私钥:
keytool -v -importkeystore -srckeystore kafka.client.keystore.jks -srcalias caroot -destkeystore cert_and_key.p12 -deststoretype PKCS12
生成 p12 文件后,使用下面的命令将密钥打印到 STDOUT,从那里可以将其复制并粘贴到key.pem中(确保复制到 --BEGIN PRIVATE KEY-- 和 --END PRIVATE KEY-- 之间的行)。
openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes
但是,我在执行上面的命令后,怎么都找不到打印的信息,最后没办法,在https://www.openssl.net.cn/docs/249.html中查阅了下 OpenSSL 的命令,将结果打印在终端,从终端复制到key.pem文件中:
openssl pkcs12 -in cert_and_key.p12 -nodes
然而事实上,在最后的使用中,我根本没用到key.pem文件,直接添加了对应的 password 即可。。。。
2.3 提取 CARoot 证书最后我们将提取 CARoot 证书:
keytool -exportcert -alias CARoot -keystore kafka.client.keystore.jks -rfc -file CARoot.pem3. kafka-python创建连接
现在我们有了三个文件certificate.pem、key.pem、CARoot.pem。kafka-python它们可以作为消费者和生产者的构造函数的参数传递:
from kafka import KafkaConsumer, KafkaProducer
consumer = KafkaConsumer(bootstrap_servers='my.server.com',
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem')
producer = KafkaProducer(bootstrap_servers='my.server.com',
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem')
# Write hello world to test topic
producer.send("test", bytes("Hello World"))
producer.flush()
# Read and print all messages from test topic
consumer.assign([TopicPartition(TOPIC, 0)])
consumer.seek_to_beginning(TopicPartition(TOPIC, 0))
for msg in consumer:
print(msg)
4. pykafka创建连接
以类似的方式,还可以将这些文件作为参数传递给pykafka:
from pykafka import KafkaClient, SslConfig
config = SslConfig(cafile='CARoot.pem',
certfile='certificate.pem',
keyfile='key.pem')
client = KafkaClient(hosts='my.server.com',
ssl_config=config)
topic = client.topics["test"]
# Write hello world to test topic
with topic.get_sync_producer() as producer:
producer.produce('Hello World')
# Print all messages from test topic
consumer = topic.get_simple_consumer()
for message in consumer:
if message is not None:
print('{} {}'.format(message.offset, message.value))
现在我们可以建立 SSL 连接并使用kafka-python or pykafka包编写自己的消费者和生产者了。
部分内容源自于:http://maximilianchrist.com/python/databases/2016/08/13/connect-to-apache-kafka-from-python-using-ssl.html



