在TDH集群环境获取jar包目录下dict.json文件,调用udf函数getDictName( ), 传入两个入参,分别是一级编码和二级编码(srcCode及srcSbuCode),最终返回值为"字典名称"(dictName)。
比如函数入参传入‘1001’和‘0’,返回值为“董事长”,入参传入‘1001’和‘3’,返回值为“财务负责人”。
对于解析字典的需求,现有做法是在大数据集群hdfs指定路径上传xml文件,通过调用udf函数,在udf函数逻辑中解析xml文件,获取字典名称。
该做法可以正常实现要求,但是也存在两个劣势:1) 需要在hdfs指定目录上传xml文件,否则无法读取到文件;2) 函数入参中除传入两个入参外,同时要求传入xml文件路径。如果路径错误或者没有传入,就存在执行失败或者无法解析出正确字典名称的问题。
2.2 优化方案针对现有的解决方案,如何避免以上两个问题?是否存在优化方案,可以做到不上传xml文件到hdfs目录,同时函数只传入两个入参(srcCode及srcSbuCode),在满足以上前提下,可以正常获取返回值dictName。
需求已经明确了,接下来就是考虑如何实现需求。通过分析发现,可以通过以下步骤实现:
1)编写json文件,文件名称为“dict.json”(该文件需要满足json文件格式),上传至java工程项目中resources目录下;
2)在udf函数getDictName( )代码逻辑中编写解析json文件逻辑;
3)项目打包后上传jar包至集群指定的自定义函数目录(非hdfs路径);
4)在spark on yarn 模式下,通过sql调用udf函数getDictName( srcCode,srcSbuCode),传入两个入参后保证在集群环境下可以获取返回值dictName。
附:json文件格式
{
"1001":{
"0":"董事长",
"1":"控制人",
"2":"主要负责人",
"3":"财务负责人",
"4":"监事长"
},
"1002":{
"0":"证件地址",
"1":"联系地址",
"2":"家庭地址",
"3":"单位地址",
"4":"户籍地址",
"5":"出生地址"
},
......
}
3.2 编写udf函数
3.2.1 编写函数工具类
package com.ccsg.udf.utils;
import java.io.*;
public class FileUtil {
public static String readFile(InputStream inputStream) throws IOException {
Reader reader = new InputStreamReader(inputStream, "utf-8");
int ch = 0;
StringBuffer sb = new StringBuffer();
while ((ch = reader.read()) != -1) {
sb.append((char) ch);
}
reader.close();
return sb.toString();
}
}
3.2.2 编写函数核心逻辑
package com.ccsg.udf.function.udf; import com.alibaba.fastjson.JSON; import com.ccsg.udf.utils.FileUtil; import org.apache.commons.lang.StringUtils; import org.apache.spark.sql.api.java.UDF2; import java.io.InputStream; import java.util.Map; public class GetDictName implements UDF2{ private static final String DEFAULT_DICT_NAME = " "; public String call(String srcCode, String srcSubCode) throws Exception { // 获取文件所在包路径,调用工具类FileUtil中方法readFile()读取json文件 InputStream inputStream = UDFGetDictName.class.getClassLoader().getResourceAsStream("dict.json"); String jsonStr = FileUtil.readFile(inputStream); // 定义返回结果 String dictName = null; // 数据结构为嵌套Map结构: Map > if (StringUtils.isNotBlank(jsonStr) && StringUtils.isNotBlank(srcCode) && StringUtils.isNotBlank(srcSubCode)) { // 解析json字符串,获取最终返回结果dictName Map jsonMap = (Map) JSON.parse(jsonStr); Map destMap = (Map ) jsonMap.get(StringUtils.trim(srcCode)); if (null != jsonMap && null != destMap && jsonMap.containsKey(StringUtils.trim(srcCode)) && destMap.containsKey(StringUtils.trim(srcSubCode))) { dictName = destMap.get(StringUtils.trim(srcSubCode)); return dictName; } else { return DEFAULT_DICT_NAME; } } return DEFAULT_DICT_NAME; } }
说明:
1)建议类中获取文件所在包路径方式:类名.class.getClassLoader().getResourceAsStream(“dict.json”);该方式支持本地和集群环境下正常找到json文件所在目录;
2)调用Json转map方式进行逻辑处理: Map
3)已验证以下两种方式支持在本地环境找到文件所在目录,不支持集群环境,所以不作为优先考虑方式
方式1:String fileRelativePath = Thread.currentThread().getContextClassLoader().getResource(“dict.json”).getFile();
方式2:UDFGetDictName.class.getResource("/") + “dict.json”;
4)充分考虑数据场景,考虑代码健壮性,对可能传入错误参数 或者传入空值 进行优化处理,避免运行时报空指针异常;
5)以上udf函数编写使用spark中自定义函数框架,通过实现UFD2接口,重写call()方式处理,两个入参及返回值都为String类型。
public static void main(String[] args) throws Exception {
GetDictName gdn = new GetDictName();
// 1 正确场景:" 1001", "0"
String result1 = gdn.call("1001", "0");
// 2 入参为空场景
String result2 = gdn.call(" ", " ");
String result3 = gdn.call("", " ");
String result4 = gdn.call(null, " ");
String result5 = gdn.call("1001", " ");
// 3 错误数据 "1100", "18" / "1100", "1" / "1001", "7"
String result6 = gdn.call("1100", "18");
String result7 = gdn.call("1100", "1");
String result8 = gdn.call("1001", "7");
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println(result4);
System.out.println(result5);
System.out.println(result6);
System.out.println(result7);
System.out.println(result8);
}
返回结果:
只有result1存在返回值“董事长”,其他都返回默认值(空格)
3.3 打包注意事项 3.3.1 目录结构图
说明:在resources目录下上传指定Json文件,如idea中该目录不存在则新建即可。
3.3.3 执行maven install效果src/main/resources ***.properties **/*.json true
说明:可以看到打包后json文件在根目录下
1)上传jar包到集群指定自定义函数目录
2)sql中调用函数
select t.id_kind, GetDictName('1001',t.id_kind) as dict_name from person t where t.id_kind in ('0','1','2');
返回值:
id_kind dict_name 0 董事长 1 控制人 2 主要负责人
说明:项目中使用类名作为函数名称
4 回顾总结1)充分考虑需求,结合Json文件格式和业务逻辑,合理设计实优化方案;
2)考虑使用udf函数,保证集群环境下找到Json文件目录并读取内容,重点验证获取路径方式,即优先考虑方式:类名.class.getClassLoader().getResourceAsStream(“dict.json”);
3)实现业务逻辑时充分考虑各种数据场景,增强代码健壮性;
4)本需求涉及IO流、反射、udf函数专业知识,需要对java及大数据基础有深入了解。



