以下给出一个参考示例,但具体的某些细节的处理还需要根据自己的业务自行抉择
实现思路,通过 consumer.wakeup() 方法让 poll 方法抛出异常,然后捕获异常,退出
《kafka权威指南》 第 64 页也有给出示例,有兴趣的可以找来看看
| // 消费线程 private Thread consumerThread; // 线程名称,自己设置下 private String threadName; private volatile boolean running = false; // 业务线程池,请根据自己的业务设置合理参数 private ExecutorService pool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, 120, TimeUnit.SECONDS, blockingDeque, NamedThreadFactory.create(THREAD_POOL_NAME_PREFIX + threadName + "-"), new CallerRunsPolicy() );; public synchronized void start() { // 此处省略 consumer 初始化的过程 consumer = null; consumer.subscribe(Lists.newArrayList("topic-aaa")); running = true; consumerThread = new Thread(() -> { while (running) { try { ConsumerRecords for (ConsumerRecord // 放入自己的业务线程池 pool.submit(new Worker(record, records)); } } catch (WakeupException exception) { // 捕获 WakeupException consumer 准备退出 LOG.info("WakeupException 准备关闭consumer"); // 跳出循环,关闭consumer break; } catch (Exception exception) { LOG.error("error cause by ", exception); } } consumer.close(); // 此处使用的 guava 的线程池关闭方法,也可以根据自己的需求去自定义实现线程池关闭逻辑,阻塞时间请根据业务情况自行设置 boolean isTerminated = MoreExecutors.shutdownAndAwaitTermination(pool, 1, TimeUnit.SECONDS); LOG.info("consumer closed ,poll terminated:{}", isTerminated); }); consumerThread.setName(threadName); consumerThread.start(); } public synchronized void close() { if (!running) { LOG.info("consumer 不是运行状态,不需要关闭,threadName[{}]", threadName); return; } Stopwatch stopwatch = Stopwatch.createStarted(); running = false; consumer.wakeup(); try { // 等待counsumer线程执行完成 consumerThread.join(); } catch (InterruptedException e) { LOG.error("consumerThread.join InterruptedException", e); } LOG.info("consumer线程关闭完成,threadName[{}],Spend time[{}]", threadName, stopwatch); } |



