栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【无标题】

【无标题】

    
        c3p0
        c3p0
        0.9.1.2

    

    
    
        mysql
        mysql-connector-java
        5.1.39
    

    
    
        org.apache.spark
        spark-sql_2.11
        ${spark.version}
    

    
    
        org.scalikejdbc
        scalikejdbc_2.11
        3.1.0
    
    
    
        org.scalikejdbc
        scalikejdbc-config_2.11
        3.1.0
    







    
        
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.5.1
            
        
    
    
        
            net.alchim31.maven
            scala-maven-plugin
            
                
                    scala-compile-first
                    process-resources
                    
                        add-source
                        compile
                    
                
                
                    scala-test-compile
                    process-test-resources
                    
                        testCompile
                    
                
            
        

        
            org.apache.maven.plugins
            maven-compiler-plugin
            
                
                    compile
                    
                        compile
                    
                
            
        

        
            org.apache.maven.plugins
            maven-shade-plugin
            2.4.3
            
                
                    package
                    
                        shade
                    
                    
                        
                            
                                *:*
                                
                                    meta-INF
object WordCountForeachRDD {
def main(args: Array[String]) {
//做单词计数
val sparkConf = new SparkConf().setAppName(“WordCountForeachRDD”).setMaster(“local[2]”)
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.socketTextStream(“localhost”, 8888)
val words = lines.flatMap(.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(
+ _)

//将结果保存到Mysql(一) 这句代码是不能运行的。
wordCounts.foreachRDD { (rdd, time) =>
//创建了数据库连接
//executed at the driver
Class.forName(“com.mysql.jdbc.Driver”)
val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
//statement 要从Driver通过网络发送过来
//序列化的事,statement不支持序列化。
//connection object not serializable)
rdd.foreach { record =>
//executed at the worker(Executor)
//遍历每一条数据,然后把数据插入数据库。
statement.setLong(1, time.milliseconds)
statement.setString(2, record._1)
statement.setInt(3, record._2)
statement.execute()
}
statement.close()
conn.close()
}
//启动Streaming处理流
ssc.start()

ssc.stop(false)

//将结果保存到Mysql(二) 可以的。
wordCounts.foreachRDD { (rdd, time) =>
//driver

rdd.foreach { record =>
//为每一条数据都创建了一个连接。
//连接使用完了以后就关闭。
//频繁的创建和关闭连接。其实对数据库性能影响很大。
//executor,worker
Class.forName(“com.mysql.jdbc.Driver”)
val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
statement.setLong(1, time.milliseconds)
statement.setString(2, record._1)
statement.setInt(3, record._2)
statement.execute()
statement.close()
conn.close()
}
}

//将结果保存到Mysql(三)
wordCounts.foreachRDD { (rdd, time) =>

rdd.foreachPartition { partitionRecords =>
//executor,Worker
//为每个partition的数据创建一个连接。
//比如这个partition里面有1000条数据,那么这1000条数据
//就共用一个连接。这样子的话,连接数就减少了1000倍了。
Class.forName(“com.mysql.jdbc.Driver”)
val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")

partitionRecords.foreach { case (word, count) =>
  statement.setLong(1, time.milliseconds)
  statement.setString(2, word)
  statement.setInt(3, count)
  statement.execute()
}
statement.close()
conn.close()

}
}

//将结果保存到Mysql(四)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
//使用连接池,我们连接就可以复用
//性能就更好了。
val conn = ConnectionPool.getConnection
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")

partitionRecords.foreach { case (word, count) =>
  //缺点: 还是一条数据一条数据插入
  //每条数据插入都需要跟MySQL进行通信。
  statement.setLong(1, time.milliseconds)
  statement.setString(2, word)
  statement.setInt(3, count)
  statement.execute()
}
statement.close()
//使用完了以后,把连接还回去
ConnectionPool.returnConnection(conn)

}
}

//将结果保存到Mysql(五)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.foreach { case (word, count) =>
//使用了批处理。性能就更好了。
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
}
statement.executeBatch()
statement.close()
ConnectionPool.returnConnection(conn)
}
}

//将结果保存到Mysql(六)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
//自动提交的事务关闭
conn.setAutoCommit(false)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.foreach { case (word, count) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
}
statement.executeBatch()
statement.close()
//提交了一个批次以后,我们手动提交事务。
conn.commit()
conn.setAutoCommit(true)
ConnectionPool.returnConnection(conn)
}
}

//将结果保存到Mysql(七)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
conn.setAutoCommit(false)
//500 mysql
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
//批处理的时候,我们可以决定多少条数据为一个批次
//我们这儿设置的是500条。
if (index != 0 && index % 500 == 0) {
statement.executeBatch()
conn.commit()
}
}
statement.executeBatch()
statement.close()
conn.commit()
conn.setAutoCommit(true)
ConnectionPool.returnConnection(conn)
}
}

//等待Streaming程序终止
ssc.awaitTermination()
}

}
package output;

import com.mchange.v2.c3p0.ComboPooledDataSource;

import java.sql.Connection;
import java.sql.SQLException;

public class ConnectionPool {
private static ComboPooledDataSource dataSource = new ComboPooledDataSource();
static {
dataSource.setJdbcUrl(“jdbc:mysql://hadoop:3306/test”);//设置连接数据库的URL

    dataSource.setUser("root");//设置连接数据库的用户名

    dataSource.setPassword("root");//设置连接数据库的密码

    dataSource.setMaxPoolSize(40);//设置连接池的最大连接数

    dataSource.setMinPoolSize(2);//设置连接池的最小连接数

    dataSource.setInitialPoolSize(10);//设置连接池的初始连接数

    dataSource.setMaxStatements(100);//设置连接池的缓存Statement的最大数
}

public static Connection getConnection() {
    try {
        return dataSource.getConnection();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

public static void returnConnection(Connection connection) {
    if (connection != null) {
        try {
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758577.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号