栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

# HIVE udf和udtf功能使用

# HIVE udf和udtf功能使用

	前言:之前在实习过程中,有一个业务需求,因为开始没有理解需求,所以走了很多弯路,最后修改代码逻辑的时候发现还有上游字段没有解析,需要些一个udtf来完成需求。之前虽然了解过udf和udtf等,但是没有实际写过,
最后差点任务延期,所以在使用hive过程中,udf和udtf也应该作为数据开发人员的基本功,在实际开发过程中也是经常需要使用的。

1、UDF:只对单行数值产生作用;继承UDF类,核心方法evaluate();

注意:evaluate()方法并不是唯一的:
这里我以一个实际方法为例来讲解:
public class DateBetween extends UDF {
	public boolean evaluate(String diffDates[],String inDate,int num) throws ParseException {
		if(diffDates == null) return false;
		for(String date:diffDates){
			int diff = diffDays(date,inDate);
			if(diff>0 && diff<=num) return true;
		}
		return false;
	}
	public boolean evaluate(String diffDate,String inDate,int num) throws ParseException {
		if(diffDate == null ||"".equals(diffDate))return false;
		String diffDates[] = diffDate.split(",");
		for(String date:diffDates){
			int diff = diffDays(date,inDate);
			if(diff>0 && diff<=num) return true;
		}
		return false;
	}
	public int diffDays(String dateStr1,String dateStr2) throws ParseException {
		if(dateStr1 == null || dateStr2 ==  null)return -1;
		SimpleDateFormat df = null;
		df = new SimpleDateFormat("yyyyMMdd");
		Date date1= df.parse(dateStr1);
		Date date2 = df.parse(dateStr2);
		long time = date1.getTime() - date2.getTime();
		if(time<=0){
			return -1;
		} else{
			return (int)(time/(24 * 60 * 60 * 1000));
		}
	}

	public static void main(String srgas[]) throws ParseException {
		DateBetween between = new DateBetween();
		String dateStr = "20211026,20211101";
		String dateStr2 = "20211030";
		String diffDates[] = dateStr.split(",");
		for(String date:diffDates){
			int diff = between.diffDays(date,dateStr2);
			System.out.println("diff:"+diff);
			if(diff>0 && diff<=7)
				System.out.println(1);
		}
	}
}
evaluate方法中定义参数及返回值,这也是udf_func(x)的返回值,可以看到,evaluate()是可以进行方法重载的。

2、udtf:输入一行输出多行;继承GenericUDTF类,重写initialize(返回输出行信息:列个数,类型), process, close三方法;

public class CouponRestrictionPoiLimitlist extends GenericUDTF{

    private static final String POI_SKU = "poi_sku_ID";

    private static Logger logger= LoggerFactory.getLogger(CouponRestrictionPoiLimitlist.class);
    static final Log LOG = LogFactory.getLog(CouponRestrictionPoiLimitlist.class.getName());

    private String[] obj = new String[1];

//初始化方法,主要作用是定义输出的格式、字段类型;
    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {

        ArrayList fieldNames = new ArrayList();
        ArrayList fieldOIs = new ArrayList();

//如果有多列需要输入和输出,那么同样也应该在这里定义多个对象,具体方法便是fieldNames和fieldOIs应该要对应添加想要输出的内容;
        fieldNames.add(POI_SKU);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

//一般来说,这里选用默认的输出结构即可;
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    @Override
    public void process(Object[] objects) throws HiveException {

        try{
            //字段值;
            String jsonStr = objects[0].toString();

            logger.warn("####"+jsonStr);

            //提取JSON字段;
            Gson gson = new Gson();

            JSONArray jsonArray = gson.fromJson(jsonStr, new TypeToken() {}.getType());

            for (int i = 0; i < jsonArray.size(); i++) {

                JSONObject jsonObject = jsonArray.getJSONObject(i);
                //获取poiId字段;
                Integer poiId = jsonObject.getInteger("poiId");
                //获取limitList字段;
                List limitList = jsonObject.getObject("limitList", List.class);
                //拼接并输出poi_sku字段;
                for (int j = 0; j < limitList.size(); j++) {
                    Integer skuId = (int) Double.parseDouble(String.valueOf(limitList.get(j)));
                    String result=poiId + "_" + skuId;
                    obj[0] = result;
                    logger.warn("####result"+result);
//这里要注意,forward()中输出的是Object类型数据,但实际上这里不能直接输出单个Object对象,而应该是String[],Object数组等类型数据;
                    forward(obj);
                }
            }
        }catch(Exception e){
            e.printStackTrace();
            logger.error(e.getMessage(), e);
        }
    }

    @Override
    public void close() throws HiveException {

    }



    public static void main(String[] args) {

        String jsonStr = "[{"poiId":46,"limitList":[54183,54318,15501,54849]}]";

        //提取JSON字段;
        Gson gson = new Gson();

        JSONArray jsonArray = gson.fromJson(jsonStr, new TypeToken() {}.getType());

        for (int i = 0; i < jsonArray.size(); i++) {

            JSONObject jsonObject = jsonArray.getJSONObject(i);
            //获取poiId字段;
            Integer poiId = jsonObject.getInteger("poiId");
            //获取limitList字段;
            List limitList = jsonObject.getObject("limitList", List.class);
            //拼接并输出poi_sku字段;
            for (int j = 0; j < limitList.size(); j++) {
                Integer skuId = (int) Double.parseDouble(String.valueOf(limitList.get(j)));
                System.out.println((poiId + "_" + skuId));
            }
        }
    }
}

//输出结果为:
46_54183
46_54318
46_15501
46_54849

Process finished with exit code 0

3、udaf函数;这里我没有用过,以后有遇到会再行补充;主要运用得多的是udf和udtf函数;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389348.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号