依赖:
org.apache.spark spark-core_2.12 3.1.2 org.apache.spark spark-sql_2.12 3.1.2 mysql mysql-connector-java 8.0.25
PS:Spark依赖的版本注意要对应,否则可能出现类缺失的问题
导包:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import java.util.*;
java代码:
public class toMySQL {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("spark_mysql");
//创建一个SparkSession类型的spark对象
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
//转成JavaSparkContext对象
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
sc.setLogLevel("ERROR");
//创建Properties对象
Properties properties = new Properties();
properties.setProperty("user", "root"); // 用户名
properties.setProperty("password", "123456"); // 密码
properties.setProperty("driver", "com.mysql.cj.jdbc.Driver");
properties.setProperty("numPartitions","10");
//创建数据
JavaRDD rdd = sc.textFile("datas/1.txt");
JavaRDD rdd1 = rdd.flatMap(new FlatMapFunction() {
@Override
public Iterator call(String s) throws Exception {
String s1 = s.replace(",", "").replace(".", "").replace("?", "");
return Arrays.asList(s1.split(" ")).iterator();
}
});
JavaPairRDD rdd2 = rdd1.mapToPair(new PairFunction() {
@Override
public Tuple2 call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairRDD rdd3 = rdd2.reduceByKey(new Function2() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//将数据转化成Row对象形式,用以存入数据库
JavaRDD rdd4 = rdd3.map(new Function, Row>() {
@Override
public Row call(Tuple2 ss) throws Exception {
return RowFactory.create(ss._1, ss._2);
}
});
//动态构造Dataframe元数据
List structFields = new ArrayList();
structFields.add(DataTypes.createStructField("word", DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("count",DataTypes.IntegerType,true));
//构建StructType,用于最后Dataframe元数据的描述
StructType scheme = DataTypes.createStructType(structFields);
//创建临时视图
Dataset dataframe = sparkSession.createDataframe(rdd4, scheme);
//将数据写入对应数据库对应的表中
dataframe.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/test","words1",properties);
sparkSession.stop();
}
}



