package resource
import (
"context"
"fmt"
"ms_web/configs"
"github.com/Shopify/sarama"
)
type Kafka struct{
client sarama.SyncProducer
Topic string
}
func NewKafka(ctx context.Context, conf *configs.Config) (*Kafka, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要 leader和follow都确认
//config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个 partition(sarama.NewRoundRobinPartitioner轮询
config.Producer.Return.Successes = true // 成功交付的消息将在success_channel返回
client, err := sarama.NewSyncProducer(conf.Kafka.Brokers, config)
if err != nil {
return nil, fmt.Errorf("NewKafka.NewSyncProducer err:%s", err.Error())
}
return &Kafka{
client: client,
Topic: conf.Kafka.Topic,
}, nil
}
func (k *Kafka) Close() {
if k.client != nil {
k.client.Close()
}
}
func (k *Kafka) ProducerMessage(data []byte) error{
msg := &sarama.ProducerMessage{
Topic: k.Topic,
Value: sarama.ByteEncoder(data),
}
_, _, err := k.client.SendMessage(msg)
return err
}