栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka 生产者和消费者

kafka 生产者和消费者

node2:/root/sbin/kafka#cat kafka_produce.py 
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json


class Kafka_producer():
    '''
    使用kafka的生产模块
    '''

    def __init__(self, kafkahost,kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
            ))

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print e


producer = Kafka_producer("192.168.137.3", 9092, "topic_20211214")
#producer.sendjsondata('a1111111111')
producer.sendjsondata('z9999999999999')
node2:/root/sbin/kafka#cat cosumer1.py 
#!/usr/bin/env python
# coding=utf-8
from kafka import *
from  kafka import KafkaConsumer
import datetime,time
import json
def get_kafka_reviews(bootstrap_servers,topics):
    # print type(self.bootstrap_servers)
    consumer = KafkaConsumer(bootstrap_servers=[bootstrap_servers], group_id='con01', auto_offset_reset='latest', enable_auto_commit=True)
    consumer.subscribe(topics=(topics))  #订阅要消费的主题
    # print consumer.topics()
    # print "+++++++",consumer.position(TopicPartition(topic=u'ctripapi_duplicateddata_review', partition=1)) #获取当前主题的最新偏移量

    review_list =[]
    for message in consumer:
        print message
        # str_time = datetime.datetime.fromtimestamp(message.timestamp / 1000)
        # print message.timestamp
        # print type(message.timestamp)
        # #print message.topic ,message.timestamp,message.value
        #
        #
        # print message.topic, str_time, message.value
        # print type(message.value)
        # dict1 = json.loads(message.value)
        # print dict1
        # print type(dict1)
        # print '-------------------'
        # for key in dict1:
        #
        #     print str_time,key,dict1[key]
        # print '-------------------'
        # #print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
        # #review_list.append(message.value)
    #return  review_list

print get_kafka_reviews('192.168.137.3:9092','topic_20211214')

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/662900.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号