栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka-client 优雅关停

kafka-client 优雅关停

以下给出一个参考示例,但具体的某些细节的处理还需要根据自己的业务自行抉择

实现思路,通过 consumer.wakeup() 方法让  poll 方法抛出异常,然后捕获异常,退出

《kafka权威指南》 第 64 页也有给出示例,有兴趣的可以找来看看

// 消费线程

private Thread consumerThread;

// 线程名称,自己设置下

private String threadName;

private volatile boolean running = false;

// 业务线程池,请根据自己的业务设置合理参数

private ExecutorService pool = new ThreadPoolExecutor(

    Runtime.getRuntime().availableProcessors() * 2,

    Runtime.getRuntime().availableProcessors() * 2, 120, TimeUnit.SECONDS,

    blockingDeque, NamedThreadFactory.create(THREAD_POOL_NAME_PREFIX + threadName + "-"),

    new CallerRunsPolicy()

);;

public synchronized void start() {

  // 此处省略 consumer 初始化的过程

  consumer = null;

  consumer.subscribe(Lists.newArrayList("topic-aaa"));

  running = true;

  consumerThread = new Thread(() -> {

    while (running) {

      try {

        ConsumerRecords records = consumer.poll(500);

        for (ConsumerRecord record : records) {

          // 放入自己的业务线程池

          pool.submit(new Worker(record, records));

        }

      } catch (WakeupException exception) {

        // 捕获 WakeupException consumer 准备退出

        LOG.info("WakeupException 准备关闭consumer");

        // 跳出循环,关闭consumer

        break;

      } catch (Exception exception) {

        LOG.error("error cause by ", exception);

      }

    }

    consumer.close();

    // 此处使用的 guava 的线程池关闭方法,也可以根据自己的需求去自定义实现线程池关闭逻辑,阻塞时间请根据业务情况自行设置

    boolean isTerminated = MoreExecutors.shutdownAndAwaitTermination(pool, 1, TimeUnit.SECONDS);

    LOG.info("consumer closed ,poll terminated:{}", isTerminated);

  });

  consumerThread.setName(threadName);

  consumerThread.start();

}

public synchronized void close() {

  if (!running) {

    LOG.info("consumer 不是运行状态,不需要关闭,threadName[{}]", threadName);

    return;

  }

  Stopwatch stopwatch = Stopwatch.createStarted();

  running = false;

  consumer.wakeup();

  try {

    // 等待counsumer线程执行完成

    consumerThread.join();

  } catch (InterruptedException e) {

    LOG.error("consumerThread.join InterruptedException", e);

  }

  LOG.info("consumer线程关闭完成,threadName[{}],Spend time[{}]", threadName, stopwatch);

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711816.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号