栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021-12-23 迈向程序员的第五十二步

2021-12-23 迈向程序员的第五十二步

目录

一.rowkey设计原则

1.1 唯一原则

1.2 长度原则

1.3 散列原则

二.Hbase优化实操

2.1 高可用

2.2 预分区

2.3 RowKey设计(记忆)

2.4 内存优化

2.5 Hbase2.0新特性

三.Flume概述

3.1 大数据处理流程

3.2 Flume的简介

3.3 Flume的体系结构

3.4 Flume采集模型

3.4.1 模型分类

3.4.2 设计原则 

3.4.3 采集方案模板

3.4.4 三大核心组件的常用接口

四、Flume的安装

五、flume采集方案演示


一.rowkey设计原则

1.1 唯一原则

必须在设计上保证其唯一性。

由于在Hbase中数据存储是Key-Value形式,若Hbase中同一张表插入相同Rowkey,并且是同一个列族下的同一个key时,如果version设置为1的话,则原先的数据会被覆盖掉,所以务必保证Rowkey的唯一性

1.2 长度原则
rowkey的长度统一。

Rowkey是一个二进制码流,Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。

原因如下:

-1. 数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;
-2. MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。
-3. 目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。

需要指出的是不仅Rowkey的长度是越短越好,而且列族名、列名等尽量使用短名字,因为Hbase属于列式数据库,这些名字都是会写入到Hbase的持久化文件HFile中去,过长的Rowkey、列族、列名都会导致整体的存储量成倍增加。

1.3 散列原则

我们设计的Rowkey应均匀的分布在各个Hbase节点上。

拿常见的时间戳举例,假如Rowkey是按系统时间戳的方式递增,Rowkey的第一部分如果是时间戳信息的话将造成所有新数据都在一个RegionServer上堆积的热点现象,就是region热点问题。

我们可以采取下面三种方式,来解决这个Region热点问题:

-1. Reverse反转

针对固定长度的Rowkey反转后存储,这样可以使Rowkey中经常改变的部分放在最前面,可以有效的随机Rowkey。
反转Rowkey的例子通常以手机举例,可以将手机号反转后的字符串作为Rowkey,这样的就避免了以手机号那样比较固定开头(137x、15x等)导致热点问题,这样做的缺点是牺牲了Rowkey的有序性。

-2. Salting(加盐)

Salting是将每一个Rowkey加一个前缀,前缀使用一些随机字符,使得数据分散在多个不同的Region,达到Region负载均衡的目标。比如在一个有4个Region(注:以 [ ,a)、[a,b)、[b,c)、[c, )为Region起止)的Hbase表中,
加Salt前的Rowkey:abc001、abc002、abc003
我们分别加上a、b、c前缀,加Salt后Rowkey为:a-abc001、b-abc002、c-abc003  
可以看到,加盐前的Rowkey默认会在第2个region中,加盐后的Rowkey数据会分布在3个region中,理论上处理后的吞吐量应是之前的3倍。由于前缀是随机的,读这些数据时需要耗费更多的时间,所以Salt增加了写操作的吞吐量,不过缺点是同时增加了读操作的开销。

-3. Hash散列或者Mod
用Hash散列来替代随机Salt前缀的好处是能让一个给定的行有相同的前缀,这在分散了Region负载的同时,使读操作也能够推断。确定性Hash(比如md5后取前4位做前缀)能让客户端重建完整的RowKey,可以使用get操作直接get想要的行。

例如将上述的原始Rowkey经过hash处理,此处我们采用md5散列算法取前4位做前缀,结果如下
9bf0-abc001 (abc001在md5后是9bf049097142c168c38a94c626eddf3d,取前4位是9bf0)  
7006-abc002  
95e6-abc003  

若用前4个字符作为不同分区的起止,上面几个Rowkey数据会分布在3个region中。实际应用场景是当数据量越来越大的时候,这种设计会使得分区之间更加均衡。 

如果Rowkey是数字类型的,也可以考虑Mod方法。

二.Hbase优化实操

2.1 高可用
    在Hbase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个Hbase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以Hbase支持对Hmaster的高可用配置。

2.1.1 关闭Hbase集群

[root@xxx1 home]# stop-hbase.sh

2.1.2 在conf目录下创建backup-masters文件

[root@xxx2 home]# touch conf/backup-masters

2.1.3 在backup-masters文件中配置高可用HMaster节点

