package com.biaodian.flink.taskregister;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.dto.RegisterDto;
import com.biaodian.flink.function.register.*;
import com.biaodian.flink.keyby.RegisterKeyBy;
import com.biaodian.flink.keyby.RegisterSiteKeyBy;
import com.biaodian.flink.tool.CountOrProcessingTimeTrigger;
import com.biaodian.flink.tool.PropertiesTool;
import com.biaodian.flink.tool.PropertiesUtil;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Date;
import java.util.Properties;
import java.util.TimeZone;
public class FlinkRegisterUser {
public static void main(String[] args) throws Exception {
//时区设置
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
System.out.println("当前时间=" + new Date());
//保存点路径
String path = FlinkRegisterUser.class.getClassLoader().getResource("").getPath();
//检查点保存路径
path = Constant.CHECKPOINT_PREFIX + path + Constant.CHECKPOINT_REGISTER_SUFFIX;
System.out.println("保存点路径=" + path);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(Constant.ENABLE_CHECKPOINTING);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setMinPauseBetweenCheckpoints(Constant.MIN_PAUSE_BETWEEN_CHECKPOINTS);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setCheckpointStorage(path);
//kafka配置信息
String topicName = PropertiesTool.getProperty("kafka.register.topic");
Properties properties = PropertiesUtil.getProperties();
properties.setProperty("group.id", PropertiesTool.getProperty("kafka.register.topic.group-id"));
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), properties);
//kafka数据源
DataStream source = env.addSource(consumer).uid("kafka");
//数据转化
DataStream map = source.map(new RegisterMapFunction()).uid("map");
//数据过滤 只取今天的数据
DataStream sourceData = map.filter(new RegisterFilterFunction()).uid("filter");
//开窗处理数据 platform pvType siteId
DataStream register = sourceData.keyBy(new RegisterKeyBy())
.window(TumblingProcessingTimeWindows.of(Time.days(Constant.INTEGER_ONE), Time.hours(-8)))
.trigger(CountOrProcessingTimeTrigger.of(Time.seconds(Constant.INTEGER_ONE), 1000L))
.aggregate(new RegisterAggregateFunction(), new RegisterWindowFunction()).uid("register");
DataStream registerSite = sourceData.keyBy(new RegisterSiteKeyBy())
.window(TumblingProcessingTimeWindows.of(Time.days(Constant.INTEGER_ONE), Time.hours(-8)))
.trigger(CountOrProcessingTimeTrigger.of(Time.seconds(Constant.INTEGER_ONE), 1000L))
.aggregate(new RegisterSiteAggregateFunction(), new RegisterSiteWindowFunction()).uid("registerSite");
//添加到redis缓存中 siteId
env.execute("registerUserNum");
}
}
package com.biaodian.flink.function.register;
import cn.hutool.core.date.DateTime;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.constant.PlatFormEnum;
import com.biaodian.flink.constant.PvTypeEnum;
import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.common.functions.MapFunction;
public class RegisterMapFunction implements MapFunction {
JSonObject jsonObject = null;
JSonObject data = null;
RegisterDto dto = null;
String create_at = null;
@Override
public RegisterDto map(String value) throws Exception {
dto = new RegisterDto();
jsonObject = JSON.parseObject(value.toString());
data = jsonObject.getJSonObject("data");
create_at = new DateTime(data.getDate("created_at").getTime()).toDateStr();
dto.setSourcePlatform(jsonObject.getString("sourcePlatform"));
dto.setPlatform(PlatFormEnum.namevalue(data.getString("platform")));
dto.setPvType(PvTypeEnum.namevalue(data.getString("platform")));
dto.setSiteId(data.getString("application_merchant_id"));
dto.setEventType(data.getString("event_type"));
dto.setUid(data.getString("user_id"));
dto.setStatisticsDateNum(Integer.parseInt(create_at.replaceAll(Constant.UNDER_LINE, Constant.EMPTY_STRING)));
dto.setNum(Constant.INTEGER_ONE);
return dto;
}
}
package com.biaodian.flink.function.register;
import cn.hutool.core.date.DateUtil;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.common.functions.FilterFunction;
public class RegisterFilterFunction implements FilterFunction {
Integer dateInt = null;
@Override
public boolean filter(RegisterDto value) throws Exception {
dateInt = Integer.parseInt(DateUtil.date().toDateStr().replaceAll(Constant.UNDER_LINE,Constant.EMPTY_STRING));
return dateInt.equals(value.getStatisticsDateNum());
}
}
package com.biaodian.flink.keyby;
import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple6;
public class RegisterKeyBy implements KeySelector> {
@Override
public Tuple6 getKey(RegisterDto value) throws Exception {
return new Tuple6<>(value.getStatisticsDateNum(), value.getSourcePlatform(), value.getPlatform(), value.getPvType(), value.getSiteId(), value.getEventType());
}
}
package com.biaodian.flink.tool;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class CountOrProcessingTimeTrigger extends Trigger
package com.biaodian.flink.function.register;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.dto.RegisterDto;
import com.biaodian.flink.tool.RedisUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.redisson.api.RBloomFilter;
import java.util.concurrent.TimeUnit;
public class RegisterAggregateFunction implements AggregateFunction {
private RBloomFilter
package com.biaodian.flink.function.register;
import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class RegisterWindowFunction implements WindowFunction, TimeWindow> {
@Override
public void apply(Tuple6 tuple6, TimeWindow window, Iterable input, Collector out) throws Exception {
}
}
package com.biaodian.flink.function.register;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.dto.RegisterDto;
import com.biaodian.flink.tool.RedisUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.redisson.api.RBloomFilter;
import java.util.concurrent.TimeUnit;
public class RegisterSiteAggregateFunction implements AggregateFunction {
private RBloomFilter
package com.biaodian.flink.function.register;
import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class RegisterSiteWindowFunction implements WindowFunction, TimeWindow> {
@Override
public void apply(Tuple4 tuple4, TimeWindow window, Iterable input, Collector out) throws Exception {
}
}
package com.biaodian.flink.tool;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class PropertiesTool {
private static final Properties properties = new Properties();
static {
InputStream inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application.properties");
try {
properties.load(inputStream);
if ("dev".equals(properties.getProperty("spring.profiles.active"))){
inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application-dev.properties");
}else {
inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application-prod.properties");
}
properties.load(inputStream);
} catch (IOException e) {
e.printStackTrace();
}
}
private PropertiesTool(){}
public static String getProperty(String key){
return properties.getProperty(key);
}
}