package TEST
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util
import java.util.Properties
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
object PostgresqlTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("PostgresqlTest")
.config("spark.ui.showConsoleProgress", "true")
.config("spark.shuffle.reduceLocality.enabled", "false")
.enableHiveSupport()
.getOrCreate()
//造数据
val dataList = new util.ArrayList[Row]()
dataList.add(Row("1", "徐国胜1", "22"))
dataList.add(Row("2", "邱良东2", "22"))
val schema = StructType(List(StructField("id", StringType, true), StructField("name", StringType, true), StructField("age", StringType, true)))
val DF: Dataframe = spark.createDataframe(dataList, schema)
//Dataframe写入PG库
WriteDfToPG(DF, "testdb", "bigdata.student")
val dataDF: Dataframe = QueryPG(spark, "testdb", "bigdata.student")
//按条件筛选,删除数据库中的数据;如果需要删除全表,可以把删选条件写成 1=1
DeleteFromPG("testdb", "bigdata.student", "name='邱良东'")
}
def QueryPG(spark: SparkSession, databaseName: String, tableName: String): Dataframe = {
val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName"
val prop: Properties = new Properties()
prop.put("user", "postgres")
prop.put("password", "bm@123")
prop.put("driver", "org.postgresql.Driver")
spark.read.jdbc(url, tableName, prop)
}
def WriteDfToPG(df: Dataframe, databaseName: String, tableName: String): Unit = {
val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName"
val prop: Properties = new Properties()
prop.put("user", "postgres")
prop.put("password", "bm@123")
prop.put("driver", "org.postgresql.Driver")
df.write.mode("Append").jdbc(url, tableName, prop)
}
def DeleteFromPG(databaseName: String, tableName: String, condition: String): Unit = {
Class.forName("org.postgresql.Driver")
val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName"
val prop: Properties = new Properties()
prop.put("user", "postgres")
prop.put("password", "bm@123")
prop.put("driver", "org.postgresql.Driver")
val connection: Connection = DriverManager.getConnection(url, prop)
val delSql = s"delete from $tableName where $condition"
val delPS: PreparedStatement = connection.prepareStatement(delSql)
delPS.execute()
delPS.close()
println("delete query :" + delSql)
}
}