栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

.net kafka(Kafka基础之.NetCore中简单使用Kafka(二))

.net kafka(Kafka基础之.NetCore中简单使用Kafka(二))

1、引用nuget包UtilsSharp.Kafka

UtilsSharp.Kafka是基于Confluent.Kafka 1.8.2封装的,封装后使用简单快捷。

2、 注册链接
        /// 
        /// 注册
        /// 
        public void Register()
        {
            #region 链接Kafka
            //kafka配置
            KafkaSetting kafkaSetting = new KafkaSetting
            {
                //集群的话以逗号隔开
                BootstrapServers = new[] { "192.168.0.56:9092" }
            };
            //注册kafka
            KafkaManager.Register(new KafkaClient(kafkaSetting));
            #endregion
         }
3、如果同一台机器连接另外一个kafka链接

如果你的程序要链接另外一个kafka链接,则要先定义Kafka2Helper:

public abstract class Kafka2Helper : KafkaHelper{}
        /// 
        /// 注册1
        /// 

        public void Register1()
        {
            #region 如果同一台机器连接另外一个kafka链接

            //kafka配置
            var kafka2Setting = new KafkaSetting
            {
                BootstrapServers = new[] { "192.168.0.57:9092" }
            };
            //注册kafka
            KafkaManager.Register(new KafkaClient(kafka2Setting));

            #endregion
        }
4、创建主题:
        /// 
        /// 创建主题
        /// 
        public void CreateTopic()
        {
            //创建主题
            if (!KafkaHelper.IsTopicExist(topic))
            {
                //ctResult为空则创建成功,否则创建失败
                var ctResult = KafkaHelper.CreateTopicsAsync(topic, 3).Result;
            }
            else
            {
                //获取已创建的主题信息
                var gtResult = KafkaHelper.GetTopic(topic);
            }

        }
5、消费者
        /// 
        /// 消费者
        /// 
        public void Consumer()
        {
            #region 消费者
            var group1 = "group.1";
            var group2 = "group.2";
            var group3 = "group.3";
            {
                var consumer = KafkaHelper.GetConsumer(group1);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new[] { new KafkaSubscriber() { Topic = topic, Partition = 0 } }, result =>
                {
                    Console.WriteLine($"{group1} recieve message:{result.Message}");
                    result.Commit();//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                }).Wait();
            }

            {
                var consumer = KafkaHelper.GetConsumer(group2);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new[] { new KafkaSubscriber() { Topic = topic, Partition = 1 } }, result =>
                {
                    Console.WriteLine($"{group2} recieve message:{result.Message}");
                    result.Commit();//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                }).Wait();
            }

            {
                var consumer = KafkaHelper.GetConsumer(group3);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new[] { new KafkaSubscriber() { Topic = topic, Partition = 2 } }, result =>
                {
                    Console.WriteLine($"{group3} recieve message:{result.Message}");
                    result.Commit();//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                }).Wait();
            }
            #endregion

        }
6、生产者
        /// 
        /// 生产者
        /// 
        public void Producer()
        {
            #region 生产者

            var producer = KafkaHelper.GetProducer();
            var index = 0;
            while (true)
            {
                Console.Write("请输入消息:");
                var line = Console.ReadLine();

                var partition = index % 3;
                producer.Send(topic, partition, "Test", line);
                index++;
            }

            #endregion

        }
7、完整Demo代码
using System;
using System.Collections.Generic;
using System.Text;
using Kafka;
using OptionConfig;

namespace TestDemoApp.Kafka
{
    public abstract class Kafka2Helper : KafkaHelper{}

    public class KafkaInit
    {

        public KafkaInit()
        {
            //注册
            Register();
            //创建主题
            CreateTopic();
            //消费者
            Consumer();
            //生产者
            Producer();
        }

        //主题
        string topic = "myTopic";
        
        /// 
        /// 注册
        /// 
        public void Register()
        {
            #region 链接Kafka
            //kafka配置
            KafkaSetting kafkaSetting = new KafkaSetting
            {
                //集群的话以逗号隔开
                BootstrapServers = new[] { "192.168.0.56:9092" }
            };
            //注册kafka
            KafkaManager.Register(new KafkaClient(kafkaSetting));
            #endregion
         }

        /// 
        /// 注册1
        /// 

        public void Register1()
        {
            #region 如果同一台机器连接另外一个kafka链接

            //kafka配置
            var kafka2Setting = new KafkaSetting
            {
                BootstrapServers = new[] { "192.168.0.57:9092" }
            };
            //注册kafka
            KafkaManager.Register(new KafkaClient(kafka2Setting));

            #endregion
        }

        /// 
        /// 创建主题
        /// 
        public void CreateTopic()
        {
            //创建主题
            if (!KafkaHelper.IsTopicExist(topic))
            {
                //ctResult为空则创建成功,否则创建失败
                var ctResult = KafkaHelper.CreateTopicsAsync(topic, 3).Result;
            }
            else
            {
                //获取已创建的主题信息
                var gtResult = KafkaHelper.GetTopic(topic);
            }

        }

        /// 
        /// 消费者
        /// 
        public void Consumer()
        {
            #region 消费者
            var group1 = "group.1";
            var group2 = "group.2";
            var group3 = "group.3";
            {
                var consumer = KafkaHelper.GetConsumer(group1);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new[] { new KafkaSubscriber() { Topic = topic, Partition = 0 } }, result =>
                {
                    Console.WriteLine($"{group1} recieve message:{result.Message}");
                    result.Commit();//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                }).Wait();
            }

            {
                var consumer = KafkaHelper.GetConsumer(group2);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new[] { new KafkaSubscriber() { Topic = topic, Partition = 1 } }, result =>
                {
                    Console.WriteLine($"{group2} recieve message:{result.Message}");
                    result.Commit();//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                }).Wait();
            }

            {
                var consumer = KafkaHelper.GetConsumer(group3);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new[] { new KafkaSubscriber() { Topic = topic, Partition = 2 } }, result =>
                {
                    Console.WriteLine($"{group3} recieve message:{result.Message}");
                    result.Commit();//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                }).Wait();
            }
            #endregion

        }

        /// 
        /// 生产者
        /// 
        public void Producer()
        {
            #region 生产者

            var producer = KafkaHelper.GetProducer();
            var index = 0;
            while (true)
            {
                Console.Write("请输入消息:");
                var line = Console.ReadLine();

                var partition = index % 3;
                producer.Send(topic, partition, "Test", line);
                index++;
            }

            #endregion

        }
        
    }

}
8、运行结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771964.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号