栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink on Zeppelin使用示例及填坑记录

Flink on Zeppelin使用示例及填坑记录

flink在zeppelin上的使用文档,参见flink interpreter

Flink on Zeppelin 基本概念 Flink on Zeppelin Architecture

左侧的Flink解释器实际上是一个Flink客户端,负责编译和管理Flink作业的生命周期,如提交、取消作业、监控作业进度等右侧的Flink集群是执行Flink作业的地方。支持如下模式:、

MiniCluster(本地模式)、Standalone cluster(远程模式)、Yarn会话集群(Yarn模式)Yarn应用程序会话集群(Yarn -application模式)。

在Flink解释器中有两个重要组件:Scala shell和Python shell:

Scala shell是Flink解释器的入口点,它会创建Flink程序的所有入口点,如ExecutionEnvironment、StreamExecutionEnvironment和TableEnvironment。Scala shell负责编译和运行Scala代码和sql。Python shell是PyFlink的入口,它负责编译和运行Python代码。使用可参考使用PyFlink, 如何在 zeppelin 里高效的开发 PyFlink Job?
Flink解释器组

在Zeppelin中,支持Apache Flink的是由下面列出的五个解释器组成的Flink解释器组。

NameClassDescription
%flinkFlinkInterpreterCreates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment
%flink.pyflinkPyFlinkInterpreterProvides a python environment
%flink.ipyflinkIPyFlinkInterpreterProvides an ipython environment
%flink.ssqlFlinkStreamSqlInterpreterProvides a stream sql environment
%flink.bsqlFlinkBatchSqlInterpreterProvides a batch sql environment
内置的变量

Scala是Flink在Zeppelin上的默认语言,Flink Interpreter (%flink) 自动创建下面 6 个变量作为 Flink Scala 程序的入口。

senv (StreamExecutionEnvironment),benv (ExecutionEnvironment)stenv (StreamTableEnvironment for blink planner)btenv (BatchTableEnvironment for blink planner)stenv_2 (StreamTableEnvironment for flink planner)btenv_2 (BatchTableEnvironment for flink planner)z (ZeppelinContext) Flink Interpreter的运行模式

Flink Interpreter在zeppelin中的三种运行模式,正式环境使用YARN模式

Local 模式: 在本地创建一个MiniCluster,适合做POC或者小数据量的试验,必须配置如下:

Flink_HOME /opt/flink/flink-1.13.2flink.execution.mode local默认情况下:local MiniCluster的port是8081, 有4个 TM with 1 slot,可根据需要调整,示例如下:rest.port=18001,local.number-taskmanager=2,flink.tm.slot=2

Remote 模式: 连接一个已经创建好的Flink集群,一般是Flink standalone集群

Flink_HOME /opt/flink/flink-1.13.2flink.execution.mode remoteflink.execution.remote.host 172.25.xx.xxflink.execution.remote.port 8081

YARN 模式: 在Yarn集群中创建Flink Cluster

Flink_HOME /opt/flink/flink-1.13.2flink.execution.mode yarnHADOOP_CONF_DIR /etc/hadoop/conf在内部,flink会调用命令hadoop classpath,并在flink解释器进程中加载所有hadoop相关的jar文件: export HADOOP_CLASSPATH=hadoop classpath``

Yarn Application Mode: 是Zeppelin服务器主机上的一个独立的Flink解释器进程,Flink解释器运行在yarn容器中的JobManager中。

flink.execution.mode yarn-applicationHADOOP_CONF_DIR /etc/hadoop/conf在内部,flink会调用命令hadoop classpath,并在flink解释器进程中加载所有hadoop相关的jar文件: export HADOOP_CLASSPATH=hadoop classpath``

如果连接超时,则需要配置如下参数
zeppelin.interpreter.connect.timeout 600000

代码示例 简单示例
%flink
val data=benv.fromElements("hello world","hello flink","hello hadoop")
val wc=data.flatMap(line=>line.split("\s"))
.map(w=>(w,1))
.groupBy(0)
.sum(1)
z.show(wc)

