栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021-10-06攻城略地之国庆大学习总结

2021-10-06攻城略地之国庆大学习总结

目录

Mysql与JDBC 

shell

linux

HDFS(分布式文件系统)

        NAME_NODE与DATANODE之间的交互

        HDFS上传数据流程

        HDFS下载数据流程

元数据管理

HDFS优点缺点

元数据:

数据爬取 与 处理

MapReduce

01 根据原始数据的文件个数 文件大小 计算任务切片

02 根据对应的任务切片 生成对应的maptask(map任务 )

03 reducetask

数据倾斜

解决方法

Yarn( 分布式资源调度平台)

Hbase

flush的刷写时机

布隆过滤器

zookeeper

zookeeper的选举

投票机制

hive

scala

clickhouse

立方体模型

用户行为分析函数 SequenceMatch()

漏斗函数windowFunnel()


Mysql与JDBC 

        要学会增删改查,数据类型,建表,字段类型(引擎,编码,约束)

        DDl语句:数据定义语言,用来维护数据库对象(比如建表,查看表,修改表,删除表等)

        DMl语句:增删改表中数据,伴随着TCl事务控制(插入数据,更新数据,修改数据,清空数据等)

        JDBC:java连接数据库数据库连接池

        sql注入问题:

                使用PerareStatement接口 预编译SQL语句对象 就能防止了

SELECt * FROM 用户表 WHERe NAME = ‘XXX’ AND PASSWORD =’ XXX’  OR ’a’=’a’;

public static void main(String[] args) throws Exception {
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet resultSet = null;
    try {
        // 1 注册驱动
        Class.forName("com.mysql.jdbc.Driver");
        // 2获取连接
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_doit26", "root", "root");
        // 获取对象的时候传入SQL
        
        ps = conn.prepareStatement("select * from tb_stu where  id = ?");
        // 在执行SQL语句之前  给? 占位符赋值  内部重新的编译SQL语句  检查 校验
        ps.setInt(1 , 2); //1是第一个?的位置 2是数据id
        //执行SQL语句
        resultSet = ps.executeQuery();
        System.out.println(1/0); // 解析引擎停止
        List list = new ArrayList();
        while (resultSet.next()){
            String id = resultSet.getString(1);
            String name = resultSet.getString(2);
            int age = resultSet.getInt(3);
            String tel = resultSet.getString(4);
            String gender = resultSet.getString(5);
            double score = resultSet.getDouble(6);
            String address = resultSet.getString(7);
            //  2) 将字段封装在执行的属性中
            Student student = new Student(id,name,age,tel,gender,score,address);
            list.add(student) ;
        }
        System.out.println(list);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // 最终都会关闭
        resultSet.close();
        ps.close();
        conn.close();
    }
}

        JDBC:JDBC(java连接数据库的标准API) (驱动管理器)管理各种驱动

        c3p0数据库连接池:

        有了数据库连接池以后用户不用频繁的创建和释放连接对象

        使用已经被连接池创建好的连接对象,使用完毕还回去

        连接池中的连接对象可以重复性的对此使用

        维护在线的连接数和创建新的连接销毁连接

public static void main(String[] args) throws Exception {
    //JDBC注册驱动
    Class.forName("com.mysql.jdbc.Driver");
    //获取连接 地址 用户名 密码
    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_doit26","root","root");
    //获取执行sql的对象
    Statement stm = conn.createStatement();
    //编写sql 交给stm对象去执行
    String sql = "select * from student";
    ResultSet resultSet = stm.executeQuery(sql);
    Listlist = new ArrayList();
    while (resultSet.next()){
        int id = resultSet.getInt(1);
        String name = resultSet.getString(2);
        float chinese = resultSet.getFloat(3);
        float english = resultSet.getFloat(4);
        float math = resultSet.getFloat(5);
        list.add(new Student2(id,name,chinese,english,math));
    }
    for (Student2 student2 : list) {
        System.out.println(student2);
    }
}

