栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

storm迁移flink研究——如何利用Flink对数据进行分流,按自定义逻辑分阶连续处理

storm迁移flink研究——如何利用Flink对数据进行分流,按自定义逻辑分阶连续处理

Storm迁移flink主要问题:

    Storm通过自定义的Bolt类实现自己的业务逻辑,如何在flink中实现

通过flink的ProcessFuction类实现,可以通过继承该类,在processElement方法中实现自己的业务逻辑。

    Storm按照业务类型分发数据处理的逻辑,如何在flink中实现

通过flink的旁路输出特性实现,对原始的数据流按照某些分类标准分类,输出到不同的子数据流中处理。

总体处理流程:

    Flink从Kafka中读取数据,作为初始数据流initDataStream;

    对初始数据流进行分类处理,将子数据输出到数据流dataStream1、dataStream2、…、dataStreamn中;

    按照自己的业务逻辑,对子数据流dataStream1_1、dataStream2_1、…、dataStreamn_1处理,得到dataStream1_2、dataStream2_2、…、dataStreamn_2,经过多次处理,最终得到dataStream1_x、dataStream2_y、…、dataStreamn_z

    最后利用SinkFuction进行落地处理

例:利用flink对北京、上海两城市民信息进行过滤,过滤掉工资小于1000和大于20000的数据,然后按姓名_城市_收入的格式输出

输入:个人信息{姓名、年龄、性别、城市、收入}

输出:姓名_城市_收入,如:a_b_100

Person类:

public class Person {

    private String name;
    private Integer age;
    private Sex sex;
    private Integer salary;
    private Country country;
    //getter、setter...
}

enum Sex {
    Male,
    Female
}

enum Country {
    Beijing,
    Shanghai
}

生产者程序:

public class FlinkProducer {

    //kafka配置文件
    private static final Properties properties;

    static {
        //将kafka地址放入配置文件
        properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
    }

    public static void main(String[] args) throws Exception {
        //创建flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //连接kafka
        FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer<>("flink", new SimpleStringSchema(), properties);
        //初始化数据源
        String[] initData = getData();
        DataStreamSource source = env.fromElements(initData);
        //发送数据
        source.addSink(flinkKafkaProducer);
        //执行flink程序
        env.execute();
    }

    //模拟数据源
    private static String[] getData() {
        String[] data = new String[100];
        for (int i = 0; i < 100; i++) {
            Person person = new Person();
            person.setName("name");
            person.setAge(18);
            //设置性别
            int sex = (int) (Math.random() * 100);
            if (sex % 2 == 0) {
                person.setSex(Sex.Female);
            } else {
                person.setSex(Sex.Male);
            }
            //设置城市
            int country = (int) (Math.random() * 100);
            if (country % 2 == 0) {
                person.setCountry(Country.Beijing);
            } else {
                person.setCountry(Country.Shanghai);
            }
            //设置收入
            person.setSalary((int) (Math.random() * 50000));
            data[i] = JsonUtils.serialize(person);
        }
        return data;
    }

}

消费者程序:

public class FlinkConsumer {

    //kafka配置文件
    private static final Properties properties;

    //男性标签
    private static final OutputTag MALE = new OutputTag<>("male", Types.STRING);

    //女性标签
    private static final OutputTag FEMALE = new OutputTag<>("female", Types.STRING);

    static {
        //将kafka地址放入配置文件
        properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
    }

    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.flink连接kafka并读取数据源
        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("flink", new SimpleStringSchema(), properties);
        //3.获取初始数据源
        DataStreamSource initDataStream = env.addSource(flinkKafkaConsumer);
        //4.对数据源进行分类处理
        SingleOutputStreamOperator process = initDataStream.process(new SexSplitBolt());
        //5.分别获取仅仅包含男性、女性数据的数据流
        DataStream dataStream_male = process.getSideOutput(MALE);
        DataStream dataStream_female = process.getSideOutput(FEMALE);
        //6.过滤掉工资小于1000和大于20000的数据
        SingleOutputStreamOperator dataStream_male_1 = dataStream_male.process(new SalarySplitBolt());
        SingleOutputStreamOperator dataStream_female_1 = dataStream_female.process(new SalarySplitBolt());
        //7.给每个人的工资增加20%
        SingleOutputStreamOperator dataStream_male_1_2 = dataStream_male_1.process(new SalaryAddBolt());
        SingleOutputStreamOperator dataStream_female_1_2 = dataStream_female_1.process(new SalaryAddBolt());
        //8.进行落地处理,输出处理结果到控制台中
        dataStream_male_1_2.addSink(new OutputBolt());
        dataStream_female_1_2.addSink(new OutputBolt());
        //9.执行flink程序
        env.execute();
    }

    static class SexSplitBolt extends ProcessFunction {

        @Override
        public void processElement(String s, ProcessFunction.Context context, Collector collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            if (person.getSex() == Sex.Male) {
                context.output(MALE, s);
            } else {
                context.output(FEMALE, s);
            }
        }
    }

    static class SalarySplitBolt extends ProcessFunction {

        @Override
        public void processElement(String s, ProcessFunction.Context context, Collector collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            if (person.getSalary() < 1000 || person.getSalary() > 20000) {
                return;
            }
            collector.collect(s);
        }
    }

    static class SalaryAddBolt extends ProcessFunction {

        @Override
        public void processElement(String s, ProcessFunction.Context context, Collector collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            person.setSalary((int) (person.getSalary() * 1.2));
            collector.collect(s);
        }
    }

    static class OutputBolt implements SinkFunction {
        @Override
        public void invoke(String value, Context context) {
            Person person = JsonUtils.deserialize(value, Person.class);
            System.out.println("已处理数据:" + person.getName() + "_" + person.getCountry() + "_" + person.getSalary());
        }
    }
   
}

控制台输出:
已处理数据:name_Beijing_9320

已处理数据:name_Beijing_1049

已处理数据:name_Beijing_11548

已处理数据:name_Shanghai_16721

已处理数据:name_Beijing_3638

已处理数据:name_Shanghai_18862

已处理数据:name_Shanghai_8183

已处理数据:name_Shanghai_18627

已处理数据:name_Shanghai_16510

已处理数据:name_Beijing_18983

已处理数据:name_Beijing_10734

已处理数据:name_Shanghai_1594

已处理数据:name_Beijing_3640

已处理数据:name_Shanghai_17757

已处理数据:name_Shanghai_16700

已处理数据:name_Beijing_1175

已处理数据:name_Beijing_11788

已处理数据:name_Shanghai_15732

已处理数据:name_Beijing_15137

已处理数据:name_Beijing_12853

已处理数据:name_Beijing_11548

已处理数据:name_Shanghai_1188

已处理数据:name_Shanghai_3202

已处理数据:name_Shanghai_3986

已处理数据:name_Shanghai_8193

已处理数据:name_Beijing_16093

已处理数据:name_Beijing_8274

已处理数据:name_Beijing_3630

已处理数据:name_Beijing_11962

已处理数据:name_Beijing_14441

已处理数据:name_Shanghai_5312

已处理数据:name_Beijing_1815

已处理数据:name_Shanghai_11287

已处理数据:name_Beijing_5848

已处理数据:name_Beijing_11056

已处理数据:name_Shanghai_12407

已处理数据:name_Shanghai_12652

已处理数据:name_Beijing_19864

已处理数据:name_Shanghai_18935

已处理数据:name_Shanghai_16894

已处理数据:name_Shanghai_9732

36570 [TaskExecutorLocalStateStoresManager shutdown hook] INFO o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/759022.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号