栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Flink学习】flink-training浅析

【Flink学习】flink-training浅析

文章目录

官网练习

数据集说明

Schema of Taxi Ride Events 乘坐出租车事件的结构 项目工程

commonride-cleansing

RideCleansingSolution hourly-tips

HourlyTipsSolution rides-and-fares

RidesAndFaresSolution long-ride-alerts

LongRidesSolution

官网练习 数据集说明

纽约市出租车和豪华轿车委员会提供了一个关于 2009 年至 2015 年纽约市出租车出行的公共数据集。我们使用该数据的修改子集来生成有关出租车出行的事件流。 您应该在上述步骤中下载了这些。

Schema of Taxi Ride Events 乘坐出租车事件的结构

我们的出租车数据集包含有关纽约市个人出租车乘车的信息。 每个行程由两个事件表示:行程开始和行程结束事件。 每个事件由十一个字段组成:

rideId         : Long      // 每次行程都有一个唯一的 ID
taxiId         : Long      // 每辆出租车的唯一 ID
driverId       : Long      // 每位司机的唯一 ID
isStart        : Boolean   // TRUE 表示行程开始事件,FALSE 表示行程结束事件
startTime      : DateTime  // 行程开始时间
endTime        : DateTime  // 行程结束时间
                           // 时间格式  "1970-01-01 00:00:00"
startLon       : Float     // 行程开始位置的经度
startLat       : Float     // 行程开始位置的纬度
endLon         : Float     // 行程结束位置的经度
endLat         : Float     // 行程结束位置的纬度
passengerCnt   : Short     // 乘车人数

由TaxiRide类实现
数据集包含坐标信息无效或缺失的记录(经度和纬度均为 0.0)。

还有一个包含出租车费用数据的相关数据集,具有以下字段:

rideId         : Long      // 每次行程都有一个唯一的 ID
taxiId         : Long      // 每辆出租车的唯一 ID
driverId       : Long      // 每位司机的唯一 ID
startTime      : DateTime  // 行程开始时间
paymentType    : String    // 支付类型CSH (cash 现金) or CRD (card 银行卡)
tip            : Float     // 这次行程的小费
tolls          : Float     // 这次行程的通行费
totalFare      : Float     // 收取的总车费

由TaxiFare类实现
注意:获取到数据集之后,不需要解压,直接将压缩找个路径存放,并更新类Exercisebase中的静态成员变量PATH_TO_RIDE_DATA和PATH_TO_FARE_DATA。

数据集下载地址:nycTaxiRides.gz、nycTaxiFares.gz

项目工程

代码来源:https://github.com/apache/flink-training/tree/release-1.10

在flink-training项目中一共有5个子工程:

commonride-cleansingrides-and-fareshourly-tipslong-ride-alerts common

这个子模块主要包含:

实现出租车行程类和出租车车费类,以及对应的TaxiRideSource和TaxiFareSource;实现了基础类Exercisebase,包含一些简单的辅助方法,比如定了数据集的路径和任务并发数等等。使用GeoUtils 提供实用方法来处理数据流练习的位置(j经纬度)。定义一个确实解决方案的异常类MissingSolutionExceptionRideCountExample 计算每位司机行程的个数,用来验证工程可以成功启动的简单例子


ride-cleansing

对应着DataStream API 简介的教程

主要包含了RideCleansingExercise和RideCleansingSolution。两者的区别是RideCleansingSolution中对过滤器进行了实现,而RideCleansingExercise只是抛出MissingSolutionException异常。

RideCleansingSolution

该练习的任务是过滤出租车行程记录的数据流,以仅保留在纽约市内开始和结束的行程。 应打印生成的流。

参数input:输入数据文件的路径

package org.apache.flink.training.solutions.ridecleansing;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
import org.apache.flink.training.exercises.common.utils.Exercisebase;
import org.apache.flink.training.exercises.common.utils.GeoUtils;


public class RideCleansingSolution extends Exercisebase {

	
	public static void main(String[] args) throws Exception {

		ParameterTool params = ParameterTool.fromArgs(args);
		final String input = params.get("input", PATH_TO_RIDE_DATA);

		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(Exercisebase.parallelism);

		// start the data generator
		DataStream rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));

		DataStream filteredRides = rides
				// keep only those rides and both start and end in NYC
				.filter(new NYCFilter());

		// print the filtered stream
		printOrTest(filteredRides);

		// run the cleansing pipeline
		env.execute("Taxi Ride Cleansing");
	}

	public static class NYCFilter implements FilterFunction {
		@Override
		public boolean filter(TaxiRide taxiRide) {
			return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&
					GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
		}
	}
}

