栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hive-编写UDTF函数一进多出(详细教程~~~)

Hive-编写UDTF函数一进多出(详细教程~~~)

创建项目的话,和之前写UDF函数的流程是一样的,如果不懂的,看这篇文章:
HIVE-编写UDF函数

在包udf中再创建一个MyUDTF类,继承UDTF函数,实现接口:

package com.atguigu.udf;

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

public class MyUDTF extends GenericUDTF {
    public void process(Object[] args) throws HiveException {
        
    }

    public void close() throws HiveException {

    }
}

按提示所实现的方法有两个,其实不够,还要把初始化方法给他加上。

package com.atguigu.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;


public class MyUDTF extends GenericUDTF {
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        return super.initialize(argOIs);
    }

    //处理输入数据
    public void process(Object[] args) throws HiveException {

    }
    //收尾方法
    public void close() throws HiveException {

    }
}

完成的业务代码:

package com.atguigu.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

import java.util.ArrayList;
import java.util.List;


public class MyUDTF extends GenericUDTF {

    //输出数据的集合
    private  ArrayList outPutList= new ArrayList();

    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //输出数据的默认别名,可以被别名覆盖
        List fieldNames=new ArrayList();
        fieldNames.add("word");

        //输出数据的类型
        List fieldOIs = new ArrayList();

        //最终返回值
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    //处理输入数据:hello,atguigu,hive
    public void process(Object[] args) throws HiveException {
        //1.取出输入数据
        String input=args[0].toString();

        //2.按照","分割字符串
        String[] words = input.split(",");

        //3.遍历数据写出
        for (String word : words) {
            //清空集合
            outPutList.clear();

            //将数据放入集合
            outPutList.add(word);

            //输出数据
            forward(outPutList);
        }
    }
    //收尾方法
    public void close() throws HiveException {

    }
}

进行打包,重新拖入到hive的lib目录下:
添加到类路径:

hive (default)> add jar /opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar
              > ;
Added [/opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar] to class path
Added resources: [/opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar]

创建函数:

hive (default)> create temporary function myudtf as "com.atguigu.udf.MyUDTF"; 
OK
Time taken: 0.733 seconds

注意:如果出现问题的话,怎么试都没法成功,就去重新启动hive
input表的数据:

hello,spark
hello,hive
hello,zhoujielun,linjunjie,dengziqi
hello,hadoop,mapreduce,yarn,common

创建表:

hive (default)>  create table input(words string) ;

加载数据进去:

load data local inpath '/opt/module/datas/input.txt'
into table input;

查看数据:

hive (default)> select * from input;
OK
input.words
hello,spark
hello,hive
hello,zhoujielun,linjunjie,dengziqi
hello,hadoop,mapreduce,yarn,common
Time taken: 1.386 seconds, Fetched: 4 row(s)

使用UDTF函数:

hive (default)> select my_udtf(words) from input;
OK
word
hello
spark
hello
hive
hello
zhoujielun
linjunjie
dengziqi
hello
hadoop
mapreduce
yarn
common
Time taken: 3.788 seconds, Fetched: 13 row(s)

实现了分割逗号,一进多出的效果。
上面这个UDTF函数只能够分割逗号,没法分割其他符合,如果想实现根据我传入的符号进行分割,可以对上面的代码坐下 修改:

package com.atguigu.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;


public class MyUDTF extends GenericUDTF {

    //输出数据的集合
    private  ArrayList outPutList= new ArrayList();

    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //输出数据的默认别名,可以被别名覆盖
        List fieldNames=new ArrayList();
        fieldNames.add("word");

        //输出数据的类型
        List fieldOIs = new ArrayList();
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        //最终返回值
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    //处理输入数据:hello,atguigu,hive
    public void process(Object[] args) throws HiveException {
        //1.取出输入数据
        String input=args[0].toString();
        String splitCharater = args[1].toString();
        //2.按照","分割字符串
        String[] words = input.split(splitCharater);

        //3.遍历数据写出
        for (String word : words) {
            //清空集合
            outPutList.clear();

            //将数据放入集合
            outPutList.add(word);

            //输出数据
            forward(outPutList);
        }
    }
    //收尾方法
    public void close() throws HiveException {

    }
}

再重新测试:

hive (default)> select my_udtf(words,',') from input;
OK
word
hello
spark
hello
hive
hello
zhoujielun
linjunjie
dengziqi
hello
hadoop
mapreduce
yarn
common
Time taken: 0.429 seconds, Fetched: 13 row(s)


hive (default)> select my_udtf(words,'hello') from input;
OK
word

,spark

,hive

,zhoujielun,linjunjie,dengziqi

,hadoop,mapreduce,yarn,common
Time taken: 0.539 seconds, Fetched: 8 row(s)

实现可以根据自己输入的分隔符来进行分割。

上面是通过UDTF实现分割的功能,接下来通过Hive来实现wordCount的功能,数据的话还是采用input表里面的数据:
第一步,对其进行炸裂:

 select explode(split(words,',')) word  from input;

这个语句和上面UDTF所实现的功能是一样的,结果如下:

实现完这个上面的结果之后,就要炸裂出来的结果进行分组,将单词相同的分成一组,所以对explode炸裂出来的结果取个别名, 对这个别名进行group by分组, 再count(*),就求出每个单词的个数:

select word,count(*) 
from  
(select explode(split(words,',')) word  from input)t1
group by word;

结果如下:

word	_c1
common	1
dengziqi	1
hadoop	1
hello	4
hive	1
linjunjie	1
mapreduce	1
spark	1
yarn	1
zhoujielun	1
Time taken: 36.758 seconds, Fetched: 10 row(s)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/335092.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号