目录
Mysql与JDBC
shell
linux
HDFS(分布式文件系统)
NAME_NODE与DATANODE之间的交互
HDFS上传数据流程
HDFS下载数据流程
元数据管理
HDFS优点缺点
元数据:
数据爬取 与 处理
MapReduce
01 根据原始数据的文件个数 文件大小 计算任务切片
02 根据对应的任务切片 生成对应的maptask(map任务 )
03 reducetask
数据倾斜
解决方法
Yarn( 分布式资源调度平台)
Hbase
flush的刷写时机
布隆过滤器
zookeeper
zookeeper的选举
投票机制
hive
scala
clickhouse
立方体模型
用户行为分析函数 SequenceMatch()
漏斗函数windowFunnel()
Mysql与JDBC
要学会增删改查,数据类型,建表,字段类型(引擎,编码,约束)
DDl语句:数据定义语言,用来维护数据库对象(比如建表,查看表,修改表,删除表等)
DMl语句:增删改表中数据,伴随着TCl事务控制(插入数据,更新数据,修改数据,清空数据等)
JDBC:java连接数据库数据库连接池
sql注入问题:
使用PerareStatement接口 预编译SQL语句对象 就能防止了
SELECt * FROM 用户表 WHERe NAME = ‘XXX’ AND PASSWORD =’ XXX’ OR ’a’=’a’;
public static void main(String[] args) throws Exception {
Connection conn = null;
PreparedStatement ps = null;
ResultSet resultSet = null;
try {
// 1 注册驱动
Class.forName("com.mysql.jdbc.Driver");
// 2获取连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_doit26", "root", "root");
// 获取对象的时候传入SQL
ps = conn.prepareStatement("select * from tb_stu where id = ?");
// 在执行SQL语句之前 给? 占位符赋值 内部重新的编译SQL语句 检查 校验
ps.setInt(1 , 2); //1是第一个?的位置 2是数据id
//执行SQL语句
resultSet = ps.executeQuery();
System.out.println(1/0); // 解析引擎停止
List list = new ArrayList();
while (resultSet.next()){
String id = resultSet.getString(1);
String name = resultSet.getString(2);
int age = resultSet.getInt(3);
String tel = resultSet.getString(4);
String gender = resultSet.getString(5);
double score = resultSet.getDouble(6);
String address = resultSet.getString(7);
// 2) 将字段封装在执行的属性中
Student student = new Student(id,name,age,tel,gender,score,address);
list.add(student) ;
}
System.out.println(list);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 最终都会关闭
resultSet.close();
ps.close();
conn.close();
}
}
JDBC:JDBC(java连接数据库的标准API) (驱动管理器)管理各种驱动
c3p0数据库连接池:
有了数据库连接池以后用户不用频繁的创建和释放连接对象
使用已经被连接池创建好的连接对象,使用完毕还回去
连接池中的连接对象可以重复性的对此使用
维护在线的连接数和创建新的连接销毁连接
public static void main(String[] args) throws Exception {
//JDBC注册驱动
Class.forName("com.mysql.jdbc.Driver");
//获取连接 地址 用户名 密码
Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_doit26","root","root");
//获取执行sql的对象
Statement stm = conn.createStatement();
//编写sql 交给stm对象去执行
String sql = "select * from student";
ResultSet resultSet = stm.executeQuery(sql);
Listlist = new ArrayList();
while (resultSet.next()){
int id = resultSet.getInt(1);
String name = resultSet.getString(2);
float chinese = resultSet.getFloat(3);
float english = resultSet.getFloat(4);
float math = resultSet.getFloat(5);
list.add(new Student2(id,name,chinese,english,math));
}
for (Student2 student2 : list) {
System.out.println(student2);
}
}
public class C3P0DatabasePool02 {
public static void main(String[] args) throws Exception {
//1 获取数据库连接池对象 数据源
// 读取指定的配置文件 获取四个参数 创建N个连接对象
ComboPooledDataSource dataSource = new ComboPooledDataSource();
// 2 使用工具类 查询数据 返回结果
QueryRunner runner = new QueryRunner(dataSource);
// 参数三 可变参数 SQL预编译的传入参数 和占位符从前向后一一对应
Student student = runner.query("select * from tb_stu where id = ?", new BeanHandler(Student.class),3);
System.out.println(student);
}
}
下面照片是c3p0的配置文件
shell
一些linux中的语法操作(这个的话,就看思维导图中的记录吧,比较清晰)
对了,这个shell脚本写的时候一定要规范,该回车的地方要回车,千万不要为了省事而你觉得写对了,这个shell语法就是有病,你不写的规范,它就执行不了,只有写规范了.才会执行.(这个道理是我写了好几个脚本才知道的问题)
我觉得最多的脚本就是scp这个吧
scp [-r] 文件文件夹 ip:$PWD
远程登录ssh ip地址 退出exit
下面这是一个zookeeper启动/查看状态的
#!/bin/bash
export JAVA_HOME=/opt/apps/jdk1.8.0_191
for i in 1 2 3
do
ssh doit0${i} "source /etc/profile;/opt/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
done
定时任务 crontab -e 分时日月周(每一分的第几分执行)
linux
我觉得用的多的就是jps,别的有需要就看思维导图
还有就是防火墙:systemctl status/stop/start/disable/enable firewalld
查看网络端口 netstat -nltp | grep port 这个netstat命令得安装:yum -y install net-tools
HDFS(分布式文件系统)
NAME_NODE与DATANODE之间的交互
1 datanode 注册 namenade 接受注册
2 namenode向datanode发送集群ID 存储数据的目录 UUID唯一标识
3 datanode 汇报存储资源
4 datanode汇报自己数据存储情况
5 当datanode丢失一个数据块
1) 每3秒向namenode发送心跳请求(说明自己还在正常工作 并可以领取任务)
2) namenode任务队列生成副本复制任务task
6 当datanode宕机
10次心跳请求 30秒后
namenode 5min以后检查datanode
namenode 5min以后检查datanode
一共10min30sec后剔除宕机节点
HDFS上传数据流程
1 Actor(客户端)是可以更改切块大小的 定义切块个数
2 Actor请求 namenode 上传数据 namenode记录元数据信息
3 namenode校验并向Actor返回一个ok
4 Actor请求上传第一块数据
5 namenode向Actor返回信息的主机名
6 Actor与datanode建立连接通道
7 datanode向Actor返回ok
8 Actor本地读
本地输入流读取数据
网络输出流 (网络输出)
datanode的网络输入流读取
datanode 利用本地输出流保存在本地
第二台datanode同样 网络流->本地输入流->本地
第三台datanode同样 网络流->本地流 ->本地
HDFS下载数据流程
1 Actor请求下载
2 namenode返回元数据
3 请求对应datanode获取对应的数据块
元数据管理
1 put 一个文件后 就会产生对应的元数据(集群ID BP-ID 数据块block0 0-128M linux1 linux2 linux4........)
元数据存储在内存中或者存储在磁盘上
2 放在内存FSImage里
序列化
将内存对象中的元数据持久化到磁盘保证数据的安全
如果持久化完成以后 立马对元数据进行了更新 内存变了 此时有可能出现没有持久化的元数据的丢失
那么我们可以每次更新都重新持久化 出现IO的大量工作
FSImage内的存储方式是:Tree方法对节点的操作,方便对文件系统的目录结构操作 也就是树形结构
3 解决IO的大量工作方法
1 namenode -format初始化生成 secondary namenode
2 第一次的一些日志信息 放在内存FSImage中 持久化操作 并记录一定量的(edit_pre_006 edit_005 edit_004 edit_003 edit_002 edit_001)日志 至此之后就不会再进行序列化了
3 checkpoint机制 下载namenode 的 Fsimage000000文件(初始化文件 ,只下载一次) 定期的合并操作日志和镜像文件 , 序列化成新的镜像文件 /返回给NN节点
4 datanode将序列化的信息反序列化 和 回放日志 存储到自己的FSImage中
5 序列化FSImage 并覆盖本机旧的fsimage(如果有的话是覆盖 没有的话就是存储)
6 将序列化后的FSImage返回到namenode 覆盖旧的fismage
HDFS优点缺点
优点
将海量的数据存储在不同的廉价的服务器上
极易扩容/减
容错性 副本冗余
丰富的操作API
缺点
单节点故障 严重依赖主节点
延迟性高
存储的数据适用于一次存储多次读取
只能追加和覆盖写
不能对数据的修改和删除
不能存储大量的小文件
增加NN压力
不利于数据的后期处理
MR不适合处理小文件
追加写 , 覆盖写 ,单用户写
元数据:
文件块 , 文件存储位置 , 块大小 ,副本的个数 ,文件的权限.....
数据爬取 与 处理
1 爬虫爬取html网页上的信息
2 存储在HDFS上
3 读取hdfs上的信息
4 对数据进行分析
5 结果保存 并保存到hdfs
MapReduce
01 根据原始数据的文件个数 文件大小 计算任务切片
02 根据对应的任务切片 生成对应的maptask(map任务 )
02 根据对应的任务切片 生成对应的maptask(map任务 )
maptask找一个读取数据的类叫LineRecorReader
读取数据后生成对一个k,v
k:行的起始偏移量
v:行内容
将生成的K,V交给maptask里面的map方法
map接收到k,v后进行处理,生成新的k,v
map(K,V,context){ 处理数据
context.write(NK,NV) ; }
k,v(map处理后的数据)
map处理之后的数据利用HashPartitioner获得partition
Ⅰ 首先调用HashPartitioner 拿到对应的k,v 并生成一个分区编号partition
分区编号:return (key.hashCode() & 2147483647) % numReduceTasks;
根据设置的reducetask个数计算分区编号:job.setNumReduceTasks(1);
分区编号就是reducetask在拉取数据的时候应该拉取的数据
Ⅱ 将k,v,partition数据放到环形缓冲区里面
数据经过环形数组MapOutPutBuffer 写到数组里面去
MapOutPutBuffer 即:环形的缓冲区
按照相同分区里面的数据进行排序(区内排序)
当环形缓冲区的数据写到80%的时候 就会溢出一次 再到80%再次溢出
溢出:就是将数据放到内存中 或者写到本地磁盘(前提是如果数据过大)
注意:写磁盘这个地方,数据不会记住 所以要写磁盘, reducetask拉取的数据比较多的话也是需要写磁盘
写磁盘后再进行分组
如果分组之后内存过大也是要写磁盘
这个数据处理过程要多次与磁盘进行交互 显然处理效率比较低
好处就是保证了数据的安全性
数据不会丢失
溢出的数据进行合并排序
合并(Merge):将同一分区的数据进行合并
排序 将数据写到本地磁盘之前进行排序
利用对应的下载服务网络传输HTTP下载服务
03 reducetask
Fetcher:拉取
reducetask从不同的maptask下拉取属于自己分区内的数据
shuffle
拉取的过程叫shuffle
Merge合并 排序
shuffle拉取数据之后做一个合并排序
GroupingComparetor 分组器
merge合并排序后的进行相同数据的一个分组工作
有了分组之后 相同的组内进行迭代器的遍历
以组内容交给reduce方法去处理
将聚合后的结果写到我们对应的文件里面去
排序可以看到有很多地方进行了排序,那么这个排序到底有没有好处,可以去掉吗?
排序有两个地方
一个在环形缓冲器内进行的排序
一个在merge合并并排序的过程中(reducetask拉取数据并合并的阶段也有合并排序)
好处?
答案是没有什么好处?
可以这么说,许你不用,但是不许你没有
原因
mapreduce这个框架是为了分布式计算,而分布式计算最基础的就是排序
所以mapreduce最初衷就是为了大数据排序而设计
简单点说就是:不是排序对后续操作有何好处,而是这个[排序能为后续的应用开发带来很多好处,||||如果框架不提供排序,要自己排序,那可真的是难受的你哇哇叫,谁还用
数据倾斜
上游数据到下游数据进行数据分发的过程中出现的数据不均匀,导致有的机器运行的特别慢,有的机器运行的特别快
分的数据多
分的数据少
比如aaaaaacccccccd 根据hashcode来划分区域
reduce1分到aaaaaaccccccc
reduce2分到d
解决方法
1 将key打散(就是在每个数据后面加个随机数,这样hashcide值就不一样了)
代码:k.set(word+"-"+random.nextInt(numReduceTasks))
相同的key一定会被分到同一个分区(使用的时候,记得聚合,方法在思维导图里,自己去拿)
2 适当增加分区的个数
3 增加机器的运算资源(增大内存)
4 还有一个就是分区器HashPartitioner的分配逻辑问题 也会导致分区器的数据倾斜
package com._51doit.hadoop.hadoop.mr.day03.movie;
import com._51doit.hadoop.hadoop.beans.MovieBean;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MovieAvgRate {
static class MovieAvgRateMapper extends Mapper{
Gson gs = new Gson() ;
Text k = new Text() ;
DoubleWritable v = new DoubleWritable() ;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = null ;
try {
line = value.toString();
MovieBean mb = gs.fromJson(line, MovieBean.class);
String movie = mb.getMovie();
double rate = mb.getRate();
k.set(movie);
v.set(rate);
context.write(k,v);
} catch (Exception e) {
System.err.println("这行数据有问题: --"+line);
}
}
}
static class MovieAvgRateReducer extends Reducer{
DoubleWritable v = new DoubleWritable() ;
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int cnt = 0 ;
double sum = 0d ;
for (DoubleWritable value : values) {
cnt++ ;
sum += value.get() ;
}
double avgRate = sum/cnt ;
v.set(avgRate);
context.write(key , v);
}
public static void main(String[] args)throws Exception {
// 程序运行
Configuration conf = new Configuration();
// 1 创建一个工作对象 Job
Job job = Job.getInstance(conf, "movie_avg_rate");
// 2设置Mapper类
job.setMapperClass(MovieAvgRateMapper.class);
//3设置Reducer类
job.setReducerClass(MovieAvgRateReducer.class);
//5设置reduce的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// 6设置读取的数据位置 [文件 , 文件夹]
FileInputFormat.setInputPaths(job, new Path("E:\mrdata\movie\input"));
// 7设置结果保存的位置
FileOutputFormat.setOutputPath(job , new Path("E:\mrdata\movie\avg_res1"));
// 提交 并等待执行完毕
job.waitForCompletion(true) ;
}
}
}
public class MovieBean implements Writable {
private String movie ;
private double rate ;
private long timeStamp ;
private String uid ;
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
@Override
public String toString() {
return "MovieBean{" +
"movie='" + movie + ''' +
", rate=" + rate +
", timeStamp=" + timeStamp +
", uid='" + uid + ''' +
'}';
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeLong(timeStamp);
dataOutput.writeUTF(uid);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
movie = dataInput.readUTF() ;
rate = dataInput.readDouble() ;
timeStamp = dataInput.readLong() ;
uid = dataInput.readUTF() ;
}
}
Yarn( 分布式资源调度平台)
这个在这不好写,写不清楚,就看思维导图吧.关键是联系太多.在这讲不清楚. 简单的写一下
yarn 分布式资源调度平台和任务监控平台 MR在yarn上运行流程
客户端提交程序 job 内容
java程序{ 设置输入路径/设置输出路径/设置运行模式/yarn的位置/HDFS的位置/Mapper类/Reducer类/waitForCompletion()/setJarByClass/setNumReduceTasks等}
1 提交job给ApplicationsMaster
ApplicationsMaster生成对应的jobID
2 ApplicationsMaster向job返回jobID和临时工作目录
3 初始化JOB
那个临时工作目录就是在HDFS上创建的,就是用来给job初始化的
创建临时文件夹
上传jar包
用户自定义设置 new configuration() conf.xml等
计算任务切片(默认根据输入路径下的文件个数和文件大小计算 128M/个 具体划分方法还得看task源码) Splits[]+Maptask+Reducetask+编号
4 运行job
linux是 hadoop jar
windows是 java -cp
ResourManager主进程(两个子进程)
AplicationsMaster
5 向调度器申请资源
Schedule调度器
6 根据管理的所有的NodeMaster的资源来进行分配 生成任务队列queue
任务队列(queue):放任务用.先进先出,比较重要
分配任务放在任务队列里
让机器创建一个2G1C的容器
通过心跳机制将任务发送出去
MrApplicationMaster
7 NodeMaster领取任务并创建容器 container(1C2G)
在容器里创建一个对象MRApplicationMaster
8 MRApplicationMaster拉取工作目录中的数据信息
工作目录:第3步创建的HDFS目录
配置文件
jar包
splits[]等 当然切片里面包含*task
解析切片Splits[]
提取里面的job 获得maptask数和reducetask数
9 MRApplicationMaster向ResourceManager申请资源 根据3个maptask 2个reducetask
ResourceManager将任务交给调度器
10 调度器生成任务
将任务放到任务队列里面去
现在里面应该是有 RT RT MT MT MT 创建容器任务(已完成,没有了)
maptask任务结束后才会执行reducetask任务
各个机器再次通过心跳机制来领取任务
11 不同机器领取maptask任务并创建容器
创建新的对象YarnChild
12 YarnChild作用:从MRApplicationMaster上拉取jar包
12 解析jar包 获得里面的maptask任务
不运行 等待MRApplicationMaster发布执行指令
16 不同机器领取reductask任务
创建新的对象YarnChild
从MRApplicationMaster上拉取jar包
解析jar包 获得里面的reducetask任务
不运行 等待MRApplicationMaster发布执行指令
13 MRApplicationMaster发送maptask任务执行指令
各个机器并行运行
并行!
14 各个maptask汇报任务进度向MRApplicationMaster
15 MRApplicationMaster向ResourceManager汇报总体进度
有maptask运行失败 失败就得重新运行重新分配资源
MRApplicationMaster向ResourceManager申请资源跟15步骤一块进行
ResourceManager交给调度器 调度器将任务放在队列里面(放在Reducetask前面.因为maptask执行完才能执行Reducetask)
17 MRApplicationMaster发送reducetask任务执行指令
各个机器并行运行
18 各个reducetask汇报任务进度向MRApplicationMaster
MRApplicationMaster向ResourceManager汇报总体进度
有reducetask运行失败 失败就得重新运行重新分配资源
MRApplicationMaster向ResourceManager申请资源跟15步骤一块进行
ResourceManager交给调度器 调度器将任务放在队列里面
19 所有的task运行完毕以后 回收资源
20 回收对象 YarnChild
21 释放task容器
22 回收MRApplicationMaster对象
23 回收初始容器
24 清理工作目录(HDFS里的临时文件/目录)
Hbase
分布式数据库系统:基本使用语法看思维导图
hbase 的 基本原理要多看,思维导图中,比较重要
前提HDFS正常打开Zookeeper正常打开时间同步
hbase没有数据库的概念,可以使用namespace来达到数据库分类别管理表的作用
help使用的时候,记得把你要help的命令用双引号引起来 help "list"
列族:column 这个东西,看两个代码就懂了,请往下看
create 'tb_user','base_info','other_info' //建了一个表叫tb_user 两个列族,base_info和other_info put 'tb_user','1','base_info:name','zss' //往里面放了一个数据,第一行的base_info:name,数据是zss
你还可以base_info:age啥的,数据还是第一行,或者你更改1为别的也行,rk001,你觉得这一行数据结束了,就可以进行下一个数据的put了.
简而言之就是列族就是
一个列族中的所有列存储在相同的HFile文件中,Hbase是根据列族来存储数据的。列族下面可以有非常多的列,列族在创建表的时候就必须指定。
java代码:本地数据到hbase中(记得添加依赖),使用Put导入数据 , 执行put方法 , 每次put都会进行RPC网络请求 ! 效率不高 可以使用mutation批次操作对象
package com._51doit.day03;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class LoadDataToHbase {
//mapper读
static class Demo1Mapper extends Mapper {
Text k = new Text() ;
Gson gs = new Gson() ;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String line = value.toString();
MovieBean mb = gs.fromJson(line, MovieBean.class);
String movie = mb.getMovie();//1 2340
String timeStamp = mb.getTimeStamp();
String mid = StringUtils.leftPad(movie, 5, '0');
String ts = StringUtils.rightPad(timeStamp, 10, '0');
String rowKey = mid+"_"+ts ;
k.set(rowKey);
context.write(k , mb);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// reducer 输出
static class Demo1Reducer extends TableReducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
String rk = key.toString();
for (MovieBean mb : values) {
String movie = mb.getMovie();
double rate = mb.getRate();
String timeStamp = mb.getTimeStamp();
String uid = mb.getUid();
// 封装put对象
Put put = new Put(rk.getBytes());
put.addColumn("cf".getBytes() , "movie".getBytes() , Bytes.toBytes(movie));
put.addColumn("cf".getBytes() , "rate".getBytes() ,Bytes.toBytes(rate));
put.addColumn("cf".getBytes() , "timeStamp".getBytes() ,Bytes.toBytes(timeStamp));
put.addColumn("cf".getBytes() , "uid".getBytes() ,Bytes.toBytes(uid));
context.write(null ,put);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "doit01:2181,doit02:2181,doit03:2181");
Job job = Job.getInstance(conf);
job.setMapperClass(Demo1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MovieBean.class);
FileInputFormat.setInputPaths(job, new Path("E:\多易教学\mrdata\movie\input"));
TableMapReduceUtil.initTableReducerJob("movie" , Demo1Reducer.class,job);
job.waitForCompletion(true);
}
}
import com._51doit.utils.HbaseUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
public class LoadData {
public static void main(String[] args) throws Exception {
Connection conn = HbaseUtil.getConnection();
// 批次操作对象
BufferedMutator mutator = conn.getBufferedMutator(TableName.valueOf("tb_user"));
// 读取文本数据 导入到hbase中
BufferedReader br = new BufferedReader(new FileReader("d://user.txt"));
String line = null ;
List ls = new ArrayList<>() ;
while((line = br.readLine())!=null){
//u002,lss,22,F
String[] spls = line.split(",");
Put put = new Put(spls[0].getBytes());
put.addColumn("cf".getBytes() , "name".getBytes() , spls[1].getBytes());
put.addColumn("cf".getBytes() , "age".getBytes() , spls[2].getBytes());
put.addColumn("cf".getBytes() , "gender".getBytes() , spls[3].getBytes());
ls.add(put) ;
}
// 设置刷写时间间隔
mutator.setWriteBufferPeriodicFlush(1000);
mutator.mutate(ls);
// 不要让程序退出
Thread.sleep(Integer.MAX_VALUE);
mutator.flush(); //手动刷写到Hbase的HDFS里
// mutator.close(); //关闭资源
}
}
flush的刷写时机
1>当内存大小达到一定的阈值,单个的memroystore大小达到阈值128M(正好生成一个物理块,放到hdfs)
memroystore:内存空间
2>手动强制刷写flush
3> 每个region里面至少有一个memorystore,都在占用内存,这个时候,都占一点,就会很快达到机器的内存最大值. 所有的内存总和达到整个regionserver的上限阈值 所有的regionserver都要刷写(正常刷写)
但是!
不要建过多的列族,列族越多,memroystore越多,越容易达到阈值上限
4>当对应的内存没有达到128M,但是操作数据次数达到阈值.
总结
最好不要手动刷写数据,会产生一些hfile文件
不要过多的创建列族
无论数据是put还是delete都是往内存里面写数据(可以在hdfs客户端找到对应的数据块解析查看数据)
记得flush到hdfs里面去,不然网页上看不到,解析hfile的时候,要加上-p -f)
布隆过滤器
优化
面试问题
这些都去思维导图里面看:(就怕你找不到,这下可以了吧)
安利一个工具 Phoenix工具:
是运行在Hbase之上的高性能关系型数据库,可以像jdbc访问关系型数据库一样 (比较怪异) 这个等用的时候,再看吧,在思维导图中有.
zookeeper
分布式协调服务工具,可以存储数据,但是不能太大,只能是状态信息数据
zookeeper的选举
##初始状态 比较的是Id值
集群的Id: doitA(1) doitB(2) doitC(3)
1 A启动 发现集群中没有leader , 自己进入到looking(选举状态) , 投自己一票 ,广播
此时A 只有一票 票数没有过半 不能当选
2 B启动 收到A的投票信息 , 发现 集群中没有leader , 并且自己的id=2 , > 1
投自己一票 广播
3 A 收到B的广播 id=1 < 2 改投B一票 广播
4 B收到A 的广播 自己+1 票 2票 过半 当选leader , 切换leader状态 , 广播
5 A 收到信息 切换follower状态
6 C 启动 发现集群中有Leader , 切换follower状态
## 使用过程中leader故障 重新选举
投票机制
在使用中选举的时候优先查看 数据事务版本 zxid 大的当选leader
再比较ID
补充(比较重要)
zookeeper的高度可靠性
zookeeper是一个分布式的系统,多个节点
并且节点中记录的数据是完全一致(一致性),当某个zk的节点宕机之后,不会影响工作
zookeeper的选举机制(奇数台)
zookeeper的主节点不存在单点故障
zookeeper的主节点是可以动态选举出来的
单数台的原因是为了防止出现相同的票数的出现
偶数台的话,出现票数相同的情况,不太容易票选出leader
hive
数据仓库工具(服务性的软件)SQL解析和翻译工具 不是数据库
hive将SQL语句翻译成MR程序,提交给YARN进行分配调度,元数据存储在mysql中,数据存储在hdfs上
需要开启的服务
HDFS
数据存储在HDFS上
yarn
资源调度需要它
hive生成的MR程序,得由yarn处理提交job
zookeeper
数据信息协调工具, 用不到!
hive
主角当然得开启啊
一对比较重要的代码:show functions ; 和 desc function func_name ;
会了这两个代码,你就能解决好多问题.
hive中的建表和数据导入得指定数据,多看看思维导图中的hive数仓工具. 多看看git中仓库lianxi2->doit_hive->hive下得到文档
-- sql执行顺序
select
4
from
tb_name 1
where 2
group by 3
having 5
order by 6
limit 7
scala
函数:
val f1 = ()=>{
println("没有参数,没有返回值")
}
方法:
def show()={
println("没有参数")
}
隐式方法函数
关键字:implicit
一个滴滴面试题
统计线段在每个点重叠的次数 , 并按照从高到低排序输出
//代码
def main(args: Array[String]): Unit = {
val lines: Iterator[String] = Source.fromFile("d:\wokk\line.txt").getLines()
val s1 = lines.map(tp => {
val count: Array[String] = tp.split(",")
val start: Int = count(0).toInt
val end: Int = count(1).toInt
//使用推导式生成每个线段经过的所有的点
val res = for (i <- start to end)yield i
res.toArray
})//.toList.flatten.groupBy(e=>e).mapValues(_.size).toList.sortBy(-_._2).foreach(println)
// 将数据压平 列出所有的点
val s2: List[Int] = s1.toList.flatten
// 分组统计每个点出现的次数
val s3: Map[Int, List[Int]] = s2.groupBy(e => e)
val s4: Map[Int, Int] = s3.mapValues(_.size)
// 将结果转换成List集合排序
s4.toList.sortBy(-_._2).foreach(println)
}
clickhouse
ClickHouse是一个完全的列式分布式数据库管理系统(DBMS)
ClickHouse官网多看,里面有函数用法,更新比较快 只看官网你就能操作数据了,因为里面有简单的例子可以借鉴(俄罗斯老铁们发明的)
建表的时候,一定要指定引擎
clickhouse太厉害了,里面的数据,在select后就能直接调用,其它的向hivehbase都没有这功能
WITH (
SELECT *
FROM tb_shop2
WHERe (name = 'a') AND (cdate = '2017-03-01')
) AS x
SELECt *
FROM tb_shop2
WHERe (name, cdate, money) = x
//子查询的结果必须是一条数据
一些语法,这里没有列出来,还是去思维导图中看吧,比较清晰,而且详细.
立方体模型
这个在项目的时候用的多,非常重要 根据聚合维度 提前计算好所有的组合可能性进行聚合
with cube 所有可能性排出来 wuth rollup 不能掠过中间的. with totals 汇总,维度都有或都没有 select sum(vist),province,city,area from tb_with group by province,city,area with cube ;
用户行为分析函数 SequenceMatch()
// SELECt sequenceMatch('(?1)(?2)')(eventTime, eventid = 'event1', eventid = 'event2') FROM funnel_test ;
// (?N) — 在位置N匹配条件参数。
// .* — 匹配任何事件的数字。 不需要条件参数来匹配这个模式。
// (?1)(?t>1800)(?2) 匹配彼此发生超过1800秒的事件。
// 可以使用 >=, >, <, <=, == 运算符
// 数据
CREATE TABLE funnel_test
(
uid String,
eventid String,
eventTime UInt64
)
ENGINE = MergeTree
PARTITION BY (uid, eventTime)
ORDER BY (uid, eventTime)
SETTINGS index_granularity = 8192 ;
insert into funnel_test values
('uid1','event1',1551398404) ,
('uid1','event2',1551398406) ,
('uid1','event3',1551398408) ,
('uid2','event2',1551398412) ,
('uid2','event4',1551398415) ,
('uid3','event3',1551398410) ,
('uid3','event4',1551398413) ;
('uid4','event2',1551398410) ,
('uid4','event4',1551398413) ;
SELECt uid ,sequenceMatch('(?1)(?2)')(eventTime, eventid = 'event1', eventid = 'event2') FROM funnel_test group by uid;
// 返回的数据,是成功1 失败0
┌─uid──┬─sequenceMatch('(?1)(?2)')(eventTime, equals(eventid, 'event1'), equals(eventid, 'event2'))─┐
│ uid3 │ 0 │
│ uid1 │ 1 │
│ uid4 │ 0 │
│ uid2 │ 0 │
└──────┴────────────────────────────────────────────────────────────────────────────────────────────┘
漏斗函数windowFunnel()
//返回的数据是满足几个条件 // 数据同上 SELECt uid,windowFunnel(4)(toDateTime(eventTime),eventid = 'event2',eventid = 'event3') as funnel from funnel_test group by uid; // event2到event3的时间不超过4秒 ┌─uid──┬─funnel─┐ │ uid3 │ 0 │ │ uid1 │ 2 │ │ uid4 │ 1 │ │ uid2 │ 2 │ └──────┴────────┘



