栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

自定义FlinkSource模拟生成数据

自定义FlinkSource模拟生成数据

首先创建一个实体类,用于把模拟数据映射成对象

package com.zxl.source;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;
@AllArgsConstructor
@NoArgsConstructor
@Data
//疫情名单实体类
public class Sick {
    //姓名
    private String name;
    //性别
    private String sex;
    //年龄
    private Integer age;
    //身份证号
    private Long   id;
    //地址
    private String   area;
    //病情状况
    private String disease_status;
    //发病日期
    private String  date;
}

然后实现SourceFunction接口

package com.zxl.source;


import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

//自定义疫情名单生成类
public class DataSource implements SourceFunction {
    private static Random random= new Random();
    private static boolean isRunning = true;

    //姓氏数组598
   static String[] first_name={ "赵","钱","孙","李","周","吴","郑","王","冯","陈","褚","卫","蒋","沈","韩","杨","朱","秦","尤",
            "许","何","吕","施","张","孔","曹","严","华","金","魏","陶","姜","戚","谢","邹","喻","柏","水",
            "窦","章","云","苏","潘","葛","奚","范","彭","郎","鲁","韦","昌","马","苗","凤","花","方","俞",
            "任","袁","柳","酆","鲍","史","唐","费","廉","岑","薛","雷","贺","倪","汤","滕","殷","罗","毕",
            "郝","邬","安","常","乐","于","时","傅","皮","卞","齐","康","伍","余","元","卜","顾","孟","平",
            "黄","和","穆","萧","尹","姚","邵","湛","汪","祁","毛","禹","狄","米","贝","明","臧","计","伏",
            "成","戴","谈","宋","茅","庞","熊","纪","舒","屈","项","祝","董","梁","杜","阮","蓝","闵","席",
            "季","麻","强","贾","路","娄","危","江","童","颜","郭","梅","盛","林","刁","钟","徐","邱","骆",
            "高","夏","蔡","田","樊","胡","凌","霍","虞","万","支","柯","昝","管","卢","莫","经","房","裘",
            "缪","干","解","应","宗","丁","宣","贲","邓","郁","单","杭","洪","包","诸","左","石","崔","吉",
            "钮","龚","程","嵇","邢","滑","裴","陆","荣","翁","荀","羊","于","惠","甄","曲","家","封","芮",
            "羿","储","靳","汲","邴","糜","松","井","段","富","巫","乌","焦","巴","弓","牧","隗","山","谷",
            "车","侯","宓","蓬","全","郗","班","仰","秋","仲","伊","宫","宁","仇","栾","暴","甘","钭","厉",
            "戎","祖","武","符","刘","景","詹","束","龙","叶","幸","司","韶","郜","黎","蓟","溥","印","宿",
            "白","怀","蒲","邰","从","鄂","索","咸","籍","赖","卓","蔺","屠","蒙","池","乔","阴","郁","胥",
            "能","苍","双","闻","莘","党","翟","谭","贡","劳","逄","姬","申","扶","堵","冉","宰","郦","雍",
            "却","璩","桑","桂","濮","牛","寿","通","边","扈","燕","冀","浦","尚","农","温","别","庄","晏",
            "柴","瞿","阎","充","慕","连","茹","习","宦","艾","鱼","容","向","古","易","慎","戈","廖","庾",
            "终","暨","居","衡","步","都","耿","满","弘","匡","国","文","寇","广","禄","阙","东","欧","殳",
            "沃","利","蔚","越","夔","隆","师","巩","厍","聂","晁","勾","敖","融","冷","訾","辛","阚","那",
            "简","饶","空","曾","毋","沙","乜","养","鞠","须","丰","巢","关","蒯","相","查","后","荆","红",
            "游","郏","竺","权","逯","盖","益","桓","公","仉","督","岳","帅","缑","亢","况","郈","有","琴",
            "归","海","晋","楚","闫","法","汝","鄢","涂","钦","商","牟","佘","佴","伯","赏","墨","哈","谯",
            "篁","年","爱","阳","佟","言","福","南","火","铁","迟","漆","官","冼","真","展","繁","檀","祭",
            "密","敬","揭","舜","楼","疏","冒","浑","挚","胶","随","高","皋","原","种","练","弥","仓","眭",
            "蹇","覃","阿","门","恽","来","綦","召","仪","风","介","巨","木","京","狐","郇","虎","枚","抗",
            "达","杞","苌","折","麦","庆","过","竹","端","鲜","皇","亓","老","是","秘","畅","邝","还","宾",
            "闾","辜","纵","侴","万俟","司马","上官","欧阳","夏侯","诸葛","闻人","东方","赫连","皇甫","羊舌",
            "尉迟","公羊","澹台","公冶","宗正","濮阳","淳于","单于","太叔","申屠","公孙","仲孙","轩辕","令狐",
            "钟离","宇文","长孙","慕容","鲜于","闾丘","司徒","司空","兀官","司寇","南门","呼延","子车","颛孙",
            "端木","巫马","公西","漆雕","车正","壤驷","公良","拓跋","夹谷","宰父","谷梁","段干","百里","东郭",
            "微生","梁丘","左丘","东门","西门","南宫","第五","公仪","公乘","太史","仲长","叔孙","屈突","尔朱",
            "东乡","相里","胡母","司城","张廖","雍门","毋丘","贺兰","綦毋","屋庐","独孤","南郭","北宫","王孙"};

