flume1.7.0自定义拦截器使用完整示例

fflume版本:flume1.7.0
Flume拦截器(interceptor)的主要作用,当Source读取events时:
– 在events header中加入一些有用的信息
– 对events的内容(headerbody)进行过滤,完成初步的数据清洗。

拦截器是简单的插件式组件,设置在sourcechannel之间,在source接收到数据写入channel之前,拦截器都可以对event进行转换或者删除。每个拦截器只处理同一个source接收到的eventflume官方实现了一些拦截器也可以自定义拦截器,这在实际业务场景中非常有用。本文主要需求是通过自定义拦截器,实现向每条数据开头增加个标识字段,方便后续处理数据时判断数据的来源(在eventbody的开始部分增加识别符)。
自定义拦截器只需要实现Interceptor的继承类。本文使用maven开发,具体步骤如下:

第一,导入flume-core包,引入依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>
</dependencies>

第二,创建一个实现Interceptor接口的类:

import java.nio.charset.Charset;
import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.HostInterceptor;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Interceptor class that appends a static, pre-configured body to all events.
*
* Properties:<p>
*
*   key: Key fen ge fu.
*        (default is "key")<p>
*
*   value: Value content.
*        (default is "value")<p>
*
*   preserveExisting: Whether to preserve an existing value for 'key'
*                     (default is true)<p>
*
* Sample config:<p>
*
* <code>
*   agent.sources.r1.channels = c1<p>
*   agent.sources.r1.type = SEQ<p>
*   agent.sources.r1.interceptors = i1<p>
*   agent.sources.r1.interceptors.i1.type = body<p>
*   agent.sources.r1.interceptors.i1.key = datacenter<p>
*   agent.sources.r1.interceptors.i1.value= NYC_01<p>
* </code>
*
*/
public class BodyInterceptor implements Interceptor {

  private static final Logger logger = LoggerFactory.getLogger(BodyInterceptor.class);

  private final String key;
  private final String value;

  /**
   * Only {@link HostInterceptor.Builder} can build me
   */
  private BodyInterceptor(String key, String value) {
    this.key = key;
    this.value = value;
  }

  @Override
  public void initialize() {
    // no-op
  }

  /**
   * Modifies events in-place.
   */
  @Override
  public Event intercept(Event event) {
    String body = new String(event.getBody(), Charset.forName("UTF-8"));
    try{
      // body为原始数据,newBody为处理后的数据,value:增加的识别符,key:增加的分隔符
      String newBody = value + key + body;
      event.setBody(newBody.getBytes());
    }catch (Exception e){
      e.printStackTrace();
    }
    return event;
  }

  /**
   * Delegates to {@link #intercept(Event)} in a loop.
   * @param events
   * @return
   */
  @Override
  public List<Event> intercept(List<Event> events) {
    for (Event event : events) {
      intercept(event);
    }
    return events;
  }

  @Override
  public void close() {
    // no-op
  }

  /**
   * Builder which builds new instance of the BodyInterceptor.
   */
  public static class Builder implements Interceptor.Builder {

    private String key;
    private String value;

    @Override
    public void configure(Context context) {
      key = context.getString(Constants.KEY, Constants.KEY_DEFAULT);
      value = context.getString(Constants.VALUE, Constants.VALUE_DEFAULT);
    }

    @Override
    public Interceptor build() {
      logger.info(String.format(
          "Creating BodyInterceptor: key=%s,value=%s",key, value));
      return new BodyInterceptor(key, value);
    }

  }

  public static class Constants {
    public static final String KEY = "key";
    public static final String KEY_DEFAULT = "key";

    public static final String VALUE = "value";
    public static final String VALUE_DEFAULT = "value";
  }
}

主要代码说明:BodyInterceptor类用来拦截eventbody信息并拼接相应信息,内部类Builder类用来启动这个拦截器。

第三,将上面的代码打成一个jar包,上传至flume所在服务器,放置于$FLUME_HOME/lib中。

第四,配置flume agent的配置文件,增加interceptor配置,红色部分,如下所示:

agent.channels = memoryChannel
agent.sinks = kafkaSink
agent.sources = filedir

#source配置
agent.sources.filedir.type = spooldir
agent.sources.filedir.spoolDir = /home/hadoop/bodyinterceptortest
agent.sources.filedir.deletePolicy= immediate
agent.sources.filedir.decodeErrorPolicy= IGNORE
agent.sources.filedir.includePattern= ^.*.*\\.txt$
agent.sources.filedir.ignorePattern= ^(.)*\\.tmp$
agent.sources.filedir.channels = memoryChannel
agent.sources.filedir.interceptors = i1
agent.sources.filedir.interceptors.i1.type = cn.com.bonc.flume.BodyInterceptor$Builder
agent.sources.filedir.interceptors.i1.key = |
agent.sources.filedir.interceptors.i1.value= bodyinterceptor
# channel配置
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 250000
agent.channels.memoryChannel.transactionCapacity = 200000
agent.channels.memoryChannel.byteCapacityBufferPercentage = 20
agent.channels.memoryChannel.byteCapacity = 524288000

