需求 :需要求用户的行动轨迹,例如:
| uid | location_id | time |
|---|---|---|
| 001 | 1 | 2021-10-01 12:21:21 |
| 001 | 2 | 2021-10-01 12:22:21 |
| 002 | 1 | 2021-10-01 09:23:21 |
| 001 | 1 | 2021-10-01 11:24:21 |
| 001 | 1 | 2021-10-01 12:25:21 |
| 001 | 2 | 2021-10-01 12:26:21 |
| 001 | 2 | 2021-10-01 01:26:21 |
| 003 | 2 | 2021-10-01 12:26:21 |
| 003 | 2 | 2021-10-01 12:27:21 |
结果
| st location_id | uid | et | |
|---|---|---|---|
| 2021-10-01 01:26:21 | 2 | 001 | 2021-10-01 01:26:21 |
| 2021-10-01 11:24:21 | 1 | 001 | 2021-10-01 12:21:21 |
| 2021-10-01 12:22:21 | 2 | 001 | 2021-10-01 12:22:21 |
| 2021-10-01 12:25:21 | 1 | 001 | 2021-10-01 12:25:21 |
| 2021-10-01 12:26:21 | 2 | 001 | 2021-10-01 12:26:21 |
| 2021-10-01 09:23:21 | 1 | 002 | 2021-10-01 09:23:21 |
| 2021-10-01 12:26:21 | 2 | 003 | 2021-10-01 12:27:21 |
解析:
首先需要按照uid分组,组内需要按照时间进行排序,和上一条的数据进行对比,如果location_id不相等,那么就需要将上面的所有记录进行合并,st,et分别代表的是开始时间和结束时间。
注意:
需要注意的是,分组类进行操作的数据结构式List,分组内所有的数据都将在reduce端进行汇总,所以要考虑数据量的问题,一个分组内的数据不能太大,基本上轨迹信息,一个uid对应的数据不会很多,所以可以通过udaf函数来进行解决。
pom.xml
4.0.0 org.example udf 1.0-SNAPSHOT 8 8 org.apache.hive hive-exec 3.1.2 provided
package com.yzz.udaf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFevaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@Description(name = "track", value = "_FUNC_(x,y,z,...) - 求轨迹")
public class TrackUDAF extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFevaluator getevaluator(TypeInfo[] parameters) throws SemanticException {
if (parameters.length < 2) {
throw new SemanticException("参数必须不小于2 参数1:代表可供比较的字段例如轨迹中的点位id,参数2:代表时间,参数n:需要保留的字段");
}
List fieldsNames = new ArrayList<>(parameters.length);
for (TypeInfo typeInfo : parameters) {
fieldsNames.add(typeInfo.getQualifiedName());
}
return new TrackUDAFevaluator();
}
public static class TrackBuffer implements GenericUDAFevaluator.AggregationBuffer {
List
打包 上传 导入hive
- 打包
- 上传至hdfs
- 导入hive add jar hdfs://nameservice/xxx
- 创建零时函数 create temporary function track as “com.yzz.udaf.TrackUDAF”;
- 执行 select track(data_time,location_id,tag) from test.dw group by tag;
https://blog.csdn.net/weixin_39469127/article/details/89766266
https://blog.csdn.net/kent7306/article/details/50110067