public class C3P0DatabasePool02 {
    public static void main(String[] args) throws Exception {
        //1 获取数据库连接池对象  数据源
        // 读取指定的配置文件 获取四个参数  创建N个连接对象
        ComboPooledDataSource dataSource = new ComboPooledDataSource();
        // 2 使用工具类 查询数据  返回结果
        QueryRunner runner = new QueryRunner(dataSource);
       // 参数三  可变参数  SQL预编译的传入参数  和占位符从前向后一一对应
        Student student = runner.query("select * from tb_stu where id = ?", new BeanHandler(Student.class),3);
        System.out.println(student);
    }
}

        下面照片是c3p0的配置文件

shell

        一些linux中的语法操作(这个的话,就看思维导图中的记录吧,比较清晰)

        对了,这个shell脚本写的时候一定要规范,该回车的地方要回车,千万不要为了省事而你觉得写对了,这个shell语法就是有病,你不写的规范,它就执行不了,只有写规范了.才会执行.(这个道理是我写了好几个脚本才知道的问题)

        我觉得最多的脚本就是scp这个吧

        scp [-r] 文件文件夹 ip:$PWD

        远程登录ssh ip地址 退出exit

        下面这是一个zookeeper启动/查看状态的

#!/bin/bash

export JAVA_HOME=/opt/apps/jdk1.8.0_191

for i in 1 2 3

do

ssh doit0${i} "source /etc/profile;/opt/apps/zookeeper-3.4.6/bin/zkServer.sh $1"

done

        定时任务 crontab -e 分时日月周(每一分的第几分执行)

linux

        我觉得用的多的就是jps,别的有需要就看思维导图

        还有就是防火墙:systemctl status/stop/start/disable/enable firewalld

        查看网络端口 netstat -nltp | grep port 这个netstat命令得安装:yum -y install net-tools

HDFS(分布式文件系统)

        NAME_NODE与DATANODE之间的交互

                1 datanode 注册 namenade 接受注册

                2 namenode向datanode发送集群ID 存储数据的目录 UUID唯一标识

                3 datanode 汇报存储资源

                4 datanode汇报自己数据存储情况

                5 当datanode丢失一个数据块

                        1) 每3秒向namenode发送心跳请求(说明自己还在正常工作 并可以领取任务)

                        2) namenode任务队列生成副本复制任务task

                6 当datanode宕机

                        10次心跳请求 30秒后

                        namenode 5min以后检查datanode

                        namenode 5min以后检查datanode

                        一共10min30sec后剔除宕机节点

        HDFS上传数据流程

        1 Actor(客户端)是可以更改切块大小的 定义切块个数

        2 Actor请求 namenode 上传数据 namenode记录元数据信息

        3 namenode校验并向Actor返回一个ok

        4 Actor请求上传第一块数据

        5 namenode向Actor返回信息的主机名

        6 Actor与datanode建立连接通道

        7 datanode向Actor返回ok

        8 Actor本地读

                本地输入流读取数据

                网络输出流 (网络输出)

                datanode的网络输入流读取

                datanode 利用本地输出流保存在本地

                第二台datanode同样 网络流->本地输入流->本地

                第三台datanode同样 网络流->本地流 ->本地

        HDFS下载数据流程

         

        1 Actor请求下载

        2 namenode返回元数据

        3 请求对应datanode获取对应的数据块

元数据管理

1 put 一个文件后 就会产生对应的元数据(集群ID BP-ID 数据块block0  0-128M linux1 linux2 linux4........)

元数据存储在内存中或者存储在磁盘上

 2 放在内存FSImage里

序列化

将内存对象中的元数据持久化到磁盘保证数据的安全

如果持久化完成以后 立马对元数据进行了更新 内存变了 此时有可能出现没有持久化的元数据的丢失

那么我们可以每次更新都重新持久化 出现IO的大量工作

