栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

hive UDAF 求轨迹

hive UDAF 求轨迹

hive UDAF 求轨迹

需求 :需要求用户的行动轨迹,例如:

uidlocation_idtime
00112021-10-01 12:21:21
00122021-10-01 12:22:21
00212021-10-01 09:23:21
00112021-10-01 11:24:21
00112021-10-01 12:25:21
00122021-10-01 12:26:21
00122021-10-01 01:26:21
00322021-10-01 12:26:21
00322021-10-01 12:27:21

结果

st location_iduidet
2021-10-01 01:26:2120012021-10-01 01:26:21
2021-10-01 11:24:2110012021-10-01 12:21:21
2021-10-01 12:22:2120012021-10-01 12:22:21
2021-10-01 12:25:2110012021-10-01 12:25:21
2021-10-01 12:26:2120012021-10-01 12:26:21
2021-10-01 09:23:2110022021-10-01 09:23:21
2021-10-01 12:26:2120032021-10-01 12:27:21

解析:
首先需要按照uid分组,组内需要按照时间进行排序,和上一条的数据进行对比,如果location_id不相等,那么就需要将上面的所有记录进行合并,st,et分别代表的是开始时间和结束时间。

注意:
需要注意的是,分组类进行操作的数据结构式List,分组内所有的数据都将在reduce端进行汇总,所以要考虑数据量的问题,一个分组内的数据不能太大,基本上轨迹信息,一个uid对应的数据不会很多,所以可以通过udaf函数来进行解决。

代码

pom.xml



    4.0.0

    org.example
    udf
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            org.apache.hive
            hive-exec
            3.1.2
            provided
        

    


package com.yzz.udaf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFevaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;


@Description(name = "track", value = "_FUNC_(x,y,z,...) - 求轨迹")
public class TrackUDAF extends AbstractGenericUDAFResolver {

    
    @Override
    public GenericUDAFevaluator getevaluator(TypeInfo[] parameters) throws SemanticException {
        if (parameters.length < 2) {
            throw new SemanticException("参数必须不小于2 参数1:代表可供比较的字段例如轨迹中的点位id,参数2:代表时间,参数n:需要保留的字段");
        }
        List fieldsNames = new ArrayList<>(parameters.length);
        for (TypeInfo typeInfo : parameters) {
            fieldsNames.add(typeInfo.getQualifiedName());
        }
        return new TrackUDAFevaluator();
    }

    public static class TrackBuffer implements GenericUDAFevaluator.AggregationBuffer {
        List data = new ArrayList<>();

        List merge() {
            List newList = new ArrayList<>();
            StringBuilder first = null;
            String lastV1 = null;
            String lastV2 = null;
            for (Object obj : data) {
                Text txt = (Text) obj;
                String s = txt.toString();
                String[] split = s.split("\|");
                String v1 = split[0];
                String v2 = split[1];
                if (null == first) {
                    first = new StringBuilder(s);
                } else {
                    if (!v2.equals(lastV2)) {
                        first.append("|").append(lastV1);
                        newList.add(first.toString());
                        first = new StringBuilder(s);
                    }
                }
                lastV1 = v1;
                lastV2 = v2;
            }
            assert first != null;
            first.append("|").append(lastV1);
            newList.add(first.toString());
            return newList;
        }
    }


    
    public static class TrackUDAFevaluator extends GenericUDAFevaluator {


        
        private ObjectInspector[] MAP_OR_COMPLETE_OIS;
        private ListObjectInspector PARTIAL2_OR_FINAL_OIS;

        private Mode mode;

        public TrackUDAFevaluator() {
        }


        
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            //INPUT
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
                //如果是 map阶段或者只有map阶段
                MAP_OR_COMPLETE_OIS = parameters;
            } else {
                //剩下就是 PARTIAL2 和 FINAL
                PARTIAL2_OR_FINAL_OIS = (ListObjectInspector) parameters[0];
            }
            //OUTPUT
            return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }


        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new TrackBuffer();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            TrackBuffer trackBuffer = (TrackBuffer) agg;
            trackBuffer.data.clear();
        }

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            TrackBuffer trackBuffer = (TrackBuffer) agg;
            StringBuilder sb = new StringBuilder();
            for (int i = 0, len = parameters.length; i < len; i++) {
                String data = PrimitiveObjectInspectorUtils.getString(parameters[i], (PrimitiveObjectInspector) MAP_OR_COMPLETE_OIS[i]);
                sb.append(data);
                if (i != len - 1) {
                    sb.append("|");
                }
            }
            trackBuffer.data.add(sb.toString());
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            TrackBuffer trackBuffer = (TrackBuffer) agg;
            return trackBuffer.data;
        }

        
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            TrackBuffer trackBuffer = (TrackBuffer) agg;
            trackBuffer.data.addAll(PARTIAL2_OR_FINAL_OIS.getList(partial));
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            //reduce
            TrackBuffer trackBuffer = (TrackBuffer) agg;
            trackBuffer.data.sort(new Comparator() {
                @Override
                public int compare(Object o1, Object o2) {
                    Text s1 = (Text) o1;
                    Text s2 = (Text) o2;
                    return s1.compareTo(s2);

                }
            });

            return trackBuffer.merge();
        }
    }
}

 
打包 上传 导入hive 
  • 打包
  • 上传至hdfs
  • 导入hive add jar hdfs://nameservice/xxx
  • 创建零时函数 create temporary function track as “com.yzz.udaf.TrackUDAF”;
  • 执行 select track(data_time,location_id,tag) from test.dw group by tag;
参考

https://blog.csdn.net/weixin_39469127/article/details/89766266
https://blog.csdn.net/kent7306/article/details/50110067

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号