    //名字数组213
   static String[] last_name={
            "晗","淽","蕊","菡","娟","文","芳","莉","雅","萱","漩","娅","媛","怡","佩","淇",
            "娜","莹","晓","玲","紫","秋","倩","小","洁","明","一","静","媛","瑞","颖","欢",
            "霄","枫","丽","秀","卿","爱","秀","娟","英","华","巧","娜","希","珠","翠","雅",
            "芝","萍","霭","柔","竹","荔","枝","思","云","莲","真","环","荣","莺","艳","凡",
            "琼","勤","珍","贞","莉","希","宁","亚","宜","可","妍","晴","涵","洁","湘","恬",
            "允","妍","柔","音","妤","语","郁","诗","宁","孝","妮","翎","羽","霏","雪","佳",
            "智","素","伊","婕","媛","咏","瞳","梦","芩","苑","星","绚","若","珞","悠","艾",
            "颖","桂","娣","叶","璧","璐","娅","琦","妍","茜","秋","珊","莎","锦","琬","茗",
            "羽","伊","宁","飘","黛","青","婷","娴","瑾","露","瑶","婵","雁","蓓","纨","仪",
            "冰","爽","昭","寒","聪","婷","灵","丹","蓉","眉","君","琴","蕊","薇","菁","梦",
            "岚","苑","婕","馨","馨","瑗","琰","瑶","嫣","倩","韵","融","园","艺","咏","卿",
            "聪","澜","纯","毓","悦","昭","冰","爽","琬","茗","羽","希","宁","欣","飘","雨",
            "育","涵","琴","晴","丽","美","瑶","梦","茜","倩","希","夕","月","悦","乐","彤",
            "影","珍","依","沫","玉"};
    //姓名
    private static String getName(){
        int a=random.nextInt(598);
        int b=random.nextInt(213);
        String name= first_name[a] + last_name[b];
        return name;
    }
	//性别
   static String[] sex={"男","女"};
    private static String getSex(){
        int sm=random.nextInt(2);
        String rs=sex[sm];
        return rs;
    }
    //随机生成日期和时间
    public static String getDate(){
        String[] year={"2019","2020","2021"};
        String[] month={"1","2","3","4","5","6","7","8","9","10","11","12"};
        String[] day={"1","2","3","4","5","6","7","8","9","10",
                "11","12","13","14","15","16","17","18","19","20",
                "21","22","23","24","25","26","27","28","29","30"};
        int years=random.nextInt(3);
        int months=random.nextInt(12);
        int days=random.nextInt(30);
        String dates=year[years]+"-"+month[months]+"-"+day[days];

        return dates;
    }
    //病情状况
    static String[] disease={"确诊","无症状","疑似","重症",
                      "治愈","死亡"};
    private static String getDisease_status(){
        int m= random.nextInt(6);
        String ms=disease[m];
        return ms;
    }
    static String[] provinces = {"北京", "天津", "上海", "重庆", "新疆", "西藏", "宁夏", "内蒙古",
            "广西", "黑龙江", "吉林", "辽宁", "河北", "山东", "江苏", "安徽",
            "浙江", "福建", "广东", "海南", "云南", "贵州", "四川", "湖南",
            "湖北", "河南", "山西", "陕西", "甘肃", "青海", "江西", "台湾", "香港", "澳门"};
    private static String getArea(){
        int are= random.nextInt(34);
        String area=provinces[are];
        return area;
    }

    //生成模拟数据对象
    public static Sick getSick() {
                Sick sick=new Sick();
                sick.setName(getName());
                sick.setSex(getSex());
                sick.setAge(random.nextInt(100));
                sick.setId(Long.valueOf(IdGen.getRandomId()));
                sick.setArea(getArea());
                sick.setDisease_status(getDisease_status());
                sick.setDate(getDate());
                return sick;
             }
    @Override
    public void run(SourceFunction.SourceContext sourceContext) throws Exception {

        while (isRunning){

            sourceContext.collect(getSick());
			//每个3秒发送一条数据
            Thread.sleep(3000);
        }
    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

最后创建测试类,测试数据

package com.zxl.source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataTest {
    public static void main(String[] args) throws Exception {
		//创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
		//调用Flink自定义Source
        DataStreamSource source = environment.addSource(new DataSource());
		//打印数据
        source.print();
		//启动程序
        environment.execute();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/423154.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号