FSImage内的存储方式是:Tree方法对节点的操作,方便对文件系统的目录结构操作 也就是树形结构

        3 解决IO的大量工作方法

        1 namenode -format初始化生成 secondary namenode

        2 第一次的一些日志信息 放在内存FSImage中 持久化操作 并记录一定量的(edit_pre_006 edit_005 edit_004 edit_003 edit_002 edit_001)日志 至此之后就不会再进行序列化了

        3 checkpoint机制 下载namenode 的 Fsimage000000文件(初始化文件 ,只下载一次) 定期的合并操作日志和镜像文件 , 序列化成新的镜像文件 /返回给NN节点

        4 datanode将序列化的信息反序列化 和 回放日志 存储到自己的FSImage中

        5 序列化FSImage 并覆盖本机旧的fsimage(如果有的话是覆盖 没有的话就是存储)

        6 将序列化后的FSImage返回到namenode 覆盖旧的fismage

HDFS优点缺点

优点

        将海量的数据存储在不同的廉价的服务器上

        极易扩容/减

        容错性 副本冗余

        丰富的操作API

缺点

        单节点故障 严重依赖主节点

        延迟性高

        存储的数据适用于一次存储多次读取

        只能追加和覆盖写

        不能对数据的修改和删除

        不能存储大量的小文件

        增加NN压力

        不利于数据的后期处理

        MR不适合处理小文件

        追加写 , 覆盖写 ,单用户写

元数据:

        文件块 , 文件存储位置 , 块大小 ,副本的个数 ,文件的权限.....

数据爬取 与 处理

1 爬虫爬取html网页上的信息

2 存储在HDFS上

3 读取hdfs上的信息

4 对数据进行分析

5 结果保存 并保存到hdfs

MapReduce

01 根据原始数据的文件个数 文件大小 计算任务切片

02 根据对应的任务切片 生成对应的maptask(map任务 )

        maptask找一个读取数据的类叫LineRecorReader

        读取数据后生成对一个k,v

                k:行的起始偏移量

                v:行内容

        将生成的K,V交给maptask里面的map方法

        map接收到k,v后进行处理,生成新的k,v

                map(K,V,context){ 处理数据

                context.write(NK,NV) ; }

                k,v(map处理后的数据)

                map处理之后的数据利用HashPartitioner获得partition

Ⅰ 首先调用HashPartitioner 拿到对应的k,v 并生成一个分区编号partition

        分区编号:return (key.hashCode() & 2147483647) % numReduceTasks;

        根据设置的reducetask个数计算分区编号:job.setNumReduceTasks(1);

        分区编号就是reducetask在拉取数据的时候应该拉取的数据

Ⅱ 将k,v,partition数据放到环形缓冲区里面

        数据经过环形数组MapOutPutBuffer 写到数组里面去

        MapOutPutBuffer 即:环形的缓冲区

        按照相同分区里面的数据进行排序(区内排序)

        当环形缓冲区的数据写到80%的时候 就会溢出一次 再到80%再次溢出

        溢出:就是将数据放到内存中 或者写到本地磁盘(前提是如果数据过大)

注意:写磁盘这个地方,数据不会记住 所以要写磁盘, reducetask拉取的数据比较多的话也是需要写磁盘

        写磁盘后再进行分组

        如果分组之后内存过大也是要写磁盘

        这个数据处理过程要多次与磁盘进行交互 显然处理效率比较低

        好处就是保证了数据的安全性

        数据不会丢失

        溢出的数据进行合并排序

        合并(Merge):将同一分区的数据进行合并

        排序 将数据写到本地磁盘之前进行排序

        利用对应的下载服务网络传输HTTP下载服务

03 reducetask

        Fetcher:拉取

        reducetask从不同的maptask下拉取属于自己分区内的数据

        shuffle

        拉取的过程叫shuffle

        Merge合并 排序

        shuffle拉取数据之后做一个合并排序

        GroupingComparetor 分组器

        merge合并排序后的进行相同数据的一个分组工作

        有了分组之后 相同的组内进行迭代器的遍历

        以组内容交给reduce方法去处理

        将聚合后的结果写到我们对应的文件里面去

排序可以看到有很多地方进行了排序,那么这个排序到底有没有好处,可以去掉吗?

排序有两个地方

        一个在环形缓冲器内进行的排序

        一个在merge合并并排序的过程中(reducetask拉取数据并合并的阶段也有合并排序)

好处?

        答案是没有什么好处?

        可以这么说,许你不用,但是不许你没有

