栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hive UDF GDF UDTF编写与实现

Hive UDF GDF UDTF编写与实现

hive 自定义函数实现 UDF | GDF | UDTF 区别
  • UDF:一进一出(hive3 已经废除)
  • GDF:一进一出(hive3 支持的GDF)
  • UDTF:一进多出
业务前景

测试数据为:字符串(JSON数组格式 [{},{},{}] )

[{"title": "转让背书", "endorseName": "山东泰山钢铁集团有限公司", "endorseeName": "山东汶汇港物流有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-10"}, {"title": "转让背书", "endorseName": "山东汶汇港物流有限公司", "endorseeName": "山东泰通达物流有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-11"}]

通过hive自定义函数将json数组中的 endorseName和endorseeName取出后并去重输出,或者将二者取出后在hive中进行去重操作

输出格式:

-- UDF GDF输出格式:
山东泰山钢铁集团有限公司,山东汶汇港物流有限公司,山东汶汇港物流有限公司,山东泰通达物流有限公司
-- UDTF 输出格式
山东泰山钢铁集团有限公司
山东汶汇港物流有限公司
山东汶汇港物流有限公司
山东泰通达物流有限公司

实现思路:

  1. 将字符串转换为JSON数组进行处理
  2. 将字符串转换为对象集合的方式获取数据格式
前期准备
  1. 使用IDEA创建Maven项目
  2. 导入相应依赖包
  3. 创建相应的package并创建相应JAVA类
  4. 所有代码编写完成后将项目进行打包处理
创建项目

我的创建项目已经目录截图如下,以下格式仅供参考,需根据自己的需求进行整理:

导入maven


    4.0.0

    org.example
    domain_endorse
    1.0-SNAPSHOT
    
    
    
        UTF8
        3.1.2
    

    
        
        
            org.apache.hive
            hive-exec
            ${hive.version}
        

        
        
            com.alibaba
            fastjson
            1.2.47
        
        
        
            org.projectlombok
            lombok
            1.16.10
        
    
	
    
        
            
                maven-compiler-plugin
                2.3.2
                
                    1.8
                    1.8
                
            
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    

UDF函数实现

将字符串转换为集合对象来实现

package com.sddw.udf;

import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;

import java.util.*;

public class EndorseUDF extends UDF {

    // 对象集合
    @Data
    public static class EndorseObj{
        private String endorseName;
        private String endorseeName;
    }

    public String evaluate(String line) {
        // 定义set集合,接收背书企业信息,(去重)
        Set set = new HashSet();
        // 处理脏数据
        if (line == null || line == "") {
            return null;
        }
        // 判断数据的完整性,不完整数据剔除
        if (line.trim().startsWith("[{") && line.trim().endsWith("}]")) {
            try {
                // 将数据转换为对象集合
                List list = JSONObject.parseArray(line, EndorseObj.class);
                for (int i = 0; i < list.size(); i++) {
                    set.add(list.get(i).endorseeName);
                    set.add(list.get(i).endorseName);
                }

            
            } catch (
                    JSONException e) {
                e.printStackTrace();
            }
        } else {
            // 剔除不完整数据
            return null;
        }
        // 将set集合准换为字符串
        String endorsename = String.join(",", set);
        return endorsename;
    }

    public static void main(String[] args) {
        String line = "[{"title": "转让背书", "endorseName": "山东泰山钢铁集团有限公司", "endorseeName": "山东汶汇港物流有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-10"}, {"title": "转让背书", "endorseName": "山东汶汇港物流有限公司", "endorseeName": "山东泰通达物流有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-11"}, {"title": "转让背书", "endorseName": "山东泰通达物流有限公司", "endorseeName": "山东维利达经贸有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-11"}, {"title": "转让背书", "endorseName": "山东维利达经贸有限公司", "endorseeName": "莱芜市合盛铸造材料经营部", "isTransfer": "可以转让", "endorseDate": "2021-06-11"}, {"title": "转让背书", "endorseName": "莱芜市合盛铸造材料经营部", "endorseeName": "山东六六六贸易有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-11"}, {"title": "转让背书", "endorseName": "山东六六六贸易有限公司", "endorseeName": "山东禾壮信息技术有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-21"}, {"title": "转让背书", "endorseName": "山东禾壮信息技术有限公司", "endorseeName": "山东纳凯建材有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-21"}, {"title": "转让背书", "endorseName": "山东纳凯建材有限公司", "endorseeName": "聊城信源集团有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-21"}, {"title": "转让背书", "endorseName": "聊城信源集团有限公司", "endorseeName": "苏州弗兰特环保科技有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-22"}, {"title": "转让背书", "endorseName": "苏州弗兰特环保科技有限公司", "endorseeName": "潍坊祥盛控制设备科技有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-30"}, {"title": "转让背书", "endorseName": "潍坊祥盛控制设备科技有限公司", "endorseeName": "德州陵城区陆达商贸有限公司", "isTransfer": "可以转让", "endorseDate": "2021-06-30"}]";
        String x = new EndorseUDF().evaluate(line);
        System.out.println(x);
    }
}
GDF实现

