- 批数据集成
- 静态数据集数据被组织成"批"地(时间窗口)周期性的迁移到另一个系统(专门的数据端)抽取、转换、集成为通用数据格式持久化集成数据
- 数据仓库数据分析等数据服务应用
- 特点
- 时效性要求不高(时间窗口较大)数据精确性要求高数据有长久的价值
- 整合分散、过量冗余、不一致的数据,提供完整、准确、易共享的数据系统。解决信息孤岛问题,提高数据使用价值提供高支持管理决策能力的数据服务
- 底层数据结构透明:对数据访问等消费应用提供了统一接口,无需暴露底层数据结构高性能和扩展性:把数据集成和数据访问分成了两个过程,因此访问时数据已经处于准备好的状态。数据集成后提供真正单一数据视图高可重用性:由于有了实际的物理存储,数据可以为各种应用提供可重用的数据视图,而不用担心底层实际的数据源的可用性。数据管控能力加强,降低管理成本
- 一致性
- 数据一致性语义一致性
- 数据源的异构性数据模式的异构性数据类型的异构性(eg:浮点数float/double)取值的异构性(eg:性别0/1 F/M)语义的异构性(eg:不同数据的id)
- 容错性:各个数据源有很强的自治性,它们可以在不通知集成系统的前提下改变自身的结构和数据,不要select *这样子发生迁移、扩展时不会出现问题。可伸缩性
- spark的数据库如何确定spark的文件节点的确定
- sqoopflume
- 什么是Sqoop
- Sqoop:SQL–to–HadoopSqoop项目始于2009年,早期为Hadoop的第三方模块,后来成为Apache的独立项目Sqoop是一个主要在Hadoop和关系数据库之间进行批量数据迁移的工具
- 版本:Sqoop1与Sqoop2完全不兼容
- 优点
- 架构简单部署简单功能全面稳定性较高速度较快
- 访问方式单一命令行方式容易出错,格式紧耦合安全机制不够完善,存在密码泄露风险
- 优点
- 访问方式多样集中管理连接器安全机制较完善支持多用户
- 架构较复杂部署较繁琐稳定性一般速度一般
- 数据导入:RDBMS->Hadoop
- 数据导出:Hadoop->RDBMS
- Sqoop安装
- 需要准备Java JDK和Hadoop需要准备数据库驱动,比如:下载mysql的数据库驱动,移动到Sqoop的lib目录中
明文密码:
sqooplist-databases --connect jdbc:mysql://IP:PORT/ --username USER --password PASSWD;
输入密码:
sqooplist-databases --connect jdbc:mysql://IP:PORT/ --username USER -P
密码文件
sqooplist-databases --connect jdbc:mysql://IP:PORT/ --username USER --password-file file:/temp2.1.4.2. 列出数据库的所有表
明文密码:
sqooplist-tables --connect jdbc:mysql://IP:PORT/testDB --username USER --password PASSWD;
输入密码:
sqooplist-tables --connect jdbc:mysql://IP:PORT/ testDB --username USER -P
密码文件:
sqooplist-tables --connect jdbc:mysql://IP:PORT/testDB --username USER --password-file file:/temp2.1.4.3. 全量数据导入
sqoop import --connect jdbc:mysql://IP:PORT/testDB --username USER --password PASSWD --query "select * from testdbwhere $CONDITIONS" --target-dir/input --fields-terminated-by "\01" --hive-drop-import-delims --null-string "\N" --null-non-string "\N" --split-by id -m 6
参数说明
--query:sql查询语句--target-dir:hdfs目标目录--hive-drop-import-delims:删除数据中包含的Hive默认分隔符(^A, ^B, n)--null-string:string类型空值的替换符(Hive中Null用n表示)--null-non-string:非string类型空值的替换符--split-by:数据切片字段(int类型,m>1时必须指定)-m:Mapper任务数,默认为4 2.1.4.4. 基于递增列的增量数据导入(Append)
sqoop import --connect jdbc:mysql://IP:PORT/testDB --username USER --password PASSWD --query "select * from table1 where $CONDITIONS" --target-dir/input --split-by id -m 6 --incremental append --check-column id --last-value 4
参数说明
--incremental append:基于递增列的增量导入(将递增列值大于阈值的所有数据增量导入Hadoop)--check-column:递增列(int)--last-value:阈值(int) 2.1.4.5. 基于时间列的增量数据导入(LastModified方式)
sqoop import --connect jdbc:mysql://IP:PORT/testDB --username USER --password PASSWD --query "select * from table1 where $CONDITIONS" --target-dir/input --split-by id -m 1 --incremental lastmodified --merge-key id --check-column time --last-value "2020-05-01 00:00:00"
参数说明
--incremental lastmodified:基于时间列的增量导入(将时间列大于等于阈值的所有数据增量导入Hadoop)--merge-key:合并列(主键,合并键值相同的记录)--check-column:时间列(timestamp)--last-value:阈值(timestamp) 2.2. Flume 2.2.1. Flume简介
- 什么是Flume
- Flume是一个分布式海量日志采集、聚合和传输系统Flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。
- 基于事件的海量数据采集数据流模型:Source->Channel->Sink事务机制:支持重读重写,保证消息传递的可靠性内置丰富插件:轻松与各种外部系统集成高可用:Agent主备切换Java实现:开源,优秀的系统框架设计,模块分明,易于开发
- Event:事件,最小数据传输单元,由Header和Body组成
- Agent
- 代理,JVM进程,最小运行单元由Source、Channel、Sink三个核心组件构成负责将外部数据源产生的数据以Event的形式传输到目的地
- 对接各种外部数据源,将采集到的数据封装成Event,然后写入Channel一个Source可向多个Channel发送EventFlume内置类型丰富的Source,包括avro、exec、jms、spooldir、netcat、sequence generator、syslog、kafka、http、legacy、自定义。
- Event暂存容器,负责保存Source发送的Event,直至被Sink成功读取,可以存放在memory、jdbc、file、kafka channel等为了平衡Source采集、Sink读取的速度,可视为Flume内部的消息队列线程安全并具有事务性,支持Source写失败重写和Sink读失败重读
- 负责从Channel读取Event,然后将其写入目的地,目的地既可以是外部存储,也可以是下一阶段的Agent。包括hdfs、logger、avro、ipc、file、null、hbase、Kafka Sink自定义。一个Sink只能从一个Channel中读取EventSink成功读取Event后,向Channel提交事务,Event被删除,否则Channel会等待Sink重新读取
- 1个Source --> 多个Channel1个Channel --> 多个Sink1个Sink --> 1个Channel
越长越容易出错
| 扇入流 | 扇出流 |
|---|---|
- 安装
- 运行环境:JDK1.7及以上版本下载安装Flume,修改配置验证Flume
- 在Flume安装目录下,运行命令./bin/flume-ng version,查看Flume版本信息
a1.sources = r1 a1.sinks= k1 a1.channels= c1 a1.sources.r1.type = netcat // Source类型为netcat, 监听指定的Socket端口 a1.sources.r1.bind = localhost a1.sources.r1.port= 44444 a1.sinks.k1.type = logger // Sink类型为logger, 将Event输出到控制台 a1.channels.c1.type = memory // Channel类型为memory, Event缓存在内存中 a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity= 100 a1.sources.r1.channels= c1 a1.sinks.k1 channel=c1 //一个Sink只能连接一个Channel
- 针对不同source、channel、sink类型的配置自行搜索Flume运行步骤
- 编写Agent配置文件(*.conf)运行Agent,命令如下:
- # bin/flume-ng agent --confconf--conf-file example.conf--name a1 -D flume.root.logger= INFO, console--conf:Agent配置文件目录--conf-file:Agent配置文件名(*.conf)--name:Agent名称-D:JVM参数
- 全称:Extract-Transform-Load定义:
- 用来描述数据从来源端,经过萃取、转换、加载,至目的端的过程。将数据从孤立的数据源中采集出来,汇集到可被计算平台高效访问的目的地。
- 常用在数据仓库,但不限于数仓。整个流程可用任何语言开发完成。常与ELT(Extract-Load-Transform)混用,通常数据量越大、转换逻辑越复杂、目的端数据库运算能力越强,越倾向使用ELT,以便运用目的端数据库的并行处理能力。
- 业务场景
- 汇总业务交易数据将应用数据从旧系统迁移到新系统整合近期公司收购或合并的数据整合来自外部供应商、合作伙伴的数据
- 数据仓库(Data Warehouse)决策支持系统(Decision Support System)联机分析处理(online Analytical Processing)数据挖掘(Data Mining)商务智能(Business Intelligence)
- Extract(不同的数据源 —> OperationalDataStore)
- 大量调研
- 数据来自几个业务系统?系统各自数据库服务器运行什么DBMS?存在多少手工数据?存在多少非结构化数据?
- 与DW的数据库系统相同的数据源:直接连接,select 语句直接访问与DW的数据库系统不同的数据源
- 直接连接将源数据导出.txt/.xls后,再将这些文件导入ODS通过程序接口完成
- 数据清洗(是一个反复的过程,需要不断修正)
- 不完整的数据(名称、区域信息等)
- 补全
- 用SQL语句找出后,在业务系统修正后抽取
- 将重复字段导出,交由客户确认
- 转换不一致数据
- 不同业务系统相同类型的数据要统一
- 将业务系统数据按照数据仓库粒度进行聚合
- 数据指标
- Transform 的工作量约占2/3数据的加载一般是数据清洗了之后,直接写入数据仓库中去
- Datastage
- IBM 公司的商业软件,最专业的ETL工具价格不菲,适合大规模的ETL 应用使用难度:★★★★
- 商业软件,相当专业的ETL 工具价格比Datastage 便宜一点,也适合大规模的ETL 应用使用难度:★★
- 免费,开源纯Java 编写,只需JVM 即可部署,可跨平台、扩展性好使用难度:★★
- DatePipeline、Talend、Datax、Oracle Goldengate自己探索
- 数据连接:数据源的数据库技术不一致性能
- 集中式架构扩展性差分布式架构扩展性强
- 匹配、合并、更改数据非常重要
- 数据质量决定产品质量上界
- 数据是按触发器获取,还是按时间间隔获取若企业对数据的实时性要求高,则需考虑流处理能力
- 什么是HDFS
- Hadoop分布式文件系统(Hadoop Distributed File System)HDFS是Apache Hadoop的核心子项目HDFS是GFS的开源实现Master/slave架构
- 运行在大量廉价商用机器上,提供容错机制:故障检测、快速自动恢复机制简单可靠的聚合模型:一次写入多次读取,支持追加,不允许修改,保证数据一致性流式数据访问:批量读而非随机读,关注吞吐量而非时间可靠存储和处理大量数据的可伸缩性通过分布数据和逻辑到数据所在的多个节点上进行平行处理来提高效率
- 高容错、高可用、高扩展
- 数据冗余,多Block多副本,副本丢失后自动恢复(一般三副本)NameNodeHA、安全模式
- 典型文件大小GB~TB,百万以上文件数量,PB以上数据规模
- 构建在廉价的商用服务器上提供了容错和恢复机制
- 流式数据访问数据位置暴露给计算框架处理逻辑接近数据,而不是数据接近处理逻辑
- 实时性差,不适合低延迟数据访问小文件问题,不适合大量小文件存储
- 元数据占用NameNode大量内存空间
- 每个文件、目录和Block的元数据都要占150Byte存储1亿个元素,大约需要20GB内存如果一个文件为10KB,1亿个文件大小仅有1TB,却要消耗掉20GB内存
- 一个文件同时只能有一个写入者
- 仅支持追加写入
- 写操作
- 读操作
- Shell命令
- Hadoop fs (使用面最广,可以操作任何文件系统)hdfs dfs (只能操作HDFS文件系统)
- hadoopfs –helphadoopfs –ls/tmphadoopfs –text /tmp/*|lesshadoopfs –get /tmp/file localfilehadoopfs –put localfile/tmp/等
- 大数据文件存储格式
- 面向行(Row-based)存储面向列列(Column-based)存储混合存储
- 更有效处理海量数据,存储量要求安全、可靠、完整性更适合存储、压缩、读取、查询,节约存储,保证效率等
- 行存储
- 同一行的数据存储在一起,即连续存储如果只需要访问行的一小部分数据,亦需要将整行读入内存面向行的存储适合于整行数据需要同时处理的情况。
- SequenceFile文件MapFile文件Avro Datafile
- 列存储
- 整个文件被切割为若干列数据,每一列数据一起存储面向列的格式使得读取数据时,可以跳过不需要的列,适合于只处于行的一小部分字段的情况读写需要更多的内存空间,因为需要缓存行在内存中(为了获取多行中的某一列)不适合流式写入,因为一旦写入失败,当前文件无法恢复,而面向行的数据在写入失败时可以重新同步到最后一个同步点,所以Flume采用的是面向行的存储格式。
- ORCFileParquet
- 参考一参考二
- 行存储的优点
- 一次性完成,消耗的时间比列存储少能够保证数据的完整性
- 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,因此可以使用更高效的压缩算法,进一步节约存储空间。只读取需要的列,不用全表扫描,不会产生冗余数据支持向量运算,能够获取更好的扫描性能
- 什么是数据仓库:数据仓库是一个面向主题的、集成的、随时间变化的、非易失的数据集合,用于支持管理者的决策过程。特点
- 面向主题
- 系统业务级别主题决策时关心的重点方面
- 多个异构数据源经过ETL处理
- 历史操作数据周期性迁移至数仓反映了某一历史时间点的数据快照
- 一旦进入数仓,数据就不应再有改变保留数据变化的历史轨迹
- 缓冲层(数据接入层)
- 概念
- 业务系统与ODS层之间的临时缓冲区,用于临时接入业务系统的原始细节数据是否设立缓冲层,视情况而定
- 细粒度:与业务系统保持一致的原始细节数据,不做任何处理有效期:临时(1~2天),并定期清除
- 概念
- ODS:Operational Data Store,操作数据存储,一个面向主题的、集成的、可变的、当前的细节数据集合,用于支持即时性、操作型、集成性的信息需求业务系统和基础明细层之间的过度区,用于存储业务系统的当前原始细节数据
- 来源:业务系统或缓冲层细粒度:数据结构、数据间逻辑关系、数据粒度均与业务系统保持一致的原始细节数据高集成:按主题进行全面集成,拥有业务数据的完整视图低质量:不做处理或仅做简单清洗,并加时间戳(一般不做数据清洗、聚合和汇总)有效期:短期(1~2个月),并定期清除
- 当作为业务系统与数仓之间的隔离区,避免数仓直接从业务系统抽取数据,最小化对业务系统的干扰承接部分业务系统的原始细节数据(低粒度)的查询和报表功能,降低业务系统的查询和分析压力
- 概念
- DWD:Data Warehouse Detail数据仓库的中间层,从ODS层获取数据,按主题进行组织,经过清洗、校验、转换、合并等规范化处理,形成业务数据的完整视图
- 来源:ODS层细粒度:与ODS基本一致,不做聚合和汇总高质量:经过清洗、校验、转换、合并(根据业务进行数据关联形成宽表)等规范化处理高集成:按主题进行组织,拥有业务数据的完整视图有效期:永久
- 面向业务分析需求,对基础明细层的细节数据进行细粒度(轻度)的聚合、汇总和统计提前计算出细粒度的基础汇总指标,减轻后期数据分析的压力
- 数据集市是面向单一主题域(如销售、财务等)、为特定业务部门构建的小规模数据集合数据源可以是数据仓库(从属数据集市),也可以是业务系统(独立数据集市)数据集市是数据仓库的子集用于在线交互式分析(秒级响应)
- 什么是Hive
- 基于Hadoop的一个数据仓库工具将结构化的数据文件映射为一张数据库表提供SQL查询功能,将SQL语句转变成MapReduce任务来执行。
- 通过类SQL来分析大数据,而避免了写MapReduce程序来分析数据,这样使得分析数据更容易。数据底层存储在HDFS上,Hive本身并不提供数据的存储功能Hive是将数据映射成数据库和一张张的表,库和表的元数据信息一般存在关系型数据库上(比如MySQL)。数据存储方面:它能够存储很大的数据集,并且对数据完整性、格式要求并不严格。数据处理方面:因为Hive语句最终会生成MapReduce任务去计算,所以不适用于实时计算的场景,它适用于离线分析
- 内部表
- 默认创建的是内部表(managed table),存储位置在hive.metastore.warehouse.dir设置,默认位置是/user/hive/warehouse。导入数据的时候是将文件剪切(移动)到指定位置,即原有路径下文件不再存在删除表的时候,数据和元数据都将被删除默认创建的就是内部表create table xxx (xx xxx)
- 外部表文件可以在外部系统上,只要有访问权限就可以外部表导入文件时不移动文件,仅仅是添加一个metadata删除外部表时原数据不会被删除分辨外部表内部表可以使用DESCRIBE FORMATTED table_name命令查看创建外部表命令添加一个external即可,即create external table xxx (xxx)外部表指向的数据发生变化的时候会自动更新,不用特殊处理
- 有些时候数据是有组织的,比方按日期/类型等分类,而查询数据的时候也经常只关心部分数据,比方说我只想查2017年8月8号,此时可以创建分区,查询具体某一天的数据时,不需要扫描全部目录,所以会明显优化性能一个Hive表在HDFS上是有一个对应的目录来存储数据,普通表的数据直接存储在这个目录下,而分区表数据存储时,是再划分子目录来存储的使用partionedby (xxx)来创建表的分区
- 分桶是相对分区进行更细粒度的划分。分桶将整个数据内容安装某列属性值得hash值进行区分,按照取模结果对数据分桶。如取模结果相同的数据记录存放到一个文件。桶表也是一种用于优化查询而设计的表类型。创建通表时,指定桶的个数、分桶的依据字段,hive就可以自动将数据分桶存储。查询时只需要遍历一个桶里的数据,或者遍历部分桶,这样就提高了查询效率clustered by (user_id) sorted by(leads_id) into 10 buckets
- 写入数据
- 从本地文件系统中导入:load data local inpath'xxx.txt' into table xxx;从HDFS上导入:
- load data inpath'/home/xxx/add.txt' into table xxxalter table db.access_logadd partition (dt=‘2020-06-01') location 'hdfs://ns/hive/warehouse/access_log/dt=2020-06-01 ';
- 与关系型数据库的SQL 略有不同,但支持了绝大多数的语句如DDL、DML 以及常见的聚合函数、连接查询、条件查询。HIVE不适合用于联机(online)事务处理,也不提供实时查询功能。适合应用在基于大量不可变数据的批处理作业。



