当前位置:网站首页>海量日志采集工具——Flume
海量日志采集工具——Flume
2022-06-26 06:43:00 【渣渣苏】
海量日志采集工具——Flume
一、Flume的简介
1.1、大数据处理流程
在企业中,大数据的处理流程一般是:
数据采集
数据存储
数据清洗
数据分析
数据展示
参考下图:

在数据采集和搜集的工具中,Flume框架占有一定的市场份量。
1.2、Flume的简介
Flume 是一种分布式、可靠且可用的服务,用于高效收集、聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可调整的可靠性机制以及许多故障转移和恢复机制,具有健壮性和容错性。它使用允许在线分析应用程序的简单可扩展数据模型。
参考官网: http://flume.apache.org/
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
flume 最开始是由 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。但随着 flume 功能的扩展,flume的代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点渐渐暴露出来,尤其是在发行版本 0.9.4中,日志传输不稳定的现象尤为严重。
为了解决这些问题,2011 年 10 月 22 号,cloudera 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,并将 Flume 纳入 apache 旗下,从cloudera Flume 改名为 Apache Flume。
1.3、版本区别
为了与之前版本区分开,重构后的版本统称为 Flume NG(next generation),重构前的版本被统称为 Flume OG(original generation),Flume目前只有Linux系统的启动脚本,没有Windows环境的启动脚本。
二、Flume的体系结构
2.1、体系结构简介
Flume 运行的核心是 Agent。Flume是以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方。如下图所示:

2.2、组件及其作用
- Client:
客户端,Client生产数据,运行在一个独立的线程中
- Event:
一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
- Flow:
Event从源点到达目的点的迁移的抽象。
- Agent:
一个独立的Flume进程,运行在JVM中,包含组件Source、 Channel、 Sink。
每台机器运行一个agent,但是一个agent中可以包含多个sources和sinks。
- Source:
数据收集组件。source从Client收集数据,传递给Channel
- Channel:
管道,负责接收source端的数据,然后将数据推送到sink端。
- Sink:
负责从channel端拉取数据,并将其推送到持久化系统或者是下一个Agent。
- selector:
选择器,作用于source端,然后决定数据发往哪个目标。
- interceptor:
拦截器,flume允许使用拦截器拦截数据。允许使用拦截器链,作用于source和sink阶段。
三、Flume的安装
3.1、安装和配置环境变量
3.1.1、准备软件包
将apache-flume-1.8.0-bin.tar.gz 上传到linux系统中的/root/soft/目录中
3.1.2、解压软件包
[[email protected] soft]# pwd
/root/soft
[[email protected] soft]# tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local/
3.1.3、更名操作
[[email protected] soft]# cd /usr/local/
[[email protected] local]# mv apache-flume-1.8.0-bin/ flume
3.1.4、配置环境变量
[[email protected] local]# vi /etc/profile
........省略..........
export FLUME_HOME=/usr/local/flume
export PATH=$FLUME_HOME/bin:$PATH
# 加载环境变量
[[email protected] apps]# source /etc/profile
3.1.5、验证环境变量
[[email protected] local]# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d
3.2、配置文件
[[email protected] local]# cd flume/conf/
[[email protected] conf]# ll #查看里面是否有一个flume-env.sh.template文件
[[email protected] conf]# cp flume-env.sh.template flume-env.sh
[[email protected] conf]# vim flume-env.sh
........省略..........
export JAVA_HOME=/usr/local/jdk
........省略..........
四、Flume的部署
4.1、数据模型
- 单一数据模型
- 多数据流模型
4.1.1、单一数据模型
在单个 Agent 内由单个 Source, Channel, Sink 建立一个单一的数据流模型,如下图所示,整个数据流为
Web Server --> Source --> Channel --> Sink --> HDFS。

4.1.2、多数据流模型
**1)**多 Agent 串行传输数据流模型

**2)**多 Agent 汇聚数据流模型

**3)**单 Agent 多路数据流模型

**4)**Sinkgroups 数据流模型