#sink配置-cs_calllog_acess
agent.sinks.kafkaSink.channel = memoryChannel
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.kafka.topic = xxx
agent.sinks.kafkaSink.flumeBatchSize = 50000
agent.sinks.kafkaSink.kafka.bootstrap.servers = xxx:9092
agent.sinks.kafkaSink.kafka.producer.acks = 1
agent.sinks.kafkaSink.kafka.producer.linger.ms = 1
agent.sinks.kafkaSink.kafka.producer.compression.type = snappy

第五,启动flume,interceptor生效,启动命令如下:

bin/flume-ng agent --conf conf -f conf/bodyinterceptor.conf -n agent -Dflume.root.logger=INFO,console

第六,验证。

数据源
106468759xxxx|250848|31|2018-05-19 15:32:07.0240096|6|dbscar.com|long.dbscar.com
106468759xxxx|250848|31|2018-05-19 15:32:04.2311649|6|baidu.com|map.baidu.com
106469884xxxx|250762|21|2018-05-19 15:32:05.5038380|6|mlizhi.com|admin.mlizhi.com
106468759xxxx|250848|31|2018-05-19 15:32:16.3832167|6|baidu.com|map.baidu.com
106468759xxxx|250848|31|2018-05-19 15:32:11.6441823|6|baidu.com|map.baidu.com
106464042xxxx|260304|31|2018-05-19 15:33:11.6937589|6|amap.com|restapi.amap.com
106468698xxxx|250040|32|2018-05-19 15:31:23.5020435|6|gepush.com|talk.gepush.com
106468759xxxx|250848|31|2018-05-19 15:32:11.7572139|6|baidu.com|map.baidu.com
106467913xxxx|258834|31|2018-05-19 15:31:22.4791450|6|tianxing.com|open.tianxing.com
106461499xxxx|253284|11|2018-05-19 15:33:44.4231527|6|huya.com|streamhls.huya.com
106468759xxxx|250848|31|2018-05-19 15:32:13.2241204|6|baidu.com|map.baidu.com
106468759xxxx|250848|31|2018-05-19 15:32:18.6443484|6|baidu.com|map.baidu.com
106464087xxxx|258497|31|2018-05-19 15:32:58.2904478|6|sinaimg.cn|n.sinaimg.cn
106468695xxxx|253231|21|2018-05-19 15:33:44.5599505|6|amap.com|mps.amap.com
结果
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:07.0240096|6|dbscar.com|long.dbscar.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:04.2311649|6|baidu.com|map.baidu.com
bodyinterceptor|106469884xxxx|250762|21|2018-05-19 15:32:05.5038380|6|mlizhi.com|admin.mlizhi.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:16.3832167|6|baidu.com|map.baidu.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:11.6441823|6|baidu.com|map.baidu.com
bodyinterceptor|106464042xxxx|260304|31|2018-05-19 15:33:11.6937589|6|amap.com|restapi.amap.com
bodyinterceptor|106468698xxxx|250040|32|2018-05-19 15:31:23.5020435|6|gepush.com|talk.gepush.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:11.7572139|6|baidu.com|map.baidu.com
bodyinterceptor|106467913xxxx|258834|31|2018-05-19 15:31:22.4791450|6|tianxing.com|open.tianxing.com
bodyinterceptor|106461499xxxx|253284|11|2018-05-19 15:33:44.4231527|6|huya.com|streamhls.huya.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:13.2241204|6|baidu.com|map.baidu.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:18.6443484|6|baidu.com|map.baidu.com
bodyinterceptor|106464087xxxx|258497|31|2018-05-19 15:32:58.2904478|6|sinaimg.cn|n.sinaimg.cn
bodyinterceptor|106468695xxxx|253231|21|2018-05-19 15:33:44.5599505|6|amap.com|mps.amap.com

结论,达到了预期的目的:向每一条数据的开头都增加了标识字符bodyinterceptor程序稳定性还有待进一步测试

但是这里有一个前提:不建议通过对event的body解析来设置header,因为flume就是一个水槽,水槽是不会在中间对水进行加工的,要加工,等水流出去了再加工

  • 浏览:533
  • 评论:0

发表新的回复