栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

Docker配置Kafka集群

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Docker配置Kafka集群

使用docker-compose创建kafka集群

因为 kafka 需要用到 zookeeper(3.0之后就可以使用 kraft 而不用 zookeeper 了),并且还要创建多个 kafka 容器,这里为了方便使用 docker-compose 去创建。

version: '3.8'
services:
  zookeeper:
    container_name: zk-0 # 容器名称
    image: wurstmeister/zookeeper # 镜像
    restart: always # 重启策略
    ports: # 端口映射
      - 2181:2181
  kafka-0:
    container_name: kafka-0
    image: wurstmeister/kafka # kafka没有官方镜像,用了个star多的
    restart: always
    depends_on:
      - zookeeper # zookeeper创建完之后才会创建当前service
    ports:
      - 9090:9090
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka-0 # 广播主机名称,因为docker-compose会自动创建docker网络,所以可以不用ip
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka # zookeeper连接地址 /kafka 方便目录树操作
      KAFKA_LISTENERS: PLAINTEXT://:9090 # kafka启动的ip:port
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9090 # kafka广播地址ip:port
      KAFKA_BROKER_ID: 0 # brokerId,必须保证集群中的Id都不相同
  kafka-1:
    container_name: kafka-1
    image: wurstmeister/kafka
    restart: always
    depends_on:
      - zookeeper
    ports:
      - 9091:9091
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka-1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      KAFKA_LISTENERS: PLAINTEXT://:9091
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9091
      KAFKA_BROKER_ID: 1
  kafka-2:
    container_name: kafka-2
    image: wurstmeister/kafka
    restart: always
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka-0
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
      KAFKA_BROKER_ID: 2
测试:
# 随便进入一个容器
docker exec -it kafka-0 /bin/bash
# 创建一个主题 qy ,3个分区,3个副本
kafka-topics.sh --bootstrap-server kafka-0:9090,kafka-1:9091,kafka-2:9092 --create --topic qy --partitions 3 --replication-factor 3
# 往主题 qy 里面发送消息
kafka-console-producer.sh --bootstrap-server kafka-0:9090,kafka-1:9091,kafka-2:9092 --topic qy
>qyrzr
>ar
# 再随便进入一个容器
docker exec -it kafka-1 /bin/bash
# 查看主题 qy 的消息
kafka-console-consumer.sh --bootstrap-server kafka-0:9090,kafka-1:9091,kafka-2:9092 --from-beginning --topic qy


golang操作kafka
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"sync"
	"time"
)

func Producer() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Return.Successes = true
	msg := &sarama.ProducerMessage{}
	msg.Topic = "qy"
	msg.Value = sarama.StringEncoder("--ar--")
	producer, err := sarama.NewSyncProducer([]string{
		":9090",
		":9091",
		":9092",
	}, config)
	if err != nil {
		fmt.Println(err.Error(), "host error!")
		return
	}
	defer producer.Close()
	fmt.Println("--")
	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	fmt.Println("send success ", partition, offset)
}

func Consumer() {
	consumer, err := sarama.NewConsumer([]string{
		":9090",
		":9091",
		":9092",
	}, nil)
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	partitions, err := consumer.Partitions("qy")
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	fmt.Println("partitions = ", partitions)
	var wg sync.WaitGroup
	for _, partition := range partitions {
		partitionConsumer, err := consumer.ConsumePartition("qy", partition, sarama.OffsetNewest)
		fmt.Println("partitionConsumer = ", partitionConsumer)
		if err != nil {
			fmt.Println(err.Error())
			return
		}
		wg.Add(1)
		go func(partitionConsumer sarama.PartitionConsumer) {
			fmt.Println("In func")
			defer wg.Done()
			for msg := range partitionConsumer.Messages() {
				fmt.Println(string(msg.Value))
			}
		}(partitionConsumer)
	}
	wg.Wait()
}

func main() {
	go Consumer()
	time.Sleep(5 * time.Second)
	go Producer()
	time.Sleep(10 * time.Second)
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746758.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号