目录
UDF
GenericUDF
Java开发转了大数据,竟然被拉去做了非结构的ETL抽取,真的是比做后端伤脑筋,没有可借鉴的框架,只能根据数据进行抽取,第一份大数据实习,写完抽取代码后,需要写成UDF和UDTF进行使用。
简单意思:
UDF: 一对一,输入一笔数据输出一笔数据
UDTF:一对多,输入一笔数据输出多笔数据 (接受0个或多个输入然后产生多列或多行输出。)
UDAF:多对一,输入多笔数据输出一笔数据
记录一下UDF和GenericUDF的区别:
UDF属于基础的UDF:
简单的udf实现很简单,只需要继承udf,然后实现evaluate()方法就行了。evaluate()允许重载。
UDF
对于自定义函数现在需要进行总结一下:
pom文件:主要为打包文件:
4.0.0 org.example UDF1.0-SNAPSHOT jar org.apache.spark spark-core_2.112.3.0 provided org.apache.spark spark-hive_2.112.3.0 provided src compile maven-compiler-plugin 3.5.1 1.8 1.8 UTF-8 maven-assembly-plugin jar-with-dependencies make-assembly package single
package com.demo;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
@Description(
name = "wordUDF",
value = "_FUNC_(String word) - Returns result",
extended ="Example:\n > SELECt _FUNC_(\'你好\') FROM src LIMIT 1;\n \'2022新年快乐:你好\'""
)
public class WordSingleUDF extends UDF {
public String evaluate(String args) {
return "2022新年快乐:"+args;
}
public static void main(String[] args) {
System.out.println(new WordSingleUDF().evaluate("你好"));
}
}
进行打包上传:
1. add jar /home/zhaohai.li/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar
2. create temporary function udf_word as 'com.demo.WordSingleUDF';
3. select udf_word('hello')
显示:
2022新年快乐:hello
GenericUDF
这个函数需要进行实现多个方法
GenericUDF的有点 可以处理复杂的数据类型,所以它能处理更为复杂的数据类型场景。
在进行继承GenericUDF 时需要进行实现三个方法:
必须实现的函数: ObjectInspector initialize(ObjectInspector[] arguments) //初始化操作,在函数进行初始化的时候会执行,其他时间不执行Object evaluate(DeferredObject[] arguments) //进行业务计算逻辑,处理具体的数据String getDisplayString(String[] children)//进行函数描述结果的显示,只有当函数执行一场才会显示其余的函数:
configure(MapredContext context) //在函数初始化之前,进行设置mapContext
package main.java.com.demo;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.Date;
public class WordUDF extends GenericUDF {
private static int mapTasks = 0;
private static String init = "";
private transient ArrayList ret = new ArrayList();
@Override
public void configure(MapredContext context) {
System.out.println(new Date() + "configure mapredContext");
if (null != context) {
//从jobConf中获取map数
mapTasks = context.getJobConf().getNumMapTasks();
}
System.out.println(new Date() + "######## mapTasks [" + mapTasks + "] ..");
}
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
System.out.println("1. init start:udfName" + this.getUdfName() + new Date());
//初始化文件系统,可以在这里初始化读取文件等
init = "init";
//定义函数的返回类型为java的List
ObjectInspector returnOI = PrimitiveObjectInspectorFactory
.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);
return ObjectInspectorFactory.getStandardListObjectInspector(returnOI);
}
@Override
public Object evaluate(DeferredObject[] args) throws HiveException {
System.out.println("2. deal with the data process " + new Date());
ret.clear();
if(args.length < 1) return ret;
//获取第一个参数
String str = args[0].get().toString();
String[] s = str.split(",",-1);
for(String word : s) {
ret.add(word);
}
return ret;
}
@Override
public String getDisplayString(String[] strings) {
return "Usage: Lxw1234GenericUDF(String str)";
}
public static void main(String[] args) {
}
}
UDTF
package main.java.com.demo;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
public class MyUDTF extends GenericUDTF {
private ArrayList outList = new ArrayList<>();
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//1.定义输出数据的列名和类型
List fieldNames = new ArrayList<>();
List fieldOIs = new ArrayList<>();
//2.添加输出数据的列名和类型
fieldNames.add("lineToWord");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
//1.获取原始数据
String arg = args[0].toString();
//2.获取数据传入的第二个参数,此处为分隔符
String splitKey = args[1].toString();
//3.将原始数据按照传入的分隔符进行切分
String[] fields = arg.split(splitKey);
//4.遍历切分后的结果,并写出
for (String field : fields) {
//集合为复用的,首先清空集合
outList.clear();
//将每一个单词添加至集合
outList.add(field);
//将集合内容写出
forward(outList);
}
}
@Override
public void close() throws HiveException {
}
}
UDAF
UDAF已经失效 需要去实现 implement
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2 或者 extend org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver能看到实际上
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver也是实现的org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2
那我们直接进行继承父类 AbstractGenericUDAFResolver (自己可做选择)
要先了解UDAF的四个阶段,定义在GenericUDAFevaluator的Mode枚举中:
COMPLETE:如果mapreduce只有map而没有reduce,就会进入这个阶段;
PARTIAL1:正常mapreduce的map阶段;
PARTIAL2:正常mapreduce的combiner阶段;
FINAL:正常mapreduce的reduce阶段;



