- 1.针对问题
- 2.可能场景
- 3.达到效果
- 4.解决方案
- 4.1 config/druid.properties
- 4.2 多线程查表拆分
- 4.3 多线程读文件处理去重
- 5.处理速度
1.针对问题
针对以下四个问题进行处理
- 海量数据
- 单独表
- 多唯一性字段
- 重复数据和垃圾数据多
- 老表没有设置多余的唯一性字段
- 进行表数据迁移
- 高效
- 准确
- 无线程安全问题
# mysql driverClassName:com.mysql.cj.jdbc.Driver driverClassName:oracle.jdbc.OracleDriver # mysql url:jdbc:mysql:thin:@@host:port:数据库名字 url:jdbc:oracle:thin:@host:port:数据库名字 password: # dbSource: username: maxActive:5 initialSize:5 maxWait:600000 minIdle:1 #maxIdle:15 timeBetweenEvictionRunsMillis:60000 minEvictableIdleTimeMillis:30000 validationQuery:SELECt 1 from dual testWhileIdle:true testOnBorrow:false testOnReturn:false #poolPreparedStatements:true maxOpenPreparedStatements:1004.2 多线程查表拆分
package service.jmw;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class HashSplitTool implements Runnable {
public static void main(String[] args) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("Sql", "查询sql");
jsonObject.put("FileInput", "文件输出位置");
jsonObject.put("FileOutput", "");
try {
HashSplitTool.init();
} catch (Exception e) {
e.printStackTrace();
}
ThreadPoolExecutor threadPoolExecutor = HashSplitTool.poolExecutor;
try {
threadPoolExecutor.execute(new HashSplitTool(jsonObject));
threadPoolExecutor.execute(new HashSplitTool(jsonObject));
threadPoolExecutor.execute(new HashSplitTool(jsonObject));
// threadPoolExecutor.execute(new HashSplitTool(jsonObject));
// threadPoolExecutor.execute(new HashSplitTool(jsonObject));
// threadPoolExecutor.execute(new HashSplitTool(jsonObject));
// threadPoolExecutor.execute(new HashSplitTool(jsonObject));
// threadPoolExecutor.execute(new HashSplitTool(jsonObject));
// threadPoolExecutor.execute(new HashSplitTool(jsonObject));
// threadPoolExecutor.execute(new HashSplitTool(jsonObject));
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
private static Logger LOGGER = LoggerFactory.getLogger(HashSplitTool.class);
private static DruidDataSource druidDataSource;
public static void init() throws Exception {
try {
Properties properties = loadPropertiesFile("config/druid.properties");
LOGGER.info("【DBCon.properties】读取文件成功");
druidDataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(properties);
String password = properties.getProperty("password");
} catch (Exception e) {
LOGGER.info("【DBCon.properties】获取配置失败");
LOGGER.info(e.getMessage());
}
}
private static Properties loadPropertiesFile(String fullFile) {
if ((fullFile == null) || (fullFile.equals(""))) {
throw new IllegalArgumentException("Properties file path can not be null:" + fullFile);
}
InputStream inputStream = null;
Properties p = null;
try {
inputStream = new FileInputStream(new File(fullFile));
p = new Properties();
p.load(inputStream);
} catch (Exception e) {
e.printStackTrace();
try {
if (inputStream != null)
inputStream.close();
} catch (Exception e1) {
e1.printStackTrace();
}
} finally {
try {
if (inputStream != null)
inputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
return p;
}
public static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
10,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
private static JSONObject jsonObject = null;
public HashSplitTool(JSONObject jsonObject) throws Exception {
this.jsonObject = jsonObject;
init();
}
@Override
public void run() {
long start = 0;
Map> map = new HashMap<>();
ResultSet resultSet = null;
int count = 0;
// DruidPooledConnection connection = null;
// PreparedStatement preparedStatement = null;
if (jsonObject.size() != 0) {
String sql = jsonObject.getString("Sql");
String fileInput = jsonObject.getString("FileInput");
String fileOutput = jsonObject.getString("FileOutput");
try (DruidPooledConnection connection = druidDataSource.getConnection(2000);
PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.CLOSE_CURSORS_AT_COMMIT);) {
connection.setAutoCommit(false);
resultSet = preparedStatement.executeQuery();
start = System.currentTimeMillis();
while (resultSet.next()) {
//TODO 去重字段有几个就写几个
String ob_object_id = resultSet.getString("OB_OBJECT_ID");
String A = resultSet.getString("字段A");
String B = resultSet.getString("字段B");
String C = resultSet.getString("字段C");
MDFive instance = MDFive.getInstance();
String md5 = instance.getMD5(A + B + C);
int absHash = Math.abs(md5.hashCode());
int partition = absHash % 64;
if (map.containsKey(partition)) {
Set ids = map.get(partition);
ids.add(ob_object_id + "#" + md5);
if (ids.size() == 1000) {
count = count + 1000;
try {
writeFile(fileInput, partition + "桶.txt", ids);
} catch (IOException e) {
e.printStackTrace();
}
ids.clear();
}
} else {
Set ids = new HashSet<>();
ids.add(ob_object_id + "#" + md5);
map.put(partition, ids);
}
}
connection.commit();
close(connection, preparedStatement);
System.out.println("内存数据清空");
for (Map.Entry entrys :
map.entrySet()) {
Integer partition = (Integer) entrys.getKey();
Set ids = (Set) entrys.getValue();
count = count + ids.size();
try {
writeFile(fileInput, partition + "桶.txt", ids);
} catch (IOException e) {
e.printStackTrace();
}
ids.clear();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
System.out.println("数量为:"+count);
long end = System.currentTimeMillis();
System.out.println((end - start) / 1000.0 + "s");
}
private static void writeFile(String path, String fileName, Set ids) throws IOException {
File file1 = new File(path);
if (!file1.exists()) {
file1.mkdirs();
}
File file = new File(path + File.separator + fileName);
BufferedWriter f = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true), "UTF-8"));
try {
for (String id : ids) {
f.write(id);
f.write("n");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
f.flush();
f.close();
}
}
public static void close(DruidPooledConnection connection, PreparedStatement preparedStatement) {
try {
connection.recycle();
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
static class MDFive {
private final int A = 0x67452301;
private final int B = 0xefcdab89;
private final int C = 0x98badcfe;
private final int D = 0x10325476;
private int Atemp, Btemp, Ctemp, Dtemp;
private final int K[] = {
0xd76aa478, 0xe8c7b756, 0x242070db, 0xc1bdceee,
0xf57c0faf, 0x4787c62a, 0xa8304613, 0xfd469501, 0x698098d8,
0x8b44f7af, 0xffff5bb1, 0x895cd7be, 0x6b901122, 0xfd987193,
0xa679438e, 0x49b40821, 0xf61e2562, 0xc040b340, 0x265e5a51,
0xe9b6c7aa, 0xd62f105d, 0x02441453, 0xd8a1e681, 0xe7d3fbc8,
0x21e1cde6, 0xc33707d6, 0xf4d50d87, 0x455a14ed, 0xa9e3e905,
0xfcefa3f8, 0x676f02d9, 0x8d2a4c8a, 0xfffa3942, 0x8771f681,
0x6d9d6122, 0xfde5380c, 0xa4beea44, 0x4bdecfa9, 0xf6bb4b60,
0xbebfbc70, 0x289b7ec6, 0xeaa127fa, 0xd4ef3085, 0x04881d05,
0xd9d4d039, 0xe6db99e5, 0x1fa27cf8, 0xc4ac5665, 0xf4292244,
0x432aff97, 0xab9423a7, 0xfc93a039, 0x655b59c3, 0x8f0ccc92,
0xffeff47d, 0x85845dd1, 0x6fa87e4f, 0xfe2ce6e0, 0xa3014314,
0x4e0811a1, 0xf7537e82, 0xbd3af235, 0x2ad7d2bb, 0xeb86d391};
private final int s[] = {7, 12, 17, 22, 7, 12, 17, 22, 7, 12, 17, 22, 7,
12, 17, 22, 5, 9, 14, 20, 5, 9, 14, 20, 5, 9, 14, 20, 5, 9, 14, 20,
4, 11, 16, 23, 4, 11, 16, 23, 4, 11, 16, 23, 4, 11, 16, 23, 6, 10,
15, 21, 6, 10, 15, 21, 6, 10, 15, 21, 6, 10, 15, 21};
private void init() {
Atemp = A;
Btemp = B;
Ctemp = C;
Dtemp = D;
}
private int shift(int a, int s) {
return (a << s) | (a >>> (32 - s));//右移的时候,高位一定要补零,而不是补充符号位
}
private void MainLoop(int M[]) {
int F, g;
int a = Atemp;
int b = Btemp;
int c = Ctemp;
int d = Dtemp;
for (int i = 0; i < 64; i++) {
if (i < 16) {
F = (b & c) | ((~b) & d);
g = i;
} else if (i < 32) {
F = (d & b) | ((~d) & c);
g = (5 * i + 1) % 16;
} else if (i < 48) {
F = b ^ c ^ d;
g = (3 * i + 5) % 16;
} else {
F = c ^ (b | (~d));
g = (7 * i) % 16;
}
int tmp = d;
d = c;
c = b;
b = b + shift(a + F + K[i] + M[g], s[i]);
a = tmp;
}
Atemp = a + Atemp;
Btemp = b + Btemp;
Ctemp = c + Ctemp;
Dtemp = d + Dtemp;
}
private int[] add(String str) {
int num = ((str.length() + 8) / 64) + 1;//以512位,64个字节为一组
int strByte[] = new int[num * 16];//64/4=16,所以有16个整数
for (int i = 0; i < num * 16; i++) {//全部初始化0
strByte[i] = 0;
}
int i;
for (i = 0; i < str.length(); i++) {
strByte[i >> 2] |= str.charAt(i) << ((i % 4) * 8);//一个整数存储四个字节,小端序
}
strByte[i >> 2] |= 0x80 << ((i % 4) * 8);//尾部添加1
strByte[num * 16 - 2] = str.length() * 8;
return strByte;
}
public String getMD5(String source) {
init();
int strByte[] = add(source);
for (int i = 0; i < strByte.length / 16; i++) {
int num[] = new int[16];
for (int j = 0; j < 16; j++) {
num[j] = strByte[i * 16 + j];
}
MainLoop(num);
}
return changeHex(Atemp) + changeHex(Btemp) + changeHex(Ctemp) + changeHex(Dtemp);
}
private String changeHex(int a) {
String str = "";
for (int i = 0; i < 4; i++) {
str += String.format("%2s", Integer.toHexString(((a >> i * 8) % (1 << 8)) & 0xff)).replace(' ', '0');
}
return str;
}
private static MDFive instance;
public static MDFive getInstance() {
if (instance == null) {
instance = new MDFive();
}
return instance;
}
private MDFive() {
}
;
}
}
4.3 多线程读文件处理去重
package service.jmw.weektwo;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import service.jmw.HashSplitTool;
import java.io.*;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
public class DuplicateRemoval implements Runnable {
public static void main(String[] args) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("FileInput", "读取目录");
jsonObject.put("FileOutput", "去重后存放目录");
ThreadPoolExecutor threadPoolExecutor = HashSplitTool.poolExecutor;
try{
threadPoolExecutor.execute(new DuplicateRemoval(jsonObject));
}catch (Exception e){
e.printStackTrace();
}finally {
threadPoolExecutor.shutdown();
}
}
JSONObject jsonObject;
public DuplicateRemoval(JSONObject jsonObject) {
this.jsonObject = jsonObject;
}
@Override
public void run(){
double countTime = 0;
if (jsonObject.size() != 0) {
String fileInput = jsonObject.getString("FileInput");
String fileOutput = jsonObject.getString("FileOutput");
Set set = new HashSet();
File folder = new File(fileInput);
File[] files = folder.listFiles();
for (File file :
files) {
StringBuilder stringBuilder = new StringBuilder();
if (!file.exists()) {
continue;
}
List list = null;
try {
list = IOUtils.readLines(new FileInputStream(file), "UTF-8");
} catch (IOException e) {
e.printStackTrace();
}
long start = System.currentTimeMillis();
if (list!=null && list.size()!=0){
for (String line: list) {
if (StringUtils.isEmpty(line)) {
continue;
}
String[] split = line.split("#");
if (split.length!=2){
continue;
}else {
if (set.contains(split[1])) {
stringBuilder.append(split[0]+"n");
} else {
set.add(split[1]);
}
}
if (stringBuilder.length()>1024 * 1024) {
try {
writeFile(fileOutput, "objectId.txt", stringBuilder.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
if (stringBuilder.length()!=0) {
try {
writeFile(fileOutput, "objectId.txt", stringBuilder.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
set.clear();
long end = System.currentTimeMillis();
System.out.println((end - start) / 1000.0 + "s");
countTime = countTime + (end - start) / 1000.0;
stringBuilder.delete(0,stringBuilder.length());
}
}
System.out.println(countTime+ "s");
}
private void writeFile(String fileOutput, String s, String stringBuilder) throws IOException {
File file = new File(fileOutput+s);
BufferedWriter f = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true), "UTF-8"));
if (!file.exists()) {
file.mkdirs();
}
try {
f.write(stringBuilder);
} catch (IOException e) {
e.printStackTrace();
} finally {
f.flush();
f.close();
}
}
}
5.处理速度
大约一秒 2W数据,当然前提是大量数据去拆分和去重,在开10个线程的情况下大概20分钟-30分钟可以处理完2.5亿的去重量。
这里有很多可以优化效率和空间的地方,希望大家指出,如有错误欢迎指正



