栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot集成flink并发布flink集群端运行

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot集成flink并发布flink集群端运行

背景:近期项目需要,引入flink,研究了下flink,步步踩坑终于可以单独运行,也可发布到集群运行,记录下踩坑点。开发环境:idea+springboot(2.3.5.RELEASSE)+kafka(2.8.1)+mysql(8.0.26)。废话不多说,直接上可执行代码。

以下代码实现了某个时间间隔,设备不上传数据,判断为离线的逻辑

一、项目application创建

@SpringBootApplication(scanbasePackages = {"com.grandhonor.firefighting.flink","com.grandhonor.firefighting.util"})
public class DataAnalysisFlinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(DataAnalysisFlinkApplication.class, args);
    }
}

二、设备状态计算主体,从kafka接收数据,然后通过KeyedProcessFunction函数进行计算,然后把离线设备输出到mysql sink,更新设备状态

@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class DeviceDataKafkaSource {
    private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
    @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
    private String kafkaServer;
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String loginConfig;
    @Value("${customer.flink.cal-device-status-topic}")
    private String topic;
    @Autowired
    private ApplicationContext applicationContext;

    
    @PostConstruct
    public void execute() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(1);
        Properties properties = new Properties();
        //kafka的节点的IP或者hostName,多个使用逗号分隔
        properties.setProperty("bootstrap.servers", kafkaServer);
        //kafka的消费者的group.id
        properties.setProperty("group.id", "data-nanlysis-flink-devicestatus");
        //设置kafka安全认证机制为PLAIN
        properties.setProperty("sasl.mechanism", "PLAIN");
        //设置kafka安全认证协议为SASL_PLAINTEXT
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        //设置kafka登录验证用户名和密码
        properties.setProperty("sasl.jaas.config", loginConfig);

        FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);

        DataStream stream = env.addSource(myConsumer);
        stream.print().setParallelism(1);

        DataStream deviceStatus = stream
                //进行转换只获取设备序列码
                .map(data -> CommonConstant.GSON.fromJson(data, MsgData.class).getDevSn())
                //按照设备序列码分组
                .keyBy(new KeySelector() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                //进行计算,判断周期内是否有新数据上传,没有则输出认为设备离线
                .process((CalDeviceOfflineFunction) applicationContext.getBean("calDeviceOfflineFunction"));

        //写入数据库
        deviceStatus.addSink((SinkFunction) applicationContext.getBean("deviceStatusSink"));

        //启动任务
        new Thread(() -> {
            try {
                env.execute("deviceStatusFlinkJob");
            } catch (Exception e) {
                log.error(e.toString(), e);
            }
        }).start();
    }
}

说明:

1、通过@ConditionalOnProperty开关形式控制程序是否执行,后续此模块可以开发多个flink执行任务,通过开关的形式提交需要的job

2、通过springboot的@PostConstruct注解,让项目application启动时,自动执行job

3、用Thread线程执行任务提交,否则application启动时会一直flink执行中

4、日志打印,需要使用slf4j,跟flink自己依赖jar包打印日志保持一致,如此在flink集群执行时可以打印日志

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);

5、kafka连接开启了登录验证,配置见application.yml。kafka登录验证server端配置见官网资料,后续有时间写个文章记录下

三、设备离线计算



