import com.alibaba.fastjson.JSONObject;
import com.mx.config.mysql.MysqlConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.JdbcRDD;
import scala.reflect.ClassManifestFactory$;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import java.io.Serializable;
import java.sql.*;
import java.util.Properties;
public class MysqlUtils implements Serializable{
private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(MysqlUtils.class);
static String MYSQL_DRIVER;
static String MYSQL_CONNECTION_URL;
static String MYSQL_USERNAME;
static String MYSQL_PWD;
private static JavaSparkContext sc = null;
public MysqlUtils(JavaSparkContext sc) {
MysqlUtils.sc = sc;
MYSQL_DRIVER = MysqlConfig.MYSQL_DRIVER;
MYSQL_CONNECTION_URL = MysqlConfig.MYSQL_CONNECTION_URL;
MYSQL_USERNAME = MysqlConfig.MYSQL_USERNAME;
MYSQL_PWD = MysqlConfig.MYSQL_PWD;
}
public JavaRDD getData(String sql) {
DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD);
JdbcRDD jdbcRDD = new JdbcRDD<>(sc.sc(), dbConnection, sql, 0, 10, 2, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(JSONObject.class));
return JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(JSONObject.class));
}
public Integer insert(String sql) {
int autoInckey = -1;
try {
DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD);
Connection conn = dbConnection.apply();
PreparedStatement pstmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
pstmt.executeUpdate();
ResultSet rs = pstmt.getGeneratedKeys();
if (rs.next()) {
autoInckey = rs.getInt(1);
} else {
return autoInckey;
}
rs.close();
pstmt.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
return autoInckey;
}
public Boolean insertBak(String sql) {
boolean isSuccess = false;
try {
DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD);
Connection conn = dbConnection.apply();
Statement stmt = conn.createStatement();
isSuccess = stmt.execute(sql);
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
return isSuccess;
}
public Boolean update(String updateRunningInfoSql) {
boolean isSuccess = false;
try {
DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD);
Connection conn = dbConnection.apply();
Statement stmt = conn.createStatement();
isSuccess = stmt.execute(updateRunningInfoSql);
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
return isSuccess;
}
static class DbConnection extends AbstractFunction0 implements Serializable {
private final String driverClassName;
private final String connectionUrl;
private final String userName;
private final String password;
public DbConnection(String driverClassName, String connectionUrl, String userName, String password) {
this.driverClassName = driverClassName;
this.connectionUrl = connectionUrl;
this.userName = userName;
this.password = password;
}
@Override
public Connection apply() {
try {
Class.forName(driverClassName);
} catch (ClassNotFoundException e) {
LOGGER.error("Failed to load driver class", e);
}
Properties properties = new Properties();
properties.setProperty("user", userName);
properties.setProperty("password", password);
Connection connection = null;
try {
connection = DriverManager.getConnection(connectionUrl, properties);
} catch (SQLException e) {
LOGGER.error("Connection failed", e);
}
return connection;
}
}
static class MapResult extends AbstractFunction1 implements Serializable {
@Override
public JSONObject apply(ResultSet resultSet) {
ResultSetmetaData metaData;
JSONObject jsonObj = new JSONObject();
try {
metaData = resultSet.getmetaData();
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnLabel(i);
String value = resultSet.getString(columnName);
jsonObj.put(columnName, value);
}
} catch (SQLException e) {
e.printStackTrace();
}
return jsonObj;
}
}
}