Source
AVRO Source
-  AVRO Source接收被AVRO序列化之后的数据,结合AVRO Sink,可以实现复杂的流动模型 
-  案例 -  编辑文件 cd /opt/software/flume-1.11.0/data/ vim avrosource.properties在文件中添加 a1.sources = s1 a1.channels = c1 a1.sinks = k1  # 配置AVRO Source # 类型必须是avro a1.sources.s1.type = avro # 监听的主机 a1.sources.s1.bind = 0.0.0.0 # 监听的端口号 a1.sources.s1.port = 6666  a1.channels.c1.type = memory  a1.sinks.k1.type = logger  a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-  启动 flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.properties -Dflume.root.logger=INFO,console
-  在新窗口中启动AVRO客户端 flume-ng avro-client -H hadoop01 -p 6666 -F a.txt
 
-  
Spooling Directory Source
-  监听指定的目录,如果目录中产生了新的文件,那么自动的将新文件中的内容收集起来 
-  默认情况下,这个文件如果被收集了,那么文件的后缀就是 .COMPLETED
-  案例 -  创建目录 mkdir /opt/flume_data
-  编辑文件 vim spooldirsource.properties在文件中添加 a1.sources = s1 a1.channels = c1 a1.sinks = k1  # 配置Spooling Directory Source # 类型必须是spooldir a1.sources.s1.type = spooldir # 监听的目录 a1.sources.s1.spoolDir = /opt/flume_data # 被收集过的文件后缀 # 利用这条规则,可以过滤掉一部分不需要收集的文件 a1.sources.s1.fileSuffix = .finished  a1.channels.c1.type = memory  a1.sinks.k1.type = logger  a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-  执行 flume-ng agent -n a1 -c $FLUME_HOME/conf -f spooldirsource.properties -Dflume.root.logger=INFO,console
 
-  
Taildir Source
-  可以用于监听一个或者一组文件,如果被监听的文件中添加了新数据,那么新添的数据会被自动收集 
-  Exec Source需要通过指定 tail -F命令才能监听指定文件,Spooling Directory Source监听指定的目录,并不能确定文件中是否新添了数据
-  不同于Exec Source的地方在于,Taildir Source不需要指定命令,还可以监控一类文件,且Taildir Source通过记录偏移量实现断点续传效果 
-  偏移量通过属性 positionFile来决定,默认是~/.flume/taildir_position.json
-  需要注意的是,Taildir Source不支持在Windows中使用 
-  案例:监听flume_data目录下所有的log和txt文件,如果文件被添加新数据,那么自动收集 -  编辑文件 vim taildirsource.properties
-  在文件中添加 a1.sources = s1 a1.channels = c1 a1.sinks = k1  # 配置Taildir Source # 类型必须是TAILDIR a1.sources.s1.type = TAILDIR # 监听的一组文件的组名 a1.sources.s1.filegroups = f1 f2 # 文件组中的要监听的文件 a1.sources.s1.filegroups.f1 = /opt/flume_data/.*log.* a1.sources.s1.filegroups.f2 = /opt/flume_data/.*txt.* # 偏移量的存储位置 a1.sources.s1.positionFile = /opt/flume_data/taildir_position.json  a1.channels.c1.type = memory  a1.sinks.k1.type = logger  a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-  执行 flume-ng agent -n a1 -c $FLUME_HOME/conf -f taildirsource.properties -Dflume.root.logger=INFO,console
 
-  
NetCat TCP Source
-  Netcat TCP Source监听TCP请求,在使用的时候需要监听指定的主机和端口,从这个指定主机的指定端口来接收TCP请求,并且将TCP请求内容作为日志来进行收集 
-  默认情况下,每一条数据大小不能超过512B,可以通过参数 max-line-length来修改
Sequence Generator Source
-  序列产生器,从0开始递增到 totalEvents,默认情况下totalEvents的值Long.MAX_VALUE
-  实际过程中,会利用这个Source测试流动模型是否搭建成功 
-  案例 a1.sources = s1 a1.channels = c1 a1.sinks = k1  # 配置Sequence Generator Source # 类型必须是seq a1.sources.s1.type = seq # 最大值 a1.sources.s1.totalEvents = 100  a1.channels.c1.type = memory  a1.sinks.k1.type = logger  a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
HTTP Source
-  接收HTTP请求,并且将请求内容作为日志进行收集 
-  只能接收GET和POST请求,其中GET请求接收只能用于实验,实际过程中使用HTTP Source来接收POST请求 
-  案例 -  在文件中添加 a1.sources = s1 a1.channels = c1 a1.sinks = k1  # 配置HTTP Source # 类型必须是http a1.sources.s1.type = http # 监听端口 a1.sources.s1.port = 8888  a1.channels.c1.type = memory  a1.sinks.k1.type = logger  a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-  启动Flume 
-  发送POST请求 curl -X POST -d '[{"headers":{"class":"flume"},"body":"welcome~~~"}]' http://hadoop01:8888
 
-  
Custom Source
-  Flume支持用户自定义Source。Flume针对Source提供了顶级接口 Source,但是实际过程中,并不是实现Source接口,而是实现子接口之一:-  EventDrivenSource:事件驱动Source,本身是一个被动型Source,需要自己定义线程来获取数据以及封装数据
-  PollableSource:拉取Source,本身是一个主动型Source,提供了线程来获取数据,只需要考虑数据怎么封装即可
 
-  
-  由于在自定义Source的时候,还需要考虑获取格式文件中的参数值,所以还需要实现 Configurable接口
-  实际过程中,考虑到要覆盖的方法比较多,所以继承 AbstractSource
自定义EventDrivenSource
-  导入pom文件后,定义类继承 AbstractSource,实现EventDrivenSource和Configurable接口
-  打成jar包,上传到Flume安装目录的lib目录下 cd /opt/software/flume-1.11.0/lib/ rz
-  回到格式文件目录下,编辑文件 cd /opt/software/flume-1.11.0/data/ vim authdrivensource.properties在文件中添加 a1.sources = s1 a1.channels = c1 a1.sinks = k1  # 配置自定义EventDrivenSource # 类型必须是类的全路径名 a1.sources.s1.type = com.fesco.source.AuthDrivenSource # 起始值 a1.sources.s1.start = 10 # 结束值 a1.sources.s1.end = 100 # 步长 a1.sources.s1.step = 5  a1.channels.c1.type = memory  a1.sinks.k1.type = logger  a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-  启动Flume 
自定义PollableSource
-  定义一个类继承 AbstractSource,实现PollableSource和Configurable接口
-  打成jar包,上传到lib目录下 cd ../lib rz
-  回到格式文件目录下,编辑文件 cd ../data/ vim authpollablesource.properties在文件中添加 a1.sources = s1 a1.channels = c1 a1.sinks = k1  # 配置自定义PollableSource # 类型必须是类的全路径名 a1.sources.s1.type = com.fesco.source.AuthPollableSource # 起始值 a1.sources.s1.min = 10 # 结束值 a1.sources.s1.max = 1000 # 步长 a1.sources.s1.step = 5  a1.channels.c1.type = memory  a1.sinks.k1.type = logger  a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
- 启动flume










