栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka消费完消息自动关闭Demo

kafka消费完消息自动关闭Demo

public class ConsumerTest {
    public static volatile boolean isRunning = true;
    public static KafkaConsumer consumer;
    public static HashMap> eventDateMap = new HashMap<>();
    public static ArrayList userList = new ArrayList<>();

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "All03");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumer = new KafkaConsumer<>(props);
        String topic = "hndj";

        consumer.subscribe(Collections.singletonList(topic));
        try {
            while (isRunning) {
                ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
                if (!records.isEmpty()) {
                    for (ConsumerRecord record : records) {
                        String value = record.value();
                        JSonObject jsonObject = JSONObject.parseObject(value);
                        String data = jsonObject.getString("data");
                        if (!StringUtils.isEmpty(data) && data.startsWith("[")) {
                            JSonArray objects = JSONArray.parseArray(data);
                            for (int i = 0; i < objects.size(); i++) {

                                JSonObject object = objects.getJSonObject(i);
                                String type = object.getString("type");

                                if (type.contains("track")) {
                                    // 转换日期
                                    Long time = object.getLong("time");
                                    String date = new SimpleDateFormat("yyyy-MM-dd").format(new Date(time));

                                    String eventName = object.getString("event");
                                    HashMap eventMap = eventDateMap.get(date);
                                    if (eventDateMap.containsKey(date)) {
                                        if (eventMap != null && eventMap.containsKey(eventName)) {
                                            Integer ad = eventMap.get(eventName);
                                            ad++;
                                            eventMap.put(eventName, ad);
                                        } else {
                                            assert eventMap != null;
                                            eventMap.put(eventName, 1);
                                        }
                                    } else {
                                        eventMap = new HashMap<>(4);
                                        eventMap.put(eventName, 1);
                                        eventDateMap.put(date, eventMap);
                                    }
                                } else {
                                    String userId = object.getString("distinct_id");
                                    if (!StringUtils.isEmpty(userId)){
                                        userList.add(userId);
                                    }
                                }
                                object = null;
                            }
                        }
                    }
                } else {
                    close();
                }
            }
        } finally {
            close();
        }
        // 计算日期下每个事件的总数
        if (eventDateMap != null) {
            for (Map.Entry> mapEntry : eventDateMap.entrySet()) {
                HashMap map = mapEntry.getValue();
                if (map != null) {
                    for (Map.Entry entry : map.entrySet()) {
                        log.info("日期:{}--eventName:{}--eventCount:{}", mapEntry.getKey(), entry.getKey(), entry.getValue());
                    }

                }
            }
        }
        // 计算日期对应的总数
        if (eventDateMap != null) {
            for (Map.Entry> mapEntry : eventDateMap.entrySet()) {
                Integer eventCount = 0;
                HashMap map = mapEntry.getValue();
                if (map != null) {
                    for (Map.Entry entry : map.entrySet()) {
                        eventCount += entry.getValue();
                    }
                    log.info("日期:{}--event总数:{}", mapEntry.getKey(), eventCount);
                }
            }
        }
        log.info("user数据总数:{}", userList.size());
    }

    private static void close() {
        isRunning = false;
        if (consumer != null) {
            consumer.close();
        }
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735002.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号