由上图里理解执行流程

    创建了一个Stream的执行环境,并将并发设置成了4;通过addSource将TaxiRideSource增加到了Stream Source中;使用filter实现只保留New York的行程记录printOrTest将过滤之后的结果添加到sink中并进行了打印TaxiRide记录。使用execute执行这个任务。

输出结果

2> 160240,START,2013-01-01 08:09:17,1970-01-01 00:00:00,-73.98502,40.76364,-73.9217,40.743343,1,2013010976,2013013178
4> 160175,START,2013-01-01 08:09:00,1970-01-01 00:00:00,-73.98134,40.72515,-74.006805,40.730034,1,2013002250,2013011576
3> 160209,START,2013-01-01 08:09:00,1970-01-01 00:00:00,-74.004196,40.75183,-73.943405,40.815296,1,2013010930,2013012572
2> 159459,END,2013-01-01 08:01:35,2013-01-01 08:09:09,-73.86216,40.76514,-73.96182,40.769604,1,2013004363,2013011978
4> 159121,END,2013-01-01 07:58:00,2013-01-01 08:09:00,-73.995094,40.769707,-73.95765,40.800457,3,2013005495,2013013406

3> 160233,START,2013-01-01 08:09:12,1970-01-01 00:00:00,-73.98917,40.731537,-73.994804,40.750256,1,2013007271,2013013903
3> 160233,END,2013-01-01 08:09:12,2013-01-01 08:17:25,-73.98917,40.731537,-73.994804,40.750256,1,2013007271,2013013903

从结果上看是删选出了纽约城中的行程记录,但是发现数据本身在时间上存在存问题,1970年?脏数据?

hourly-tips

对应着流式分析的教程

HourlyTipsSolution

练习的任务是首先计算每个司机每小时收集的总小费,然后从该流中找到每小时最高的小费总数。

public class HourlyTipsSolution extends Exercisebase {

	
	public static void main(String[] args) throws Exception {

		// read parameters
		ParameterTool params = ParameterTool.fromArgs(args);
		final String input = params.get("input", Exercisebase.PATH_TO_FARE_DATA);

		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(Exercisebase.parallelism);

		// start the data generator
		DataStream fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor)));

		// compute tips per hour for each driver
		DataStream> hourlyTips = fares
				.keyBy((TaxiFare fare) -> fare.driverId)
				.window(TumblingEventTimeWindows.of(Time.hours(1)))
				.process(new AddTips());

		DataStream> hourlyMax = hourlyTips
				.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
				.maxBy(2);

//		You should explore how this alternative behaves. In what ways is the same as,
//		and different from, the solution above (using a windowAll)?

// 		DataStream> hourlyMax = hourlyTips
// 			.keyBy(0)
// 			.maxBy(2);

		printOrTest(hourlyMax);

		// execute the transformation pipeline
		env.execute("Hourly Tips (java)");
	}

	
	public static class AddTips extends ProcessWindowFunction<
			TaxiFare, Tuple3, Long, TimeWindow> {
		@Override
		public void process(Long key, Context context, Iterable fares, Collector> out) {
			float sumOfTips = 0F;
			for (TaxiFare f : fares) {
				sumOfTips += f.tip;
			}
			out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
		}
	}
}

由上图里理解执行流程

    创建了一个Stream的执行环境,并将并发设置成了4,配置了EventTime;通过addSource将TaxiRideSource增加到了Stream Source中;根据司机ID分组利用滚动窗口(小时间隔)计算每小时每个司机的小费,AddTips类中拥有小时区间的结束时间,司机id,小费对象。对第三步骤的结果再次进行每小时最高小费的计算maxBy(2),2指的Tuple3的下标为2的元素。printOrTest将过滤之后的结果添加到sink中并进行了打印hourlyMax记录。使用execute执行这个任务。

输出结果
输出的是Tuple3对象,小时区间的结束时间,司机id,小费

2> (1357308000000,2013014526,81.98)
3> (1357311600000,2013026978,33.9)
4> (1357315200000,2013018152,74.91)
1> (1357318800000,2013004219,20.2)

window与windowAll啥区别?

rides-and-fares

对应着数据管道 & ETL的教程

RidesAndFaresSolution