[root@xxx2 home]# echo "192.168.49.101" > conf/backup-masters

2.1.4 将整个conf目录scp到其他节点

[root@xxx1 home]# scp -r conf/ 192.168.49.101:/home/hbase-1.2.1/hbase/

1.5 打开页面测试查看

2.2 预分区
   每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高Hbase性能。

2.2.1 手动预分区

create 'stf1','info','partition1',SPLITS => ['1000','2000','3000','4000']
put 'stf1', '888', 'info:name', 'lixi'

2.2.2 生成16进制序列预分区

create 'stf2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

2.2.3 按照文件中设置的规则预分区

创建splits.txt文件内容如下:

aaaa
bbbb
cccc
dddd

然后执行

create 'stf3','partition3',SPLITS_FILE => '/home/splits.txt'

2.2.4 JavaAPI创建预分区

//自定义算法,产生一系列Hash散列值存储在二维数组中
byte[][] splitKeys = 某个散列值函数
//创建HbaseAdmin实例
HbaseAdmin hAdmin = new HbaseAdmin(HbaseConfiguration.create());
//创建HTableDescriptor实例
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
//通过HTableDescriptor实例和散列值二维数组创建带有预分区的Hbase表
hAdmin.createTable(tableDesc, splitKeys);

2.3 RowKey设计(记忆)
一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。接下来我们就谈一谈rowkey常用的设计方案。

2.3.1 字符串反转

20191124000001转成10000042119102
20191124000002转成20000042119102

2.3.2 字符串拼接

20191124000001_lixi
20191124000001_rock

2.3.3 生成随机数、hash、散列值

比如:
原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7
原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd
原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913
在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后作为每个分区的临界值。

2.4 内存优化
Hbase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给Hbase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。

2.4.1 设置在HDFS中追加内容

hdfs-site.xml、hbase-site.xml


    dfs.support.append
    true
    开启HDFS追加同步,可以优秀的配合Hbase的数据同步和持久化。默认值为true

2.4.2 dataNode允许的最大文件打开数

hdfs-site.xml


    dfs.datanode.max.transfer.threads
    4096
    Hbase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096

2.4.3 设置延迟高的数据操作的等待时间

hdfs-site.xml


    dfs.image.transfer.timeout
    60000
    如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。

2.4.4 优化数据的写入效率

mapred-site.xml


    mapreduce.map.output.compress
    true
    开启这两个数据可以大大提高文件的写入效率,减少写入时间。

    mapreduce.map.output.compress.codec
    org.apache.hadoop.io.compress.GzipCodec
    开启这两个数据可以大大提高文件的写入效率,减少写入时间。org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式

2.4.5 设置RPC监听数量

hbase-site.xml


    hbase.regionserver.handler.count
    30
    默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。

2.4.6 设置HStore文件大小

hbase-site.xml


    hbase.hregion.max.filesize
    10737418240
    默认值10737418240(10GB),如果需要运行Hbase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。

2.4.7 设置hbase客户端缓存

hbase-site.xml


    hbase.client.write.buffer
    4096
    用于指定Hbase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。

2.4.8 指定scan.next扫描Hbase所获取的行数

hbase-site.xml


    hbase.client.scanner.caching
    4096
    用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。

2.4.9 flush、compact、split机制

大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。
涉及属性:
1. 128M就是Memstore的默认阈值
hbase.hregion.memstore.flush.size:134217728
tip:
    这个参数的作用是当单个HRegion内所有的Memstore大小总和超过指定值时,flush该HRegion的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。
    
2. 
hbase.regionserver.global.memstore.upperLimit:0.4
hbase.regionserver.global.memstore.lowerLimit:0.38
tip:
    当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit

2.5 Hbase2.0新特性
2017年8月22日凌晨2点左右,Hbase发布了2.0.0 alpha-2,相比于上一个版本,修复了500个补丁,我们来了解一下2.0版本的Hbase新特性。
最新文档:
http://hbase.apache.org/book.html#ttl
官方发布主页:
http://mail-archives.apache.org/mod_mbox/www-announce/201708.mbox/ 

三.Flume概述

3.1 大数据处理流程

在企业中,大数据的处理流程一般是:

1. 数据采集
2. 数据清洗ETL
3. 数据分析
4. 数据展示(BI,数据挖掘,为AI提供数据支持)

