栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spark写数据存入MySQL

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spark写数据存入MySQL

Spark写数据存入MySQL

依赖:

    
        
            org.apache.spark
            spark-core_2.12
            3.1.2
        
        
            org.apache.spark
            spark-sql_2.12
            3.1.2
        
        
            mysql
            mysql-connector-java
            8.0.25
        
    

PS:Spark依赖的版本注意要对应,否则可能出现类缺失的问题

导包:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.*;

java代码:

public class toMySQL {
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("spark_mysql");
        //创建一个SparkSession类型的spark对象
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //转成JavaSparkContext对象
        JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
        sc.setLogLevel("ERROR");

        //创建Properties对象
        Properties properties = new Properties();
        properties.setProperty("user", "root"); // 用户名
        properties.setProperty("password", "123456"); // 密码
        properties.setProperty("driver", "com.mysql.cj.jdbc.Driver");
        properties.setProperty("numPartitions","10");


        //创建数据
        JavaRDD rdd = sc.textFile("datas/1.txt");

        JavaRDD rdd1 = rdd.flatMap(new FlatMapFunction() {
            @Override
            public Iterator call(String s) throws Exception {
                String s1 = s.replace(",", "").replace(".", "").replace("?", "");
                return Arrays.asList(s1.split(" ")).iterator();
            }
        });

        JavaPairRDD rdd2 = rdd1.mapToPair(new PairFunction() {
            @Override
            public Tuple2 call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        JavaPairRDD rdd3 = rdd2.reduceByKey(new Function2() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        //将数据转化成Row对象形式,用以存入数据库
        JavaRDD rdd4 = rdd3.map(new Function, Row>() {
            @Override
            public Row call(Tuple2 ss) throws Exception {
                return RowFactory.create(ss._1, ss._2);
            }
        });

        //动态构造Dataframe元数据
        List structFields = new ArrayList();
        structFields.add(DataTypes.createStructField("word", DataTypes.StringType,true));
        structFields.add(DataTypes.createStructField("count",DataTypes.IntegerType,true));

        //构建StructType,用于最后Dataframe元数据的描述
        StructType scheme = DataTypes.createStructType(structFields);
        //创建临时视图
        Dataset dataframe = sparkSession.createDataframe(rdd4, scheme);
        //将数据写入对应数据库对应的表中
        dataframe.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/test","words1",properties);
        sparkSession.stop();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/318615.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号