2. 订单生产者org.apache.spark spark-core_2.112.2.0 org.apache.spark spark-streaming_2.112.2.0 org.apache.spark spark-streaming-kafka-0-8_2.112.2.0
package com.demo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import com.demo.utils.ConstantUtils;
import com.demo.utils.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class OrderProducer {
private static Logger logger = LoggerFactory.getLogger(OrderProducer.class);
public static void main(String[] args) throws IOException {
// set up the producer
Producer producer = null;
ObjectMapper mapper = new ObjectMapper();
try {
Properties props = new Properties();
// kafka集群
props.put("metadata.broker.list", ConstantUtils.metaDATA_BROKER_LIST_VALUE);
// 配置value的序列化类
props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);
// 配置key的序列化类
props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);
ProducerConfig config = new ProducerConfig(props);
producer = new Producer(config);
// 定义发布消息体
List> messages = new ArrayList<>();
// 每隔3秒生产随机个订单消息
while (true) {
int random = RandomUtils.getRandomNum(20);
if (random == 0) {
continue;
}
messages.clear();
for (int i = 0; i < random; i++) {
int orderRandom = RandomUtils.getRandomNum(random * 10);
Order order = new Order("name" + orderRandom, Float.valueOf("" + orderRandom));
Random rnd = new Random();
int rndvalue = rnd.nextInt(5);
order.setMerchantCode("10388100011" + rndvalue);
order.setDeptCode("1000" + rndvalue);
// 订单消息体:topic和消息
KeyedMessage message = new KeyedMessage(
ConstantUtils.ORDER_TOPIC, mapper.writevalueAsString(order));
messages.add(message);
}
producer.send(messages);
logger.warn("orderNum:" + random + ",message:" + messages.toString());
Thread.sleep(5000);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("-------------:" + e.getStackTrace());
} finally {
producer.close();
}
}
}
3. 订单统计
package com.demo;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import com.demo.utils.ConstantUtils;
import com.demo.utils.SparkUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.AtomicDouble;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
import static jdk.nashorn.internal.objects.NativeArray.reduce;
public class OrderSparkStreaming {
private static Logger logger = LoggerFactory.getLogger(OrderSparkStreaming.class);
private static AtomicLong orderCount = new AtomicLong(0);
private static AtomicDouble totalPrice = new AtomicDouble(0);
public static void main(String[] args) throws InterruptedException {
// Create context with a 2 seconds batch interval
JavaStreamingContext jssc = SparkUtils.getJavaStreamingContext("JavaDirectKafkaWordCount",
"local[2]", null, Durations.seconds(10));
Set topicsSet = new HashSet<>(Arrays.asList(ConstantUtils.ORDER_TOPIC.split(",")));
Map kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", ConstantUtils.metaDATA_BROKER_LIST_VALUE);
kafkaParams.put("auto.offset.reset", ConstantUtils.AUTO_OFFSET_RESET_VALUE);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream orderMsgStream = KafkaUtils.createDirectStream(jssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
topicsSet);
// json与对象映射对象
final ObjectMapper mapper = new ObjectMapper();
JavaDStream orderDStream = orderMsgStream
.map(new Function, Order>() {
private static final long serialVersionUID = 1L;
@Override
public Order call(Tuple2 t2) throws Exception {
Order order = mapper.readValue(t2._2, Order.class);
return order;
}
}).cache();
// 对DStream中的每一个RDD进行操作
orderDStream.foreachRDD(new VoidFunction>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD orderJavaRDD) throws Exception {
long count = orderJavaRDD.count();
if (count > 0) {
// 累加订单总数
orderCount.addAndGet(count);
// 对RDD中的每一个订单,首先进行一次Map操作,产生一个包含了每笔订单的价格的新的RDD
// 然后对新的RDD进行一次Reduce操作,计算出这个RDD中所有订单的价格总和
JavaRDD priceMap = orderJavaRDD.map(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Float call(Order order) throws Exception {
return order.getPrice();
}
});
Float sumPrice = priceMap.reduce(new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Float call(Float a, Float b) throws Exception {
return a + b;
}
});
// 然后把本次RDD中所有订单的价格总和累加到之前所有订单的价格总和中。
totalPrice.getAndAdd(sumPrice);
// 数据订单总数和价格总和,生产环境中可以写入数据库
System.out.println("-------Total order count : " + orderCount.get()
+ " with total price : " + totalPrice.get());
}
}
});
// 对DStream中的每一个RDD进行操作
orderDStream.foreachRDD(new VoidFunction>() {
@Override
public void call(JavaRDD orderJavaRDD) throws Exception {
JavaPairRDD javaPairRDD = orderJavaRDD.mapToPair(new PairFunction() {
@Override
public Tuple2 call(Order order) throws Exception {
String key = order.getMerchantCode() + "-" + order.getDeptCode();
Float value = order.getPrice();
return new Tuple2(key, value);
}
});
JavaPairRDD javaPairRDD2 = javaPairRDD.reduceByKey(new Function2() {
@Override
public Float call(Float aFloat, Float aFloat2) throws Exception {
return aFloat + aFloat2;
}
});
javaPairRDD2.foreach(new VoidFunction>() {
@Override
public void call(Tuple2 t2) throws Exception {
System.out.println("key = " + t2._1 + " value=" + t2._2);
}
}
);
}
});
orderDStream.print();
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
jssc.stop();
}
}
4. 辅助类ConstantUtils
package com.demo.utils;
public class ConstantUtils {
public final static String metaDATA_BROKER_LIST_VALUE = "10.18.0.34:9092";
public final static String AUTO_OFFSET_RESET_VALUE = "smallest";
public final static String SERIALIZER_CLASS_VALUE = "kafka.serializer.StringEncoder";
public final static String ORDER_TOPIC = "orderTopic";
}
5. 辅助类RandomUtils
package com.demo.utils;
import java.util.Random;
public class RandomUtils {
public static int getRandomNum(int bound){
Random random = new Random();
return random.nextInt(bound);
}
public static void main(String[] args) throws InterruptedException{
while(true){
int randomNum = getRandomNum(20);
System.out.println(randomNum);
for(int i=0; i
6. 辅助类SparkUtils
package com.demo.utils;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class SparkUtils {
public static JavaSparkContext getJavaSparkContext(String appName, String master,
String logLeverl) {
SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel(logLeverl);
return jsc;
}
public static JavaStreamingContext getJavaStreamingContext(String appName, String master,
String logLeverl,Duration batchDuration) {
SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,batchDuration);
return jsc;
}
public static JavaRDD createRddExternal(JavaSparkContext jsc, String filePath) {
if (jsc == null) {
return null;
}
// 文件读取方式创建RDD
JavaRDD readmeRdd = jsc.textFile(filePath);
return readmeRdd;
}
public static JavaRDD createRddCollect(JavaSparkContext jsc, List list) {
if (jsc == null) {
return null;
}
// 创建RDD
JavaRDD listRdd = jsc.parallelize(list);
return listRdd;
}
public static JavaPairRDD createPairRddCollect(JavaSparkContext jsc,
List> list) {
if (jsc == null) {
return null;
}
// 创建RDD
JavaPairRDD pairRDD = jsc.parallelizePairs(list);
return pairRDD;
}
}
7. 统计输出
8. 监控查看执行情况
通过监控可以查看任务执行相关信息,以便在任务多的时候进行性能优化调整。