扩展:大数据在进行数据采集的时候,数据的种类可以这样分:

1. 业务数据
2. 行为日志数据
3. 内容数据
4. 购买的第三方数据

3.2 Flume的简介
1. Flume是一个分布式的、高可用、高可靠的行为日志数据的采集框架
2. 体系架构简单,是基于数据流的模型
3. 具有故障转移和恢复机制,以及良好的扩展机制。
4. 0.9.4以前的版本称之为Flume OG   之后称之为Flume NG版本

3.3 Flume的体系结构

1. source: 用来采集数据,封装成event,传输给channel
2. channel:  channel接受source的event,传输给sink
3. sink: 接收channel的event, 落地到相应的位置
4. Agent:  flume的最小运行单元,一个Agent中三个核心组件都要至少有一个。
5. event:  采集的数据的封装对象。  由消息头head(KV对)和消息体body(byte[]类型)组成
6. interceptor:  作用在source或者是sink端, 可以拦截event,进行设置等
7. selector: 作用在source端,用于将event分发到不同的channel中
8. processor:可以进行sink组设置,进行自动容灾,负载均衡

3.4 Flume采集模型

3.4.1 模型分类

1)单一数据模型

在单个 Agent 内由单个 Source, Channel, Sink 建立一个单一的数据流模型,如下图所示,整个数据流为 Web Server --> Source --> Channel --> Sink --> HDFS。

 2) 多数据流模型

多 Agent 串行传输数据流模型

多 Agent 汇聚数据流模型

 

单 Agent 多路数据流模型

 

Sinkgroups 数据流模型

 

3.4.2 设计原则 
Source--> Channel
  1.单个Source组件可以和多个Channel组合建立数据流,既可以replicating 和 multiplexing。
  2.多个Sources可以写入同一个 Channel
Channel-->Sink
  1.多个Sinks又可以组合成Sinkgroups从Channel中获取数据,既可以loadbalancing和failover机制。
  2.多个Sinks可以从同一个Channel中取数据。
  3.单个Sink只能从单个Channel中取数据

3.4.3 采集方案模板
参考官网

3.4.4 三大核心组件的常用接口

1)Source

Spooling Directory Source
TailDir Source
Kafka Source
Http Source
Avro Source
Exec Source

2)channel

Memory Channel
File Channel

3)Sink

HDFS Sink
Logger Sink
Kafka Sink
Avro Sink
Hive Sink
Hbase Sink

四、Flume的安装

1)上传,解压,更名,配置环境变量,验证

[root@xxx01 ~]# tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local/
[root@xxx01 ~]# cd /usr/local/
[root@xxx01 local]# mv apache-flume-1.8.0-bin/ flume
[root@xxx01 local]# vim /etc/profile
#FLUME environment
export FLUME_HOME=/usr/local/flume
export PATH=$FLUME_HOME/bin:$PATH

[root@xxx01 local]# vim /etc/profile
[root@xxx01 local]# source /etc/profile
[root@xxx01 local]# flume-ng version

2)配置flume的环境脚本

[root@xxx01 local]# cd flume/conf
[root@xxx01 conf]# cp flume-env.sh.template flume-env.sh
[root@xxx01 conf]# vim flume-env.sh
........省略..........
export JAVA_HOME=/usr/local/jdk
........省略..........

3)集群模式的说明

flume的集群模式指的就是各个节点都安装了flume而已

五、flume采集方案演示

案例1)avro+memory+logger

logger通常用于测试,数据流中的event最终显示在屏幕上

1)采集方案的配置

[root@xxx01 ~]# mkdir flumeconf
[root@xxx01 ~]# vim ./flumeconf/avro-mem-logger.properties
# 定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 定义Source的相关属性
a1.sources.r1.type = avro
# 绑定本机的ip或者是hostname
a1.sources.r1.bind = xxx01 
# 要监听的本机上的某一个端口号,  当程序启动时,该端口号就会被使用
a1.sources.r1.port = 10086

# 定义channel的相关属性
a1.channels.c1.type = memory
#  内存存储容量 event的最大数量
a1.channels.c1.capacity=1000
#  从内存中出来时,一次性提交的event的数量
a1.channels.c1.transactionCapacity=100

#定义Sink的相关属性
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog = 16

2)启动方案

flume-ng agent -c /usr/local/flume/conf -f ./flumeconf/avro-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console

