运行类
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
public class KmeansRun {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
long start = System.currentTimeMillis();//开始时间
String dataName = args[0];
int k = Integer.parseInt(args[1]);
// int k = 3;
// String dataName = "iris.txt";//文件的输入路径
String outName ="result.txt";
String outPath = DataUtil.HDFS_OUTPUT + "/" + outName;;//文件的输出路径
String dataPath = DataUtil.HDFS_INPUT + "/" + dataName;
double dis ;//旧簇与新簇的差
ArrayList> data;
ArrayList> center;//簇中心
ArrayList> newcenter ;//新的簇中心
//读取数据
data = DataUtil.readCenter(dataPath);
//计算 center
center = CenterRandom.centerRandomChoice(data,k);
//带有簇标记的data
ArrayList dataindex ;
//循环计算新的中心点,直到中心点不再改变
for (int i = 0; i < 1000; i++) {
System.out.println("--------------------------"+i+"----------------------------------");
//分配 data
dataindex = DistributionData.distributData(data,center);
//计算新的 center
newcenter = CalCenters.calCenters(dataindex,k);
// System.out.println(i+"-------------dataindex--------------------");
// for (int p = 0; p < dataindex.size(); p++) {
// System.out.println(dataindex.get(p).getIndex()+" "+dataindex.get(p).getData());
// }
// System.out.println("-----------------newcenter------------------");
// for (int j = 0; j < newcenter.size(); j++) {
// System.out.println(newcenter.get(j));
// }
//System.out.println(newcenter);
//计算新簇中心与旧簇中心的差
dis = CalUtil.calDistanceBetweenCenters(newcenter,center);
System.out.println("------------------------dis----------------------------");
System.out.println(dis);
//如果差为0,退出循环
if(dis ==0){
break;
}
//保存新簇,下个循环新簇就变成旧簇了
center = newcenter;
}
// System.out.println(newcenter);
// System.out.println("最后的结果");
// for (int i = 0; i < dataindex.size(); i++) {
// System.out.println(dataindex.get(i).getIndex()+" "+dataindex.get(i).getData());
// }
// System.out.println("dataindex.size==================="+dataindex.size());
//最终的聚类结果
dataindex = DistributionData.distributData(data,center);
writeOut.writeout(outPath,dataindex);
long end = System.currentTimeMillis();//结束时间
System.out.println("共耗时"+(end-start)+"毫秒");//耗时
}
}
创建带有簇标签的数据存储方式
import java.util.ArrayList;
//带有簇标记的数据
public class DataWithIndex {
static class dataWithIndex{
ArrayList data;//数据
int index;//簇标记
public ArrayList getData() {
return data;
}
public int getIndex() {
return index;
}
public void setData(ArrayList data) {
this.data = data;
}
public void setIndex(int index) {
this.index = index;
}
}
public DataWithIndex() {
}
}
选取初始的随机中心类
import java.util.ArrayList;
import java.util.Random;
public class CenterRandom {
//随机生成k个中心点
public static ArrayList> centerRandomChoice(ArrayList> data, int k){
ArrayList> center = new ArrayList<>();
int elementsSize = data.size();
int rm;
int j;
Random random = new Random();
for (int i = 0; i < k; i++) {
rm = random.nextInt();
j = Math.abs(rm % elementsSize);
center.add(data.get(j));
}
return center;
}
}
计算新的簇中心类
import java.util.ArrayList;
public class CalCenters {
//首先计算簇标记为0的簇中心,然后计算簇标记为1,2,,,k
public static ArrayList> calCenters(ArrayList dataindex,int k){
ArrayList> center = new ArrayList<>();
ArrayList add ;
for (int n = 0; n < k; n++) {
ArrayList centerone = new ArrayList<>();
int count = 0;//计数,计算该类中有多少个点
//遍历dataindex,如果centerindex里没有簇标记,就添加,否则就更新centerindex.data
for (DataWithIndex.dataWithIndex one : dataindex) {
if (one.getIndex() == n) {
if (centerone.size() == 0) {
centerone.addAll(one.getData());
//centerone = one.getData();
count++;
} else {
add = CalUtil.addElement(centerone, one.getData());
centerone = add;
count++;
}
}
}
for (int i = 0; i < centerone.size(); i++) {
centerone.set(i,centerone.get(i)/count);
}
center.add(centerone);
}
return center;
}
}
计算新的簇标签
import java.util.ArrayList;
public class DistributionData {
//计算数据属于哪个簇
public static ArrayList distributData(ArrayList> data, ArrayList> center){
ArrayList dataindex = new ArrayList<>();
double dis;
int index = 0;
for (int i = 0; i < data.size(); i++) {
DataWithIndex.dataWithIndex onedataindex = new DataWithIndex.dataWithIndex();
double min = 10000.0;
for (int j = 0; j < center.size(); j++) {
dis = CalUtil.calDistance(data.get(i),center.get(j));
if(dis
各种数值计算类
import java.util.ArrayList;
// 计算工具类,两值距离,选择最近中心点等
public class CalUtil {
// 计算两向量距离,欧式
public static double calDistance(ArrayList element1, ArrayList element2){
double disSum = 0;
for(int i=0;i addElement(ArrayList element1, ArrayList element2){
for(int i=0;ioldCenter, ArrayList>newCenter){
// 因为data的读入顺序相同,所以最终收敛时聚类中心的顺序也相同
// 只要遍历计算距离即可,不用考虑中心点本身顺序
if(oldCenter.size() > newCenter.size())
return 1000;
double sum = 0;
for(int i=0;i
文件读取类(我这里是直接粘贴的hdfs上的读取方式,一行一行的读)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
import java.io.IOException;
import java.util.ArrayList;
// 实现hdfs中中心点文件的读取和删除
// 中心点文件放在output中
public class DataUtil {
public static final String HDFS_INPUT = "hdfs://hadoop102:8020/input"; // input地址
public static final String HDFS_OUTPUT = "hdfs://hadoop102:8020/output"; // output地址
// 切割字符串成double数组集合
public static ArrayList splitStringIntoArray(String line) {
ArrayList center = new ArrayList();
String[] lineContextArry = line.split("t");
for (String s : lineContextArry) {
Double c = Double.parseDouble(s);
center.add(c);
}
return center;
}
// 获取文件对应的hdfs系统下的linereader
private static LineReader getLineReader(String filePath) throws IOException {
Path path = new Path(filePath);
Configuration conf = new Configuration();
FileSystem fileSystem = path.getFileSystem(conf);
FSDataInputStream fsdis = fileSystem.open(path);
LineReader lineReader = new LineReader(fsdis, conf);
return lineReader;
}
// 读入center
private static void readCenterLines(LineReader lineReader, ArrayList> centers) throws IOException {
Text line = new Text();
// 每一行进行一次读取
while (lineReader.readLine(line) > 0) {
ArrayList center = splitStringIntoArray(line.toString().trim());
centers.add(center);
}
lineReader.close();
}
// 读取中心点
// 可能是文件夹,遍历读取
public static ArrayList> readCenter(String centerPath) throws IOException {
ArrayList> centers = new ArrayList>();
Path path = new Path(centerPath);
Configuration conf = new Configuration();
FileSystem fileSystem = path.getFileSystem(conf);
if (fileSystem.isDirectory(path)) {
// 文件夹,遍历读取
FileStatus[] listFile = fileSystem.listStatus(path);
for (FileStatus fileStatus : listFile) {
LineReader lineReader = getLineReader(fileStatus.getPath().toString());
readCenterLines(lineReader, centers);
}
} else {
// 普通文件,直接读取
LineReader lineReader = getLineReader(centerPath);
readCenterLines(lineReader, centers);
}
return centers;
}
}
文件写出类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
public class writeOut {
public static void writeout(String outPath, ArrayList dataindex) throws URISyntaxException, IOException, InterruptedException {
//1、创建配置文件
// 要确认是org.apache.hadoop.conf.Configuration;
Configuration conf = new Configuration();
// 2、获取文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), conf, "hong");
// 3、调用ApI操作
// 创建文件并写入数据
int a;
FSDataOutputStream in = fs.create(new Path(outPath));
for (int i = 0; i < dataindex.size(); i++) {
//写出簇标签
a = dataindex.get(i).getIndex();
String s = String.valueOf(a);
char c = s.charAt(0);
in.writeInt(c);
//in.writeByte(dataindex.get(i).getIndex());
in.write('t');
//写出一个data
for (int j = 0; j < dataindex.get(i).getData().size(); j++) {
in.write(dataindex.get(i).getData().get(j).toString().getBytes());
//in.write(data.get(i).get(j).toString().getBytes());
in.write('t');
}
in.write('n');
in.flush();
}
fs.close();
}
}