将字符串转换为集合对象来实现

package com.sddw.gdf;

import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONException;

import java.util.*;


public class EndorseGDF extends GenericUDF {

	@Data
	public static class EndorseObj{
		private String endorseName;
		private String endorseeName;
	}

	
	@Override
	public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
		// 判断输入参数的个数
		if (objectInspectors.length != 1) {
			throw new UDFArgumentLengthException("Input Args Length Error!!!");
		}
		// 判断输入参数的类型
		if (!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)) {
			throw new UDFArgumentTypeException(0, "Input Args Type Error!!!");
		}
		//函数本身返回值为string,需要返回string类型的鉴别器对象
		return PrimitiveObjectInspectorFactory.javaStringObjectInspector;

	}

	
	@Override
	public String evaluate(DeferredObject[] deferredObjects) throws HiveException {

		String line = deferredObjects[0].get().toString();

		// 定义set集合,接收背书企业信息,(去重)
		Set set = new HashSet();
		// 处理脏数据
		if (line == null || line.length() == 0) {
			return null;
		}
		// 判断数据的完整性,不完整数据剔除
		if (line.trim().startsWith("[{") && line.trim().endsWith("}]")) {

			try {
				// 将数据转换为对象集合
				List list = JSONObject.parseArray(line, EndorseObj.class);
				for (int i = 0; i < list.size(); i++) {
					set.add(list.get(i).endorseeName);
					set.add(list.get(i).endorseName);
				}
			} catch (
					JSONException e) {
				e.printStackTrace();
			}
		} else {
			// 剔除不完整数据
			return null;
		}
		// 将set集合准换为字符串
		String endorsename = String.join(",", set);
		return endorsename;
	}

	@Override
	public String getDisplayString(String[] strings) {
		return null;
	}
}

UDTF实现

通过将字符串转换为JSON数组的方式进行实现

package com.sddw.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.List;

public class EndorseUDTF extends GenericUDTF {
	
	@Override
	public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
		//1.获取传入的参数
		List inputFields = argOIs.getAllStructFieldRefs();
		//2.判断参数个数是否为一个?
		if (inputFields.size() != 1) {
			throw new UDFArgumentException("只需要一个参数");
		}
		//3.定义返回值名称和类型
		//返回的字段名
		List fieldNames = new ArrayList<>();
		fieldNames.add("endorsecorp");
		//返回的字段类型
		List fieldOIs = new ArrayList<>();
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
	}

	
	@Override
	public void process(Object[] objects) throws HiveException {
		//1.获取传入的数据
		String jsonArray = objects[0].toString();
		// 判断JSON数组的完整性,非完整的剔除
		if (jsonArray.startsWith("[{") && jsonArray.endsWith("}]")) {
			//2.将string转化为json数组
			JSONArray actions = new JSONArray(jsonArray);
			//3.循环取出json数组的元素,依次写出
			for (int i = 0; i < actions.length(); i++) {
				String[] result = new String[1];
				result[0] = actions.getString(i);

				JSONObject jsonObject = new JSONObject(result[0]);

				if (jsonObject.toString().contains("endorseName") && jsonObject.toString().contains("endorseeName")) {
					// 获取JSON中的数据
					String endorseName = jsonObject.getString("endorseName").replace("(", "(").replace(")", ")");
					String endorseeName = jsonObject.getString("endorseeName").replace("(", "(").replace(")", ")");
					// 将数据写入
					forward(endorseName);
					forward(endorseeName);
				}
			}
		}
	}

	@Override
	public void close() throws HiveException {

	}
}
项目打包

