Flume 1.9
cdh 5.16.2
具体错误:
24 Mar 2022 10:58:07,452 ERROR [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:150) - Failed to start agent because dependencies were not found in classpath. Error follows. java.lang.NoClassDefFoundError: org/apache/hive/hcatalog/streaming/RecordWriter at org.apache.flume.sink.hive.HiveSink.createSerializer(HiveSink.java:220) at org.apache.flume.sink.hive.HiveSink.configure(HiveSink.java:203) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:453) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.hive.hcatalog.streaming.RecordWriter at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 13 more
原因:看报错是缺少jar包造成的。
解决:把$HIVE_HOME/hcatalog/share/hcatalog/ 下面hive-hcatalog-streaming.jar 拷贝到 $FLUME_HOME/lib/ 下面即可。
具体错误信息:
22/03/24 11:09:26 WARN hive.HiveSink: k2 : Failed connecting to EndPoint {metaStoreUri='thrift://cdh-1:9083', database='ods', table='ods_flume_log', partitionVals=[20220324] }
org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://cdh-1:9083', database='ods', table='ods_flume_log', partitionVals=[20220324] }
at org.apache.flume.sink.hive.HiveWriter.(HiveWriter.java:99)
at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:346)
at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:297)
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hive.hcatalog.streaming.StreamingException: Cannot stream to table that has not been bucketed : {metaStoreUri='thrift://cdh-1:9083', database='ods', table='ods_flume_log', partitionVals=[20220324] }
at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.(AbstractRecordWriter.java:69)
at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.(DelimitedInputWriter.java:115)
at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:66)
at org.apache.flume.sink.hive.HiveWriter.(HiveWriter.java:89)
... 6 more
22/03/24 11:09:26 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://cdh-1:9083', database='ods', table='ods_flume_log', partitionVals=[20220324] }
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:269)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://cdh-1:9083', database='ods', table='ods_flume_log', partitionVals=[20220324] }
at org.apache.flume.sink.hive.HiveWriter.(HiveWriter.java:99)
at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:346)
at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:297)
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
... 3 more
Caused by: org.apache.hive.hcatalog.streaming.StreamingException: Cannot stream to table that has not been bucketed : {metaStoreUri='thrift://cdh-1:9083', database='ods', table='ods_flume_log', partitionVals=[20220324] }
at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.(AbstractRecordWriter.java:69)
at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.(DelimitedInputWriter.java:115)
at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:66)
at org.apache.flume.sink.hive.HiveWriter.(HiveWriter.java:89)
... 6 more
原因:Flume写入hive表时,需要hive表支持事务,所以hive表必须是事务表。
解决:创建事务表。
1、开启事务配置:
在hive命令行运行以下命令:
SET hive.support.concurrency = true; SET hive.enforce.bucketing = true; SET hive.exec.dynamic.partition.mode = nonstrict; SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; SET hive.compactor.initiator.on = true; SET hive.compactor.worker.threads = 1;
2、创建分区分桶表并开启事务
create table ods_flume_log(line string)
partitioned by (dt string)
clustered by (line) into 1 buckets
stored as orc tblproperties ('transactional'='true');
解释:
partitioned by (dt string) - 指定分区字段
clustered by (line) – 指定分桶的字段
stored as orc - 分桶格式orc
tblproperties (‘transactional’=‘true’) - tblproperties可以添加一些hive属性,这里是开启事务
我这里只有一个字段,因为按时间分区了,所以只设置了一个桶,各位看自己情况创建。
再次运行Flume后,数据正常写入hive中。



