场景:监听服务器某个文件夹下的内容,当出现新文件时,就将文件的内容动态添加到hdfs中,并用hive读取
第一步,nifi监听并并将数据写入hdfs
nifi采用的版本是1.11.4,采用的processor包含两个:putHDFS与getFile,getFile主要目的是监听某个特定文件夹,如果有新增的文件就将文件中的内容写入到hdfs。这里以一个电影的测试数据集为例,主要包含id、title和genres三个字段,文件格式为csv格式。
需要配置getFile的几个参数:
| Input Directory | 需要监听的文件夹,例如:/tmp/test |
| Keep Source File | 读取该文件后是否保留该文件,若填false,读取后会删掉该文件 |
| Minimum File Age | 文件存在多久后会读取该文件,例如:5sec |
而后需要配置PutHDFS的几个参数:
| Hadoop Configuration Resources | 需要填写hadoop下的core-site.xml和hdfs-site.xml的路径 |
| Directory | 写入hdfs的目录,例如:/mov |
| Conflict Resolution Strategy | 有新数据来的时候的写入方式,append:追加、replace:替换 |
经过这样设置,达到的效果就是,在/tmp/test下如果有新文件写入,在新文件存在5s后,读取该文件后把该文件删除并把读取到的数据写入到hdfs的同名文件下。通常在/tmp/test写入的文件的文件名一般需要缀上日期
第二步,创建内部表,建立内部表和hdfs对应路径的关联:
cerate table mov( movieId string, title string, genres string ) stored as textfile location '/mov';
其中,location需要配置具体hdfs的路径
最后,调用查询语句完成查询:
select * from mov;
其中要特别说明的是,由于nifi监听的文件夹中新写入的文件的文件名按日期来划分,因此写入到hdfs中的文件也是按日期来划分的。若调用上述查询语句,是将所有文件的所有数据都读取出来。