3)测试:因为用的是avro的source,那么必须使用avro-client进行测试

[root@xxx01 ~]# mkdir flumedata
[root@xxx01 ~]# echo "hellworld" > flumedata/data.txt
[root@xxx01 ~]# flume-ng avro-client -c /usr/local/flume/conf/ -H xxx01 -p 10086 -F ./flumedata/data.txt

案例2)exec+memory+logger

注意:使用exec源,监听的文件,要提前创建

1)采集方案的编写

[root@xxx01 ~]# vim ./flumeconf/exec-mem-logger.properties
# 定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 定义Source的相关属性
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F ./flumedata/data.txt

# 定义channel的相关属性
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#定义Sink的相关属性
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog = 16

2)启动采集方案

flume-ng agent -f ./flumeconf/exec-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console

3)测试

echo "helloworld" >> ./flumedata/data.txt

案例3)exec+memory+hdfs

1)采集方案的编写

[root@xxx01 ~]# vim ./flumeconf/exec-mem-hdfs.conf
# 定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 定义Source的相关属性
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F ./flumedata/data.txt

# 定义channel的相关属性
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#定义Sink的相关属性
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path = /flume/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = wcm
a1.sinks.k1.hdfs.fileSuffix = .wsy
# 下面三个条件满足其一,就会产生新文件
# 新文件产生的时间周期,单位是秒,   如果设置为0表示不会产生新文件。
a1.sinks.k1.hdfs.rollInterval = 60    
# 当前文件达到1000字节,就会产生新文件
a1.sinks.k1.hdfs.rollSize = 1000
# 当前文件的event数量达到10条,就会产生新文件
a1.sinks.k1.hdfs.rollCount = 10
# 如果writeFormat指定了Text,那么fileType必须是DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream

# round的作用,用于指定是否滚动文件夹  false 表示不滚动文件夹
a1.sinks.k1.hdfs.round = true      
# 设置文件夹滚动的时间单位
a1.sinks.k1.hdfs.roundUnit = minute
# 设置文件夹固定的时间数字大小
a1.sinks.k1.hdfs.roundValue = 2
#  如果目录上设置了时间格式字符串,比如%Y等,那么下面的属性应该设置为true,除非event的head里有一个叫timestamp的消息头
a1.sinks.k1.hdfs.useLocalTimeStamp = true

2)启动方案

flume-ng agent -f ./flumeconf/exec-mem-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

3)测试

[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt

案例4)spool+memory+logger

spool源,是用来监听目录下的新文件的,并通过更名的方式来决定该文件已经采集完。注意,监听的目录必须提前存在。是一个可靠源

exec源不可靠

1)采集方案的编写

[root@xxx01 ~]# vim flumeconf/spool-mem-logger.properties
# 列出每个组件的名称
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

#设置source组件的属性
a1.sources.r1.type=spooldir
#要监听的目录必须提前创建
a1.sources.r1.spoolDir=/root/data/subdir
a1.sources.r1.fileSuffix=.gyy
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000

#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#设置sink组件的属性
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=16

2)启动方案

flume-ng agent -f ./flumeconf/spool-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console

3)测试

[root@xxx01 ~]# echo "helloworld" >>data/subdir/a.txt
[root@xxx01 ~]# cd data/subdir/
[root@xxx01 subdir]# ll
总用量 4
-rw-r--r-- 1 root root 11 12月 23 16:25 a.txt.gyy
[root@xxx01 subdir]# echo "helloworld" >>b.txt
[root@xxx01 subdir]# echo "helloworld" >>c.txt
[root@xxx01 subdir]# echo "helloworld" >>d.txt
[root@xxx01 subdir]# ll
总用量 16
-rw-r--r-- 1 root root 11 12月 23 16:25 a.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 b.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 c.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 d.txt.gyy
注意:因为每次监听都会更名,因此再次监听的文件名不能与之前的名字重复。

案例5)spool+file+hdfs

1)方案的编写

[root@xxx01 ~]# vim flumeconf/spool-file-hdfs.properties
# 命名,并关联
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

#设置spool源
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/data/subdir
a1.sources.r1.fileSuffix=.gyy
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000

#设置file的channel
a1.channels.c1.type=file

#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://xxx01:8020/flume/hdfs/%Y
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=michael
a1.sinks.s1.hdfs.fileSuffix=.gyy
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=false
a1.sinks.s1.hdfs.roundValue=2      
a1.sinks.s1.hdfs.roundUnit=minute

