数据文件:
用到的数据文件
链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w
提取码:2hmu
输入bean
package com.atguigu.networkflow_analysis.beans;
public class UserBehavior {
public Long userId;
public Long itemId;
public Integer categoryId;
public String behavior;
public Long timestamp;
public UserBehavior() {
}
public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timestamp = timestamp;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Long getItemId() {
return itemId;
}
public void setItemId(Long itemId) {
this.itemId = itemId;
}
public Integer getCategoryId() {
return categoryId;
}
public void setCategoryId(Integer categoryId) {
this.categoryId = categoryId;
}
public String getBehavior() {
return behavior;
}
public void setBehavior(String behavior) {
this.behavior = behavior;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", categoryId=" + categoryId +
", behavior='" + behavior + ''' +
", timestamp=" + timestamp +
'}';
}
}
输出bean
package com.atguigu.networkflow_analysis.beans;
public class PageViewCount {
private String url;
private Long windowEnd;
private Long count;
public PageViewCount() {
}
public PageViewCount(String url, Long windowEnd, Long count) {
this.url = url;
this.windowEnd = windowEnd;
this.count = count;
}
public String getUrl() {
return url;
}
public Long getWindowEnd() {
return windowEnd;
}
public Long getCount() {
return count;
}
public void setUrl(String url) {
this.url = url;
}
public void setWindowEnd(Long windowEnd) {
this.windowEnd = windowEnd;
}
public void setCount(Long count) {
this.count = count;
}
@Override
public String toString() {
return "PageViewCount{" +
"url='" + url + ''' +
", windowEnd=" + windowEnd +
", count=" + count +
'}';
}
}
PageView(pv)统计
package com.atguigu.networkflow_analysis.Ahotpages;
import com.atguigu.networkflow_analysis.beans.PageViewCount;
import com.atguigu.networkflow_analysis.beans.UserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.net.URL;
import java.util.Random;
public class PageView {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
URL resource = PageView.class.getResource("/UserBehavior.csv");//读取文件
DataStreamSource inputStream = env.readTextFile(resource.getPath());
DataStream dataStream=inputStream.map(
line ->{
String [] words=line.split(",");
return new UserBehavior(new Long(words[0]),new Long(words[1]),new Integer(words[2]),new String(words[3]),new Long(words[4]));
})
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor() { //升序
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {//获取事件时间戳,秒级转毫秒级
return userBehavior.getTimestamp()*1000L;
}
});
//分组聚合得到结果数据
SingleOutputStreamOperator pvCountStream = dataStream
.filter(data -> "pv".equals(data.getBehavior())) //过滤“pv”行为
.map(new MapFunction>() {//创建[1,4]随机key,让数据均匀遍布4个分区
@Override
public Tuple2 map(UserBehavior value) throws Exception {
Random random = new Random();
return new Tuple2<>(random.nextInt(4)+1,1L);
}
})
.keyBy(data -> data.f0)
.timeWindow(Time.minutes(60)) //每1小时更新一次窗口数据
.aggregate(new PvCountAgg(),new PvCountResult());
SingleOutputStreamOperator countPvResult = pvCountStream
.keyBy(PageViewCount::getWindowEnd)
.process(new TotalPvCount());
//输出并执行
countPvResult.print();
env.execute("hot items analysis");
}
//泛型1:输入类型 泛型2:聚合状态类型 泛型3:输出类型
public static class PvCountAgg implements AggregateFunction,Long,Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple2 value, Long accumulator) {
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
//参数1:输入类型 参数2:输出类型 参数3:keyBy的返回值键值对中value的类型 参数4: 窗口类型
public static class PvCountResult implements WindowFunction {
@Override
public void apply(Integer integer, TimeWindow window, Iterable iterable, Collector collector) throws Exception {
collector.collect(new PageViewCount(integer.toString(),window.getEnd(),iterable.iterator().next()));
}
}
//参数1:keyBy返回值类型 参数2:输入类型 参数3:输出类型
public static class TotalPvCount extends KeyedProcessFunction {
ValueState totalCountState; //保存当前总的count值
@Override
public void open(Configuration parameters) throws Exception {
totalCountState=getRuntimeContext().getState(new ValueStateDescriptor("total_count",Long.class,0L));
}
@Override
public void processElement(PageViewCount pageViewCount, Context context, Collector collector) throws Exception {
totalCountState.update(pageViewCount.getCount()+totalCountState.value()); //count致叠加
context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd());//注册定时器
}
@Override
public void onTimer(long timestamp, onTimerContext ctx, Collector out) throws Exception {
Long totalCount = totalCountState.value();
out.collect(new PageViewCount("pv_count",ctx.getCurrentKey(),totalCount));
//清空状态
totalCountState.clear();
}
@Override
public void close() throws Exception {
totalCountState.clear();
}
}
}
UniqueVisitor(uv)统计
package com.atguigu.networkflow_analysis.Ahotpages;
import com.atguigu.networkflow_analysis.beans.PageViewCount;
import com.atguigu.networkflow_analysis.beans.UserBehavior;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;
import java.net.URL;
public class UniqueVisitorVersion2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
URL resource = UniqueVisitorVersion1.class.getResource("/UserBehavior.csv");//读取文件
DataStreamSource inputStream = env.readTextFile(resource.getPath());
DataStream dataStream=inputStream.map(
line ->{
String [] words=line.split(",");
return new UserBehavior(new Long(words[0]),new Long(words[1]),new Integer(words[2]),new String(words[3]),new Long(words[4]));
})
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor() { //升序
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {//获取事件时间戳,秒级转毫秒级
return userBehavior.getTimestamp()*1000L;
}
});
//处理
SingleOutputStreamOperator uvCountStream = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.timeWindowAll(Time.minutes(60))
.trigger(new MyTrigger())
.process(new UvCountResultWithBloomFilter());
//执行
uvCountStream.print();
env.execute("统计uv");
}
public static class MyTrigger extends Trigger{
//每来一个元素触发一次
@Override
public TriggerResult onElement(UserBehavior userBehavior, long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
return TriggerResult.FIRE_AND_PURGE; //即每来一条数据触发窗口计算又清空窗口数据
}
//在处理时间上自定义触发
@Override
public TriggerResult onProcessingTime(long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;//什么都不干
}
//在事件时间上触发
@Override
public TriggerResult onEventTime(long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;//什么都不干
}
//主要用来清除一些自定义状态
@Override
public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception {
}
}
public static class MyBloomFilter{
private Integer cap;//位图的大小,一般需要时2的倍数
public MyBloomFilter(Integer cap){
this.cap=cap;
}
public Long hashCode(String value,Integer seed){
Long result=1L;
for(int i=0;i {
private Jedis jedis;
private MyBloomFilter mybloomFilter;
@Override
public void open(Configuration parameters) throws Exception {
jedis=new Jedis("localhost",6379);
mybloomFilter=new MyBloomFilter(1<<29);//约为1亿位
}
@Override
public void process(Context context, Iterable iterable, Collector collector) throws Exception {
Long windowEnd = context.window().getEnd();
String bitMapKey = windowEnd.toString();
String countHashName="uv_count";
String countKey=windowEnd.toString();
//取出当前useId
Long userId = iterable.iterator().next().userId;
//计算出当前userId在位图中的offset
Long offset = mybloomFilter.hashCode(userId.toString(), 61);
//用redis的getbit命令,判断对应位置上的值
Boolean isExist = jedis.getbit(bitMapKey, offset);
if(!isExist){
//如果对应offset上没有值,则对应位置设为1
jedis.setbit(bitMapKey,offset,true);
//更新redis保存的count值
Long uvCount=0L;
String uvCountString = jedis.hget(countHashName, countKey);//获取count值
if(uvCountString !=null && !"".equals(uvCountString)){
uvCount = Long.valueOf(uvCountString);
}
jedis.hset(countHashName,countKey,String.valueOf(uvCount+1L));//设置新的count值
//输出
collector.collect(new PageViewCount("uv",windowEnd,uvCount+1L));
}
}
@Override
public void close() throws Exception {
jedis.close();
}
}
}
依赖
4.0.0
com.atguigu
UserBehaviorAnalysis
pom
1.0-SNAPSHOT
HotItemsAnalysis
NetworkFlowAnalysis
KafkaDemo
JavaReview
1.10.1
2.11
2.2.0
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
org.apache.kafka
kafka_${scala.binary.version}
${kafka.version}
org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
org.apache.flink
flink-table-planner_${scala.binary.version}
${flink.version}
org.apache.flink
flink-csv
${flink.version}
org.apache.flink
flink-connector-kafka-0.11_${scala.binary.version}
${flink.version}
org.apache.flink
flink-connector-redis_${scala.binary.version}
1.1.5
org.apache.flink
flink-connector-elasticsearch6_${scala.binary.version}
${flink.version}
mysql
mysql-connector-java
5.1.44
org.apache.kafka
kafka-clients
0.11.0.0
redis.clients
jedis
2.8.1
maven-compiler-plugin
1.8
1.8
UTF-8