原因

        mapreduce这个框架是为了分布式计算,而分布式计算最基础的就是排序

        所以mapreduce最初衷就是为了大数据排序而设计

        简单点说就是:不是排序对后续操作有何好处,而是这个[排序能为后续的应用开发带来很多好处,||||如果框架不提供排序,要自己排序,那可真的是难受的你哇哇叫,谁还用

数据倾斜

        上游数据到下游数据进行数据分发的过程中出现的数据不均匀,导致有的机器运行的特别慢,有的机器运行的特别快

        分的数据多

        分的数据少

比如aaaaaacccccccd 根据hashcode来划分区域

        reduce1分到aaaaaaccccccc

        reduce2分到d

解决方法

        1 将key打散(就是在每个数据后面加个随机数,这样hashcide值就不一样了)

代码:k.set(word+"-"+random.nextInt(numReduceTasks))

相同的key一定会被分到同一个分区(使用的时候,记得聚合,方法在思维导图里,自己去拿)

        2 适当增加分区的个数

        3 增加机器的运算资源(增大内存)

        4 还有一个就是分区器HashPartitioner的分配逻辑问题 也会导致分区器的数据倾斜

package com._51doit.hadoop.hadoop.mr.day03.movie;

import com._51doit.hadoop.hadoop.beans.MovieBean;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class MovieAvgRate {
    static  class  MovieAvgRateMapper extends Mapper{
        Gson gs = new Gson() ;
        Text k = new  Text() ;
        DoubleWritable v = new DoubleWritable() ;
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = null ;
            try {
                line = value.toString();
                MovieBean mb = gs.fromJson(line, MovieBean.class);
                String movie = mb.getMovie();
                double rate = mb.getRate();
                k.set(movie);
                v.set(rate);
                context.write(k,v);
            } catch (Exception e) {
                System.err.println("这行数据有问题: --"+line);
            }
        }
    }

    static  class  MovieAvgRateReducer extends Reducer{
        DoubleWritable v = new DoubleWritable() ;
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int cnt = 0 ;
            double sum  = 0d ;
            for (DoubleWritable value : values) {
                cnt++ ;
                sum += value.get() ;
            }
            double avgRate = sum/cnt ;
            v.set(avgRate);
            context.write(key , v);
        }

        public static void main(String[] args)throws  Exception {
            // 程序运行
            Configuration conf = new Configuration();
            // 1 创建一个工作对象 Job
            Job job = Job.getInstance(conf, "movie_avg_rate");
            // 2设置Mapper类
            job.setMapperClass(MovieAvgRateMapper.class);
            //3设置Reducer类
            job.setReducerClass(MovieAvgRateReducer.class);

            //5设置reduce的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
            // 6设置读取的数据位置  [文件 , 文件夹]
            FileInputFormat.setInputPaths(job, new Path("E:\mrdata\movie\input"));
            // 7设置结果保存的位置
            FileOutputFormat.setOutputPath(job , new Path("E:\mrdata\movie\avg_res1"));
            // 提交 并等待执行完毕
            job.waitForCompletion(true) ;
        }
    }
}




public class MovieBean implements Writable {
    private   String movie ;
    private  double rate ;
    private  long timeStamp ;
    private  String uid ;

    public String getMovie() {
        return movie;
    }

    public void setMovie(String movie) {
        this.movie = movie;
    }

    public double getRate() {
        return rate;
    }

    public void setRate(double rate) {
        this.rate = rate;
    }

    public long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    @Override
    public String toString() {
        return "MovieBean{" +
                "movie='" + movie + ''' +
                ", rate=" + rate +
                ", timeStamp=" + timeStamp +
                ", uid='" + uid + ''' +
                '}';
    }

    
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(movie);
        dataOutput.writeDouble(rate);
        dataOutput.writeLong(timeStamp);
        dataOutput.writeUTF(uid);
    }
    
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        movie = dataInput.readUTF() ;
        rate = dataInput.readDouble() ;
        timeStamp = dataInput.readLong() ;
        uid = dataInput.readUTF() ;
    }
}

Yarn( 分布式资源调度平台)

