spark 概念及linux 本地模式部署请点这里
一:配置文件pom.xml二:测试类4.0.0 org.example zymTest1.0-SNAPSHOT org.apache.spark spark-sql_2.133.2.0 mysql mysql-connector-java6.0.6 8 8
import org.apache.hadoop.shaded.org.eclipse.jetty.websocket.common.frames.DataFrame;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
//testSparkRddTxt();
//testSparkRddCsv();
//testSparkRddMysql();
testSparkRddJson();
}
//spark 测试外部文件(txt)
//rdd:resilient distributed dataset ,弹性分布式数据集
public static void testSparkRddTxt(){
//1.环境准备
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.driver.host","localhost");
//sparkConf.set("SPARK_LOCAL_HOSTNAME","localhost");
sparkConf.setAppName("JavaSparkDemo").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel("WARN");
//2.处理数据
JavaRDD fileRDD = jsc.textFile("D:\TEMP\word.txt");
JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD wordAndOneRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD wordAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);
//3.输出结果
List> result = wordAndCountRDD.collect();
result.forEach(System.out::println);
//4.关闭资源
jsc.stop();
}
//spark 测试外部文件(csv)
public static void testSparkRddCsv(){
//1.环境准备
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.driver.host","localhost");
sparkConf.setAppName("JavaSparkDemo").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel("WARN");
//2.读取外部文件创建rdd,以字符串读取
JavaRDD fileRDD = jsc.textFile("D:\TEMP\testcsv.csv");
//3.把文件内容使用,分割
JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
JavaPairRDD wordAndOneRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD wordAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);
System.out.println(wordAndCountRDD.collect());
jsc.stop();
}
//spark 操作mysql数据库
//添加依赖:mysql-connector-java
public static void testSparkRddMysql(){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest3")
.config("spark.driver.host", "localhost")
.config("spark.some.config.option", "some-value")
.master("local[*]")
.getOrCreate();
//DataSet 是具有强类型的数据集合
Dataset jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://10.0.173.220:3307/dtbk_rzt?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
.option("dbtable", "(SELECt * FROM location_authorize) tmp")
.option("user", "dtbk_dev_2")
.option("password", "1qaz@WSX")
.option("driver","com.mysql.jdbc.Driver")
.load();
jdbcDF.printSchema();
jdbcDF.show();
//转化为RDD
JavaRDD rowJavaRDD = jdbcDF.javaRDD();
System.out.println(rowJavaRDD.collect());
spark.stop();
}
//spark 测试json
public static void testSparkRddJson(){
//1.环境准备
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest3")
.config("spark.driver.host", "localhost")
.config("spark.some.config.option", "some-value")
.master("local[*]")
.getOrCreate();
Dataset df = spark.read().json("D:\TEMP\testjson.json");
df.printSchema();
df.show();
df.createOrReplaceTempView("t_person");
spark.sql("select age,name from t_person where age > 3").show();
spark.stop();
}
}
三:总结
测试类中一共有写了四个测试方法,包含分析txt文件,csv文件,json数据处理,直连mysql数据库,方法都经过测试,可以正常打印结果,特别是jdbc 直连mysql可以直接写sql语句,很方便



