栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark streaming+kafka订单累计统计和分组统计

spark streaming+kafka订单累计统计和分组统计

1. 添加maven依赖

            org.apache.spark
            spark-core_2.11
            2.2.0
        

        
            org.apache.spark
            spark-streaming_2.11
            2.2.0
        

        
            org.apache.spark
            spark-streaming-kafka-0-8_2.11
            2.2.0
        
2. 订单生产者
package com.demo;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;

import com.demo.utils.ConstantUtils;
import com.demo.utils.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public class OrderProducer {
    private static Logger logger = LoggerFactory.getLogger(OrderProducer.class);

    public static void main(String[] args) throws IOException {
        // set up the producer
        Producer producer = null;
        ObjectMapper mapper = new ObjectMapper();

        try {

            Properties props = new Properties();
            // kafka集群
            props.put("metadata.broker.list", ConstantUtils.metaDATA_BROKER_LIST_VALUE);

            // 配置value的序列化类
            props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);
            // 配置key的序列化类
            props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);

            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer(config);
            // 定义发布消息体
            List> messages = new ArrayList<>();
            // 每隔3秒生产随机个订单消息
            while (true) {
                int random = RandomUtils.getRandomNum(20);
                if (random == 0) {
                    continue;
                }
                messages.clear();
                for (int i = 0; i < random; i++) {
                    int orderRandom = RandomUtils.getRandomNum(random * 10);
                    Order order = new Order("name" + orderRandom, Float.valueOf("" + orderRandom));

                    Random rnd = new Random();
                    int rndvalue = rnd.nextInt(5);

                    order.setMerchantCode("10388100011" + rndvalue);
                    order.setDeptCode("1000" + rndvalue);

                    // 订单消息体:topic和消息
                    KeyedMessage message = new KeyedMessage(
                            ConstantUtils.ORDER_TOPIC, mapper.writevalueAsString(order));
                    messages.add(message);
                }

                producer.send(messages);
                logger.warn("orderNum:" + random + ",message:" + messages.toString());
                Thread.sleep(5000);

            }

        } catch (Exception e) {
            e.printStackTrace();
            logger.error("-------------:" + e.getStackTrace());
        } finally {
            producer.close();
        }

    }
}
3. 订单统计
package com.demo;


import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import com.demo.utils.ConstantUtils;
import com.demo.utils.SparkUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.AtomicDouble;


import kafka.serializer.StringDecoder;
import scala.Tuple2;

import static jdk.nashorn.internal.objects.NativeArray.reduce;


public class OrderSparkStreaming {
    private static Logger logger = LoggerFactory.getLogger(OrderSparkStreaming.class);
    private static AtomicLong orderCount = new AtomicLong(0);
    private static AtomicDouble totalPrice = new AtomicDouble(0);

