栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka生产者消费者python版demo

kafka生产者消费者python版demo

1. 版本

Python 3.8.2

kafka-python 2.0.2

2. 生产者
import time
import json
from kafka import KafkaProducer

topic = 'sanford'
bootstrap_servers = 'localhost:9092'


def test_producer():
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                             key_serializer=lambda k: json.dumps(k).encode(),
                             value_serializer=lambda m: json.dumps(m).encode())

    data = {'name': 'jay', 'timestamp': int(time.time()*1000)}

    try:
        future = producer.send(topic=topic, value=data)
        future.get(timeout=10)
    except Exception as e:
        print('生产者发送失败:{0}'.format(e))
    else:
        print('生产者发送成功')


if __name__ == '__main__':
    test_producer()
3. 消费者
auto_offset_reset说明: earliest从未消费的开始消费; latest从最新的开始消费
import json
from kafka import KafkaConsumer

topic = 'sanford'
group_id = 'demo_python'
bootstrap_servers = 'localhost:9092'
# auto_offset_reset = 'earliest'
auto_offset_reset = 'latest'


def test_consumer():
    try:
        consumer = KafkaConsumer(
            topic,
            group_id=group_id,
            bootstrap_servers=bootstrap_servers,
            auto_offset_reset=auto_offset_reset)
        for msg in consumer:
            print('data:{0}'.format(json.loads(msg.value)))
            consumer.commit()
    except Exception as e:
        print('消费者接收失败:{0}'.format(e))


if __name__ == '__main__':
    test_consumer()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/654216.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号