这个在这不好写,写不清楚,就看思维导图吧.关键是联系太多.在这讲不清楚. 简单的写一下

yarn 分布式资源调度平台和任务监控平台 MR在yarn上运行流程

客户端提交程序 job 内容

java程序{ 设置输入路径/设置输出路径/设置运行模式/yarn的位置/HDFS的位置/Mapper类/Reducer类/waitForCompletion()/setJarByClass/setNumReduceTasks等}

        1 提交job给ApplicationsMaster

                ApplicationsMaster生成对应的jobID

        2 ApplicationsMaster向job返回jobID和临时工作目录

        3 初始化JOB

        那个临时工作目录就是在HDFS上创建的,就是用来给job初始化的

        创建临时文件夹

        上传jar包

        用户自定义设置 new configuration() conf.xml等

        计算任务切片(默认根据输入路径下的文件个数和文件大小计算 128M/个 具体划分方法还得看task源码) Splits[]+Maptask+Reducetask+编号

        4 运行job

        linux是 hadoop jar

        windows是 java -cp

        ResourManager主进程(两个子进程)

        AplicationsMaster

        5 向调度器申请资源

        Schedule调度器

        6 根据管理的所有的NodeMaster的资源来进行分配 生成任务队列queue

        任务队列(queue):放任务用.先进先出,比较重要

        分配任务放在任务队列里

        让机器创建一个2G1C的容器

        通过心跳机制将任务发送出去

        MrApplicationMaster

        7 NodeMaster领取任务并创建容器 container(1C2G)

        在容器里创建一个对象MRApplicationMaster

        8 MRApplicationMaster拉取工作目录中的数据信息

        工作目录:第3步创建的HDFS目录

        配置文件

        jar包

        splits[]等 当然切片里面包含*task

        解析切片Splits[]

        提取里面的job 获得maptask数和reducetask数

        9 MRApplicationMaster向ResourceManager申请资源 根据3个maptask 2个reducetask

        ResourceManager将任务交给调度器

        10 调度器生成任务

        将任务放到任务队列里面去

        现在里面应该是有 RT RT MT MT MT 创建容器任务(已完成,没有了)

        maptask任务结束后才会执行reducetask任务

        各个机器再次通过心跳机制来领取任务

        11 不同机器领取maptask任务并创建容器

        创建新的对象YarnChild

        12 YarnChild作用:从MRApplicationMaster上拉取jar包

        12 解析jar包 获得里面的maptask任务

        不运行 等待MRApplicationMaster发布执行指令

        16 不同机器领取reductask任务

        创建新的对象YarnChild

        从MRApplicationMaster上拉取jar包

        解析jar包 获得里面的reducetask任务

        不运行 等待MRApplicationMaster发布执行指令

        13 MRApplicationMaster发送maptask任务执行指令

        各个机器并行运行

        并行!

        14 各个maptask汇报任务进度向MRApplicationMaster

        15 MRApplicationMaster向ResourceManager汇报总体进度

        有maptask运行失败 失败就得重新运行重新分配资源

        MRApplicationMaster向ResourceManager申请资源跟15步骤一块进行

        ResourceManager交给调度器 调度器将任务放在队列里面(放在Reducetask前面.因为maptask执行完才能执行Reducetask)

        17 MRApplicationMaster发送reducetask任务执行指令

        各个机器并行运行

        18 各个reducetask汇报任务进度向MRApplicationMaster

        MRApplicationMaster向ResourceManager汇报总体进度

        有reducetask运行失败 失败就得重新运行重新分配资源

        MRApplicationMaster向ResourceManager申请资源跟15步骤一块进行

        ResourceManager交给调度器 调度器将任务放在队列里面

        19 所有的task运行完毕以后 回收资源

        20 回收对象 YarnChild

        21 释放task容器

        22 回收MRApplicationMaster对象

        23 回收初始容器

        24 清理工作目录(HDFS里的临时文件/目录)

Hbase

分布式数据库系统:基本使用语法看思维导图

        hbase 的 基本原理要多看,思维导图中,比较重要

        前提HDFS正常打开Zookeeper正常打开时间同步

        hbase没有数据库的概念,可以使用namespace来达到数据库分类别管理表的作用

        help使用的时候,记得把你要help的命令用双引号引起来 help "list"

列族:column 这个东西,看两个代码就懂了,请往下看        

create 'tb_user','base_info','other_info' //建了一个表叫tb_user 两个列族,base_info和other_info
put 'tb_user','1','base_info:name','zss' //往里面放了一个数据,第一行的base_info:name,数据是zss

你还可以base_info:age啥的,数据还是第一行,或者你更改1为别的也行,rk001,你觉得这一行数据结束了,就可以进行下一个数据的put了.

简而言之就是列族就是

        一个列族中的所有列存储在相同的HFile文件中,Hbase是根据列族来存储数据的。列族下面可以有非常多的列,列族在创建表的时候就必须指定。

        java代码:本地数据到hbase中(记得添加依赖),使用Put导入数据 , 执行put方法 , 每次put都会进行RPC网络请求 ! 效率不高 可以使用mutation批次操作对象

package com._51doit.day03;

import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;


public class LoadDataToHbase {
    //mapper读
    static class Demo1Mapper extends Mapper {
        Text  k = new  Text() ;
        Gson gs = new Gson() ;
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String line = value.toString();
                MovieBean mb = gs.fromJson(line, MovieBean.class);
                
                String movie = mb.getMovie();//1   2340
                String timeStamp = mb.getTimeStamp();
                String mid = StringUtils.leftPad(movie, 5, '0');
                String ts = StringUtils.rightPad(timeStamp, 10, '0');
                String rowKey = mid+"_"+ts ;
                k.set(rowKey);
                context.write(k , mb);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    // reducer 输出
    static class Demo1Reducer extends TableReducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            String rk = key.toString();
            for (MovieBean mb : values) {
                String movie = mb.getMovie();
                double rate = mb.getRate();
                String timeStamp = mb.getTimeStamp();
                String uid = mb.getUid();
                // 封装put对象
                Put put = new Put(rk.getBytes());

                put.addColumn("cf".getBytes() , "movie".getBytes() , Bytes.toBytes(movie));
                put.addColumn("cf".getBytes() , "rate".getBytes() ,Bytes.toBytes(rate));
                put.addColumn("cf".getBytes() , "timeStamp".getBytes() ,Bytes.toBytes(timeStamp));
                put.addColumn("cf".getBytes() , "uid".getBytes() ,Bytes.toBytes(uid));
                context.write(null ,put);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "doit01:2181,doit02:2181,doit03:2181");
        Job job = Job.getInstance(conf);

        job.setMapperClass(Demo1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MovieBean.class);

        FileInputFormat.setInputPaths(job, new Path("E:\多易教学\mrdata\movie\input"));

        TableMapReduceUtil.initTableReducerJob("movie" , Demo1Reducer.class,job);
        job.waitForCompletion(true);
    }
}

import com._51doit.utils.HbaseUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;


public class LoadData {
    public static void main(String[] args) throws Exception {
        Connection conn = HbaseUtil.getConnection();
        // 批次操作对象
        BufferedMutator mutator = conn.getBufferedMutator(TableName.valueOf("tb_user"));
        // 读取文本数据  导入到hbase中
        BufferedReader br = new BufferedReader(new FileReader("d://user.txt"));
        String line = null ;
        List ls = new ArrayList<>() ;
        while((line = br.readLine())!=null){
            //u002,lss,22,F
            String[] spls = line.split(",");
            Put put = new Put(spls[0].getBytes());
            put.addColumn("cf".getBytes() , "name".getBytes() , spls[1].getBytes());
            put.addColumn("cf".getBytes() , "age".getBytes() , spls[2].getBytes());
            put.addColumn("cf".getBytes() , "gender".getBytes() , spls[3].getBytes());
            ls.add(put) ;
        }
       // 设置刷写时间间隔
        mutator.setWriteBufferPeriodicFlush(1000);
        mutator.mutate(ls);
        // 不要让程序退出
       Thread.sleep(Integer.MAX_VALUE);
       mutator.flush(); //手动刷写到Hbase的HDFS里
       // mutator.close();  //关闭资源
    }
}

flush的刷写时机

1>当内存大小达到一定的阈值,单个的memroystore大小达到阈值128M(正好生成一个物理块,放到hdfs)

memroystore:内存空间

2>手动强制刷写flush

3> 每个region里面至少有一个memorystore,都在占用内存,这个时候,都占一点,就会很快达到机器的内存最大值. 所有的内存总和达到整个regionserver的上限阈值 所有的regionserver都要刷写(正常刷写)

        但是!

不要建过多的列族,列族越多,memroystore越多,越容易达到阈值上限

4>当对应的内存没有达到128M,但是操作数据次数达到阈值.

总结

最好不要手动刷写数据,会产生一些hfile文件

不要过多的创建列族

无论数据是put还是delete都是往内存里面写数据(可以在hdfs客户端找到对应的数据块解析查看数据)

记得flush到hdfs里面去,不然网页上看不到,解析hfile的时候,要加上-p -f)

布隆过滤器

优化

面试问题

这些都去思维导图里面看:(就怕你找不到,这下可以了吧)

安利一个工具 Phoenix工具:

是运行在Hbase之上的高性能关系型数据库,可以像jdbc访问关系型数据库一样 (比较怪异) 这个等用的时候,再看吧,在思维导图中有.

zookeeper

分布式协调服务工具,可以存储数据,但是不能太大,只能是状态信息数据

zookeeper的选举

##初始状态 比较的是Id值

        集群的Id: doitA(1) doitB(2) doitC(3)

        1 A启动 发现集群中没有leader , 自己进入到looking(选举状态) , 投自己一票 ,广播

        此时A 只有一票 票数没有过半 不能当选

        2 B启动 收到A的投票信息 , 发现 集群中没有leader , 并且自己的id=2 , > 1

        投自己一票 广播

        3 A 收到B的广播 id=1 < 2 改投B一票 广播

        4 B收到A 的广播 自己+1 票 2票 过半 当选leader , 切换leader状态 , 广播

        5 A 收到信息 切换follower状态

        6 C 启动 发现集群中有Leader , 切换follower状态

## 使用过程中leader故障 重新选举

投票机制

在使用中选举的时候优先查看 数据事务版本 zxid 大的当选leader

        再比较ID

补充(比较重要)

        zookeeper的高度可靠性

        zookeeper是一个分布式的系统,多个节点

        并且节点中记录的数据是完全一致(一致性),当某个zk的节点宕机之后,不会影响工作

        zookeeper的选举机制(奇数台)

        zookeeper的主节点不存在单点故障

        zookeeper的主节点是可以动态选举出来的

        单数台的原因是为了防止出现相同的票数的出现

        偶数台的话,出现票数相同的情况,不太容易票选出leader

hive

数据仓库工具(服务性的软件)SQL解析和翻译工具 不是数据库

        hive将SQL语句翻译成MR程序,提交给YARN进行分配调度,元数据存储在mysql中,数据存储在hdfs上

需要开启的服务

        HDFS

                数据存储在HDFS上

        yarn

        资源调度需要它

        hive生成的MR程序,得由yarn处理提交job

zookeeper

        数据信息协调工具, 用不到!

hive

        主角当然得开启啊

一对比较重要的代码:show functions ; 和 desc function func_name ;

会了这两个代码,你就能解决好多问题.

hive中的建表和数据导入得指定数据,多看看思维导图中的hive数仓工具. 多看看git中仓库lianxi2->doit_hive->hive下得到文档

-- sql执行顺序
select
           4
from
tb_name    1
where      2
group by   3
having     5
order by   6
limit      7

scala

函数:

        val f1 = ()=>{

                println("没有参数,没有返回值")

        }

方法:

        def show()={

                println("没有参数")

        }

隐式方法函数

        关键字:implicit

一个滴滴面试题

        统计线段在每个点重叠的次数 , 并按照从高到低排序输出

