之前的脚本从hive 迁移至spark-hive执行
由于业务需求需要使用UDTF处理数据
公司spark版本为2.4
udtf代码如下:
package com.sankuai;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class UserInPairsUdtf extends GenericUDTF {
private List colName = Lists.newlinkedList();
private List resType = Lists.newlinkedList();
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
colName.add("fieldName1");
colName.add("fieldName2");
colName.add("fieldName3");
resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
// 返回分别为列名 和 列类型
return ObjectInspectorFactory.getStandardStructObjectInspector(colName, resType);
}
@Override
public void process(Object[] objects) throws HiveException {
.................................................................
}
}
}
@Override
public void close() throws HiveException {
}
}
将此代码打包到HDFS,并且已经制作function成功
执行sql报错:
Please make sure your function overrides `public StructObjectInspector initialize(ObjectInspector[] args)`.; line 6 pos 0
执行引擎为hive时,没有有报错
仔细研究发现
GenericUDTF中有两个initialize方法:
spark中用的已经过时的initialize(ObjectInspector[] argOIs),且如调用该方法会直接抛异常,所以必须实现initialize(StructObjectInspector argOIs),因此,我的方法并不会被调用。
将代码修改如下:
package com.sankuai;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class UserInPairsUdtf extends GenericUDTF {
private List colName = Lists.newlinkedList();
private List resType = Lists.newlinkedList();
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
colName.add("fieldName1");
colName.add("fieldName2");
colName.add("fieldName3");
resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
// 返回分别为列名 和 列类型
return ObjectInspectorFactory.getStandardStructObjectInspector(colName, resType);
}
@Override
public void process(Object[] objects) throws HiveException {
.................................................................
}
}
}
@Override
public void close() throws HiveException {
}
}