@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class CalDeviceOfflineFunction extends KeyedProcessFunction {
    private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
    
    private ValueState deviceState;
    
    private ValueState timerState;
    @Autowired
    private DeviceService deviceService;

    @Override
    public void open(Configuration parameters) throws Exception {
        deviceState = getRuntimeContext().getState(new ValueStateDescriptor<>("deviceState", DeviceLastDataTimestamp.class));
        timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timerState", Long.class));
    }

    
    @Override
    public void processElement(String value, Context ctx, Collector out) throws Exception {
        log.info("++++++++++++++fink recevice deviceSn={}", value);
        // 查看当前计数
        DeviceLastDataTimestamp current = deviceState.value();
        if (current == null) {
            current = new DeviceLastDataTimestamp();
            current.key = value;
            current.lastDataTime = ctx.timestamp();
        }

        Long currentTimerState = timerState.value();
        if (null == currentTimerState) {
            //初始值设置为-1
            timerState.update(-1L);
        }

        if (-1 != timerState.value()) {
            //删除原先定时任务,然后重新注册新的定时任务
            ctx.timerService().deleteProcessingTimeTimer(timerState.value());
        }

        long interval = deviceService.getDeviceOfflineInterval(value);
        // 设置状态的时间戳为记录的事件时间时间戳
        current.lastDataTime = ctx.timestamp();
        //设置判断离线时间间隔
        current.interval = interval;
        // 状态回写
        deviceState.update(current);
        //更新定时任务执行时间
        timerState.update(current.lastDataTime + interval);
        //注册新的定时任务
        ctx.timerService().registerProcessingTimeTimer(current.lastDataTime + interval);
    }

    
    @Override
    public void onTimer(
            long timestamp,
            onTimerContext ctx,
            Collector out) throws Exception {
        // 取得该设备状态的State状态
        DeviceLastDataTimestamp result = deviceState.value();

        // timestamp是定时器触发时间,如果等于最后一次更新时间+离线间隔时间,就表示这十秒内没有收到过该设备报文了
        if (timestamp == result.lastDataTime + result.interval) {
            // 发送
            out.collect(result.key);
            // 打印数据,用于核对是否符合预期
            log.info("==================" + result.key + " is offline");
        }
    }

    
    class DeviceLastDataTimestamp {
        public String key;
        public long lastDataTime;
        public long interval;
    }
}

四、 更新设备离线状态


