因为 kafka 需要用到 zookeeper(3.0之后就可以使用 kraft 而不用 zookeeper 了),并且还要创建多个 kafka 容器,这里为了方便使用 docker-compose 去创建。
version: '3.8'
services:
zookeeper:
container_name: zk-0 # 容器名称
image: wurstmeister/zookeeper # 镜像
restart: always # 重启策略
ports: # 端口映射
- 2181:2181
kafka-0:
container_name: kafka-0
image: wurstmeister/kafka # kafka没有官方镜像,用了个star多的
restart: always
depends_on:
- zookeeper # zookeeper创建完之后才会创建当前service
ports:
- 9090:9090
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka-0 # 广播主机名称,因为docker-compose会自动创建docker网络,所以可以不用ip
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka # zookeeper连接地址 /kafka 方便目录树操作
KAFKA_LISTENERS: PLAINTEXT://:9090 # kafka启动的ip:port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9090 # kafka广播地址ip:port
KAFKA_BROKER_ID: 0 # brokerId,必须保证集群中的Id都不相同
kafka-1:
container_name: kafka-1
image: wurstmeister/kafka
restart: always
depends_on:
- zookeeper
ports:
- 9091:9091
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka-1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
KAFKA_LISTENERS: PLAINTEXT://:9091
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9091
KAFKA_BROKER_ID: 1
kafka-2:
container_name: kafka-2
image: wurstmeister/kafka
restart: always
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka-0
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_BROKER_ID: 2
测试:
# 随便进入一个容器 docker exec -it kafka-0 /bin/bash # 创建一个主题 qy ,3个分区,3个副本 kafka-topics.sh --bootstrap-server kafka-0:9090,kafka-1:9091,kafka-2:9092 --create --topic qy --partitions 3 --replication-factor 3 # 往主题 qy 里面发送消息 kafka-console-producer.sh --bootstrap-server kafka-0:9090,kafka-1:9091,kafka-2:9092 --topic qy >qyrzr >ar # 再随便进入一个容器 docker exec -it kafka-1 /bin/bash # 查看主题 qy 的消息 kafka-console-consumer.sh --bootstrap-server kafka-0:9090,kafka-1:9091,kafka-2:9092 --from-beginning --topic qy
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
"time"
)
func Producer() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
msg := &sarama.ProducerMessage{}
msg.Topic = "qy"
msg.Value = sarama.StringEncoder("--ar--")
producer, err := sarama.NewSyncProducer([]string{
":9090",
":9091",
":9092",
}, config)
if err != nil {
fmt.Println(err.Error(), "host error!")
return
}
defer producer.Close()
fmt.Println("--")
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println("send success ", partition, offset)
}
func Consumer() {
consumer, err := sarama.NewConsumer([]string{
":9090",
":9091",
":9092",
}, nil)
if err != nil {
fmt.Println(err.Error())
return
}
partitions, err := consumer.Partitions("qy")
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println("partitions = ", partitions)
var wg sync.WaitGroup
for _, partition := range partitions {
partitionConsumer, err := consumer.ConsumePartition("qy", partition, sarama.OffsetNewest)
fmt.Println("partitionConsumer = ", partitionConsumer)
if err != nil {
fmt.Println(err.Error())
return
}
wg.Add(1)
go func(partitionConsumer sarama.PartitionConsumer) {
fmt.Println("In func")
defer wg.Done()
for msg := range partitionConsumer.Messages() {
fmt.Println(string(msg.Value))
}
}(partitionConsumer)
}
wg.Wait()
}
func main() {
go Consumer()
time.Sleep(5 * time.Second)
go Producer()
time.Sleep(10 * time.Second)
}



