栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

go包sarama 连接kafka

go包sarama 连接kafka

package resource

import (
	"context"
	"fmt"

	"ms_web/configs"

	"github.com/Shopify/sarama"
)

type Kafka struct{
	client sarama.SyncProducer
	Topic string
}

func NewKafka(ctx context.Context, conf *configs.Config) (*Kafka, error) {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要 leader和follow都确认
	//config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个 partition(sarama.NewRoundRobinPartitioner轮询
	config.Producer.Return.Successes = true // 成功交付的消息将在success_channel返回
	client, err := sarama.NewSyncProducer(conf.Kafka.Brokers, config)
	if err != nil {
		return nil, fmt.Errorf("NewKafka.NewSyncProducer err:%s", err.Error())
	}

	return &Kafka{
		client: client,
		Topic: conf.Kafka.Topic,
	}, nil
}

func (k *Kafka) Close() {
	if k.client != nil {
		k.client.Close()
	}
}

func (k *Kafka) ProducerMessage(data []byte) error{
	msg := &sarama.ProducerMessage{
		Topic: k.Topic,
		Value: sarama.ByteEncoder(data),
	}

	_, _, err := k.client.SendMessage(msg)
	return err
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774299.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号