栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

C# 如何给kafka消息配置优先级按序消费

C# 如何给kafka消息配置优先级按序消费

        顾名思义kafka消息主题是没有优先级的配置,没办法配置消费顺序的,所有我们需要想办法给kafka消息配置消费顺序,如何做呢?

下面我给大家举个简单优先级事例,比如高中低三个顺序消费消息

        首先要定义三个消息主题,分别是高、中、低是哪个主题,英文分别是high、medium、low

这个时候他们还是没有先后消费的本领,下面需要我们赋予他们这个顺序级别思路如下

消费顺序,高》中》低,高和中都是可以插队来进行消费的,下面分析C#代码

先建立三个线程

        AutoResetEvent low = new AutoResetEvent(false);   //低的关卡

        AutoResetEvent medium= new AutoResetEvent(false); //中的关卡

        AutoResetEvent high= new AutoResetEvent(false); //高的关卡

        Dictionary list= new Dictionary();

        list.Add("low",low);

        list.Add("medium",medium);

        list.Add("high",high);

//定义消费低主题线程

Thread lowThread = new Thread(new ParameterizedThreadStart(new LowServer().Work));

lowThread.Stat(list);

//定义消费中主题线程

Thread mediumThread = new Thread(new ParameterizedThreadStart(new MediumServer().Work));

lowThread.Stat(list);

//定义消费高主题线程

Thread highThread = new Thread(new ParameterizedThreadStart(new HighServer().Work));

lowThread.Stat(list);

三个消费任务

class LowServer{
 
    public void Work(object list){
        Dictionary locks = list as Dictionary;
        AutoResetEvent medium = locks["medium"];
        AutoResetEvent high= locks["high"];
        var consumer = new ConsumerBuilder("kafka配置")
                       .SetValueDeserializer(new 序列化类())
                       .Build();
        consumer.Subscribe("low");//订阅主题low 优先级:低
        while(true){
              if(highSign){//全局变量高主题是否有消息
                  //是
                 high.WaitOne();//等待高主题消息消费完
              } 
              if(mediumSign){//全局变量中主题是否有消息
                  //是
                 medium.WaitOne();//等待中主题消息消费完
              } //最后才轮到自己低主题的
            //然后获取消息工作就是了
    
        }

    }

}
class MediumServer{
 
    public void Work(object list){
        Dictionary locks = list as Dictionary;
        AutoResetEvent medium = locks["medium"];
        AutoResetEvent high= locks["high"];
        var consumer = new ConsumerBuilder("kafka配置")
                       .SetValueDeserializer(new 序列化类())
                       .Build();
        consumer.Subscribe("medium");//订阅主题low 优先级:低
        while(true){
              if(highSign){//全局变量高主题是否有消息
                  //是
                 high.WaitOne();//等待高主题消息消费完
              } 
             //高主题消费完然后才轮到自己中主题的消费

            //然后获取消息工作就是了
            ConsumerResult tempMsg=consumer.Consume();//拉取消息
             if(tempMsg.IsPartitionEOF){//是否消息消费完成
                //是,然后告诉低主题,我做完了,你做吧
                mediumSign=false;
                medium.Set();
             }
            //工作
        }

    }

}
class HighServer{
 
    public void Work(object list){
        Dictionary locks = list as Dictionary;
        AutoResetEvent high= locks["high"];
        var consumer = new ConsumerBuilder("kafka配置")
                       .SetValueDeserializer(new 序列化类())
                       .Build();
        consumer.Subscribe("high");//订阅主题low 优先级:低
        while(true){
             //不需要等到我就是第一
            //然后获取消息工作就是了
            ConsumerResult tempMsg=consumer.Consume();//拉取消息
             if(tempMsg.IsPartitionEOF){//是否消息消费完成
                //是,然后告诉中低主题,我做完了,你做吧
                highSign=false;
                high.Set();
             }
            //工作
        }

    }

}

最后就完成了,消息没有先后,但是线程执行有先后,如此思路,即可解决,支持分布式

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/632848.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号