栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot 集成 Spark demo

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot 集成 Spark demo

spark 概念及linux 本地模式部署请点这里

一:配置文件pom.xml
 

    4.0.0
    org.example
    zymTest
    1.0-SNAPSHOT
    
        
        
            org.apache.spark
            spark-sql_2.13
            3.2.0
        
        
        
            mysql
            mysql-connector-java
            6.0.6
        
    

    
        8
        8
    

二:测试类
import org.apache.hadoop.shaded.org.eclipse.jetty.websocket.common.frames.DataFrame;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;



public class SparkTest {


    public static void main(String[] args) {
        //testSparkRddTxt();

        //testSparkRddCsv();

        //testSparkRddMysql();

        testSparkRddJson();
    }


    //spark 测试外部文件(txt)
    //rdd:resilient distributed dataset ,弹性分布式数据集
    public static void testSparkRddTxt(){
        //1.环境准备
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.driver.host","localhost");
        //sparkConf.set("SPARK_LOCAL_HOSTNAME","localhost");
        sparkConf.setAppName("JavaSparkDemo").setMaster("local[*]");

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.setLogLevel("WARN");

        //2.处理数据
        JavaRDD fileRDD = jsc.textFile("D:\TEMP\word.txt");

        JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairRDD wordAndOneRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
        JavaPairRDD wordAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);

        //3.输出结果
        List> result = wordAndCountRDD.collect();
        result.forEach(System.out::println);

        //4.关闭资源
        jsc.stop();
    }

    //spark 测试外部文件(csv)
    public static void testSparkRddCsv(){
        //1.环境准备
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.driver.host","localhost");
        sparkConf.setAppName("JavaSparkDemo").setMaster("local[*]");

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.setLogLevel("WARN");

        //2.读取外部文件创建rdd,以字符串读取
        JavaRDD fileRDD = jsc.textFile("D:\TEMP\testcsv.csv");
        //3.把文件内容使用,分割
        JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
        JavaPairRDD wordAndOneRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
        JavaPairRDD wordAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);

        System.out.println(wordAndCountRDD.collect());
        jsc.stop();

    }


    //spark 操作mysql数据库
    //添加依赖:mysql-connector-java
    public static void testSparkRddMysql(){
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkSQLTest3")
                .config("spark.driver.host", "localhost")
                .config("spark.some.config.option", "some-value")
                .master("local[*]")
                .getOrCreate();


        //DataSet 是具有强类型的数据集合
        Dataset jdbcDF = spark.read()
                .format("jdbc")
                .option("url", "jdbc:mysql://10.0.173.220:3307/dtbk_rzt?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
                .option("dbtable", "(SELECt * FROM location_authorize) tmp")
                .option("user", "dtbk_dev_2")
                .option("password", "1qaz@WSX")
                .option("driver","com.mysql.jdbc.Driver")
                .load();

        jdbcDF.printSchema();
        jdbcDF.show();

        //转化为RDD
        JavaRDD rowJavaRDD = jdbcDF.javaRDD();
        System.out.println(rowJavaRDD.collect());

        spark.stop();
    }


    //spark 测试json
    public static void testSparkRddJson(){
        //1.环境准备
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkSQLTest3")
                .config("spark.driver.host", "localhost")
                .config("spark.some.config.option", "some-value")
                .master("local[*]")
                .getOrCreate();

        Dataset df = spark.read().json("D:\TEMP\testjson.json");
        df.printSchema();
        df.show();

        df.createOrReplaceTempView("t_person");
        spark.sql("select age,name from t_person where age > 3").show();

        spark.stop();
    }

}
三:总结

测试类中一共有写了四个测试方法,包含分析txt文件,csv文件,json数据处理,直连mysql数据库,方法都经过测试,可以正常打印结果,特别是jdbc 直连mysql可以直接写sql语句,很方便

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1036411.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号