栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka生产者——python Api发送

kafka生产者——python Api发送

kafka生产者——python Api使用

一、环境简介二、测试pykafka 生产者API

2.1 生产者测试简单发送2.2 生产者异步发送测试 三、测试kafka-python 生产者API

一、环境简介

机器:13-inch, M1, 2020
编码:pycharm
环境:python3.6.15,kafka2.5
kafka包:pykafka2.8.0 ; kafka-python2.0.2
如果你也是以上的环境那么该案例对你有一定的参考意义!

以下内容为参考:博主文章实践的内容

二、测试pykafka 生产者API 2.1 生产者测试简单发送

1)命令行启动一个消费者

2)执行代码

#!/bin/env python
from pykafka import KafkaClient
host = 'lsl101:9092,lsl102:9092,lsl103:9092'
client = KafkaClient(hosts = host)
topic = client.topics["demo"]
with topic.get_sync_producer() as producer:
    for i in range(100):
        producer.produce(('test message ' + str(i ** 2)).encode())

运行截图如下:

2.2 生产者异步发送测试
#!/bin/env python
from pykafka import KafkaClient


def send_to_kafka(topic_name, msg):
    kafka_host = 'lsl101:9092,lsl102:9092,lsl103:9092'

    if not kafka_host:
        raise Exception('Unable to get Kafka host address')

    client = KafkaClient(hosts=kafka_host)
    topic = client.topics[topic_name]

    with topic.get_producer(sync=False, delivery_reports=True) as producer:
        producer.produce(msg.encode())
        msg, exc = producer.get_delivery_report(block=True)
        if exc is not None:
            print("Failed to deliver msg {}: {}".format(msg.partition_key, repr(exc)))
            raise exc


send_to_kafka("demo", "test test")

print("success!")

程序执行情况:

消费者接受情况:

三、测试kafka-python 生产者API

1)命令行启动一个消费者

2) 测试代码

import time

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['lsl101:9092', 'lsl102:9092', 'lsl103:9092'])
# Assign a topic
topic = 'demo'


def test():
    print('begin')
    n = 1
    try:
        while (n <= 100):
            # send方法是异步方法,其被调用时会将记录发送到待处理记录的缓冲区,并立刻返回
            producer.send(topic, str(n).encode())
            print("send" + str(n))
            n += 1
            time.sleep(0.5)
    except KafkaError as e:
        print(e)
    finally:
        producer.close()
        print('done')


if __name__ == '__main__':
    test()
    执行
    程序执行:

    命令行消费者:
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784816.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号