栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink-bug:org.apache.flink.table.api.TableException: uv is not found in visitTime, EXPR$0

Flink-bug:org.apache.flink.table.api.TableException: uv is not found in visitTime, EXPR$0

代码块
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStreamSource input = env.fromElements(
                new PageVisit("2017-09-16 09:00:00", 1001, "/page1"),
                new PageVisit("2017-09-16 09:00:00", 1001, "/page2"),

                new PageVisit("2017-09-16 09:04:00", 1001, "/page1"),
                new PageVisit("2017-09-16 09:05:00", 1001, "/page2"),

                new PageVisit("2017-09-16 09:05:00", 1002, "/page2"),
                new PageVisit("2017-09-16 09:05:00", 1002, "/page1"),

                new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
                new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
                new PageVisit("2017-09-16 10:30:00", 1005, "/page2"));
       Table table = tEnv.fromDataStream(input);
         Table select = table.groupBy($("visitTime"))
                .select($("visitTime"),
                        $("userId").count().distinct().as("uv")
                       , $("userId").isEqual(1005).count().as("suv")
                )//返回添加字段名称
                 //.as("visitTime","uv","suv")
         ;
        // 使用 Row.class 不需要指明返回字段名称
        // 使用 Result.class 需要重新命名字段名称
//      DataStream> dataStream = tEnv.toRetractStream(select, Result.class);
        DataStream> dataStream = tEnv.toRetractStream(select, Row.class);
        dataStream.print();
异常
Exception in thread "main" org.apache.flink.table.api.TableException: suv is not found in visitTime, EXPR$0, EXPR$1
	at org.apache.flink.table.planner.codegen.SinkCodeGenerator$$anonfun$1.apply(SinkCodeGenerator.scala:82)
	at org.apache.flink.table.planner.codegen.SinkCodeGenerator$$anonfun$1.apply(SinkCodeGenerator.scala:79)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.generateRowConverterOperator(SinkCodeGenerator.scala:79)
	at org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator(SinkCodeGenerator.scala)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:190)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:141)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodebase.translateToPlan(ExecNodebase.java:134)
	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
	at org.apache.flink.table.planner.delegation.Plannerbase.translate(Plannerbase.scala:165)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:528)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:517)
	at com.xiaolin.stream.RetractStreamApp.main(RetractStreamApp.java:64)
原因
Table select = table.groupBy($("visitTime"))
         .select($("visitTime"),
                 $("userId").count().distinct().as("uv")
               , $("userId").isEqual(1005).count().as("suv")
         )//返回添加字段名称,指明返回字段名称 ROW<`visitTime` STRING, `uv` BIGINT NOT NULL, `suv` BIGINT NOT NULL>
                 .as("visitTime","uv","suv")
 // 不添加的话 在 tEnv.toRetractStream(select, Result.class); 解析select与Result 对应时将找不到
 //org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink#translateToTransformation
 ROW<`visitTime` STRING, `EXPR$0` BIGINT NOT NULL, `EXPR$1` BIGINT NOT NULL>
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753613.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号