    public static void main(String[] args) throws InterruptedException {

        // Create context with a 2 seconds batch interval
        JavaStreamingContext jssc = SparkUtils.getJavaStreamingContext("JavaDirectKafkaWordCount",
                "local[2]", null, Durations.seconds(10));

        Set topicsSet = new HashSet<>(Arrays.asList(ConstantUtils.ORDER_TOPIC.split(",")));
        Map kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", ConstantUtils.metaDATA_BROKER_LIST_VALUE);
        kafkaParams.put("auto.offset.reset", ConstantUtils.AUTO_OFFSET_RESET_VALUE);

        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream orderMsgStream = KafkaUtils.createDirectStream(jssc,
                String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
                topicsSet);

        // json与对象映射对象
        final ObjectMapper mapper = new ObjectMapper();
        JavaDStream orderDStream = orderMsgStream
                .map(new Function, Order>() {
                    
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Order call(Tuple2 t2) throws Exception {
                        Order order = mapper.readValue(t2._2, Order.class);
                        return order;
                    }
                }).cache();

        // 对DStream中的每一个RDD进行操作
        orderDStream.foreachRDD(new VoidFunction>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public void call(JavaRDD orderJavaRDD) throws Exception {
                long count = orderJavaRDD.count();
                if (count > 0) {
                    // 累加订单总数
                    orderCount.addAndGet(count);
                    // 对RDD中的每一个订单,首先进行一次Map操作,产生一个包含了每笔订单的价格的新的RDD
                    // 然后对新的RDD进行一次Reduce操作,计算出这个RDD中所有订单的价格总和
                    JavaRDD priceMap = orderJavaRDD.map(new Function() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        public Float call(Order order) throws Exception {
                            return order.getPrice();
                        }
                    });

                    Float sumPrice = priceMap.reduce(new Function2() {
                        
                        private static final long serialVersionUID = 1L;

                        @Override
                        public Float call(Float a, Float b) throws Exception {
                            return a + b;
                        }
                    });
                    // 然后把本次RDD中所有订单的价格总和累加到之前所有订单的价格总和中。
                    totalPrice.getAndAdd(sumPrice);

                    // 数据订单总数和价格总和,生产环境中可以写入数据库
                    System.out.println("-------Total order count : " + orderCount.get()
                            + " with total price : " + totalPrice.get());
                }
            }
        });

        // 对DStream中的每一个RDD进行操作
        orderDStream.foreachRDD(new VoidFunction>() {

            @Override
            public void call(JavaRDD orderJavaRDD) throws Exception {

                JavaPairRDD javaPairRDD = orderJavaRDD.mapToPair(new PairFunction() {

                    @Override
                    public Tuple2 call(Order order) throws Exception {
                        String key = order.getMerchantCode() + "-" + order.getDeptCode();
                        Float value = order.getPrice();
                        return new Tuple2(key, value);
                    }
                });

                JavaPairRDD javaPairRDD2 = javaPairRDD.reduceByKey(new Function2() {
                    @Override
                    public Float call(Float aFloat, Float aFloat2) throws Exception {
                        return aFloat + aFloat2;
                    }
                });

                javaPairRDD2.foreach(new VoidFunction>() {
                                         @Override
                                         public void call(Tuple2 t2) throws Exception {
                                             System.out.println("key = " + t2._1 + " value=" + t2._2);
                                         }
                                     }
                );

            }
        });

        orderDStream.print();

        jssc.start(); // Start the computation

        jssc.awaitTermination(); // Wait for the computation to terminate
        jssc.stop();
    }
}
4. 辅助类ConstantUtils
package com.demo.utils;

public class ConstantUtils {
    public final static String metaDATA_BROKER_LIST_VALUE = "10.18.0.34:9092";

    public final static String AUTO_OFFSET_RESET_VALUE = "smallest";

    public final static String SERIALIZER_CLASS_VALUE = "kafka.serializer.StringEncoder";

    public final static String ORDER_TOPIC = "orderTopic";
}
5. 辅助类RandomUtils
package com.demo.utils;

import java.util.Random;


public class RandomUtils {

    public static int getRandomNum(int bound){
        Random random = new Random();
        return random.nextInt(bound);
    }

    public static void main(String[] args) throws InterruptedException{
        while(true){
            int randomNum = getRandomNum(20);
            System.out.println(randomNum);

            for(int i=0; i 
6. 辅助类SparkUtils 
package com.demo.utils;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;


public class SparkUtils {

    
    public static JavaSparkContext getJavaSparkContext(String appName, String master,
                                                       String logLeverl) {
        SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        jsc.setLogLevel(logLeverl);

        return jsc;
    }

    
    public static JavaStreamingContext getJavaStreamingContext(String appName, String master,
                                                               String logLeverl,Duration batchDuration) {
        SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);

        JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,batchDuration);
        return jsc;
    }


    
    public static JavaRDD createRddExternal(JavaSparkContext jsc, String filePath) {
        if (jsc == null) {
            return null;
        }

        // 文件读取方式创建RDD
        JavaRDD readmeRdd = jsc.textFile(filePath);

        return readmeRdd;

    }

    
    public static JavaRDD createRddCollect(JavaSparkContext jsc, List list) {
        if (jsc == null) {
            return null;
        }

        // 创建RDD
        JavaRDD listRdd = jsc.parallelize(list);

        return listRdd;
    }

    
    public static JavaPairRDD createPairRddCollect(JavaSparkContext jsc,
                                                                     List> list) {
        if (jsc == null) {
            return null;
        }

        // 创建RDD
        JavaPairRDD pairRDD = jsc.parallelizePairs(list);

        return pairRDD;
    }
}

7. 统计输出

 8. 监控查看执行情况

通过监控可以查看任务执行相关信息,以便在任务多的时候进行性能优化调整。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/706748.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号