栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume自定义Source 代码和详细步骤

Flume自定义Source 代码和详细步骤

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。
搭建maven项目

pom.xml


	4.0.0

	com.yy.brick
	flume-log-extrator
	0.0.1-SNAPSHOT
	jar

	yy-brick-flume
	http://maven.apache.org

	
		UTF-8
	

	
		
			com.alibaba
			fastjson
			1.2.40
		
		
			org.apache.commons
			commons-collections4
			4.1
		
		
			org.apache.commons
			commons-lang3
			3.6
		
		
			org.apache.flume
			flume-ng-core
			1.7.0
		

	
	
		
		
			
				org.codehaus.mojo
				build-helper-maven-plugin
				1.7

				
					
						add-resource
						generate-resources
						
							add-resource
						
						
							
								
									src/main/resources
								
							
						
					
				
			
			
				org.apache.maven.plugins
				maven-surefire-plugin
				2.17
				
					true
				
			
		
	

官网代码块 Flume 1.9.0 Developer Guide — Apache Flume

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651204.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号