本练习的目标是通过票价信息填充 TaxiRides,使得数据更丰富完整。

public class RidesAndFaresSolution extends Exercisebase {

	
	public static void main(String[] args) throws Exception {

		ParameterTool params = ParameterTool.fromArgs(args);
		final String ridesFile = params.get("rides", PATH_TO_RIDE_DATA);
		final String faresFile = params.get("fares", PATH_TO_FARE_DATA);

		final int delay = 60;					// at most 60 seconds of delay
		final int servingSpeedFactor = 1800; 	// 30 minutes worth of events are served every second

		// Set up streaming execution environment, including Web UI and REST endpoint.
		// Checkpointing isn't needed for the RidesAndFares exercise; this setup is for
		// using the State Processor API.

		Configuration conf = new Configuration();
		conf.setString("state.backend", "filesystem");
		conf.setString("state.savepoints.dir", "file:\code\flink\training-data\savepoints");
		conf.setString("state.checkpoints.dir", "file:\code\flink\training-data\checkpoints");
		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
		env.setParallelism(Exercisebase.parallelism);

		env.enableCheckpointing(10000L);
		CheckpointConfig config = env.getCheckpointConfig();
		config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

		DataStream rides = env
				.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
				.filter((TaxiRide ride) -> ride.isStart)
				.keyBy(ride -> ride.rideId);

		DataStream fares = env
				.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
				.keyBy(fare -> fare.rideId);

		// Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
		DataStream> enrichedRides = rides
				.connect(fares)
				.flatMap(new EnrichmentFunction())
				.uid("enrichment");

		printOrTest(enrichedRides);

		env.execute("Join Rides with Fares (java RichCoFlatMap)");
	}

	public static class EnrichmentFunction extends RichCoFlatMapFunction> {
		// keyed, managed state
		private ValueState rideState;
		private ValueState fareState;

		@Override
		public void open(Configuration config) {
			rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
			fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
		}

		@Override
		public void flatMap1(TaxiRide ride, Collector> out) throws Exception {
			TaxiFare fare = fareState.value();
			if (fare != null) {
				fareState.clear();
				out.collect(Tuple2.of(ride, fare));
			} else {
				rideState.update(ride);
			}
		}

		@Override
		public void flatMap2(TaxiFare fare, Collector> out) throws Exception {
			TaxiRide ride = rideState.value();
			if (ride != null) {
				rideState.clear();
				out.collect(Tuple2.of(ride, fare));
			} else {
				fareState.update(fare);
			}
		}
	}
}

由上图里理解执行流程

    创建了一个Stream的执行环境,并将并发设置成了4,与之前的练习不同,这里将事件时间变为30分钟。由于使用需要使用有状态的转换,使用Configuration配置了状态存储的方式和路径。每10s进行一次checkpoint。取消任务的状态文件需要手动清理。通过addSource将TaxiRideSource和TaxiRideSource分别增加到了Stream Source的rides和fares中,将每个行程id作为key进行分组,rides过滤出开始类型的行程。实现了一个EnrichmentFunction类,作为处理函数。rides流走flatMap1方法,fares走flatMap2方法。将每个行程id作为key,把开始类型的行程与行程费用对象通过key组合成一个元组流。printOrTest将过滤之后的结果添加到sink中并进行了打印enrichedRides记录。使用execute执行这个任务。

输出结果

2> (1494598,START,2013-01-04 15:48:25,1970-01-01 00:00:00,0.0,0.0,-73.95784,40.675404,1,2013007827,2013007823,1494598,2013007827,2013007823,2013-01-04 15:48:25,CRD,4.0,0.0,42.0)
3> (1494966,START,2013-01-04 15:49:20,1970-01-01 00:00:00,-73.99782,40.740948,-73.99345,40.731056,1,2013004745,2013016201,1494966,2013004745,2013016201,2013-01-04 15:49:20,CSH,0.0,0.0,6.0)
2> (1494659,START,2013-01-04 15:48:42,1970-01-01 00:00:00,-73.98507,40.728317,-74.009056,40.716156,1,2013012349,2013025722,1494659,2013012349,2013025722,2013-01-04 15:48:42,CRD,2.7,0.0,16.2)
3> (1494813,START,2013-01-04 15:49:00,1970-01-01 00:00:00,-73.99216,40.7503,-73.98552,40.75631,1,2013000782,2013014855,1494813,2013000782,2013014855,2013-01-04 15:49:00,CSH,0.0,0.0,7.5)

