栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink消费kafka消息实战,java项目开发案例全程实录视频

Flink消费kafka消息实战,java项目开发案例全程实录视频

  1. JDK:1.8.0_191

  2. spring boot:1.5.9.RELEASE

  3. spring-kafka:1.3.8.RELEASE

  4. Flink:1.7

在机器192.168.1.101上部署三个容器(消息生产者、zookeeper、kafka)

构建kafka相关的环境不是本文重点,因此这里利用docker快速实现,步骤如下:

  1. 在机器192.168.1.101上安装docker和docker-compose;

  2. 创建docker-compose.yml文件,内容如下:

version: ‘2’

services:

zookeeper:

image: wurstmeister/zookeeper

ports:

  • “2181:2181”

kafka1:

image: wurstmeister/kafka:2.11-0.11.0.3

ports:

  • “9092:9092”

environment:

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092

KAFKA_LISTENERS: PLAINTEXT://:9092

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

KAFKA_CREATE_TOPICS: “topic001:2:1”

volumes:

  • /var/run/docker.sock:/var/run/docker.sock

producer:

image: bolingcavalry/kafka01103producer:0.0.1-SNAPSHOT

ports:

  • “8080:8080”
  1. 在docker-compose.yml所在目录执行命令docker-compose up -d,即可启动容器;

如果您想了解更多docker环境下kafka消息生产者的细节,请参考《如何使用Docker内的kafka服务》;

在机器192.168.1.104上安装Apache Bench

不同的操作系统安装Apache Bench的命令也不一样:

  1. ubuntu上的安装命令apt-get install apache2-utils;

  2. centos上的安装命令yum install httpd-tools;

源码下载

接下来的实战是编写Flink应用的源码,您可以选择直接从GitHub下载这个工程的源码,地址和链接信息如下表所示:

| 名称 | 链接 | 备注 |

| :-- | :-- | :-- |

| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |

| git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |

| git仓库地址(ssh) | [git@github.com](ma

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

ilto:git@github.com):zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |

这个git项目中有多个文件夹,本章源码在flinkkafkademo这个文件夹下,如下图红框所示:

开发Flink应用,部署到机器192.168.1.102
  1. Flink环境搭建请参考《Flink1.7从安装到体验》;

  2. 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0

根据提示,输入groupId为com.bolingcavalry,artifactId为flinkkafkademo,其他的直接按下回车键即可使用默认值,这样就得到了一个maven工程:flinkkafkademo;

3. 打开工程的pom.xml文件,增加以下两个依赖:

org.apache.flink

flink-connector-kafka-0.11_2.12

${flink.version}

com.alibaba

fastjson

1.2.28

  1. 新增一个辅助类,用于将kafka消息中的内容转换成java对象:

public class JSonHelper {

public static long getTimeLongFromRawMessage(String raw){

SingleMessage singleMessage = parse(raw);

return null==singleMessage ? 0L : singleMessage.getTimeLong();

}

public static SingleMessage parse(String raw){

SingleMessage singleMessage = null;

if (raw != null) {

singleMessage = JSONObject.parseObject(raw, SingleMessage.class);

}

return singleMessage;

}

}

  1. SingleMessage对象的定义:

public class SingleMessage {

private long timeLong;

private String name;

private String bizID;

private String time;

private String message;

public long getTimeLong() {

return timeLong;

}

public void setTimeLong(long timeLong) {

this.timeLong = timeLong;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public String getBizID() {

return bizID;

}

public void setBizID(String bizID) {

this.bizID = bizID;

}

public String getTime() {

return time;

}

public void setTime(String time) {

this.time = time;

}

public String getMessage() {

return message;

}

public void setMessage(String message) {

this.message = message;

}

}

  1. 实时处理的操作都集中在StreamingJob类,源码的关键位置已经加了注释,就不再赘述了:

package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;

import org.apache.flink.streaming.api.functions.windowing.WindowFunction;

import org.apache.flink.streaming.api.watermark.Watermark;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import javax.annotation.Nullable;

import java.util.Properties;

public class StreamingJob {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000); // 要设置启动检查点

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties props = new Properties();

props.setProperty(“bootstrap.servers”, “kafka1:9092”);

props.setProperty(“group.id”, “flink-group”);

//数据源配置,是一个kafka消息的消费者

FlinkKafkaConsumer011 consumer =

new FlinkKafkaConsumer011<>(“topic001”, new SimpleStringSchema(), props);

//增加时间水位设置类

consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks (){

@Override

public long extractTimestamp(String element, long previousElementTimestamp) {

return JSONHelper.getTimeLongFromRawMessage(element);

}

@Nullable

@Override

public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673456.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号