一、环境简介二、测试pykafka 生产者API
2.1 生产者测试简单发送2.2 生产者异步发送测试 三、测试kafka-python 生产者API
一、环境简介机器:13-inch, M1, 2020
编码:pycharm
环境:python3.6.15,kafka2.5
kafka包:pykafka2.8.0 ; kafka-python2.0.2
如果你也是以上的环境那么该案例对你有一定的参考意义!
以下内容为参考:博主文章实践的内容
二、测试pykafka 生产者API 2.1 生产者测试简单发送1)命令行启动一个消费者
2)执行代码
#!/bin/env python
from pykafka import KafkaClient
host = 'lsl101:9092,lsl102:9092,lsl103:9092'
client = KafkaClient(hosts = host)
topic = client.topics["demo"]
with topic.get_sync_producer() as producer:
for i in range(100):
producer.produce(('test message ' + str(i ** 2)).encode())
运行截图如下:
#!/bin/env python
from pykafka import KafkaClient
def send_to_kafka(topic_name, msg):
kafka_host = 'lsl101:9092,lsl102:9092,lsl103:9092'
if not kafka_host:
raise Exception('Unable to get Kafka host address')
client = KafkaClient(hosts=kafka_host)
topic = client.topics[topic_name]
with topic.get_producer(sync=False, delivery_reports=True) as producer:
producer.produce(msg.encode())
msg, exc = producer.get_delivery_report(block=True)
if exc is not None:
print("Failed to deliver msg {}: {}".format(msg.partition_key, repr(exc)))
raise exc
send_to_kafka("demo", "test test")
print("success!")
程序执行情况:
消费者接受情况:
1)命令行启动一个消费者
2) 测试代码
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['lsl101:9092', 'lsl102:9092', 'lsl103:9092'])
# Assign a topic
topic = 'demo'
def test():
print('begin')
n = 1
try:
while (n <= 100):
# send方法是异步方法,其被调用时会将记录发送到待处理记录的缓冲区,并立刻返回
producer.send(topic, str(n).encode())
print("send" + str(n))
n += 1
time.sleep(0.5)
except KafkaError as e:
print(e)
finally:
producer.close()
print('done')
if __name__ == '__main__':
test()
- 执行
程序执行:
命令行消费者:



