栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

访问HDFS集群文件并将内容写入Hbase,附带条件筛选

访问HDFS集群文件并将内容写入Hbase,附带条件筛选

访问HDFS集群文件并将内容写入Hbase,附带条件筛选 注释在代码中均有,可自行查看

测试的目标文件:

链接:https://pan.baidu.com/s/1TXQhbGAd8YSppBpljj_ldw
提取码:b78n

需要将其先上传到Hdfs集群上:hadoop fs -put 文件名称 路径

一共三个功能自定义函数,一个main函数;


package com.Hbase.putdatas;





import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedReader;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.TreeMap;

public class testa {

    public static TreeMap> readFile(String paths) throws URISyntaxException, IOException, InterruptedException {
        // 01. 读取文件的数据
        TreeMap> map = null;
        try {
            map=new TreeMap<>();
            //BufferedReader reader = new BufferedReader(new FileReader(path));

            //连接hdfs集群
            String path=paths;                          //函数传入要访问的 hdfs路径
            Configuration conf = new Configuration();
            URI uri = new URI(path);
            FileSystem fs =FileSystem.get(uri,conf,"atguigu");  //设置hdfs集群的用户
            FSDataInputStream in=fs.open(new Path(path));

            String line=null;

            while((line = in.readLine()) != null){
                ArrayList values =new ArrayList<>();

                System.out.println(line);       //查看文件内容是否正确

                String[] split =line.split("t");
                values.add(split[1]);                   //添加一行中除列族中的每个列值到values中
                values.add(split[2]);
                values.add(split[3]);
                values.add(split[4]);
                values.add(split[5]);
                values.add(split[6]);
                values.add(split[7]);
                map.put(Integer.parseInt(split[0]),values);
                //每次循环,清空list中的数据,避免list中出现重复数据
                //values.clear();   //经测试会把ArrayList中的内容全部清空,没办法用,只能把new ArrayList放到循环中。
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return map;
    }

    public static Connection connHbase(String hostname) throws IOException {
        Configuration conf= HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum",hostname);
        Connection conn=null;

        conn = ConnectionFactory.createConnection(conf);

        return conn;
    }

    // 02. 连接hbase

    
    public static void writeToHbaseTable(String hostname,String tablename,TreeMap> values,String cf) throws IOException {

        Connection conn=connHbase(hostname);
        //读取指定表
        Table table=conn.getTable(TableName.valueOf(tablename));
        ArrayList puts=new ArrayList<>();

        System.out.println(values.size());  //测试数组长度

        for (Integer integer : values.keySet()) {
            Put put = new Put(Bytes.toBytes(integer.toString()));
            //byte[] family 列族, byte[]qualifier 列,  byte[] value 值
            put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("name"),Bytes.toBytes(values.get(integer).get(0)));   //写入hbase数据部分
            put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("age"),Bytes.toBytes(values.get(integer).get(1)));
            put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("number"),Bytes.toBytes(values.get(integer).get(2)));
            put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("data"),Bytes.toBytes(values.get(integer).get(3)));
            put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("salary"),Bytes.toBytes(values.get(integer).get(4)));
            put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("achievements"),Bytes.toBytes(values.get(integer).get(5)));
            put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("days"),Bytes.toBytes(values.get(integer).get(6)));
            puts.add(put);
        }
        table.put(puts);


    }


    // 02. 将文件数据写入到hbase表中
    public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
        //测试是否将数据写入到了map中
//        TreeMap> listTreeMap = readFile("./data/stuDatas.txt");
//        for (Integer integer : listTreeMap.keySet()) {
//            System.out.println("rowkey:"+integer+",columns:"+listTreeMap.get(integer).toString());
//        }

//        //测试hbase连接
//        System.out.println(connHbase("192.168.10.105"));



        //测试 写入hbase数据
        writeToHbaseTable("192.168.10.105","people2",       //写入虚拟机ip,表名,要访问的hdfs集群路径,列族名称
                readFile("hdfs://hadoop105:8020/emp.txt"),"info");

    }

}

到这里读取HDFS集群文件写入Hbase的任务就完成了。





并根据薪资(salary),筛选出大于2000的职工信息:


package com.Hbase.putdatas;

import jnr.ffi.Struct;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.TreeMap;

public class ScreenThan2000 {

    public static TreeMap> readFile(String paths) throws URISyntaxException, IOException, InterruptedException {




        // 01. 读取文件的数据
        TreeMap> map = null;
        try {
            map=new TreeMap<>();
            //BufferedReader reader = new BufferedReader(new FileReader(path));

            //连接hdfs集群
            String path=paths;
            Configuration conf = new Configuration();
            URI uri = new URI(path);
            FileSystem fs =FileSystem.get(uri,conf,"atguigu");
            FSDataInputStream in=fs.open(new Path(path));

            String line=null;

            while((line = in.readLine()) != null){
                ArrayList values =new ArrayList<>();

                System.out.println(line);       //查看文件内容是否正确

                String[] split =line.split("t");
                values.add(split[1]);
                values.add(split[2]);
                values.add(split[3]);
                values.add(split[4]);
                values.add(split[5]);
                values.add(split[6]);
                values.add(split[7]);
                map.put(Integer.parseInt(split[0]),values);
                //每次循环,清空list中的数据,避免list中出现重复数据
                //values.clear();   //经测试会把ArrayList中的内容全部清空,没办法用,只能把new ArrayList放到循环中。
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return map;
    }

    public static void writeToHbaseTable(TreeMap> values)  {


        ArrayList puts=new ArrayList<>();

        System.out.println(values.size());  //测试数组长度

        System.out.println(values);
        values.forEach((key, value) -> {                //遍历values的key和value
            if (Float.parseFloat(value.get(4)) > 2000) {
                System.out.println(key+"t"+value.get(0)+"t"+value.get(1)+"t"+
                        value.get(2)+"t"+value.get(3)+"t"+value.get(4)+"t"+value.get(5)+"t"+value.get(6));
            }

        });

    }

    public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {




        //测试 写入hbase数据
        writeToHbaseTable(readFile("hdfs://hadoop105:8020/emp.txt"));

    }

}

共勉!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/602569.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号