- 0.编程思想得到升华
- 0. hive中用到的对象
- 1.UDF
- 2.UDAF
- 2.UDTF
- 3.UDTF升级版
- 4.创建临时函数
- 5.创建永久函数
0. hive中用到的对象
- 当写这个框架的自定义类时,大都要继承框架的某些类
- 框架中的对象基本都是再次包装过的,基本不能之间 new 出来 ,可以考虑在对象名后面加上factory(工厂)
- 当学习新的知识时,看源码是一个非常好的方式,虽然现在还是看得有点懵逼,不过可以先理解大致流程.
- 实现类时,编写简单或自己用的函数,自需要写官方规定的方法即可。一般都是抽象方法会有提示.
1.UDF
- Cloneable: 可克隆的 是一个接口
- ObjectInspector: obj对象鉴别器 是一个接口 继承与 Cloneable
- ObjectInspectorFactory: obj对象鉴别器工厂 是一个final class 最终类 用于生产对象
- PrimitiveObjectInspectorFactory: 基本类型 obj对象鉴别器工厂 是一个final class 最终类 与 java类型对接
- StructObjectInspector : 结构体obj对象鉴别器 是一个实现类 实现了 ObjectInspector 接口 是一个抽象类 干啥的没解析注释看不懂!!!
一进一出
package review.myudf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class GetLengthStr extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 1.判断参数个数
if (arguments.length != 1 || arguments == null){
// 抛出异常
throw new UDFArgumentLengthException("参数数量只能是一个");
}
// 2.判断参数类型 要求基本数据类型
if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentTypeException(0, "参数类型异常");
}
// 3.返回该对象数据类型
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// 1.先获取参数
Object str = arguments[0].get();
// 2.考虑null值的特殊型 因为 hive中 有null 不是这个"null" 这是字符串
if (str == null){
return 0;
}
// 3.获取长度
int length = str.toString().length();
// 4.返回结果
return length;
}
@Override
public String getDisplayString(String[] children) {
return "";
}
}
对比源码:
public class GenericUDFAbs extends GenericUDF {
private transient PrimitiveCategory inputType;
private final DoubleWritable resultDouble = new DoubleWritable();
private final LongWritable resultLong = new LongWritable();
private final IntWritable resultInt = new IntWritable();
private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable();
private transient PrimitiveObjectInspector argumentOI;
private transient Converter inputConverter;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
if (arguments.length != 1) {
throw new UDFArgumentLengthException(
"ABS() requires 1 argument, got " + arguments.length);
}
if (arguments[0].getCategory() != Category.PRIMITIVE) {
throw new UDFArgumentException(
"ABS only takes primitive types, got " + arguments[0].getTypeName());
}
argumentOI = (PrimitiveObjectInspector) arguments[0];
inputType = argumentOI.getPrimitiveCategory();
ObjectInspector outputOI = null;
switch (inputType) {
case SHORT:
case BYTE:
case INT:
inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
PrimitiveObjectInspectorFactory.writableIntObjectInspector);
outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
break;
case LONG:
inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
PrimitiveObjectInspectorFactory.writableLongObjectInspector);
outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
break;
case FLOAT:
case STRING:
case DOUBLE:
inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
break;
case DECIMAL:
outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
((PrimitiveObjectInspector) arguments[0]).getTypeInfo());
inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
outputOI);
break;
default:
throw new UDFArgumentException(
"ABS only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
}
return outputOI;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
Object valObject = arguments[0].get();
if (valObject == null) {
return null;
}
switch (inputType) {
case SHORT:
case BYTE:
case INT:
valObject = inputConverter.convert(valObject);
resultInt.set(Math.abs(((IntWritable) valObject).get()));
return resultInt;
case LONG:
valObject = inputConverter.convert(valObject);
resultLong.set(Math.abs(((LongWritable) valObject).get()));
return resultLong;
case FLOAT:
case STRING:
case DOUBLE:
valObject = inputConverter.convert(valObject);
if (valObject == null) {
return null;
}
resultDouble.set(Math.abs(((DoubleWritable) valObject).get()));
return resultDouble;
case DECIMAL:
HiveDecimalObjectInspector decimalOI =
(HiveDecimalObjectInspector) argumentOI;
HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject);
if (val != null) {
resultDecimal.set(val);
resultDecimal.mutateAbs();
val = resultDecimal;
}
return val;
default:
throw new UDFArgumentException(
"ABS only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
}
}
@Override
public String getDisplayString(String[] children) {
return getStandardDisplayString("abs", children);
}
}
2.UDAF
- 多进一出
- 不知咋的,实现该类时,显示已经过时就没有深入了
- 老师提都没提到
- 找了几个继承类都过时了 所以用UDTF 实现了一下
- 但对字段会显示多行
- 应该是因为每行数据都会 调用一次 process()方法
- 不过 select(1,2,3,4,5) 这种可以实现效果
public class MySum extends GenericUDTF {
private int sum = 0;
private ArrayList outRow = new ArrayList<>();
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1.判断参数个数
List extends StructField> structFieldRefs = argOIs.getAllStructFieldRefs();
if (structFieldRefs.size() < 1){
throw new UDFArgumentLengthException("参数个数至少为一个");
}
// 2.判断参数类型 这里Category类没有int类型
for (int i = 0; i < structFieldRefs.size(); i++) {
StructField structField = structFieldRefs.get(i);
if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentTypeException(i,"参数类型不匹配");
}
}
ArrayList fieldNames = new ArrayList<>();
ArrayList fieldOIs = new ArrayList();
fieldNames.add("sum");
// int 型 这个应该是 输出时 是 int
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
for (Object arg : args) {
sum += Integer.parseInt(arg.toString());
}
// 因为只有一行数据 所以也没必要情况 list
outRow.add(sum);
forward(outRow);
}
@Override
public void close() throws HiveException {
}
}
2.UDTF
1.多进多出
public class MySplitWords extends GenericUDTF {
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1.判断参数个数
List extends StructField> structFieldRefs = argOIs.getAllStructFieldRefs();
if (structFieldRefs.size() != 2){
throw new UDFArgumentLengthException("参数个数自能是两个 (字符串,切割字符)");
}
// 2.判断参数类型
for (int i = 0; i < structFieldRefs.size(); i++) {
StructField structField = structFieldRefs.get(i);
if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentTypeException(i,"参数类型不匹配");
}
}
// 3.返回对象
// 因为是多出 可能是多列
// 所以需要用list集合来封装
// fileNames 列名
// fieldOIS 列类型
ArrayList fieldNames = new ArrayList<>();
ArrayList fieldOIs = new ArrayList();
// 这里是单列多行所以不用循环
fieldNames.add("word");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
// 获取参数
String words = args[0].toString();
String split = args[1].toString();
// 处理
String[] splitWords = words.split(split);
// 写出
for (String word : splitWords) {
// 因为forward()方法要求用数组写出
Object[] obj = new Object[1];
obj[0] = word;
forward(obj);
}
}
@Override
public void close() throws HiveException {
}
}
对比源码:(只截取了必要的方法)
public class GenericUDTFGetSplits extends GenericUDTF {
private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits.class);
protected transient StringObjectInspector stringOI;
protected transient IntObjectInspector intOI;
protected transient JobConf jc;
private boolean orderByQuery;
private boolean forceSingleSplit;
private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
private DataOutput dos = new DataOutputStream(bos);
@Override
public StructObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
LOG.debug("initializing GenericUDFGetSplits");
if (SessionState.get() == null || SessionState.get().getConf() == null) {
throw new IllegalStateException("Cannot run get splits outside HS2");
}
LOG.debug("Initialized conf, jc and metastore connection");
if (arguments.length != 2) {
throw new UDFArgumentLengthException(
"The function GET_SPLITS accepts 2 arguments.");
} else if (!(arguments[0] instanceof StringObjectInspector)) {
LOG.error("Got " + arguments[0].getTypeName() + " instead of string.");
throw new UDFArgumentTypeException(0, """
+ "string" is expected at function GET_SPLITS, " + "but ""
+ arguments[0].getTypeName() + "" is found");
} else if (!(arguments[1] instanceof IntObjectInspector)) {
LOG.error("Got " + arguments[1].getTypeName() + " instead of int.");
throw new UDFArgumentTypeException(1, """
+ "int" is expected at function GET_SPLITS, " + "but ""
+ arguments[1].getTypeName() + "" is found");
}
stringOI = (StringObjectInspector) arguments[0];
intOI = (IntObjectInspector) arguments[1];
List names = Arrays.asList("split");
List fieldOIs = Arrays
. asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
StructObjectInspector outputOI = ObjectInspectorFactory
.getStandardStructObjectInspector(names, fieldOIs);
LOG.debug("done initializing GenericUDFGetSplits");
return outputOI;
}
@Override
public void process(Object[] arguments) throws HiveException {
String query = stringOI.getPrimitiveJavaObject(arguments[0]);
int num = intOI.get(arguments[1]);
// Generate applicationId for the LLAP splits
LlapCoordinator coordinator = LlapCoordinator.getInstance();
if (coordinator == null) {
throw new HiveException("LLAP coordinator is not initialized; must be running in HS2 with "
+ ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled");
}
ApplicationId applicationId = coordinator.createExtClientAppId();
LOG.info("Generated appID {} for LLAP splits", applicationId.toString());
PlanFragment fragment = createPlanFragment(query, num, applicationId);
TezWork tezWork = fragment.work;
Schema schema = fragment.schema;
boolean generateSingleSplit = forceSingleSplit && orderByQuery;
try {
InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, generateSingleSplit);
LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, query,
orderByQuery, forceSingleSplit);
if (generateSingleSplit && splits.length > 1) {
throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + query);
}
for (InputSplit s : splits) {
Object[] os = new Object[1];
bos.reset();
s.write(dos);
byte[] frozen = bos.toByteArray();
os[0] = frozen;
forward(os);
}
} catch (Exception e) {
throw new HiveException(e);
}
}
@Override
public void close() throws IOException {
try {
LOG.info("DriverCleanup for LLAP splits: {}", applicationId);
driver.releaseLocksAndCommitOrRollback(true);
driver.close();
driver.destroy();
txnManager.closeTxnManager();
} catch (Exception err) {
LOG.error("Error closing driver resources", err);
throw new IOException(err);
}
}
3.UDTF升级版
1.这个函数实现了多行多列
public class MySplitWordsUP extends GenericUDTF {
private ArrayList outs = new ArrayList<>(); // 用于存储一行数据
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1.判断参数个数
List extends StructField> fieldRefs = argOIs.getAllStructFieldRefs();
if (fieldRefs.size() != 3){
throw new UDFArgumentLengthException("参数个数为3 (string,'分割符','分隔符')");
}
// 2.判断参数类型
for (int i = 0; i < fieldRefs.size(); i++) {
StructField structField = fieldRefs.get(i);
if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentTypeException(i,"参数类型不匹配,只接受基本数据类型");
}
}
// 3.返回对象
// structFiledNames 指定列名
// structFieldObjectInspector 指定列类型
List names = new ArrayList<>();
List type = new ArrayList<>();
// 这个函数有两个列
names.add("a1");
names.add("a2");
type.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
type.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
StandardStructObjectInspector standardStructObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(names, type);
return standardStructObjectInspector;
}
@Override
public void process(Object[] args) throws HiveException {
// 获取参数
String str = args[0].toString();
String split1 = args[1].toString(); // 切成多行
String split2 = args[2].toString(); // 切成多列
// 处理参数
String[] rows = str.split(split1);
for (String row : rows) {
String[] cols = row.split(split2);
outs.clear();
for (String col : cols) {
outs.add(col);
}
forward(outs); // 输出一行数据
}
}
@Override
public void close() throws HiveException {
}
}
如上
4.创建临时函数
- 进入hive客户端
- add jar jar包路径;
- create temporary function my_len as “主类名引用路径”;
注:注:这种情况是临时的重新连接就是消失
5.创建永久函数
- 先把 jar 放到 hive 的lib目录
cp jar包路劲 lib目录路劲- 启动hive
create function func_name as “主类引用路劲”;



