栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

mac 安装及操作kafka

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

mac 安装及操作kafka

首先对kafka的科普,这里不会讲了,不过这有一篇文章,我感觉用来入门还是挺好。

深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列https://gitbook.cn/books/5ae1e77197c22f130e67ec4e/index.html​​​​​​​

操作系统:macOS12

mac M1系统,已经不支持用brew直接安装使用kafka了,所以需要到官网下载安装包。

当然,首先需要在电脑上配置好java环境,这个就不在这说了

在终端输入 java -version 显示类似如下信息,说明就配置好java环境了

openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Zulu 8.60.0.21-CA-macos-aarch64) (build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Zulu 8.60.0.21-CA-macos-aarch64) (build 25.322-b06, mixed mode)

其实完全按照kafka官网给的指导就能顺利操作成功

1. 先下载下来安装包,按照官网建议,选择第一行的链接。

然后解压,进入到安装包内,目录如下:

 

 2. 开启kafka环境:

        在此文件夹中打开终端,先开启的是zookeeper服务:

bin/zookeeper-server-start.sh config/zookeeper.properties

        然后再打开一个终端,这次开启的是kafka服务:

bin/kafka-server-start.sh config/server.properties

如果两条命令都没有报错,那么就可以认定kafka的运行环境启动成功了

3. 安装python运行kafka的标准库

  官网上给的这一步,是基于命令行操作的kafka,在这我们要做的是利用python操作kafka。

  所以,接下来要先安装kafka的python包,如下:

pip install kafka-python

  这里需要说的是,我还没有找到可以像brew启动服务一样启动kafka服务,所以暂定以第2步这种比较麻烦的方式开启服务。当然如果读者有更好的,欢迎留言告诉我。

4. 一个kafka-python的demo

# kafka_demo.py

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json

def producer_demo():
    # 假设生产的消息是json格式
    producer = KafkaProducer(  # 实例化一个生产者
        bootstrap_servers=['localhost:9092'], 
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 发送消息
    for i in range(5):
        future = producer.send(
            'kafka_demo',
            key='count_num',  # 同一个key值,会被送至同一个分区
            value=str(i))
        print(f"send {i}")
        try:
            future.get(timeout=10) # 监控是否发送成功           
        except KafkaError as e:  # 发送失败抛出KafkaError
            print(e)
            
def consumer_demo():
    consumer = KafkaConsumer(  # 实例化一个消费者
        'kafka_demo', 
        bootstrap_servers=':9092',
        group_id='test'
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
        

相信大家在其他地方已经学习了kafka的生产-消费模式,这里就不多说了。

代码中的两个方法要分开执行,可以进入python交互模式导入kafka_demo,然后依次执行两个方法。

待续.....

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753032.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号