今年刚上手一个项目,需求点是文章页面的点赞,收藏,评论和阅读的实时点击统计,并根据这个统计集成给后台,作为后台实现文章日更,时更和用户推送的一个数据埋点.
首先用到的技术是
Kafka流式计算Kafka除了作为高性能的MQ以外,其实还有一个流式计算的功能,这个功能可以把代码运行中的数据状态以消息的形式发送给统计模块进行分流,聚合和统计输出,自动化完成数据的捕抓和收集;
同时,这种功能需要在项目初期技术选型的时候就确定下来,避免对原代码造成侵入破坏.
MQ采用的是消息队列的方式,所以完全不用担心高并发量的问题,或许唯一使您烦恼的可能是消息生产者和消息消费者之间各种自定的主题名字.
实现思路:
1.数据埋点;
2.Springboot项目整合Kafka
3.Kafka及K Stream原始配置与简化;
4.Kafka Input
5.K Stream流式计算
6.Kafka Output
7.数据收集,处理,更新并返回
具体步骤:
一.基础依赖环境(版本号根据自己项目需求确定)
org.apache.kafka kafka-streams
二.yml中需要配置消息组和服务访问地址(我是用的是Linux系统)
kafka:
group: ${spring.application.name}
hosts: 192.168.66.133:9092
三.Kafka及K Stream启动配置类----------->
1.KafkaStreamConfig
@Configuration
@EnableKafkaStreams //启用kafkastream
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
//最大消息的大小
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
//kafka所在服务器地址
private String hosts;
//kafka所在分组名称 给消费者使用 就是applicationName
private String group;
public String getHosts() {
return hosts;
}
public void setHosts(String hosts) {
this.hosts = hosts;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 消息副本数量
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
return new KafkaStreamsConfiguration(props);
}
}
2.工厂注册类
@Component
public class KafkaStreamListenerFactory implements InitializingBean {
Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);
@Autowired
DefaultListableBeanFactory defaultListableBeanFactory;//IOC容器本身
@Override
public void afterPropertiesSet() {
Map map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);
for (String key : map.keySet()) {
KafkaStreamListener k = map.get(key);
KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);
String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;
//将对象交给spring容器管理
3.K Stream监听器接口
public interface KafkaStreamListener {
// 流式处理的时候需要监听的主题是什么 INPUTTOPIC
String listenerTopic();
//流式处理完成之后继续发送到的主题是什么 OUTTOPIC
String sendTopic();
// 流式业务的对象处理逻辑
T getService(T stream);
}
4.基本Bean
public class KafkaStreamProcessor {
// 流构建器
StreamsBuilder streamsBuilder;
private String type;
KafkaStreamListener listener;
public KafkaStreamProcessor(StreamsBuilder streamsBuilder, KafkaStreamListener kafkaStreamListener) {
this.streamsBuilder = streamsBuilder;
this.listener = kafkaStreamListener;
this.parseType();
Assert.notNull(this.type, "Kafka Stream 监听器只支持kstream、ktable,当前类型是" + this.type);
}
public Object doAction() {
if ("kstream".equals(this.type)) {
KStream, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
stream = (KStream) listener.getService(stream);
stream.to(listener.sendTopic());
return stream;
} else {
KTable, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
table = (KTable) listener.getService(table);
table.toStream().to(listener.sendTopic());
return table;
}
}
private void parseType() {
Type[] types = listener.getClass().getGenericInterfaces();
if (types != null) {
for (int i = 0; i < types.length; i++) {
if (types[i] instanceof ParameterizedType) {
ParameterizedType t = (ParameterizedType) types[i];
String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
if (name.contains("org.apache.kafka.streams.kstream.kstream") || name.contains("org.apache.kafka.streams.kstream.ktable")) {
this.type = name.substring(0, name.indexOf('<')).replace("org.apache.kafka.streams.kstream.", "").trim();
break;
}
}
}
}
}
}
5.消息生产者(数据入口)
@RestController
@RequestMapping("/sendstream")
public class StreamProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
private static final String INPUT_TOPIC = "input-stream-topic";
@GetMapping
public String sendstream(){
for(int i=1;i<=20;i++){
if(i%2==0){
kafkaTemplate.send(INPUT_TOPIC,"0001"+i,"hello kafka");
}else{
kafkaTemplate.send(INPUT_TOPIC,"0002"+i,"hello stream");
}
}
return "成功";
}
}
6.消息消费者(K Stream数据出口)
@Component
public class StreamConsumer {
private static final String OUT_TOPIC = "out-stream-topic";
@KafkaListener(topics =OUT_TOPIC )
public void handleMsg(ConsumerRecord consumerRecord){
if(consumerRecord!=null){
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("--------"+key+":"+value);
}
}
}
四.数据埋点(简单的数据封装,由生产者发送到消费者)
五.K Stream--->K Table--->K Stream
@Component
public class HotArticleStreamHandler implements KafkaStreamListener> {
@Override
public String listenerTopic() {//输入主题
return MQConstants.HOT_ARTICLE_INPUT_TOPIC;
}
@Override
public String sendTopic() { //输出主题
return MQConstants.HOT_ARTICLE_OUTPUT_TOPIC;
}
@Override
public KStream getService(KStream stream) {
KTable, Long> ktable = stream.flatMapValues(new ValueMapper>() {
@Override
public Iterable> apply(String value) {
UpdateArticleMsg articleMsg = JsonUtils.toBean(value, Msg.class);
String tag = Msg;
return Arrays.asList(tag);
}
// 搜集值
}).map(new KeyValueMapper>() {
@Override
public KeyValue, ?> apply(String key, Object value) {
return new KeyValue<>(value, value);
}
}).groupByKey()
//设置聚合间隔时间
.windowedBy(TimeWindows.of(Duration.ofSeconds(3)))
.count(Materialized.as("count"));
KStream kStream = ktable.toStream().map(new KeyValueMapper, Long, KeyValue>() {
@Override
public KeyValue apply(Windowed
六.数据收集持久化,实时更新功能可选用第三方定时工具定制;
以上就是本篇文章的需求的实现思路和步骤,如果喜欢觉得有用的,请关注一下,下一篇的题材应该是自动化定时工具的介绍.