2)启动采集方案

[root@xxx01 ~]# flume-ng agent  -f ./flumeconf/spool-file-hdfs.properties -n a1  -Dflume.root.logger=INFO,console

3)测试

[root@xxx01 subdir]# echo "helloworld" >>e.txt
[root@xxx01 subdir]# echo "helloworld" >>f.txt
[root@xxx01 subdir]# echo "helloworld" >>g.txt

案例6)http+memory+logger

1)采集方案的编写

[root@xxx01 ~]# vim flumeconf/http-mem-logger.properties
# list name of three core
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

#设置每个组件的接口以及属性
a1.sources.r1.type=http
#该源要监听的host或者是ip
a1.sources.r1.bind=xxx01
#该源要监听的port
a1.sources.r1.port=10086
a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler


a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=32

2)启动方案

[root@xxx01 ~]# flume-ng agent  -f ./flumeconf/http-mem-logger.properties -n a1  -Dflume.root.logger=INFO,console

3)使用curl指令发送post协议进行测试

[root@xxx03 ~]# curl -X POST -d '[{"headers":{"girlfriend1":"zhangjunning","girlfriend":"nazha"},"body":"they are my girlfriends"}]' http://xxx01:10086


解析:
-X  用来指定http的请求方式,如post或者是get
-d  用来模拟要发送的数据
第三个参数表示要将数据发送到的地址。

案例7)syslogtcp+memory+logger

1)采集方案的编写

[root@xxx01 ~]# vim flumeconf/syslogtcp-mem-logger.properties
# list name of three core
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

#设置每个组件的接口以及属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=xxx01
a1.sources.r1.port=10086
a1.sources.r1.eventSize=2500


a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=32

2)启动方案

[root@xxx01 ~]# flume-ng agent  -f ./flumeconf/syslogtcp-mem-logger.properties -n a1  -Dflume.root.logger=INFO,console

3)使用nc指令来发送tcp协议,进行测试

先安装nc指令:yum -y install nmap-ncat

[root@xxx03 ~]#  echo "helloworld" | nc xxx01 10086
nc的语法:   
  nc  host  port

案例8)taildir+memory+hdfs

taildir与spooling这两个源的比较

- 相同点:
	1. 都是可靠源
	2. 监听的都是目录
	3. 该目录一定要提前创建
- 不同点:
	1. spooling监听完的文件会被重命名
	2. spooling监听的目录里的文件不能重名
	3. spooling监听的是目录里的新文件
	4. taildir监听的文件不会被重命名,可以一直监听文件里的新行。 

1)采集方案的编写

[root@xxx01 ~]# vim flumeconf/taildir-mem-hdfs.properties
# 命名,并关联
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

#设置spool源
a1.sources.r1.type=TAILDIR
#设置要监听的文件所属组,可以设置多个
a1.sources.r1.filegroups=g1 g2
#规定每一个组要监听的文件的绝对路径,可以使用正则表达式来表示一批文件
a1.sources.r1.filegroups.g1=/root/data/dir2/.*.txt
a1.sources.r1.filegroups.g2=/root/data/dir2/.*.csv
#a1.sources.r1.positionFile=/root/taildir_position.json

#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://xxx01:8020/flume/hdfs/%Y-%m
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=michael
a1.sinks.s1.hdfs.fileSuffix=.gyy
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=2      
a1.sinks.s1.hdfs.roundUnit=minute

2)启动方案

[root@xxx01 ~]# flume-ng agent  -f ./flumeconf/taildir-mem-hdfs.properties -n a1  -Dflume.root.logger=INFO,console

3)测试

目录要提前创建出来

[root@xxx01 dir2]# echo "chenyun" > a.txt
[root@xxx01 dir2]# echo "chenyun" >> a.txt
[root@xxx01 dir2]# echo "chenyun" >> a.csv
[root@xxx01 dir2]# echo "chenyun" >> a.csv
[root@xxx01 dir2]# echo "chenyun" >> a.csv
[root@xxx01 dir2]# echo "chenyun" >> a.csv
[root@xxx01 dir2]# echo "chenyun" >> a.json   # 不会被采集的

去检查HDFS上的目录

(=-=,flume的实操几乎一样,hbase的优化需要多多注意,今天星期四,明天星期五~~~)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/687261.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号