首先去下载:Kafka下载地址
备选地址:https://www.lanzouw.com/i4b8mxgrafg
密码:204o
准备好jdk环境,zk若无Kafka自带无须担忧。
说明
从蓝奏云上下载的朋友请注意,由于上传的时候蓝奏云不支持tgz后缀的格式,所以我把它改成了tar包,但不影响,下载下来之后以tar包方式解压即可。
解压并配置
tar -xzf kafka_2.13-3.0.0.tgz cd kafka_2.13-3.0.0 # 修改配置文件 vim config/server.properties
配置文件内容
# 该 broker id,针对每一个实例需要设置唯一值 broker.id=0 # 配置所在机器的 ip 和 端口,笔者这里安装在本地的 listeners=PLAINTEXT://127.0.0.1:9092 # 每个 topic 默认的分区数量 num.partitions=2 # zk 地址 zookeeper.connect=127.0.0.1:2181 # 日志文件存储目录,多个可以用逗号分隔 log.dirs=/tmp/kafka-logs启动zk
进入kafka的包的bin目录下
./zookeeper-server-start.sh ../config/zookeeper.properties
不要用ctrl c打断,切记!!!
启动kafka此时还在bin目录下
./kafka-server-start.sh ../config/server.properties常用命令和测试
此时在kafka的安装目录下
# 创建主题 # --tipic 后面指定 topic 的名称为 testTopic # --partitions 指定分区数量 # --replication-factor 指定副本集数量 # --bootstrap-server kafka 地址 # 直接运行 kafka-topics.sh 不带任何参数,会有提示信息出来,也就是帮助列表,选项有哪些,含义是什么 bin/kafka-topics.sh --create --topic testTopic --partitions 1 --replication-factor 1 --bootstrap-server 127.0.0.1:9092 Created topic testTopic. # 查看 topic 列表 bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list testTopic # 发送一条数据 # 回车一条是发送一条消息,可以用 ctrl+c 终止这个程序 # 需要注意的是,如果你这条消息不回车,就直接 ctrl+c 终止了,那么就不算发出去了 bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic testTopic >hello word >wo event # 接受数据:可以重新开启一个命令行 # --from-beginning 表示从头开始消费消息,也就是说,你重复使用这条命令,就可以重复看到这些消息,这个和 rabbitMQ 是有一点区别的 bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic testTopic --from-beginning hello word 1
删除 topic
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic testTopic遇到的问题
关于zk
1.启动过程ctrl c了,他自带的默认不是以后台进程开启的,所以不能打断
2.启动命令都要带上配置文件参数
关于kafka
1.启动的时候报 commit_memory(0x00000000c0000000, 1073741824, 0) failed
百度之后发现是服务器内存太小了,我用的是轻量服务器,2G的,他默认启动要占1G,这肯定不行
vim kafka-server-start.sh
降低启动内存大小.
如:export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
kafka.common.KafkaException: Socket server failed to bind to xx:9092: Cannot assign requested address
该情况下的虚拟机对外ip[暴露的ip]和真实ip[ifconfig显示的ip]可能只是映射关系,用户访问对外ip时,OpenStack会转发到对应的真实ip实现访问。但此时如果 Kafka server.properties配置中的listeners=PLAINTEXT://10.20.1.153:9092中的ip配置为[对外ip]的时候无法启动,因为socket无法绑定监听,报如下错误
kafka.common.KafkaException: Socket server failed to bind to 10.20.1.154:9092: Cannot assign requested address
解决方法也很简单,listeners=PLAINTEXT://10.20.1.153:9092中的ip改为真实ip[ifconfig中显示的ip]即可,其他使用时正常使用对外ip即可,跟真实ip就没有关系了。
上面的说法也不能说是错的,Kafka监听的不能直接是公网的IP(我自己本人是这么认为的,不知道准确不准确),他需要做一个内外网的一个映射~
listeners=PRIVATE://0.0.0.0:9092,PUBLIC://0.0.0.0:9093 advertised.listeners=PRIVATE://192.168.16.4:9092,PUBLIC://180.77.248.246:9093 listener.security.protocol.map=PRIVATE:PLAINTEXT,PUBLIC:PLAINTEXT inter.broker.listener.name=PRIVATE



