本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket)
这里采用静态数据进行模拟商户订单数据。当接收到订单数据时,会根据商户id进行分组,然后再按天进行统计。
1. 主程序代码package com.demo.statis;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
public class ShopDayGMVStatis {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
// 处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//从文件中读取数据
String inputPath = "data/shoporder.txt";
DataStream inputDataSet = env.readTextFile(inputPath);
//对数据集进行处理
DataStream dataStream = inputDataSet.map(new MapFunction() {
@Override
public ShopOrder map(String s) throws Exception {
String splits[] = s.split(",");
Long time = new Long(splits[1]);
return new ShopOrder(splits[0], time * 1000,
new Long(splits[2]) );
}
})
.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks()
{
@Override
public long extractTimestamp(ShopOrder shopOrder, long l) {
return shopOrder.getPayTime();
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}
}
);
DataStream result = dataStream.keyBy(ShopOrder::getShopId)
.process(new ShopDayKeyProcess());
result.print("shop day statistics");
env.execute();
}
}
这里根据订单支付时间的对应日期进行统计。
2. 统计代码package com.demo.statis; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; import java.util.Date; public class ShopDayKeyProcess extends KeyedProcessFunction{ // MapState key:date+type, value:amount。例如:key="18581Sales" 标示2020-11-15 的销售额 private MapState dayAmount; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // ttl策略,(此处需要注意,有些参数跟随flink设置而略有不同) StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(3)) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .cleanupInRocksdbCompactFilter(3000) .build(); MapStateDescriptor amountStateDesc = new MapStateDescriptor<>("amountState", TypeInformation.of(String.class), TypeInformation.of(Long.class)); // 设置ttl amountStateDesc.enableTimeToLive(stateTtlConfig); // 获取MapState dayAmount = getRuntimeContext().getMapState(amountStateDesc); } @Override public void processElement(ShopOrder value, Context context, Collector collector) throws Exception { final ShopDayGmv shopDayGmv = new ShopDayGmv(); shopDayGmv.setShopId(value.getShopId()); shopDayGmv.setTime(value.getPayTime()); long payTime = value.getPayTime(); // paytime,下面公式计算得到日期标示,此值不会重复 // 60 * 60 * 24 = 86400(秒) = 1天 // 60 * 60 * 8 = 28800(秒) = 8小时,对应到GMT+8小时 long dateNum = (payTime + 28800) / 86400; shopDayGmv.setTimeKey(timeFormat(payTime)); // 从状态中获取每日销售额 Long dateSalesState = dayAmount.get(dateNum + "Sales"); Long dateCountState = dayAmount.get(dateNum + "Count"); // 销售额 if (dateSalesState == null) { // 没有数据,标示该时期下第一笔订单,订单量=1,销售额=price dayAmount.put(dateNum + "Sales", value.getOrderPrice()); } else { dayAmount.put(dateNum + "Sales", value.getOrderPrice() + dateSalesState); } // 订单量 if (dateCountState == null) { // 没有数据,标示该时期下第一笔订单,订单量=1,销售额=price dayAmount.put(dateNum + "Count", 1L); } else { dayAmount.put(dateNum + "Count", 1L + dateCountState); } // 获取每日订单量和销售额 shopDayGmv.setTodayCount(dayAmount.get(dateNum + "Count")); shopDayGmv.setTodaySales(dayAmount.get(dateNum + "Sales")); // 数据收集 collector.collect(shopDayGmv); } public static String timeFormat(long timeStamp) { return new SimpleDateFormat("yyyy-MM-dd").format(new Date(timeStamp)); } }
通过设置状态缓存,每个商户的统计状态数据只会保存3天,这样可以避免数据的不断累积导致内存被消耗完的问题。
3. 辅助代码ShopOrderpackage com.demo.statis;
public class ShopOrder {
private String shopId;
private Long payTime;
private Long orderPrice;
public ShopOrder() {
}
public ShopOrder(String shopId, Long payTime, Long orderPrice) {
this.shopId = shopId;
this.payTime = payTime;
this.orderPrice = orderPrice;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public Long getPayTime() {
return payTime;
}
public void setPayTime(Long payTime) {
this.payTime = payTime;
}
public Long getOrderPrice() {
return orderPrice;
}
public void setOrderPrice(Long orderPrice) {
this.orderPrice = orderPrice;
}
@Override
public String toString() {
return "ShopOrder{" +
"shopId='" + shopId + ''' +
", payTime=" + payTime +
", orderPrice=" + orderPrice +
'}';
}
}
4. 辅助代码ShopDayGmv
package com.demo.statis;
public class ShopDayGmv {
private String shopId;
private Long time;
private Long todayCount;
private Long todaySales;
private String timeKey;
public ShopDayGmv() {
}
public ShopDayGmv(String shopId, Long time, Long todayCount, Long todaySales, String timeKey) {
this.shopId = shopId;
this.time = time;
this.todayCount = todayCount;
this.todaySales = todaySales;
this.timeKey = timeKey;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public Long getTodayCount() {
return todayCount;
}
public void setTodayCount(Long todayCount) {
this.todayCount = todayCount;
}
public Long getTodaySales() {
return todaySales;
}
public void setTodaySales(Long todaySales) {
this.todaySales = todaySales;
}
public String getTimeKey() {
return timeKey;
}
public void setTimeKey(String timeKey) {
this.timeKey = timeKey;
}
@Override
public String toString() {
return "ShopDayGmv{" +
"shopId='" + shopId + ''' +
", time=" + time +
", todayCount=" + todayCount +
", todaySales=" + todaySales +
", timeKey='" + timeKey + ''' +
'}';
}
}
5. 测试数据
001,1644479447,10 001,1644479447,20 001,1644479445,10 001,1644479446,30 001,1644479447,20 001,1644479444,80 001,1644479445,10 002,1641801046,10 003,1641801056,30 004,1641801066,106. 执行程序
可以看到如下的输出结果:
shop day statistics> ShopDayGmv{shopId='001', time=1644479447000, todayCount=1, todaySales=10, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479447000, todayCount=2, todaySales=30, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479445000, todayCount=3, todaySales=40, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479446000, todayCount=4, todaySales=70, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479447000, todayCount=5, todaySales=90, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479444000, todayCount=6, todaySales=170, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479445000, todayCount=7, todaySales=180, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='002', time=1641801046000, todayCount=1, todaySales=10, timeKey='2022-01-10'}
shop day statistics> ShopDayGmv{shopId='003', time=1641801056000, todayCount=1, todaySales=30, timeKey='2022-01-10'}
shop day statistics> ShopDayGmv{shopId='004', time=1641801066000, todayCount=1, todaySales=10, timeKey='2022-01-10'}
可以看出,已经安装商户编号和日志进行了统计汇总输出。这里测试只是输出到控制台,实际项目中可以根据项目情况将结果输出到kafka、redis或者数据库等。
可以看到,针对每个数据,都会出现一个对应的输出结果进行对应。如果输入数据并发比较大的话,输出的数据并发也会相应增大,此时需要选择合适的输出方式。