项目完成并测试通过后,将项目进行打包上传到hive所在的集群中
项目打包完成后会产生两个文件,一个是含有maven,一个是不含maven依赖,如果我们只使用hive的依赖不涉及到其他的依赖建议直接上传不含maven依赖的jar包,如果涉及到其它maven依赖的,必须上传含有maven依赖的jar包

创建临时函数
add jar /opt/data/domain_endorse.jar;
create temporary function endorseudf as 'com.sddw.udf.EndorseUDF';
create temporary function endorsegdf as 'com.sddw.gdf.EndorseGDF';
create temporary function endorseudtf as 'com.sddw.udtf.EndorseUDTF';
-- 临时喊出退出shell终端后函数立马失效

drop function if exists endorsegdf;
drop function if exists endorseudf;
drop function if exists endorseudf;

-- 测试:
select endorseudtf('[{"title": "转让背书", "endorseName": "江苏普莱姆新材料有限公司", "endorseeName": "邯郸市邯山区润川贸易有限公司", "isTransfer": "可转让", "endorseDate": "2020-12-24"}, {"title": "转让背书", "endorseName": "邯郸市邯山区润川贸易有限公司", "endorseeName": "宁波久营贸易有限公司", "isTransfer": "可转让", "endorseDate": "2020-12-25"}, {"title": "转让背书", "endorseName": "宁波久营贸易有限公司", "endorseeName": "嵊州市恒鑫金属制管有限公司", "isTransfer": "可转让", "endorseDate": "2020-12-25"}, {"title": "转让背书", "endorseName": "嵊州市恒鑫金属制管有限公司", "endorseeName": "建龙北满特殊钢有限责任公司", "isTransfer": "可转让", "endorseDate": "2020-12-25"}, {"title": "转让背书", "endorseName": "建龙北满特殊钢有限责任公司", "endorseeName": "无锡容大环境科技有限公司", "isTransfer": "可转让", "endorseDate": "2020-12-31"}, {"title": "转让背书", "endorseName": "无锡容大环境科技有限公司", "endorseeName": "宜兴市清泰净化剂有限公司", "isTransfer": "可转让", "endorseDate": "2021-01-19"}, {"title": "转让背书", "endorseName": "宜兴市清泰净化剂有限公司", "endorseeName": "上海碧源化学品有限公司", "isTransfer": "可转让", "endorseDate": "2021-01-21"}, {"title": "转让背书", "endorseName": "上海碧源化学品有限公司", "endorseeName": "安徽巨成精细化工有限公司", "isTransfer": "可转让", "endorseDate": "2021-01-22"}, {"title": "转让背书", "endorseName": "安徽巨成精细化工有限公司", "endorseeName": "爱森(如东)化工有限公司", "isTransfer": "可转让", "endorseDate": "2021-01-26"}, {"title": "转让背书", "endorseName": "爱森(如东)化工有限公司", "endorseeName": "中国三冶集团有限公司", "isTransfer": "可转让", "endorseDate": "2021-03-19"}, {"title": "转让背书", "endorseName": "中国三冶集团有限公司", "endorseeName": "铁西区玫美工程机械租赁站", "isTransfer": "可转让", "endorseDate": "2021-03-25"}]');

创建永久函数
-- 注意:创建永久函数的时候需要进入到项目的数据库下,我的数据库为“sddw”,如果我们想要更换代码实现逻辑,只需要我们将HDFS文件系统(或者lib目录下的涉及到jar包文件)中的jar文件进行替换,退出hive终端并重新进入即可。

-- 第一种方式:将jar包放在hive的lib目录下(这样可能存在jar包冲突,如果maven不涉及到其他依赖的时候可以直接放入到lib目录下)
add jar /opt/module/hive-3.1.2/lib/domain_endorse.jar;
create function endorseudf as 'com.sddw.udf.EndorseUDF';
create function endorsegdf as 'com.sddw.gdf.EndorseGDF';
-- 前提:hive-env.sh
-- export HIVE_AUX_JARS_PATH=/opt/module/hive-3.1.2/lib

