栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于SpringBoot使用K stream实现数据埋点统计实时更新

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于SpringBoot使用K stream实现数据埋点统计实时更新

       今年刚上手一个项目,需求点是文章页面的点赞,收藏,评论和阅读的实时点击统计,并根据这个统计集成给后台,作为后台实现文章日更,时更和用户推送的一个数据埋点.

       首先用到的技术是

Kafka流式计算

        Kafka除了作为高性能的MQ以外,其实还有一个流式计算的功能,这个功能可以把代码运行中的数据状态以消息的形式发送给统计模块进行分流,聚合和统计输出,自动化完成数据的捕抓和收集;

        同时,这种功能需要在项目初期技术选型的时候就确定下来,避免对原代码造成侵入破坏.

       MQ采用的是消息队列的方式,所以完全不用担心高并发量的问题,或许唯一使您烦恼的可能是消息生产者和消息消费者之间各种自定的主题名字.

         实现思路:

1.数据埋点;

2.Springboot项目整合Kafka

3.Kafka及K Stream原始配置与简化;

4.Kafka Input

5.K Stream流式计算

6.Kafka Output

7.数据收集,处理,更新并返回

        具体步骤:

一.基础依赖环境(版本号根据自己项目需求确定)

        
            org.apache.kafka
            kafka-streams
        

二.yml中需要配置消息组和服务访问地址(我是用的是Linux系统)

kafka:
  group: ${spring.application.name}
  hosts: 192.168.66.133:9092

 三.Kafka及K Stream启动配置类----------->

1.KafkaStreamConfig

@Configuration
@EnableKafkaStreams //启用kafkastream
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {

    //最大消息的大小
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    //kafka所在服务器地址
    private String hosts;
    //kafka所在分组名称 给消费者使用 就是applicationName
    private String group;

    public String getHosts() {
        return hosts;
    }

    public void setHosts(String hosts) {
        this.hosts = hosts;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // 消息副本数量
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
        props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
        return new KafkaStreamsConfiguration(props);
    }
}

 2.工厂注册类

@Component
public class KafkaStreamListenerFactory implements InitializingBean {

    Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);

    @Autowired
    DefaultListableBeanFactory defaultListableBeanFactory;//IOC容器本身

    
    @Override
    public void afterPropertiesSet() {
        Map map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);
        for (String key : map.keySet()) {
            KafkaStreamListener k = map.get(key);
            KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);
            String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;

            //将对象交给spring容器管理  

3.K Stream监听器接口

public interface KafkaStreamListener {

    // 流式处理的时候需要监听的主题是什么  INPUTTOPIC
    String listenerTopic();

    //流式处理完成之后继续发送到的主题是什么 OUTTOPIC
    String sendTopic();

    // 流式业务的对象处理逻辑
    T getService(T stream);

}

4.基本Bean

public class KafkaStreamProcessor {

    // 流构建器
    StreamsBuilder streamsBuilder;

    private String type;

    KafkaStreamListener listener;

    public KafkaStreamProcessor(StreamsBuilder streamsBuilder, KafkaStreamListener kafkaStreamListener) {
        this.streamsBuilder = streamsBuilder;
        this.listener = kafkaStreamListener;
        this.parseType();
        Assert.notNull(this.type, "Kafka Stream 监听器只支持kstream、ktable,当前类型是" + this.type);
    }

    
    public Object doAction() {
        if ("kstream".equals(this.type)) {
            KStream stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
            stream = (KStream) listener.getService(stream);
            stream.to(listener.sendTopic());
            return stream;
        } else {
            KTable table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
            table = (KTable) listener.getService(table);
            table.toStream().to(listener.sendTopic());
            return table;
        }
    }

    
    private void parseType() {
        Type[] types = listener.getClass().getGenericInterfaces();
        if (types != null) {
            for (int i = 0; i < types.length; i++) {
                if (types[i] instanceof ParameterizedType) {
                    ParameterizedType t = (ParameterizedType) types[i];
                    String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
                    if (name.contains("org.apache.kafka.streams.kstream.kstream") || name.contains("org.apache.kafka.streams.kstream.ktable")) {
                        this.type = name.substring(0, name.indexOf('<')).replace("org.apache.kafka.streams.kstream.", "").trim();
                        break;
                    }
                }
            }
        }
    }

}

5.消息生产者(数据入口)

@RestController
@RequestMapping("/sendstream")
public class StreamProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static final String INPUT_TOPIC = "input-stream-topic";

    @GetMapping
    public String sendstream(){
        for(int i=1;i<=20;i++){
            if(i%2==0){
                kafkaTemplate.send(INPUT_TOPIC,"0001"+i,"hello kafka");
            }else{
                kafkaTemplate.send(INPUT_TOPIC,"0002"+i,"hello stream");
            }
        }
        return "成功";
    }
}

6.消息消费者(K Stream数据出口)

@Component
public class StreamConsumer {

    private static final String OUT_TOPIC = "out-stream-topic";

    @KafkaListener(topics =OUT_TOPIC )
    public void handleMsg(ConsumerRecord consumerRecord){
        if(consumerRecord!=null){
            String key = consumerRecord.key();
            String value = consumerRecord.value();
            System.out.println("--------"+key+":"+value);
        }
    }
}

四.数据埋点(简单的数据封装,由生产者发送到消费者)

五.K Stream--->K Table--->K Stream

@Component
public class HotArticleStreamHandler implements KafkaStreamListener> {
    @Override
    public String listenerTopic() {//输入主题
        return MQConstants.HOT_ARTICLE_INPUT_TOPIC;
    }

    @Override
    public String sendTopic() { //输出主题
        return MQConstants.HOT_ARTICLE_OUTPUT_TOPIC;
    }

    @Override
    public KStream getService(KStream stream) {

        
        KTable, Long> ktable = stream.flatMapValues(new ValueMapper>() {
            @Override
            public Iterable apply(String value) {
                UpdateArticleMsg articleMsg = JsonUtils.toBean(value, Msg.class);
                String tag = Msg;
                return Arrays.asList(tag);
            }
            // 搜集值
        }).map(new KeyValueMapper>() {

            @Override
            public KeyValue apply(String key, Object value) {
                return new KeyValue<>(value, value);
            }
        }).groupByKey()
           //设置聚合间隔时间
                .windowedBy(TimeWindows.of(Duration.ofSeconds(3)))
                .count(Materialized.as("count"));

        
        KStream kStream = ktable.toStream().map(new KeyValueMapper, Long, KeyValue>() {
            @Override
            public KeyValue apply(Windowed window, Long value) {
                String key = window.key().toString();  

                Msg msg = new Msg();

                switch (condition) {
                    case value1:
                        streamMsg.setmsg (value1);
                        break;
                    case value2:
                        streamMsg.setmsg (value2);
                        break;
                    case value3:
                        streamMsg.setmsg (value3);
                        break;
                    case value4:
                        streamMsg.setmsg (value4);
                        break;
                }

                return new KeyValue<>("", JsonUtils.toString(streamMsg));
            }
        });
        return kStream;
    }
}
 

六.数据收集持久化,实时更新功能可选用第三方定时工具定制; 

以上就是本篇文章的需求的实现思路和步骤,如果喜欢觉得有用的,请关注一下,下一篇的题材应该是自动化定时工具的介绍.

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号