Batch ETL

基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 Batch ETL 任务

下载数据curl -O https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip

注意:手工删除bank.csv的第一行数据

%flink.bsql
DROp TABLE IF EXISTS bank;
CREATE TABLE bank (
    age INT,
    job STRING,
    marital STRING,
    education STRING,
    `default` STRING,
    balance STRING,
    housing STRING,
    loan STRING,
    contact STRING, 
    `day` STRING,
    `month` STRING,
    duration INT,
    campaign INT,
    pdays INT,
    privious INT,
    poutcome STRING,
    y STRING
) WITH (
'connector'='filesystem',
'path'='hdfs://172.25.xx.xx:8020/tmp/bank.csv',
'format'='csv',
'csv.field-delimiter'=';',
'csv.quote-character'='"',
'csv.ignore-parse-errors' = 'true'
);

select * from bank limit 10;
BI 数据分析

基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 BI 数据分析

%flink.bsql
select age,count(1) as total from bank
where age<${maxAge=38}
group by age order by age

%flink.bsql
select age,count(1) as total from bank
where marital='${marital=married,single|divorced|married}'
group by age order by age

使用 Flink UDF
%flink-yarn
class ScalaUpper extends ScalarFunction {
  def eval(str: String) = str.toUpperCase
} 
btenv.registerFunction("scala_upper", new ScalaUpper())

%flink-yarn.bsql
select scala_upper(education),count(1) from bank group by education

访问hive中的数据

为了在Flink中使用Hive,你必须做以下设置。

将zeppelin.flink.enableHive设置为true将zeppelin.flink.hive.version设置为您正在使用的hive版本,如3.1.0将HIVE_CONF_DIR设置为hive-site.xml所在的位置。确保hive metastore已经启动,并且在hive-site.xml中配置了hive.metastore.uris将以下依赖项复制到flink安装的lib文件夹中。(以HDP 3.1.5.0-152,Flink 1.13.2为例,需要的依赖包如下)

flink-sql-connector-hive-3.1.2_2.11。 注意:这是一个bundled hive jar,详见:flink hive

show tables;
describe t1;
select * from t1 limit 1;

异常处理 checkpoint is not supported for batch jobs

在执行批处理SQL时,报如下异常:checkpoint is not supported for batch jobs
将conf/flink-conf.yaml中的以下内容注释掉

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
#state.backend: rocksdb
#state.checkpoints.dir: hdfs://pci01:8020/flink/checkpoint/sqlclient
#state.savepoints.dir: hdfs://pci01:8020/flink/savepoint/sqlclient
#state.backend.incremental: true
#jobmanager.execution.failover-strategy: region
#execution.checkpointing.interval: 300000
NoSuchMethodError: com.google.common.base.Preconditions.checkArgument

flink SQL连接hive报错​​java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V​​,查询资料后发现就是guava版本冲突造成的。
hive-exec-3.1.1内置的guava是19.0版本的,而hadoop中的guava是28.0-jre版本的,flink内置的guava也有多个版本。彼此之间版本就冲突了。

解决方案参见:link SQL报错java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLj
将flink-sql-connector-hive-3.1.2_2.11 jar中的shade的guava包删除 或 重新编译前将guava依赖删除

ClassNotFoundException:org.apache.hadoop.mapred.JobConf

flink SQL连接hiver后,查询hive 表中的数据,报如下异常:
Caused by: java.lang.ClassNotFoundException: **org.apache.hadoop.mapred.JobConf** at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 52 more

注意:hadoop集群的所有节点已经配置了export HADOOP_CLASSPATH=hadoop classpath``但没有作用。
需要将如下jar包从hadoop集群复制到flink lib目录下:

参考

connectors table formats csv
最新版本Flink 1.12.0 的sql-cli配置连接yarn-session 问题处理
link SQL报错java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLj

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/712447.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号