-- 第二种方式:将jar包上传到HDFS 文件系统(jar包一定包含Maven依赖)
create function endorseudf as 'com.sddw.udf.EndorseUDF' using jar 'hdfs:///sddw/function/domain_endorse.jar'
create function endorsegdf as 'com.sddw.gdf.EndorseGDF' using jar 'hdfs:///sddw/function/domain_endorse.jar'
create function endorseudtf as 'com.sddw.udtf.EndorseUDTF' using jar 'hdfs:///sddw/function/domain_endorse.jar'

-- 查看创建的函数
show functions like '*endorse*' ;

-- 删除函数
drop function if exists sddw.endorseudf;
drop function if exists sddw.endorsegdf;
drop function if exists sddw.endorseudtf;
-- 测试:
select sddw.endorseudtf('[{"title": "转让背书", "endorseName": "江苏普莱姆新材料有限公司", "endorseeName": "邯郸市邯山区润川贸易有限公司", "isTransfer": "可转让", "endorseDate": "2020-12-24"}, {"title": "转让背书", "endorseName": "邯郸市邯山区润川贸易有限公司", "endorseeName": "宁波久营贸易有限公司", "isTransfer": "可转让", "endorseDate": "2020-12-25"}, {"title": "转让背书", "endorseName": "宁波久营贸易有限公司", "endorseeName": "嵊州市恒鑫金属制管有限公司", "isTransfer": "可转让", "endorseDate": "2020-12-25"}, {"title": "转让背书", "endorseName": "嵊州市恒鑫金属制管有限公司", "endorseeName": "建龙北满特殊钢有限责任公司", "isTransfer": "可转让", "endorseDate": "2020-12-25"}, {"title": "转让背书", "endorseName": "建龙北满特殊钢有限责任公司", "endorseeName": "无锡容大环境科技有限公司", "isTransfer": "可转让", "endorseDate": "2020-12-31"}, {"title": "转让背书", "endorseName": "无锡容大环境科技有限公司", "endorseeName": "宜兴市清泰净化剂有限公司", "isTransfer": "可转让", "endorseDate": "2021-01-19"}, {"title": "转让背书", "endorseName": "宜兴市清泰净化剂有限公司", "endorseeName": "上海碧源化学品有限公司", "isTransfer": "可转让", "endorseDate": "2021-01-21"}, {"title": "转让背书", "endorseName": "上海碧源化学品有限公司", "endorseeName": "安徽巨成精细化工有限公司", "isTransfer": "可转让", "endorseDate": "2021-01-22"}, {"title": "转让背书", "endorseName": "安徽巨成精细化工有限公司", "endorseeName": "爱森(如东)化工有限公司", "isTransfer": "可转让", "endorseDate": "2021-01-26"}, {"title": "转让背书", "endorseName": "爱森(如东)化工有限公司", "endorseeName": "中国三冶集团有限公司", "isTransfer": "可转让", "endorseDate": "2021-03-19"}, {"title": "转让背书", "endorseName": "中国三冶集团有限公司", "endorseeName": "铁西区玫美工程机械租赁站", "isTransfer": "可转让", "endorseDate": "2021-03-25"}]');

-- 输出:
上海碧源化学品有限公司,中国三冶集团有限公司,无锡容大环境科技有限公司,江苏普莱姆新材料有限公司,建龙北满特殊钢有限责任公司,铁西区玫美工程机械租赁站,邯郸市邯山区润川贸易有限公司,宁波久营贸易有限公司,爱森(如东)化工有限公司,宜兴市清泰净化剂有限公司,安徽巨成精细化工有限公司,嵊州市恒鑫金属制管有限公司
UDF|GDF数据行转列
-- 要想UDF、GDF也实现UDTF这种输出,需要对该数据进行行转列处理
select 
corp_name
from 
(
select '上海碧源化学品有限公司,中国三冶集团有限公司,无锡容大环境科技有限公司,江苏普莱姆新材料有限公司,建龙北满特殊钢有限责任公司,铁西区玫美工程机械租赁站,邯郸市邯山区润川贸易有限公司,宁波久营贸易有限公司,爱森(如东)化工有限公司,宜兴市清泰净化剂有限公司,安徽巨成精细化工有限公司,嵊州市恒鑫金属制管有限公司' as endorse_coms 
)t1 lateral view explode(split(endorse_coms,',')) num as corp_name;
END

到这里,hive的UDF、GDF、UDTF均实现该方法

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487149.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号