栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

kafka多线程实现消费者实战

kafka多线程实现消费者实战

前言:KafkaProducer是线程安全的,但是KafkaConsumer不是线程安全的,同一个KafkaConsumer用在了多个线程中,将会报Kafka Consumer is not safe for multi-threaded assess异常。可以加一个同步锁进行保护。

所以KafkaConsumer在多线程的环境下,我们使用KafkaConsumer的实例要小心,应该每个消费数据的线程拥有自己的KafkaConsumer实例。

1:每个Consumer有自己的线程,consumer去拉取数据,并对数据处理,这种方式比较简单,易于实现,容易保持对消息的顺序处理
2:消费者/处理者方式,创建一个线程池,在consumer拉取数据后,由线程池中的线程来处理数据,把拉取数据与处理数据解耦,但数据处理有可能破坏partition的消息顺序

两种方式的区别:第一种注重消息消费顺序但消费速度不如第二种,第二种注重消费速度但不注重消费顺序。在实际项目种可跟据实际情况选择。

实战背景:项目需要通过kafka的方式接收PaaS平台的告警数据,每分钟的接受量在1000+,目前kafka消费者实例数和分区数都是3,但是由于接收到消息后处理逻辑较为复杂,在优化后有时仍存在消息积压,所以需要在kafka消费处加入多线程进行处理。

多线程处理代码如下,这里使用的上述方式2,因为本处逻辑对消息顺序并不关注:

Thread thread = new Thread(){
			   public void run(){


					KafkaConsumer consumer = null;
					while (true) {
						try {
							consumer = new KafkaConsumer<>(props);
							consumer.subscribe(Collections.singletonList(PaasConstant.topic));

							// 创建线程池
							ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 105, 10L, TimeUnit.SECONDS, new linkedBlockingQueue(1000));
							Integer index = 1;

							// 监听某个topic
							while (true) {
								logger.debug("--------start------监听kafka--topic----" + PaasConstant.topic);
								ConsumerRecords records = consumer.poll(1000);
								logger.debug("开始消费 " + PaasConstant.topic + "中的数据 ");
								for (ConsumerRecord record : records) {
									// 处理数据,发请求
									try {
										// 加入线程池操作
										pool.execute(new ThreadTaskCustomerKafka(record,index.toString()));
										index++;
										logger.debug("======线程"+index+"加入线程池======");

									
									} catch (Exception ex) {
										logger.debug(PaasConstant.topic + "   处理数据异常======" + record.value(), ex);
									}
								}
								Thread.sleep(300);
								logger.debug("--------准备下次监听kafka--topic----" + PaasConstant.topic);
							}

//							pool.shutdown();
						} catch (Exception e) {

							try {
								Thread.sleep(1000);
							} catch (InterruptedException e1) {
								logger.error(PaasConstant.topic + "===Thread.sleep(1000);===" , e1);
							}
							logger.error(PaasConstant.topic + "   处理数据异常======" , e);
						}
					}
				   
				   
				   
				   
			   }
			};
thread.start();

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278855.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号