栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink Table & SQL 行列转换

Flink Table & SQL 行列转换

总结Flink Table & SQL 中,如何将一个数组(Array)或对象(Map)展开。

测试数据

用datagen连接器构造测试数据。DDL如下:

CREATE TABLE source_table (
    userId INT,
    eventTime as '2021-10-01 08:08:08',
    eventType as 'click',
    productId INT,
    -- 数组(Array)类型
    productImages as ARRAY['image1','image2'],
    -- 对象(Map)类型
    pageInfo as MAP['pageId','1','pageName','yyds']
) WITH (
    'connector' = 'datagen',
    'number-of-rows' = '2',
    'fields.userId.kind' = 'random',
    'fields.userId.min' = '2',
    'fields.userId.max' = '2',
    'fields.productId.kind' = 'sequence',
    'fields.productId.start' = '1',
    'fields.productId.end' = '2'
);

测试数据如下:

userId      eventTime      			eventType      productId      productImages     	 	pageInfo
  2       2021-10-01 08:08:08     click           1          [image1, image2] 	{pageId=1, pageName=yyds}
  2       2021-10-01 08:08:08     click           2          [image1, image2] 	{pageId=1, pageName=yyds}   
展开Array 1. 单列多行

将数组展开为单列多行。

基于UNNEST
SELECt userId,eventTime,eventType,productId,productImage 
FROM source_table, UNNEST(productImages) as t(productImage);
基于UDTF
package com.bigdata.flink.sql.udtf;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;

@FunctionHint(
        input = @DataTypeHint("ARRAY"),
        output = @DataTypeHint("String"))
public class ExpandArrayOneColumnMultRowUDTF extends TableFunction {
    public void eval(String... productImages) {
        for (String productImage : productImages) {
            collect(productImage);
        }
    }
}

// SQL
SELECt userId, eventTime, eventType, productId, productImage FROM source_table
  , LATERAL TABLE (ExpandArrayOneColumnMultRowUDTF(`productImages`)) AS t(productImage);
结果示例
userId      eventTime          	eventType      productId      productImage
2       2021-10-01 08:08:08       click           1              image1
2       2021-10-01 08:08:08       click           1              image2
2       2021-10-01 08:08:08       click           2              image1
2       2021-10-01 08:08:08       click           2              image2
展开Map 1. 多列单行

将Map展开为多列单行。

基于UNNEST
// 默认,用这种方式即可展开Map
SELECt userId,eventTime,eventType,productId,pageInfo['pageId'] as pageId,pageInfo['pageName'] as pageName 
FROM source_table;
基于UDTF
package com.bigdata.flink.sql.udtf;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.Map;

@FunctionHint(
        input = @DataTypeHint("MAP"),
        output = @DataTypeHint("ROW"))
public class ExpandMapMultColumnOneRowUDTF extends TableFunction {
    public void eval(Map pageInfo) {
        // 原来的一行,对应输出一行
        collect(Row.of(pageInfo.get("pageId"), pageInfo.get("pageName")));
    }
}

// SQL使用
SELECt userId,eventTime,eventType,productId,pageId,pageName FROM source_table
  , LATERAL TABLE (ExpandMapMultColumnOneRowUDTF(`pageInfo`)) AS t(pageId,pageName)
结果示例
userId      eventTime      			eventType      productId      pageId      pageName
2       2021-10-01 08:08:08       click           1             1            yyds
2       2021-10-01 08:08:08       click           2             1            yyds
2. 多列多行

将Map展开为多列多行。

基于UNNEST
SELECt userId,eventTime,eventType,productId,mapKey,mapValue 
FROM source_table, UNNEST(pageInfo) as t(mapKey,mapValue);
基于UDTF
package com.bigdata.flink.sql.udtf;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.Map;

@FunctionHint(
        input = @DataTypeHint("MAP"),
        output = @DataTypeHint("ROW"))
public class ExpandMapMultColumnMultRowUDTF extends TableFunction {
    public void eval(Map pageInfo) {
        for (Map.Entry entry : pageInfo.entrySet()) {
            // 原来的一行,每个Key都输出一行
            collect(Row.of(entry.getKey(), entry.getValue()));
        }
    }
}

// SQL使用
SELECt userId,eventTime,eventType,productId,mapKey,mapValue FROM source_table
  , LATERAL TABLE (ExpandMapMultColumnMultRowUDTF(`pageInfo`)) AS t(mapKey,mapValue)
结果示例
userId      eventTime      			eventType      productId      	mapKey      	  mapValue
2       2021-10-01 08:08:08       click           1              pageId            1
2       2021-10-01 08:08:08       click           1              pageName         yyds
2       2021-10-01 08:08:08       click           2              pageId            1
2       2021-10-01 08:08:08       click           2              pageName         yyds
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663635.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号