一、Flume是什么?二、使用步骤三、常用Sink和Source
1.Sink
1.HDFS2.Hive3.Logger4.Hbase5.Hbase26.ElasticSearch7.Kafka 2.Source
1.Kafka2.Avro3.Logger
Syslog TCPMultiport Syslog TCP 4.HTTP 3.Channels
1.Memory2.JDBC3.Kafka4.File 四、自定义Sink和Source
1.MysqlSink2.MysqlSource
一、Flume是什么?Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。
二、使用步骤以conf为例,保存到flume的conf目录下,并命名为flume-kafka.conf
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /data/car.log a1.sources.r1.checkperiodic = 50 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = test_flume_kafka a1.sinks.k1.kafka.bootstrap.servers = udp01:9092,udp02:9092,udp03:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1
再执行:
nohup ${dir_file_home}/bin/flume-ng agent --conf conf/ --conf-file job/flume-kafka.conf --name a1 -Dflume.root.logger=INFO,console
三、常用Sink和Source
以下都是从官网整理出来,较为常用的sink和source。
1.Sink 1.HDFSa1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S # HDFS directory path (eg hdfs://namenode/flume/webdata/) a1.sinks.k1.hdfs.filePrefix = events- #Name prefixed to files created by Flume in hdfs directory,默认为FlumeData a1.sinks.k1.hdfs.round = true #Should the timestamp be rounded down (if true, affects all time based escape sequences except %t),默认是false a1.sinks.k1.hdfs.roundValue = 10 #Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. 默认值是1 a1.sinks.k1.hdfs.roundUnit = minute # The unit of the round down value - second, minute or hour.2.Hive
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083 #Hive metastore URI (eg thrift://a.b.com:9083 )
a1.sinks.k1.hive.database = logsdb #hive databases name
a1.sinks.k1.hive.table = weblogs # hive table name
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M #If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "t" # (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “t”
a1.sinks.k1.serializer.serdeSeparator = 't'
a1.sinks.k1.serializer.fieldnames =id,,msg
3.Logger
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c14.Hbase
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hbase a1.sinks.k1.table = foo_table. #The name of the table in Hbase to write to. a1.sinks.k1.columnFamily = bar_cf #The column family in Hbase to write to. a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer #Default increment column = “iCol”, payload column = “pCol” a1.sinks.k1.channel = c15.Hbase2
Hbase2Sink 相当于 Hbase 版本 2 的 HbaseSink。提供的功能和配置参数与 HbaseSink 的情况相同(除了接收器类型中的 hbase2 标记和包/类名称)。
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hbase2 #这里跟hbase不一样 a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume .sink.hbase2.RegexHbase2EventSerializer #这里也跟hbase不一样 a1.sinks.k1.channel = c16.ElasticSearch
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = elasticsearch a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 #Comma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used a1.sinks.k1.indexName = foo_index a1.sinks.k1.indexType = bar_type a1.sinks.k1.clusterName = foobar_cluster a1.sinks.k1.batchSize = 500 a1.sinks.k1.ttl = 5d a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer a1.sinks.k1.channel = c17.Kafka
a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy2.Source 1.Kafka
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id2.Avro
监听端口
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 #IP地址 a1.sources.r1.port = 4141 #被监听的端口3.Logger Syslog TCP
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1Multiport Syslog TCP
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = multiport_syslogtcp a1.sources.r1.channels = c1 a1.sources.r1.host = 0.0.0.0 a1.sources.r1.ports = 10001 10002 10003 a1.sources.r1.portHeader = port4.HTTP
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 a1.sources.r1.handler = org.example.rest.RestHandler a1.sources.r1.handler.nickname = random props a1.sources.r1.HttpConfiguration.sendServerVersion = false a1.sources.r1.ServerConnector.idleTimeout = 3003.Channels 1.Memory
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 8000002.JDBC
a1.channels = c1 a1.channels.c1.type = jdbc3.Kafka
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer4.File
a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /mnt/flume/checkpoint. # ~/.flume/file-channel/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data # ~/.flume/file-channel/data四、自定义Sink和Source 1.MysqlSink
对应的依赖
org.apache.flume flume-ng-core 1.9.0 org.apache.flume flume-ng-configuration 1.9.0
代码
package hadoopexample.FlumeOption;
import com.google.common.base.Preconditions;
import org.apache.flume.*;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.conf.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class MysqlSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(MysqlSink.class);
private Connection connect;
private Statement state;
private String columnName;
private String url;
private String user;
private String password;
private String tableName;
@Override
public synchronized void stop(){
super.stop();
}
@Override
public synchronized void start(){
super.start();
try {
connect = DriverManager.getConnection(url,user,password);
//jdbc:mysql//url/
state = connect.createStatement();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
@Override
public Status process() throws EventDeliveryException {
//获取当前Sink绑定的Channel
Channel ch = getChannel();
//获取事务
Transaction transaction = ch.getTransaction();
//声明事件
Event event = null;
//开启事务
transaction.begin();
//读取Channel中的事件,直到读取到事件结束循环
while(true){
event = ch.take();
if (event != null ){
break;
}
}
try {
//处理事件
String rawbody = new String(event.getBody());
String body = rawbody.split("t")[2];
if (body.split(",").length == columnName.split(",").length){
String sql = "insert into " + tableName + "(" + columnName + ") values (" + body + ")";
state.executeUpdate(sql);
//事物提交
transaction.commit();
return Status.READY;
}else {
//遇到异常,事务回滚
transaction.rollback();
return null;
}
} catch (Throwable throwables) {
transaction.rollback();
if (throwables instanceof Error){
throw (Error) throwables;
}else {
throw new EventDeliveryException(throwables);
}
} finally {
//关闭事务
transaction.close();
}
}
@Override
public void configure(Context context) {
//读取配置文件内容
columnName = context.getString("column_name");
Preconditions.checkNotNull(columnName,"column_name must be set");
url = context.getString("url");
user = context.getString("user");
password = context.getString("password");
tableName = context.getString("tableName");
}
}
2.MysqlSource
自定义MysqlSource 可以看这个大佬写的,巨完整。
[Flume自定义Source之MysqlSource] [(https://blog.csdn.net/zyz_home/article/details/106754957)]



