栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

解决Flink报错Exception in thread “main“ org.apache.flink.api.common.functions.InvalidTypesException

解决Flink报错Exception in thread “main“ org.apache.flink.api.common.functions.InvalidTypesException

Bug描述

当写好WordCount程序,使用了Idea自带的显式代码自动转Lambda表达式时,就可能出现这种错误,例如:

package com.zhiyong.flinkStudy;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

@Slf4j
public class flinkWordCountDemo1 {
    public static void main(String[] args) throws Exception{
//        System.out.println("Java环境正常");

        String inputPath = "E:/study/flink/data/test1";


        //initLogRecord.initLog();
        //log.info("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");

        System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");

        // 获取Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Source读数据
        DataSource data = env.readTextFile(inputPath);

        // Trans运算
        AggregateOperator> result = data.flatMap((FlatMapFunction) (s, collector) -> {
            String[] split = s.trim().split("\s+");
            for (String cell : split) {
                collector.collect(cell);
            }

        }).map((MapFunction>) s -> Tuple2.of(s, 1)).groupBy(0).sum(1);

        // Sink写数据
        result.print();

        // 执行
        //env.execute("老版本print需要这一句");
    }
}

执行后报错:

Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(flinkWordCountDemo1.java:32)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:181)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:220)
	at com.zhiyong.flinkStudy.flinkWordCountDemo1.main(flinkWordCountDemo1.java:38)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
	at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:271)
	at com.zhiyong.flinkStudy.flinkWordCountDemo1.main(flinkWordCountDemo1.java:32)

Process finished with exit code 1

这种情况就是Idea好心办坏事了!!!Spark中可以随意让Idea自动转Lambda表达式【至少目前没遇到什么大问题】,Flink切记,不要随便转Lambda表达式!!!不要随便转Lambda表达式!!!不要随便转Lambda表达式!!!重要的话说三遍。

故障原因

使用了Lambda表达式,就需要框架能够自行推测出需要使用的数据类型。恰巧Flink框架目前的版本还不具备这样的能力,导致报错。

解决方式 方式一:取消Lambda表达式
package com.zhiyong.flinkStudy;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

@Slf4j
public class flinkWordCountDemo1 {
    public static void main(String[] args) throws Exception{

        String inputPath = "E:/study/flink/data/test1";

        System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");

        // 获取Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Source读数据
        DataSource data = env.readTextFile(inputPath);

        // Trans运算
        AggregateOperator> result = data.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String s, Collector collector) throws Exception {
                String[] split = s.trim().split("\s+");
                for (String cell : split) {
                    collector.collect(cell);
                }

            }
        }).map(new MapFunction>() {
            @Override
            public Tuple2 map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        }).groupBy(0).sum(1);

        // Sink写数据
        result.print();
    }
}

只需要将Lambda表达式显式写明入参与出参类型即可正常运行:

Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,2)
(123,3)
(好,2)
(宝宝,1)
(haha,1)
(宝贝,2)
(呵呵,4)
(数码宝贝,1)
(喜欢,2)
(哈哈,1)

Process finished with exit code 0

方式二:继承类中显式指定泛型

先实现FlatMapFunction:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

public class FlinkWordCountDemo2FlatMapFunction implements FlatMapFunction {
    @Override
    public void flatMap(String s, Collector collector) throws Exception {
        String[] split = s.trim().split("\s+");
        for (String cell : split) {
            collector.collect(cell);
        }
    }
}

接着实现MapFunction:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class FlinkWordCountDemo2MapFunction implements MapFunction> {

    @Override
    public Tuple2 map(String s) throws Exception {
        return Tuple2.of(s, 1);
    }
}

之后在主方法中调用:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;

public class FlinkWordCountDemo2 {
    public static void main(String[] args) throws Exception{
        String inputPath = "E:/study/flink/data/test1";
        System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");
        // 获取Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Source读数据
        DataSource data = env.readTextFile(inputPath);

        AggregateOperator result = data.flatMap(new FlinkWordCountDemo2FlatMapFunction()).map(new FlinkWordCountDemo2MapFunction())
                .groupBy(0).sum(1);

        result.print();
    }
}

即可看到正确结果:

Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,2)
(123,3)
(好,2)
(宝宝,1)
(haha,1)
(宝贝,2)
(呵呵,4)
(数码宝贝,1)
(喜欢,2)
(哈哈,1)

Process finished with exit code 0

如果实现类中没有标明泛型,例如偷个懒,把FlatMapFunction的继承类的泛型去掉:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

public class FlinkWordCountDemo2FlatMapFunction implements FlatMapFunction {
//    @Override
//    public void flatMap(String s, Collector collector) throws Exception {
//        String[] split = s.trim().split("\s+");
//        for (String cell : split) {
//            collector.collect(cell);
//        }
//    }

    @Override
    public void flatMap(Object o, Collector collector) throws Exception {
        String[] split = o.toString().trim().split("\s+");
        for (String cell : split) {
            collector.collect(cell);
        }
    }
}

再偷个懒,把MapFunction实现类的泛型去掉:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class FlinkWordCountDemo2MapFunction implements MapFunction {

//    @Override
//    public Tuple2 map(String s) throws Exception {
//        return Tuple2.of(s, 1);
//    }

    @Override
    public Object map(Object o) throws Exception {
        return Tuple2.of(o.toString(), 1);
    }
}

FlinkWordCountDemo2不变的情况下,执行依旧是会报相同的错:

Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkWordCountDemo2.java:17)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:181)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:220)
	at com.zhiyong.flinkStudy.FlinkWordCountDemo2.main(FlinkWordCountDemo2.java:17)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1384)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:1412)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1369)
	at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:811)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:575)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
	at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:271)
	... 1 more

Process finished with exit code 1

Flink对Lambda表达式的支持还有待提高,至少当下必须遵循这一规则。方式二的做法好处就是实现了计算逻辑的抽取,简化了主类的篇幅,提高了主类代码的可阅读性,同时也可以实现相同逻辑相同算法的复用。

特别说明

出现问题的版本号:Flink1.14.3。POM如下:



    
        study
        study.zhiyong
        1.0.0
    
    4.0.0

    flinkStudy
    pom

    





    
    
        
            aliyun
            http://maven.aliyun.com/nexus/content/groups/public/
        
        
            cloudera
            https://repository.cloudera.com/artifactory/cloudera-repos/
        
        
            apache.snapshots
            Apache Development Snapshot Repository
            https://repository.apache.org/content/repositories/snapshots/
        
    

    
        UTF-8
        8
        8
        1.14.3
        2.12
        2.12
    

    
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.2
                
                    1.8
                    1.8
                    UTF-8
                
            
        
    

    
        
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-runtime-web_${scala.binary.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-filesystems
            ${flink.version}
            pom
        

        
        
        
            org.slf4j
            slf4j-log4j12
            1.7.7

        
        
            log4j
            log4j
            1.2.17

        


        
            org.projectlombok
            lombok
            1.16.20
        
    

    

不知道以后的版本会不会解决这个问题。能像Scala版本的Spark那样够随意使用隐式转换和Lambda表达式才像是FP函数式编程,目前Java版本的Flink这种做法看起来总像是OPP面向过程的编程。。。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774015.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号