同时检查了一下状态存储的文件夹,发现取消任务后,他的状态文件并没有被删除。需要手动删除。

疑问 savepoints和checkpoints每个子文件夹下的文件有存的具体是什么呢?

long-ride-alerts

对应着事件驱动应用的教程

LongRidesSolution

本练习的目标是为在前 2 小时内未与 END 事件匹配的出租车行程发出 START 事件。

	public static void main(String[] args) throws Exception {

		ParameterTool params = ParameterTool.fromArgs(args);
		final String input = params.get("input", Exercisebase.PATH_TO_RIDE_DATA);

		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(Exercisebase.parallelism);

		// start the data generator
		DataStream rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));

		DataStream longRides = rides
				.keyBy(r -> r.rideId)
				.process(new MatchFunction());

		printOrTest(longRides);

		env.execute("Long Taxi Rides");
	}

	private static class MatchFunction extends KeyedProcessFunction {
		// keyed, managed state
		// holds an END event if the ride has ended, otherwise a START event
		private ValueState rideState;

		@Override
		public void open(Configuration config) {
			ValueStateDescriptor startDescriptor =
					new ValueStateDescriptor<>("saved ride", TaxiRide.class);
			rideState = getRuntimeContext().getState(startDescriptor);
		}

		@Override
		public void processElement(TaxiRide ride, Context context, Collector out) throws Exception {
			TimerService timerService = context.timerService();

			if (ride.isStart) {
				// the matching END might have arrived first; don't overwrite it
				if (rideState.value() == null) {
					rideState.update(ride);
				}
			} else {
				rideState.update(ride);
			}

			timerService.registerEventTimeTimer(ride.getEventTime() + 120 * 60 * 1000);
		}

		@Override
		public void onTimer(long timestamp, OnTimerContext context, Collector out) throws Exception {
			TaxiRide savedRide = rideState.value();
			if (savedRide != null && savedRide.isStart) {
				out.collect(savedRide);
			}
			rideState.clear();
		}
	}

}

由上图里理解执行流程

    创建了一个Stream的执行环境,并将并发设置成了4,配置了EventTime;。通过addSource将TaxiRideSource增加到了Stream Source中;通过行程id进行分组,具体的逻辑在MatchFunction中实现。MatchFunction继承了KeyedProcessFunction。重写了open,processElement,onTimer。
      在初始化的时候调用了open,为TaxiRide创建了对于的值状态描述符(ValueStateDescriptor)当票价事件(TaxiFare-Event)输入(到达)时调用processElement,对行程的状态进行更新。并注册一个计时器,到期时间为事件开始时间后的2h。当watermark表明窗口现在需要完成的时候调用。应该就是上面注册的定时器到时回调onTimer这个方法。采集器采集超过2h还没收到收到行程结束的行程(以行程开始事件表示)
    printOrTest将过滤之后的结果添加到sink中并进行了打印longRides记录。使用execute执行这个任务。

输出结果

3> 2758,START,2013-01-01 00:10:13,1970-01-01 00:00:00,-73.98849,40.725166,-73.989006,40.763557,1,2013002682,2013002679
2> 7575,START,2013-01-01 00:20:23,1970-01-01 00:00:00,-74.002426,40.73445,-74.0148,40.716736,1,2013001908,2013001905
2> 22131,START,2013-01-01 00:47:03,1970-01-01 00:00:00,-73.97784,40.72598,-73.926346,40.74442,1,2013008502,2013008498
1> 25473,START,2013-01-01 00:53:10,1970-01-01 00:00:00,-73.98471,40.778183,-73.98471,40.778183,1,2013007595,2013007591
1> 29907,START,2013-01-01 01:01:15,1970-01-01 00:00:00,-73.96685,40.77239,-73.918274,40.84052,1,2013007187,2013007183
3> 30796,START,2013-01-01 01:03:00,1970-01-01 00:00:00,-73.99605,40.72438,-73.99827,40.729496,6,2013002159,2013002156
1> 33459,START,2013-01-01 01:07:47,1970-01-01 00:00:00,0.0,0.0,0.0,0.0,1,2013009337,2013009334
4> 36822,START,2013-01-01 01:14:00,1970-01-01 00:00:00,-73.95057,40.779404,-73.98082,40.77466,1,2013009669,2013009666

1970的时间数据,就是为了验证这个场景,特意造的数据

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/721070.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号