栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

python对接kafka

python对接kafka

python对接kafka有两个常用库:kafka-python,pykafka,前者github star较多,所以选用了前者。

生产者:

from kafka import KafkaProducer
import json
import datetime

topic = 'test'
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda m: json.dumps(m).encode("utf-8"))
# 参数bootstrap_servers:指定kafka连接地址
# 参数value_serializer:指定序列化的方式,我们定义json来序列化数据,当字典传入kafka时自动转换成bytes
# 用户密码登入参数
# security_protocol="SASL_PLAINTEXT"
# sasl_mechanism="PLAIN"
# sasl_plain_username="maple"
# sasl_plain_password="maple"

for i in range(2):
    data = {"num": i, "ts": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
    producer.send(topic, data)

producer.close()

消费者:

import time

from kafka import KafkaConsumer
from multiprocessing import Process

def run(process, group_id, *topics):
    consumer = KafkaConsumer(*topics, bootstrap_servers=['127.0.0.1:9092'], group_id=group_id, auto_offset_reset="earliest")
    # 参数bootstrap_servers:指定kafka连接地址
    # 参数group_id:如果2个程序的topic和group_id相同,那么他们读取的数据不会重复,2个程序的topic相同,group_id不同,那么他们各自消费相同的数据,互不影响
    # 参数auto_offset_reset:默认为latest表示offset设置为当前程序启动时的数据位置,earliest表示offset设置为0,在你的group_id第一次运行时,还没有offset的时候,给你设定初始offset。一旦group_id有了offset,那么此参数就不起作用了

    for msg in consumer:
        print(msg)
        recv = "%s:%d:%d: key=%s value=%s process=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value, process)
        print(recv)
        time.sleep(1)
        # consumer.close()


if __name__ == '__main__':
    p1 = Process(target=run, args=('进程1', 'group3', 'test'))
    # p2 = Process(target=run, args=('进程2', 'group1', 'test'))
    p1.start()
    # p2.start()
    print('主进程')

消费者在消费时,如果程序意外退出(如ctrl+c),offset未能及时保存,再次重启时会导致重复消费问题。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/443542.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号