1.java环境配置
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
2. /usr/local/hadoop/etc/hadoop/中core-site.xml
hadoop.tmp.dir file:/usr/local/hadoop/tmp Abase for other temporary directories. fs.defaultFS hdfs://localhost:9000
3./usr/local/hadoop/etc/hadoop/中hdfs-site.xml
实验二dfs.replication 1 dfs.namenode.name.dir file:/usr/local/hadoop/tmp/dfs/name dfs.datanode.data.dir file:/usr/local/hadoop/tmp/dfs/data
1.Eclipse中的MergeFile.java代码
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
class MyPathFilter implements PathFilter {
String reg = null;
MyPathFilter(String reg) {
this.reg = reg;
}
public boolean accept(Path path) {
if (!(path.toString().matches(reg)))
return true;
return false;
}
}
public class MergeFile {
Path inputPath = null; //待合并的文件所在的目录的路径
Path outputPath = null; //输出文件的路径
public MergeFile(String input, String output) {
this.inputPath = new Path(input);
this.outputPath = new Path(output);
}
public void doMerge() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://localhost:9000");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
//下面过滤掉输入目录中后缀为.abc的文件
FileStatus[] sourceStatus = fsSource.listStatus(inputPath,
new MyPathFilter(".*\.abc"));
FSDataOutputStream fsdos = fsDst.create(outputPath);
PrintStream ps = new PrintStream(System.out);
//下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
for (FileStatus sta : sourceStatus) {
//下面打印后缀不为.abc的文件的路径、文件大小
System.out.print("路径:" + sta.getPath() + " 文件大小:" + sta.getLen()
+ " 权限:" + sta.getPermission() + " 内容:");
FSDataInputStream fsdis = fsSource.open(sta.getPath());
byte[] data = new byte[1024];
int read = -1;
while ((read = fsdis.read(data)) > 0) {
ps.write(data, 0, read);
fsdos.write(data, 0, read);
}
fsdis.close();
}
ps.close();
fsdos.close();
}
public static void main(String[] args) throws IOException {
MergeFile merge = new MergeFile(
"hdfs://localhost:9000/user/hadoop/",
"hdfs://localhost:9000/user/hadoop/merge.txt");
merge.doMerge();
}
}
2.附录
(1)写入文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
public class Chapter3 {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://localhost:9000");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.get(conf);
byte[] buff = "Hello world".getBytes(); // 要写入的内容
String filename = "test"; //要写入的文件名
FSDataOutputStream os = fs.create(new Path(filename));
os.write(buff,0,buff.length);
System.out.println("Create:"+ filename);
os.close();
fs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
(2)判断文件是否存在
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Chapter3 {
public static void main(String[] args) {
try {
String filename = "test";
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://localhost:9000");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(filename))){
System.out.println("文件存在");
}else{
System.out.println("文件不存在");
}
fs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
(3)读取文件
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
public class Chapter3 {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://localhost:9000");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.get(conf);
Path file = new Path("test");
FSDataInputStream getIt = fs.open(file);
BufferedReader d = new BufferedReader(new InputStreamReader(getIt));
String content = d.readLine(); //读取文件一行
System.out.println(content);
d.close(); //关闭文件
fs.close(); //关闭hdfs
} catch (Exception e) {
e.printStackTrace();
}
}
}
实验三
1./usr/local/hbase/hbase-tmp配置
hbase.rootdir file:///usr/local/hbase/hbase-tmp
2.配置/usr/local/hbase/conf/hbase-site.xml
hbase.rootdir hdfs://localhost:9000/hbase hbase.cluster.distributed true hbase.unsafe.stream.capability.enforce false
3.ExampleForHBase.java代码
1.import org.apache.hadoop.conf.Configuration;
2.import org.apache.hadoop.hbase.*;
3.import org.apache.hadoop.hbase.client.*;
4.import org.apache.hadoop.hbase.util.Bytes;
5.
6.import java.io.IOException;
7.public class ExampleForHBase {
8. public static Configuration configuration;
9. public static Connection connection;
10. public static Admin admin;
11. public static void main(String[] args)throws IOException{
12. init();
13. createTable("student",new String[]{"score"});
14. insertData("student","zhangsan","score","English","69");
15. insertData("student","zhangsan","score","Math","86");
16. insertData("student","zhangsan","score","Computer","77");
17. getData("student", "zhangsan", "score","English");
18. close();
19. }
20.
21. public static void init(){
22. configuration = HBaseConfiguration.create();
23. configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
24. try{
25. connection = ConnectionFactory.createConnection(configuration);
26. admin = connection.getAdmin();
27. }catch (IOException e){
28. e.printStackTrace();
29. }
30. }
31.
32. public static void close(){
33. try{
34. if(admin != null){
35. admin.close();
36. }
37. if(null != connection){
38. connection.close();
39. }
40. }catch (IOException e){
41. e.printStackTrace();
42. }
43. }
44.
45. public static void createTable(String myTableName,String[] colFamily) throws IOException {
46. TableName tableName = TableName.valueOf(myTableName);
47. if(admin.tableExists(tableName)){
48. System.out.println("talbe is exists!");
49. }else {
50. TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
51. for(String str:colFamily){
52. ColumnFamilyDescriptor family =
53.ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
54. tableDescriptor.setColumnFamily(family);
55. }
56. admin.createTable(tableDescriptor.build());
57. }
58. }
59.
60. public static void insertData(String tableName,String rowKey,String colFamily,String col,String val) throws IOException {
61. Table table = connection.getTable(TableName.valueOf(tableName));
62. Put put = new Put(rowKey.getBytes());
63. put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes());
64. table.put(put);
65. table.close();
66. }
67.
68. public static void getData(String tableName,String rowKey,String colFamily, String col)throws IOException{
69. Table table = connection.getTable(TableName.valueOf(tableName));
70. Get get = new Get(rowKey.getBytes());
71. get.addColumn(colFamily.getBytes(),col.getBytes());
72. Result result = table.get(get);
73. System.out.println(new String(result.getValue(colFamily.getBytes(),col==null?null:col.getBytes())));
74. table.close();
75. }
76.}