4.1.3、小总结
在flume提供的数据流模型中,几个原则很重要。
Source--> Channel
1.单个Source组件可以和多个Channel组合建立数据流,既可以replicating 和 multiplexing。
2.多个Sources可以写入单个 Channel
Channel-->Sink
1.多个Sinks又可以组合成Sinkgroups从Channel中获取数据,既可以loadbalancing和failover机制。
2.多个Sinks也可以从单个Channel中取数据。
3.单个Sink只能从单个Channel中取数据
根据上述 5 个原则,你可以设计出满足你需求的数据流模型。
4.2、配置介绍
4.2.1、定义组件名称
要定义单个代理中的流,您需要通过通道链接源和接收器。您需要列出给定代理的源,接收器和通道,然后将源和接收器指向一个通道。一个源实例可以指定多个通道,但是一个接收器实例只能指定一个通道。格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
案例如下:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
4.2.2、配置组件属性
# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>
案例如下:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
4.3、常用的source和sink种类
4.3.1、常用的flume sources
# Avro source:
avro
# Syslog TCP source:
syslogtcp
# Syslog UDP Source:
syslogudp
# HTTP Source:
http
# Exec source:
exec
# JMS source:
jms
# Thrift source:
thrift
# Spooling directory source:
spooldir
# Kafka source:
org.apache.flume.source.kafka,KafkaSource
.....
4.3.2、常用的flume channels
# Memory Channel
memory
# JDBC Channel
jdbc
# Kafka Channel
org.apache.flume.channel.kafka.KafkaChannel
# File Channel
file
4.3.3、常用的flume sinks
# HDFS Sink
hdfs
# HIVE Sink
hive
# Logger Sink
logger
# Avro Sink
avro
# Kafka Sink
org.apache.flume.sink.kafka.KafkaSink
# Hbase Sink
hbase
五、案例演示
配置可参考:https://flume.apache.org/releases/content/1.10.0/FlumeUserGuide.html
5.1、案例演示:avro+memory+logger
Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger
5.1.1、编写采集方案
[[email protected] flume]# mkdir flumeconf
[[email protected] flume]# cd flumeconf
[[email protected] flumeconf]# vi avro-logger.conf
#定义各个组件的名字
a1.sources=avro-sour1
a1.channels=mem-chan1
a1.sinks=logger-sink1
#定义sources组件的相关属性
a1.sources.avro-sour1.type=avro
a1.sources.avro-sour1.bind=master
a1.sources.avro-sour1.port=9999
#定义channels组件的相关属性
a1.channels.mem-chan1.type=memory
#定义sinks组件的相关属性
a1.sinks.logger-sink1.type=logger
a1.sinks.logger-sink1.maxBytesToLog=100
#组件之间进行绑定
a1.sources.avro-sour1.channels=mem-chan1
a1.sinks.logger-sink1.channel=mem-chan1
5.1.2、启动Agent
[[email protected] flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
5.1.3、测试数据
另起一个窗口测试
[[email protected] ~]# mkdir flumedata
[[email protected] ~]# cd flumedata/
[[email protected] flumedata]#
[[email protected] flumedata]# date >> test.data
[[email protected] flumedata]# cat test.data
2019年 11月 21日 星期四 21:22:36 CST
[[email protected] flumedata]# ping master >> test.data
[[email protected] flumedata]# cat test.data
....省略....
[[email protected] flumedata]# flume-ng avro-client -c /usr/local/flume-1.6.0/conf/ -H master -p 9999 -F ./test.dat
5.2、实时采集(监听文件):exec+memory+hdfs
Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容memory:传输数据的Channel为Memory
hdfs 是输出目标为Hdfs
5.2.1、配置方案
[[email protected] flumeconf]# vim exec-hdfs.conf
# 定义sources的属性
a1.sources=r1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /root/flumedata/test.data
# 定义sinks的属性
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://master:8020/flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix=events
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=second
a1.sinks.k1.hdfs.rollInterval=3
a1.sinks.k1.hdfs.rollSize=20
a1.sinks.k1.hdfs.rollCount=5
a1.sinks.k1.hdfs.batchSize=1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream
# 定义channels的属性
a1.channels
channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
# 组件之间的绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
5.2.2、启动Agent
[[email protected] flumeconf]# flume-ng agent -c ../conf -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
5.2.3、测试数据
[[email protected] flumedata]# ping master >> test.data
5.3、实时采集(监听目录):spool+ mem+logger
spool: Source来源于目录,有文件进入目录就摄取。
mem:通过内存来传输数据
logger:是传送数据到日志
5.3.1、配置方案
[[email protected] flumeconf]# vi spool-logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir = /home/flume/spool
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000
a1.channels.c1.type=memory
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=100
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
5.3.2、启动agent
[[email protected] flumeconf]# flume-ng agent -c ../conf -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
5.3.3、测试
[[email protected] ~]# for i in `seq 1 10`; do echo $i >> /home/flume/spool/$i;done
5.4、案例演示:http+ mem+logger
http: 表示数据来源是http网络协议,一般接收的请求为get或post请求. 所有的http请求会通过插件格式的Handle转化为一个flume的Event数据.
mem:表示用内存传输通道
logger:表示输出格式为Logger格式
5.4.1、配置方案
[[email protected] flumeconf]# vi http-logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
a1.channels.c1.type=memory
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=16
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
5.4.2、启动agent的服务
[[email protected] ~]# flume-ng agent -c ../conf -f ./http-logger.conf -n a1 -Dflume.root.logger=INFO,console
5.4.3、测试
[[email protected] ~]# curl -X POST -d '[{"headers":{"name":"zhangsan","pwd":"123456"},"body":"this is my content"}]' http://master:6666
六、拦截器的使用
在Flume运行过程中,Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。拦截器有下面几个特点:
拦截器需要实现org.apache.flume.interceptor.Interceptor接口。
拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。
拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。
一个拦截器返回的事件列表被传递给链中的下一个拦截器。
如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。
6.1、常用拦截器:
- Timestamp Interceptor :时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多
- Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)
- Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。
6.2、案例演示:Syslogtcp+mem+hdfs
通过时间拦截器,数据源为SyslogTcp,传送的通道模式是FileChannel,最后输出的目的地为HDFS
6.2.1 配置方案:
[[email protected] flumeconf]# vi ts.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=6666
a1.sources.r1.interceptors=i1 i2 i3
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i1.preserveExisting=false
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.preserveExisting=false
a1.sources.r1.interceptors.i3.key=hn
a1.sources.r1.interceptors.i3.value=master
a1.channels.c1.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
6.2.2 启动agent的服务:
[[email protected] flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console
6.2.3 测试:
[[email protected] ~]# echo "hello world hello qiangeng" | nc master 6666
6.3、案例演示:regex+Syslogtcp+mem+hdfs
拦截器为正则表达式拦截器, 数据源为Syslogtcp格式,传送通道为MemChannel,最后传送的目的地是HDFS
6.3.1 配置方案
[[email protected] flumeconf]# vi regex-ts.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_filter
#不要加引号包裹正则
a1.sources.r1.interceptors.i1.regex=^[0-9].*$
a1.sources.r1.interceptors.i1.excludeEvents=false
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
6.3.2、启动agent的服务:
[[email protected] flumeconf]# flume-ng agent -c ../conf -f ./regex-ts.conf -n a1 -Dflume.root.logger=INFO,console
6.3.3、测试:
[[email protected] ~]# echo "hello world hello qiangeng" | nc master 6666
[[email protected] ~]# echo "123123123 hello world hello qiangeng" | nc master 6666
6.4、自定义拦截器
6.4.0 需求:
为了提高Flume的扩展性,用户可以自己定义一个拦截器
将event的body中的数据, 以数字开头的, 存储为 hdfs://master:8020/flume/number.log s1
将event的body中的数据, 以字母开头的, 存储为 hdfs://master:8020/flume/character.log s2
将event的body中的数据, 其他的开头的, 存储为 hdfs://master:8020/flume/other.log s3
6.4.1 pom.xml
可以参考/code/pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
6.4.2 代码
具体代码可以参考 code/MyInterceptor
public class MyInterceptor implements Interceptor {
private static final String LOCATION_KEY = "location";
private static final String LOCATION_NUM = "number";
private static final String LOCATION_CHAR = "character";
private static final String LOCATION_OTHER = "other";
@Override
public void initialize() {
}
/** * 当拦截到单个Event的时候调用 * @param event 这个被拦截的Event * @return 这个拦截器发送出去的Event */
@Override
public Event intercept(Event event) {
// 对拦截到的event的header中添加指定的键值对
// 1. 获取拦截到的event的body
byte[] body = event.getBody();
// 2. 验证是否是以数字开头
if (body[0] >= '0' && body[0] <= '9') {
// 在这个event的头部添加一个键值对,来标识这个event需要放入到哪一个channel中
event.getHeaders().put(LOCATION_KEY, LOCATION_NUM);
}
else if (body[0] >= 'a' && body[0] <= 'z' || body[0] >= 'A' && body[0] <= 'Z') {
event.getHeaders().put(LOCATION_KEY, LOCATION_CHAR);
}
else {
event.getHeaders().put(LOCATION_KEY, LOCATION_OTHER);
}
return event;
}
/** * 批量拦截到Event * @param list 存储Event的集合 * @return 处理之后的Event集合 */
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
/** * 设计一个获取拦截器对象的类 */
public static class MyBuilder implements Builder {
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
6.4.3 打包上传
使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下
6.4.4 编写方案
a1.sources=r1
a1.channels=c1 c2 c3
a1.sinks=s1 s2 s3
a1.sources.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3
# 设置source的属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=12345
# 设置拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.qf.MyInterceptor$MyBuilder
# 设置选择器的属性
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=location
a1.sources.r1.selector.mapping.number=c1
a1.sources.r1.selector.mapping.character=c2
a1.sources.r1.selector.mapping.other=c3
# 设置channel的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000
# 设置sink的属性
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=regex
a1.sinks.s1.hdfs.rollInterval=0
a1.sinks.s1.hdfs.rollSize=102400
a1.sinks.s1.hdfs.rollCount=30
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/customInterceptor/s2/%Y-%m-%d-%H
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=regex
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=102400
a1.sinks.s2.hdfs.rollCount=30
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs://master:8020/flume/customInterceptor/s3/%Y-%m-%d-%H
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=regex
a1.sinks.s3.hdfs.rollInterval=0
a1.sinks.s3.hdfs.rollSize=102400
a1.sinks.s3.hdfs.rollCount=30
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.writeFormat=Text
6.4.5 启动agent
[[email protected] flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console
6.4.6 测试:
七、选择器的使用
7.1、说明
Flume中的Channel选择器作用于source阶段 ,是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。
Agent中各个组件的交互
由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。
Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。
Flume分类两种选择器,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器.
- replicating:该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。
- multiplexing:是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由
7.2、案例演示:replicating selector
7.2.1 配置方案
[[email protected] flumeconf]# vi rep.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=replicating
a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
7.2.2 启动agent的服务:
[[email protected] flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console
7.2.3 测试:
[[email protected] ~]# echo "hello world hello qianfeng" | nc master 6666
7.3、案例演示:Multiplexing selector
7.3.1 配置方案
[[email protected] flumeconf]# vi mul.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
a1.sources.r1.type=http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
a1.sources.r1.selector.default = c1
a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
7.3.2 启动agent的服务:
[[email protected] flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console
7.3.3 测试:
[[email protected] ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://master:6666
[[email protected] ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://master:6666
7.4、自定义拦截器
7.4.0、需求:
为了提高Flume的扩展性,用户可以自己定义一个拦截器
将event的body中的数据,以数字开头的,存储为 hdfs://qianfeng01:8020/flume/number.log s1
将event的body中的数据,以字母开头的,存储为 hdfs://qianfeng1:8020/flume/character.log s2
将event的body中的数据,其他的开头的,存储为 hdfs: //qianfeng01:8020/flume/other.log s3
7.4.1、pom.xml
可以参考/code/pom. xml
<!-- https: //mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
7.4.2、代码
public class MyInterceptor implements Interceptor {
private static final String LOCATION_KEY = "location";
private static final String LOCATION_NUMBER = "number";
private static final String LOCATION_CHARACTER = "character";
private static final String LOCATION_OTHER = "other";
/** * 当拦截到单个Event的时候调用 * * @param event 被拦截到的Event * @return 这个拦截器发送出去的Event,如果返回null,则这个Event将被丢弃 */
@Override
public Event intercept(Event event) {
// 1. 获取到拦截器拦截到Event的body部分
byte[] body = event.getBody();
// 2. 验证是否是以数字开头的
if (body[0] > '0' && body[0] <= '9') {
event.getHeaders().put(LOCATION_KEY, LOCATION_NUMBER);
} else if (body[0] >= 'a' && body[0] <= 'z' || (body[0] >= 'A' && body[0] <= 'Z')) {
event.getHeaders().put(LOCATION_KEY, LOCATION_CHARACTER);
} else {
event.getHeaders().put(LOCATION_KEY, LOCATION_OTHER);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void initialize() {
}
@Override
public void close() {
}
public static class MyBuilder implements Builder {
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
7.4.3、打包上传
使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下
7.4.4、编写方案
a1.sources=r1
a1.channels=c1 c2 c3
a1.sinks=s1 s2 s3
a1.sourres.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3
#设置source的属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=12345
#设置拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.qf.flume.MyInterceptor$MyBuilder
#设置选择器的属性
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=location
a1.sources.r1.selector.mapping.number=c1
a1.sources.r1.selector.mapping.character=c2
a1.sources.r1.selector.mapping.other=c3
#设置channel的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000
#设置sink的属性
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s1.hdfs.useLocalTimeStamp=true
ai.sinks.s1.hdfs.filePrefix=regex
a1.sinks.s1.hdfs.rollInterval=0
a1.sinks.s1.hdfs.rollSize=102400
a1.sinks.s1.hdfs.rollCount=30
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/customInterceptor/s2/%Y-%m-%d-%H
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=regex
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=102400
a1.sinks.s2.hdfs.rollCount=30
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs://master:8020/flume/customInterceptor/s3/%Y-%m-%d-%H
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=regex
a1.sinks.s3.hdfs.rollInterval=0
a1,sinks.s3.hdfs.rollSize=102400
a1.sinks.s3.hdfs.rollCount=30
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.writeFormat=Text
6.4.5、启动agent
[[email protected] flumeconf]# flume-ng agent -c ../conf/-f ./mytest.conf -n a1 -Dflume.root.logger=INF0,console
测试
边栏推荐
- [golang] time related
- 视图简析
- Installation and login of MySQL database
- Bugku exercise ---misc--- prosperity, strength and democracy
- Screen sharing recommendations
- C nuget offline cache package installation
- Jasminum plug-in of Zotero document management tool
- How can an enterprise successfully complete cloud migration?
- Unsatisfied dependency expressed through field ‘baseMapper‘; nested exceptio
- LightGBM--调参笔记
猜你喜欢

Decompile Android applications, interview Android
A new paradigm for large model application: unified feature representation optimization (UFO)

If you meet a female driver who drives didi as an amateur, you can earn 500 yuan a day!

Guide to "avoid dismissal during probation period"

Container with the most water

MySQL 数据库的小白安装与登录

SecureCRT运行SparkShell 删除键出现乱码的解法

Marketing skills: compared with the advantages of the product, it is more effective to show the use effect to customers

Decision tree learning notes

成水最多的容器
随机推荐
Load balancer does not have available server for client: userService问题解决
Gof23 - prototype mode
Usage of zip (*arg)
Judgment of SQL null value
Guide to "avoid dismissal during probation period"
Go语言学习笔记 1.1
3.pyinstaller模块介绍
Pagoda server setup and database remote connection
How to make the main thread wait for the sub thread to execute before executing
Show statement usage supplement
Go learning notes 1.3- data types of variables
Solution of garbled code in sparkshell deletion key of SecureCRT
My SQL(二)
Bugku exercise ---misc--- prosperity, strength and democracy
Go language learning notes 1.2- variables
How to transfer database data to check box
【yolov4】基于yolov4深度学习网络目标检测MATLAB仿真
Temperature alarm
Go语言学习笔记 1.2-变量篇
Spark3.3.0 source code compilation supplement - Crazy certificate problem