栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink java模拟生成自定义流式数据

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink java模拟生成自定义流式数据

思路如下:

  1. 定义一个POJO类,注意flink里使用的类必须有一个无参的构造方法
  2. 自定义DataSource实现SourceFunction接口
  3. 使用ctx.collect()传入想要发送的数据就可以了

首先定义一个POJO类:

class MyData {
    public int keyId;
    public long timestamp;
    public double value;

    public MyData() {
    }

    public MyData(int accountId, long timestamp, double value) {
        this.keyId = accountId;
        this.timestamp = timestamp;
        this.value = value;
    }

    public long getKeyId() {
        return keyId;
    }

    public void setKeyId(int keyId) {
        this.keyId = keyId;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "MyData{" +
                "keyId=" + keyId +
                ", timestamp=" + timestamp +
                ", value=" + value +
                '}';
    }
}

生成自己的数据:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class CreateMyData {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceStream = env.addSource(new MyDataSource());
        env.setParallelism(3);
        sourceStream.print();
        env.execute();
    }


    private static class MyDataSource implements SourceFunction {
        // 定义标志位,用来控制数据的产生
        private boolean isRunning = true;
        private final Random random = new Random(0);

        @Override
        public void run(SourceContext ctx) throws Exception {
            while (isRunning) {
                ctx.collect(new MyData(random.nextInt(5), System.currentTimeMillis(), random.nextFloat()));
                Thread.sleep(1000L); // 1s生成1个数据
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/298472.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号