c3p0 c3p00.9.1.2 mysql mysql-connector-java5.1.39 org.apache.spark spark-sql_2.11${spark.version} org.scalikejdbc scalikejdbc_2.113.1.0 org.scalikejdbc scalikejdbc-config_2.113.1.0 net.alchim31.maven scala-maven-plugin3.2.2 org.apache.maven.plugins maven-compiler-plugin3.5.1 net.alchim31.maven scala-maven-pluginscala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile org.apache.maven.plugins maven-compiler-plugincompile compile org.apache.maven.plugins maven-shade-plugin2.4.3 package shade *:* meta-INF
object WordCountForeachRDD {
def main(args: Array[String]) {
//做单词计数
val sparkConf = new SparkConf().setAppName(“WordCountForeachRDD”).setMaster(“local[2]”)
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.socketTextStream(“localhost”, 8888)
val words = lines.flatMap(.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey( + _)//将结果保存到Mysql(一) 这句代码是不能运行的。
wordCounts.foreachRDD { (rdd, time) =>
//创建了数据库连接
//executed at the driver
Class.forName(“com.mysql.jdbc.Driver”)
val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
//statement 要从Driver通过网络发送过来
//序列化的事,statement不支持序列化。
//connection object not serializable)
rdd.foreach { record =>
//executed at the worker(Executor)
//遍历每一条数据,然后把数据插入数据库。
statement.setLong(1, time.milliseconds)
statement.setString(2, record._1)
statement.setInt(3, record._2)
statement.execute()
}
statement.close()
conn.close()
}
//启动Streaming处理流
ssc.start()ssc.stop(false)
//将结果保存到Mysql(二) 可以的。
wordCounts.foreachRDD { (rdd, time) =>
//driverrdd.foreach { record =>
//为每一条数据都创建了一个连接。
//连接使用完了以后就关闭。
//频繁的创建和关闭连接。其实对数据库性能影响很大。
//executor,worker
Class.forName(“com.mysql.jdbc.Driver”)
val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
statement.setLong(1, time.milliseconds)
statement.setString(2, record._1)
statement.setInt(3, record._2)
statement.execute()
statement.close()
conn.close()
}
}//将结果保存到Mysql(三)
wordCounts.foreachRDD { (rdd, time) =>rdd.foreachPartition { partitionRecords =>
//executor,Worker
//为每个partition的数据创建一个连接。
//比如这个partition里面有1000条数据,那么这1000条数据
//就共用一个连接。这样子的话,连接数就减少了1000倍了。
Class.forName(“com.mysql.jdbc.Driver”)
val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")partitionRecords.foreach { case (word, count) => statement.setLong(1, time.milliseconds) statement.setString(2, word) statement.setInt(3, count) statement.execute() } statement.close() conn.close()}
}//将结果保存到Mysql(四)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
//使用连接池,我们连接就可以复用
//性能就更好了。
val conn = ConnectionPool.getConnection
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")partitionRecords.foreach { case (word, count) => //缺点: 还是一条数据一条数据插入 //每条数据插入都需要跟MySQL进行通信。 statement.setLong(1, time.milliseconds) statement.setString(2, word) statement.setInt(3, count) statement.execute() } statement.close() //使用完了以后,把连接还回去 ConnectionPool.returnConnection(conn)}
}//将结果保存到Mysql(五)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.foreach { case (word, count) =>
//使用了批处理。性能就更好了。
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
}
statement.executeBatch()
statement.close()
ConnectionPool.returnConnection(conn)
}
}//将结果保存到Mysql(六)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
//自动提交的事务关闭
conn.setAutoCommit(false)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.foreach { case (word, count) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
}
statement.executeBatch()
statement.close()
//提交了一个批次以后,我们手动提交事务。
conn.commit()
conn.setAutoCommit(true)
ConnectionPool.returnConnection(conn)
}
}//将结果保存到Mysql(七)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
conn.setAutoCommit(false)
//500 mysql
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
//批处理的时候,我们可以决定多少条数据为一个批次
//我们这儿设置的是500条。
if (index != 0 && index % 500 == 0) {
statement.executeBatch()
conn.commit()
}
}
statement.executeBatch()
statement.close()
conn.commit()
conn.setAutoCommit(true)
ConnectionPool.returnConnection(conn)
}
}//等待Streaming程序终止
ssc.awaitTermination()
}}
package output;import com.mchange.v2.c3p0.ComboPooledDataSource;
import java.sql.Connection;
import java.sql.SQLException;public class ConnectionPool {
private static ComboPooledDataSource dataSource = new ComboPooledDataSource();
static {
dataSource.setJdbcUrl(“jdbc:mysql://hadoop:3306/test”);//设置连接数据库的URLdataSource.setUser("root");//设置连接数据库的用户名 dataSource.setPassword("root");//设置连接数据库的密码 dataSource.setMaxPoolSize(40);//设置连接池的最大连接数 dataSource.setMinPoolSize(2);//设置连接池的最小连接数 dataSource.setInitialPoolSize(10);//设置连接池的初始连接数 dataSource.setMaxStatements(100);//设置连接池的缓存Statement的最大数 } public static Connection getConnection() { try { return dataSource.getConnection(); } catch (SQLException e) { e.printStackTrace(); } return null; } public static void returnConnection(Connection connection) { if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } }}