//代码
def main(args: Array[String]): Unit = {
    val lines: Iterator[String] = Source.fromFile("d:\wokk\line.txt").getLines()
     val s1 = lines.map(tp => {
      val count: Array[String] = tp.split(",")
      val start: Int = count(0).toInt
      val end: Int = count(1).toInt
       //使用推导式生成每个线段经过的所有的点
      val res = for (i <- start to end)yield i
       res.toArray
    })//.toList.flatten.groupBy(e=>e).mapValues(_.size).toList.sortBy(-_._2).foreach(println)
    // 将数据压平  列出所有的点
    val s2: List[Int] = s1.toList.flatten
    // 分组统计每个点出现的次数
    val s3: Map[Int, List[Int]] = s2.groupBy(e => e)
    val s4: Map[Int, Int] = s3.mapValues(_.size)
    // 将结果转换成List集合排序
    s4.toList.sortBy(-_._2).foreach(println)
  }

clickhouse

        ClickHouse是一个完全的列式分布式数据库管理系统(DBMS)

        ClickHouse官网多看,里面有函数用法,更新比较快 只看官网你就能操作数据了,因为里面有简单的例子可以借鉴(俄罗斯老铁们发明的)

        建表的时候,一定要指定引擎

        clickhouse太厉害了,里面的数据,在select后就能直接调用,其它的向hivehbase都没有这功能

WITH (
         SELECT *
         FROM tb_shop2
         WHERe (name = 'a') AND (cdate = '2017-03-01')
     ) AS x
 SELECt *
 FROM tb_shop2
 WHERe (name, cdate, money) = x
//子查询的结果必须是一条数据

一些语法,这里没有列出来,还是去思维导图中看吧,比较清晰,而且详细.

立方体模型

        这个在项目的时候用的多,非常重要 根据聚合维度 提前计算好所有的组合可能性进行聚合

with cube		所有可能性排出来
wuth rollup	不能掠过中间的.
with totals	汇总,维度都有或都没有
select sum(vist),province,city,area from tb_with group by province,city,area with cube ;

用户行为分析函数 SequenceMatch()
//	SELECt sequenceMatch('(?1)(?2)')(eventTime, eventid = 'event1', eventid = 'event2') FROM funnel_test ;
//		(?N) — 在位置N匹配条件参数。
//		.* — 匹配任何事件的数字。 不需要条件参数来匹配这个模式。
	//	 (?1)(?t>1800)(?2) 匹配彼此发生超过1800秒的事件。
//			可以使用 >=, >, <, <=, == 运算符
//	数据
CREATE TABLE funnel_test 
( 	
	uid String, 
	eventid String, 
	eventTime UInt64
) 
ENGINE = MergeTree 
PARTITION BY (uid, eventTime) 
ORDER BY (uid, eventTime) 
SETTINGS index_granularity = 8192 ;
insert into funnel_test values
('uid1','event1',1551398404) ,
('uid1','event2',1551398406) ,
('uid1','event3',1551398408) ,
('uid2','event2',1551398412) ,
('uid2','event4',1551398415) ,
('uid3','event3',1551398410) ,
('uid3','event4',1551398413) ;
('uid4','event2',1551398410) ,
('uid4','event4',1551398413) ;

SELECt uid ,sequenceMatch('(?1)(?2)')(eventTime, eventid = 'event1', eventid = 'event2') FROM funnel_test group by uid;
//	返回的数据,是成功1 失败0
┌─uid──┬─sequenceMatch('(?1)(?2)')(eventTime, equals(eventid, 'event1'), equals(eventid, 'event2'))─┐
│ uid3 │                                                                                          0 │
│ uid1 │                                                                                          1 │
│ uid4 │                                                                                          0 │
│ uid2 │                                                                                          0 │
└──────┴────────────────────────────────────────────────────────────────────────────────────────────┘

漏斗函数windowFunnel()

//返回的数据是满足几个条件
//	数据同上
SELECt uid,windowFunnel(4)(toDateTime(eventTime),eventid = 'event2',eventid = 'event3') as funnel from funnel_test group by uid;
	//	event2到event3的时间不超过4秒
 ┌─uid──┬─funnel─┐
│ uid3 │      0 │
│ uid1 │      2 │
│ uid4 │      1 │
│ uid2 │      2 │
└──────┴────────┘

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/311495.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号