-
什么是大数据?
指所涉及的数据集规模已超过了传统数据软件获取、存储、管理和分析的能力。
-
大数据的特点5V:大量、高速、多样、低价值密度、真实性
volume、velocity、variety、value、veracity
-
Google三驾马车GFS、MapReduce、BigTable
(HDFS) (Hbase) -
Hadoop生态系统:HDSF、MapReduce、Hbase、Zookeeper、OoZie、Pig、Hive(七个)
-
hadoop三大基本组件HDSF、MapReduce、YARN
-
HDFS的局限性:
- 查询性能低
- 基于一次写入多次读取设计(不支持并发写入)
- 不支持文件修改
- 不支持缓存
- 不适合存储小文件
-
HDFS的主从架构(守护进程):NameNode、DataNode、SecondaryNameNode
文章链接
-
NameNode:负责存储和管理文件系统元数据信息,包括namespace目录结构、文件块位置信息、访问权限等
访问hdfs的唯一入口
-
DataNode:存储文件具体的数据块
决定hdfs集群的整体数据存储能力,通过和namenode配合维护数据块
-
SecondaryNamenode:主节点的辅助,帮助主节点进行数据的合并
辅助namenode从namenode下载fsimage文件和edits log文件进行合并
-
-
元数据的管理机制:内存元数据、fsimage、edits文件,以及checkpoint过程(主要解决什么问题?参与该过程的节点有什么?)
-
内存元数据(meta data):namenode把所有元数据存储在内存中,成为内存元数据。内存中的元数据是最完整的,包括文件自身属性信息、文件块信息映射信息。用于元数据查询。
断点数据丢失,数据不会持久化。namenode辅佐了元数据文件来保证元数据的安全完整
-
fsimage内存镜像文件:内存元数据的一个持久化的检查点,仅包含hadoop文件系统中文件自身属性相关的元数据信息,不包含文件块位置的信息,位置信息只存储在内存中。由datanode启动加入集群时向namenode进行数据块的汇报得到的,并且后续简短指定时间进行数据块报告。持久化的动作是一种数据从内存到磁盘的IO过程,会对namenode正常服务造成一定的影响,不能频繁地进行持久化。持久化存储元数据
-
edits log编辑文件:数据操作日志,记录更改记录,可于运算出元数据,为了避免两次持久化之间数据丢失的问题设计的。文件中记录的是hdfs所有更改操作的日志,文件系统客户端执行的更改操作首先会被记录到edits文件中。
-
checkpoint机制:核心是fsimage与edits log合并以生成新的fsimage的过程,fsimage就会不断更新、edits log文件不会太大
流程:
- sn向nn发起询问是否需要checkpoint
- nn读取fstime中上次checkpoint时间,结合edits文件大小等因素,决定开始checkpoint
- nn新建一个edits文件,让新操作写入到edits.new中(避免在checkpoint过程中edits文件改变)
- SNN向NN请求(http get)获取fsimage和edits文件
- SNN将edits与fsimage合并,生成新的fsimage.checkpoint文件
- SNN将fsimage.checkpoint发送给NN
- NN收到fsimage.checkpoint后将其与旧的fsimage替换,更新fstime文件记录本次checkpoint操作
checkpoint触发条件由core-site.xml配置
dfs.namenode.checkpoint.period=3600 //两次连续的checkpoint之间的时间间隔。默认1小时 dfs.namenode.checkpoint.txns=1000000 //最大没有执行checkpoint事务的数量,满足将强制执行紧急checkpoint,即使尚未达到检查点周期。默认100万事务数量。
-
管理过程:
- 系统启动时,读取fsimage和edits至内存,形成内存元数据meta data
- client向namenode发起数据增删查请求
- namenode接收到请求后,在内存元数据中执行增删查操作,并向client返回操作结果
- 如果是增删操作,则同时记录数据操作日志edits
- 使用snn,在适当时机将操作日志合并到fsimage中(checkpoint过程)
-
-
HA模式(高可用模式)
- 允许配置两个相同的nn,一个为active mode处于活动模式,另一个为standby mode处于待机模式
- 两个nn数据保持一致,一旦活动节点失效,则由待机节点切换为活动节点,保证hadoop的正常运行
- 两个nn之间的数据共享可使用nfs或Quorum Journal Node实现
HA模式中,checkpoint通过standby namenode实现,不需要再配置snn
jourmalnode是轻量化的节点,用于两个nn之间的通信,需要配置三个以上的节点
-
心跳机制:用于dn与nn之间的通信
- register:datanode启动时,将自身信息发送给namenode
- block report:将block的信息汇报给namenode,使得namenode可以维护block和datanode之间的映射关系
- 周期性send heartbeat:
- 向namenode报告剩余存储空间等信息
- 执行namenode返回过来的指令
- 如果namenode没有收到datanode周期性的heartbeat则认为该dn已失效,nn会将失效节点中的block重新备份到其他dn上
-
什么是block:所有的文件将被拆分为若干block,以存放再不同的datanode中(每个block将有三个备份保存到不同datanode)
-
为什么HDFS不适合存储小文件?
每个文件最少一个block,每个block的元数据都会在namenode占用内存如果存在大量的小文件,它们会吃掉namenode节点的大量内存。
-
处理小文件不适合存储的问题,可以把多个文件归档成为一个文件,归档后还可以透明地访问每一个文件。
-
创建archive:(archive归档是通过mapreduce程序完成的,需要启动yarn集群)
hadoop archive -archiveName test.har(archive名称) -p /smallfile /outputdir
路径写法(-p后面的内容):
-p /foo/bar a/b/c e/f/g,这里的/foo/bar是a/b/c与e/f/g的父路径,所以完整路径为/foo/bar/a/b/c与/foo/bar/e/f/g
-
-
HDFS命令:
hdfs dfs -ls / 查看当前目录信息 hdfs dfs -mkdir /hello 创建文件夹 hdfs dfs -mkdir -p /hello/world 创建多级文件夹 hdfs dfs -get /hdfs路径 /本地路径 下载文件到本地 hdfs dfs -copyToLocal /hdfs路径 /本地路径 hdfs dfs -put /本地路径 /hdfs路径 上传文件 hdfs dfs -copyFromLocal /本地路径 /hdfs路径 hdfs dfs -cat /文件 查看hdfs中的文件
-
HDFS Java API:hdfs命令的抽象,支持远程访问hdfs,应用程序能够以和读写本地数据系统相同的方式从hdfs读取数据,或者将数据写入到hdfs
-
50070Web端口:hdfs的web服务地址,显示hdfs服务状态和日志
-
Hadoop的安装模式及特点(单机、伪分布、多节点集群)
- 单机模式:hadoop运行在一个单独的JVM中,便于开发调试
- 伪分布式集群模式:各节点在不同的java进程中,用于模拟集群环境
- 多节点集群安装模式:各节点安装在不同的系统中,是可用于生产的集群环境
-
Hadoop由什么语言开发?Java,所以必须先安装JDK
-
SSH免密的原理是什么?RSA非对称加密
通过加密算法将需要传输的数据进行处理后,进行tcp传输,在两端之间通过密文交互达到安全目的,本质是数据加密
-
怎么启动Hadoop:cd /home/hadoop/hadoop-3.1.2/
sbin/start-all.sh
-
重要的配置文件:
- core-site.xml:hadoop系统的通用关键属性
- workers:定义所有dn地址
- hdfs-site.xml:hdfs属性配置
- yarn-site.xml:yarn基本属性
- mapred-site.xml:mapreduce计算框架的属性配置
- /etc/profile:涉及系统的环境,与环境变量相关
- /etc/hosts:负责ip地址与域名快速解析的文件,hosts映射文件
-
简单的网络异常排查
(1)如果ping外网域名不通,但ping外网IP通,问:问题可能出在哪?
(2)如果ping网关通,但ping外网不IP通,问:问题可能出在哪?
…… -
简单Linux命令:
jps 检查守护进程 cd cd ~跳到home目录 .表示目前所在的目录 ..表示目前目录的上一层目录 ../..当前目录的上上两层 vi ls mkdir ssh
scp scp [可选参数] file_source file_target 基于ssh登陆进行安全的远程文件拷贝命令 scp local_file remote_username@remote_ip:remote_folder 或者 scp local_file remote_username@remote_ip:remote_file 或者 scp local_file remote_ip:remote_folder 或者 scp local_file remote_ip:remote_file scp /home/space/music/1.mp3 root@www.runoob.com:/home/root/others/music scp /home/space/music/1.mp3 root@www.runoob.com:/home/root/others/music/001.mp3 scp /home/space/music/1.mp3 www.runoob.com:/home/root/others/music scp /home/space/music/1.mp3 www.runoob.com:/home/root/others/music/001.mp3
vi编辑器:
i 编辑模式
esc 非编辑模式
:
w
q
wq
x
q!
$跳到文件末尾
/关键字 查找关键字
-
什么是YARN(yet another resource negotiator)
统一资源管理和调度平台
-
Hadoop的哪个版本开始支持YARN的?
MRv2
-
YARN的守护进程:ResourceManager、NodeManager
- resourcemanager(Master):集群中各个节点的管理者,yarn的常驻进程,数目是一个,负责整个集群上资源监控、管理等
- nodemanager(Slave):集群中单个节点的代理,yarn的常驻进程,工作节点上都有一个,启动和监控工作节点上的容器(cpu,内存等资源)
-
双层调度范式:
- 一级调度ResourceManager->Schedular:resourcemanager接收作业,在指定NM(nodemanager)节点上启动AM,在scheduler中执行调度算法为AM(applicationmanager)分配资源,管理各个AM
- 二级调度NodeManager->ApplicationMaster:applicationmaster运行在NM上,向RM请求资源,接收RM的资源分配,在NM上启动container,在container中执行application,并监控application的执行状态
- ApplicationMaster和Container的功能
- container:yarn的资源表示模型,CPU、内存、网络等资源分配给container,由container提供给其中的任务
- applicationmaster:运行在resourcemanager中,管理yarn中所有的application
-
Map和Reduce任务运行在哪里?Container
-
ApplicationMaster运行在哪里?Container
- 视频
- WordCount代码(实验3)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
public class WordCountWithCombine extends Configured implements Tool {
public static void main( String[] args ) throws Exception {
int res = ToolRunner.run(new WordCountWithCombine(), args);
System.exit(res);
}
public static String tempValue;
// 配置作业的主要参数和流程overwrite
public int run(String[] args) throws Exception {
// 创建作业,配置作业所需参数
Configuration conf = new Configuration();
// 创建作业
Job job = Job.getInstance(conf, "WordCountWithCombine");
String arg1 = args[0], arg2 = args[1];
System.out.println("args1=====" + arg1);
System.out.println("args2=====" + arg2);
tempValue = "this value is from run";
// 注入作业的主类
job.setJarByClass(WordCountWithCombine.class);
// 为作业注入Map和Reduce类
job.setMapperClass(Map.class);
//job.setCombinerClass(Combine.class);
//job.setReducerClass(Reduce.class);
//job.setNumReduceTasks(4);
// 指定输入类型为:文本格式文件;注入文本输入格式类
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(arg1));
//TextInputFormat.addInputPath(job, new Path("/mapred_input1"));
// 指定输出格式为:文本格式文件;注入文本输入格式类
job.setOutputFormatClass(TextOutputFormat.class);
// 指定Key为文本格式;注入文本类
job.setOutputKeyClass(Text.class);
// 执行Value为整型格式;注入整型类
job.setOutputValueClass(IntWritable.class);
// 指定作业的输出目录
TextOutputFormat.setOutputPath(job, new Path(arg2));
//TextOutputFormat.setOutputPath(job, new Path("/mapred_output1"));
// 作业的执行流程
// 执行MapReduce
boolean res = job.waitForCompletion(true);
if(res)
return 0;
else
return -1;
}
// Map过程
public static class Map extends Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private int num = 0;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();//text类型不能直接进行拆分,先转为字符串
System.out.println("file: " + ((FileSplit)context.getInputSplit()).getPath().toString());
System.out.println("map " + String.valueOf(num) + ": " + key.toString() + "================" + line);
num ++;
System.out.println(tempValue);
StringTokenizer tokenizer = new StringTokenizer(line); //split line to words by space
while (tokenizer.hasMoreTokens()) { //operate all word by loop
word.set(tokenizer.nextToken());
context.write(word, one); //write KV to context, word is key, word number is value
}
}
}
// Combine过程
public static class Combine extends Reducer {
private int num = 0;
@Override
public void reduce(Text key, Iterable val, Context context) //mothod for each key,input format key(value1,value2,......)
throws IOException, InterruptedException {
int sum = 0;
Iterator values = val.iterator();
while (values.hasNext()) {
sum += values.next().get(); //sum value(one word count)
}
System.out.print("Combine " + String.valueOf(num) + ": " + key.toString() + "================" + Integer.toString(sum)+ "n");
num ++;
context.write(key, new IntWritable(sum)); //write one key and its count
}
}
// Reduce过程
public static class Reduce extends Reducer {
private int num = 0;
@Override
public void reduce(Text key, Iterable val, Context context) //mothod for each key,input format key(value1,value2,......)
throws IOException, InterruptedException {
int sum = 0;
Iterator values = val.iterator();
while (values.hasNext()) {
sum += values.next().get(); //sum value(one word count)
}
System.out.print("Reduce " + String.valueOf(num) + ": " + key.toString() + "================" + Integer.toString(sum)+ "n");
num++;
context.write(key, new IntWritable(sum)); //write one key and its count
}
}
}
-
创建Job,注入各种功能类(map类、reduce类、combiner类、输出键类型、输出值类型、输入目录……)
public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCountWithCombine(), args);//启动job任务 System.exit(res);//退出进程,成功退出或者失败退出 } public static String tempValue; // 配置作业的主要参数和流程overwrite public int run(String[] args) throws Exception { // 创建作业,配置作业所需参数 Configuration conf = new Configuration(); // 创建作业 Job job = Job.getInstance(conf, "WordCountWithCombine"); String arg1 = args[0], arg2 = args[1]; System.out.println("args1=====" + arg1); System.out.println("args2=====" + arg2); tempValue = "this value is from run"; // 注入作业的主类 job.setJarByClass(WordCountWithCombine.class); // 为作业注入Map和Reduce类 job.setMapperClass(Map.class);//指定Map阶段用的类 //job.setCombinerClass(Combine.class); //job.setReducerClass(Reduce.class); //job.setNumReduceTasks(4); job.setInputFormatClass(TextInputFormat.class);// 指定输入类型为:文本格式文件;注入文本输入格式类 TextInputFormat.addInputPath(job, new Path(arg1));//指定读取路径 //TextInputFormat.addInputPath(job, new Path("/mapred_input1")); job.setOutputFormatClass(TextOutputFormat.class);// 指定输出格式为:文本格式文件;注入文本输入格式类 job.setOutputKeyClass(Text.class);// 指定Key为文本格式;注入文本类 k2 k3 job.setOutputValueClass(IntWritable.class);// 执行Value为整型格式;注入整型类 v2 v3 TextOutputFormat.setOutputPath(job, new Path(arg2));// 指定作业的输出目录 //TextOutputFormat.setOutputPath(job, new Path("/mapred_output1")); // 作业的执行流程 // 执行MapReduce boolean res = job.waitForCompletion(true); if (res) return 0; else return -1; } -
Map代码(方法体)
public static class Map extends Mapper
{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private int num = 0; @Override public void map(LongWritable key, Text value, Context context)//k1 v1 k2 throws IOException, InterruptedException { String line = value.toString();//text类型不能直接进行拆分,先转为字符串 System.out.println("file: " + ((FileSplit)context.getInputSplit()).getPath().toString()); System.out.println("map " + String.valueOf(num) + ": " + key.toString() + "================" + line);//输入的第几行,有哪些 num ++; System.out.println(tempValue); StringTokenizer tokenizer = new StringTokenizer(line); //split line to words by space while (tokenizer.hasMoreTokens()) { //operate all word by loop word.set(tokenizer.nextToken()); context.write(word, one); //write KV to context, word is key, word number is value } } } -
Reduce代码(方法体)
public static class Reduce extends Reducer
{//k2 v2 k3 v3 private int num = 0; @Override public void reduce(Text key, Iterable val, Context context) //method for each key,input format key(value1,value2,......) throws IOException, InterruptedException { //new k2 val v2 集合 上下文对象 int sum = 0; Iterator values = val.iterator(); while (values.hasNext()) { sum += values.next().get(); //sum value(one word count) } System.out.print("Reduce " + String.valueOf(num) + ": " + key.toString() + "================" + Integer.toString(sum)+ "n"); num++; context.write(key, new IntWritable(sum)); //write one key and its count } } -
MapReduce过程:
- Mapper过程
- inputdata:hdfs中输入的数据
- inputsplit:数据分片,每个map操作一个inputsplit
- inputformat:负责读取inputsplit和执行键值对拆分
- mapper:收到inputformat后的键值对,执行map过程
- combiner:对map过程的键值对进行一次处理以减少输出至reducer的数据量,相当于map本地的reduce
- partition:确定map的输出键和reduce的映射关系,partition前有一个barrier(完毕后才能进入reducer同步障),保证所有mapper执行
- Reducer过程
- reducer:执行reduce过程
- outputformat:负责job的输出,输出至hdfs
- Mapper过程
-
MapReduce算法设计(作业1)
-
复杂(二次)排序
-
特点及与关系数据库的区别
- 特点:
- 是一个分布式的、版本化的、面向列的、多维度的存储系统,在设备上具备高性能和高可用性
- Hbase是Goole BigTable的开源实现
- 以表的形式组织数据,但结构与关系数据库不同
- 不支持复杂的实时查询,但支持map-reduce实现
- 更高层的pig和hive可在hbase上实现一些受限的sql查询
- 适合大稀疏矩阵的crud操作
- 区别:
- Hbas按列存储,关系数据库按行存储
- hbase基于hdfs实现
- hbase具有容错性,空值不浪费存储空间
- hbase是稀疏型矩阵,可存储通用数据,各种数据在同一张表中,结构简单
- 特点:
-
守护进程(Hbase本身守护进程+Zookeeper的守护进程)
-
Hbase必须基于HDFS运行
-
region如何划分
按行键自动分裂成多个regions,一个region由[startkey,endkey)表示,不通region会被master分配给相应的HRegionServer进行管理,一个HRegionServer可管理多个region,一个Region分到的是若干完整的行,这些行中包含有多个列族,每个列族又生成不同的store
-
HMaster由Zookeeper选举生成
通过选举保证任何时候集群中只有一个Master,Master和RegionServers启动时会向ZooKeeper注册,实时监控Region server上线和下线信息并通知给Master,存储Hbase和schema、元数据表和所有Region的寻址入口,Zookeeper由Hbase启动,ZooKeeper的引入使得Master不再是单点故障
-
- Rowkey:table主键,table中记录按照row key排序
- Column Family:列族,一个Table在水平方向有一个或者多个列族,列族可由任意多个column组成,列族支持动态扩展,无须预定义数量及类型,二进制存储,用户需自行进行类型转换
- Column:
- Cell:通过row和columns确定的为一个存贮单元称为cell,由{row key,column(=+),version}唯一确定的单元,cell中的数据时没有类型的,全部时字节码形式存贮
- TimeStamp:时间戳,每个cell都保存着同一份数据的多个版本,版本通过时间戳来索引,时间戳的类型时64位整型,时间戳可以由hbase赋值,此时时间戳时精确到毫秒的当前系统时间。时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳,每个cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。
-
简单的Hbase Shell命令
- 如何进入Hbase Shell
- list
- scan
- create
- get(获取指定行数据、指定行指定列族、指定行指定列族中的指定列(cell))
- put
-
meta表:
用于保存集群中regions的位置信息,存储各region的记录,查询数据先从meta表中获取数据所在的region信息,一个域的meta表已可存储相当多的数据。
-
什么是Hive?
基于HDFS的数据仓库
-
基本工作原理:
- 表数据存储在HDFS
- 表结构存储在关系数据库(内嵌的Deby、MySQL)
- 编写HiveQL语言执行查询
- Hive将HiveQL语言编译成MapReduce程序提交给YARN执行查询
-
HiveQL编写语句:
- 简单查询
- Group By统计平均值
- 和
- 个数
-
Hive的内部表和外部表
-
内部表:数据(表数据和元数据)完全由Hive管理,数据保存在Hive和HDFS的默认路径下,删除内部表时,表数据和元数据都会一起删除,修改内部表时会同步到元数据
默认路径:/user/hive/warehouse/数据库名/数据文件名
-
外部表:外部表的表数据由HDFS管理,Hive管理外部表元数据,而内部表的表数据和元数据都由Hive管理,外部表的表数据存储位置由用户指定,删除外部表时,只删除表的元数据,表数据仍然存储在HDFS中,修改外部表结构和分区修改时,需要对表的元数据进行修复。
-
-
Hive的安装模式
- 内嵌模式:元数据采用Hive内嵌的Derby数据库存储,只允许一个会话连接,仅用于个人实验或测试所用
- 本地模式:元数据采用本地或网络Mysql数据库存储,通过mysql客户端访问元数据
- 远程模式:元数据采用远程mysql数据库存储,用于非java客户端访问元数据库,在hive端启动metastoreserver,客户端利用thrift协议通过metastoreserver访问源数据库
-
Hive的运行环境(HDFS+YARN)
-
command not found:
命令错误,或者没有配环境变量,或者没有下载该变量
-
No such file or directory
文件路径错误
-
Could not resolve hostname node1
无法解析,hosts文件没有配好,没有为每一个主机添加ip映射,增加节点IP地址和节点名称
-
File Exists
文件已存在,可以改名
-
Cannot set priority of datanode process(一般出现在HDFS启动后)
Cannot set priority of secondarynamenode process
-
Hive的安装模式
- 内嵌模式:元数据采用Hive内嵌的Derby数据库存储,只允许一个会话连接,仅用于个人实验或测试所用
- 本地模式:元数据采用本地或网络Mysql数据库存储,通过mysql客户端访问元数据
- 远程模式:元数据采用远程mysql数据库存储,用于非java客户端访问元数据库,在hive端启动metastoreserver,客户端利用thrift协议通过metastoreserver访问源数据库
-
Hive的运行环境(HDFS+YARN)
-
command not found:
命令错误,或者没有配环境变量,或者没有下载该变量
-
No such file or directory
文件路径错误
-
Could not resolve hostname node1
无法解析,hosts文件没有配好,没有为每一个主机添加ip映射,增加节点IP地址和节点名称
-
File Exists
文件已存在,可以改名
-
Cannot set priority of datanode process(一般出现在HDFS启动后)
Cannot set priority of secondarynamenode process
看各个节点相关守护进程,再看日志,寻找具体的出错信息