@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class DeviceStatusSink extends RichSinkFunction {
    private static final Logger log = LoggerFactory.getLogger(DeviceStatusSink.class);
    @Value("${spring.datasource.dynamic.datasource.master.url}")
    private String datasoureUrl;
    @Value("${spring.datasource.dynamic.datasource.master.username}")
    private String userName;
    @Value("${spring.datasource.dynamic.datasource.master.password}")
    private String password;
    @Value("${spring.datasource.dynamic.datasource.master.driver-class-name}")
    private String driverClass;
    private Connection conn = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //加载驱动,开启连接
        try {
            Class.forName(driverClass);
            conn = DriverManager.getConnection(datasoureUrl, userName, password);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(String deviceSn, Context context) {
        try {
            String sql = "update biz_device t set t.status=2 where t.dev_sn=?";
            ps = conn.prepareStatement(sql);
            ps.setString(1, deviceSn);
            ps.executeUpdate();
            log.info("update biz_device t set t.status=2 where t.dev_sn={}", deviceSn);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    
    @Override
    public void close() throws Exception {
        if (conn != null) {
            conn.close();
        }
        if (ps != null) {
            ps.close();
        }
    }
}

五、application.yml配置

server:
  port: 8099

spring:
  autoconfigure:
      exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
  datasource:
    druid:
      stat-view-servlet:
        enabled: true
        loginUsername: admin
        loginPassword: 123456
        allow:
      web-stat-filter:
        enabled: true
    dynamic:
      druid: # 全局druid参数,绝大部分值和默认保持一致。(现已支持的参数如下,不清楚含义不要乱设置)
        # 连接池的配置信息
        # 初始化大小,最小,最大
        initial-size: 5
        min-idle: 5
        maxActive: 20
        # 配置获取连接等待超时的时间
        maxWait: 60000
        # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        timeBetweenEvictionRunsMillis: 60000
        # 配置一个连接在池中最小生存的时间,单位是毫秒
        minEvictableIdleTimeMillis: 300000
        validationQuery: SELECT 1 FROM DUAL
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        # 打开PSCache,并且指定每个连接上PSCache的大小
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
        filters: stat,wall,slf4j
        # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
      datasource:
        master:
          url: jdbc:mysql://127.0.0.1:3306/fire?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
          username: root
          password: root
          driver-class-name: com.mysql.cj.jdbc.Driver
  kafka:
    bootstrap-servers: 127.0.0.1:9092 # 指定kafka 代理地址,可以多个
    producer: # 生产者
      retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #修改最大向kafka推送消息大小
      properties:
        max.request.size: 52428800
    consumer:
      group-id: data-analysis-flink
      #手动提交offset保证数据一定被消费
      enable-auto-commit: false
      #指定从最近地方开始消费(earliest)
      auto-offset-reset: latest
      #消费者组
      #group-id: dev
    properties:
      #服务端没有收到心跳超时时间,设置长点以防调试时超时
      session:
        timeout:
          ms: 60000
      heartbeat:
        interval:
          ms: 30000
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas:
          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="root" password="root";'
#自定义配置
customer:
  #flink相关配置
  flink:
    #是否开启设置状态计算
    cal-device-status: true
    cal-device-status-topic: device-upload-data

六、pom.xml文件



	4.0.0
	
		com.grandhonor.firefighting
		data-analysis
		1.0.0
	
	com.grandhonor.firefighting
	data-analysis-flink
	1.0.0
	jar

	
		
			org.springframework.boot
			spring-boot-starter-web
			
				
					org.springframework.boot
					spring-boot-starter-logging
				
			
		

		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
			com.grandhonor.firefighting
			data-analysis-service
			${data.nanlysis.version}
		
		
			com.grandhonor.firefighting
			data-analysis-model
			${data.nanlysis.version}
		
       
           io.github.openfeign
           feign-httpclient
           10.10.1
       
       
		
			com.alibaba
			druid-spring-boot-starter
			${druid.version}
		
		
		
			com.baomidou
			dynamic-datasource-spring-boot-starter
			${dynamic.datasource.version}
		
		
		
			mysql
			mysql-connector-java
			${mysql.version}
		

		
		
			org.apache.flink
			flink-java
			${flink.version}
		
		
			org.apache.flink
			flink-streaming-java_2.11
			${flink.version}
		
		
			org.apache.flink
			flink-clients_2.11
			${flink.version}
		
		
		
			org.apache.flink
			flink-connector-kafka_2.11
			${flink.version}
		
		
		
			org.apache.flink
			flink-json
			${flink.version}
		
		
		
			org.apache.flink
			flink-jdbc_2.11
			1.10.0
		
		
	

	
		data-analysis-flink
		
			
				org.apache.maven.plugins
				maven-shade-plugin
				3.2.4
				
					
						package
						
							shade
						
						
							false
							
								
									com.google.code.findbugs:jsr305
									org.slf4j:*
									log4j:*
								
							
							
								
									*:*
									
										module-info.class
										meta-INF/*.SF
										meta-INF/*.DSA
										meta-INF/*.RSA
									
								
							
							
									
										meta-INF/spring.handlers
										reference.conf
									
									
										meta-INF/spring.factories
									
									
										meta-INF/spring.schemas
									
									
									
										com.grandhonor.firefighting.flink.DataAnalysisFlinkApplication
									
							
						
					
				
			
		
	

说明: 

1、如果使用local执行方式,不需要提交到flink服务端执行job,可以使用spring-boot-maven-plugin,直接java -jar执行即可,如下:


    data-analysis-flink
    
         
             org.springframework.boot
             spring-boot-maven-plugin
             
             
                 com.grandhonor.firefighting.flink.DataAnalysisFlinkApplication
             
             
                 
                     
                         
                         repackage
                     
                 
             
         
     
 

使用spring-boot-maven-plugin打的jar包,提交到flink集群端执行,会报错,提示找不到类,因为springboot默认打包BOOT-INF目录,flink服务端执行会提示找不到类。使用maven-shade-plugin打包,既可以用java -jar执行,也可以提交到flink服务端执行。

2、maven-shade-plugin打的jar包,如果提交到服务端执行,需要去掉springboot默认集成的logback,否则服务端执行报错,提示Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath,如下:


   org.springframework.boot
   spring-boot-starter-web
   
      
         org.springframework.boot
         spring-boot-starter-logging
      
   

如果本地执行java -jar形式,需要在build的中注释掉以下内容,否则启动报错提示:java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory

 

3、使用maven-shade-plugin打包,必须添加如下,否则提示Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ManifestResourceTransformer


      
         meta-INF/spring.handlers
         reference.conf
      
      
         meta-INF/spring.factories
      
      
         meta-INF/spring.schemas
      
      
      
         com.grandhonor.firefighting.flink.DataAnalysisFlinkApplication
      

七、执行效果:

1、本地执行

2、提交到flink集群执行 

八、其他踩坑点

1、报错提示:The RemoteEnvironment cannot be instantiated when running in a pre-defined context

解决方法:将StreamExecutionEnvironment修改为getExecutionEnvironment,获取当前执行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、报错提示:  Insufficient number of network buffers: required 65, but only 38 available. The total number of network buffers is currently set to 2048 of 32768 bytes each.

解决办法:env.setParallelism(1)

env.setParallelism(1);

3、报错提示: Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'

解决办法:在 flink 配置文件里 flink-conf.yaml设置
classloader.check-leaked-classloader: false

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/340313.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号