栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

KafkaConsumer is not safe for multi-threaded access

KafkaConsumer is not safe for multi-threaded access

情景重现
线程A,创建kafka消费者对象,对数据进行消费等操作
线程B 停止 kafka消费者对象

解决如下
public class KafkaAsyncForward implements Runnable {
    private static final AtomicBoolean RUNNING = new AtomicBoolean(true);

    private final List topicList;
    private final KafkaConsumer kafkaConsumer;

    public KafkaAsyncForward(Map consumerParams, List topicList) {
        this.kafkaConsumer = new KafkaConsumer<>(consumerParams);
        this.topicList = topicList;
    }

    @Override
    public void run() {
        try {
            kafkaConsumer.subscribe(topicList);
            while (RUNNING.get()) {
                ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(5));
                for (ConsumerRecord record : records) {
                    String topic = record.topic();
                    String data = record.value();
                    // ...
                }
            }
        } finally {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
        }
    }

    public KafkaConsumer getKafkaConsumer() {
        return kafkaConsumer;
    }

    public void stop() {
        try {
            RUNNING.set(false);
        } catch (Exception e) {
            log.error("关闭异常", e);
        }
    }
}

// 错误方式
public class ErrorTest {
    public static void main(String[] args) {
        KafkaAsyncForward kafkaAsyncForward=new KafkaAsyncForward(null,null);
        new Thread(kafkaAsyncForward).start(); // 线程A
        kafkaAsyncForward.getKafkaConsumer().close(); // 主线程 直接关闭kafka
    }
}

// 正确方式
public class TrueTest {
    public static void main(String[] args) {
        KafkaAsyncForward kafkaAsyncForward=new KafkaAsyncForward(null,null);
        new Thread(kafkaAsyncForward).start();  // 线程A
        kafkaAsyncForward.stop();  // 主线程,结束消费的循环,让 线程A 自己执行finally语句关闭kafka
    }
}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758296.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号