Flume架构和用法示例
Flume架构
Source
Exec Source
| Property Name | Default | Description |
|---|
| channels | - | |
| type | - | exec |
| command | - | 需要执行的命令,如 tail -F /log |
| shell | - | 指定运行shell,如/bin/sh -c 指定使用sh执行command的内容 |
| logStdERR | false | 是否输出标准错误的日志 |
| batchSize | 20 | 一次发往channel的最大批次大小 |
| batchTimeout | 3000 | 如果没达到buffer size,隔多久强行发送一次 |
| restartThrottle | 10000 | 重试等待时间 |
| restart | false | command失败死亡,是否重试 |
| interceptors | - | 拦截器 |
#只包含必填属性的示例
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
Spooling Directory Source
| Property Name | Default | Description |
|---|
| channels | - | |
| type | - | spooldir |
| spoolDir | - | 监视的文件夹 |
| fileSuffix | .COMPLETED | 完成后添加的后缀 |
| deletePolicy | never | 是否删除完成的文件,never,immediate |
| fileHeader | false | 在Event头部添加文件路径 |
| fileHeaderKey | file | 添加的key |
| batchSize | - | 同上文 |
| trackingPolicy | - | 追踪策略,rename,tacker_dir |
| trackerDir | .flumespool | 追踪文件存储地址 |
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
Taildir Source
| Property Name | Default | Description |
|---|
| channels | – | |
| type | – | TAILDIR |
| filegroups | – | 文件组,表示一系列需要tail的文件。 |
| filegroups.filegroupsName | – | 使用绝对路径表示需要监控的文件组,一般使用正则表示文件组。 |
#必须属性示例
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
Kafka Source
| Property Name | Default | Description |
|---|
| channels | – | |
| type | – | org.apache.flume.source.kafka.KafkaSource |
| kafka.bootstrap.servers | – | broker的服务地址 |
| kafka.consumer.group.id | flume | 为多个source设置相同的groupid,让他们作为同一个消费者组进行消费。 |
| kafka.topics | – | 消费的topics |
| kafka.topics.regex | – | 使用正则表示消费的topic,如果此参数存在会覆盖topics |
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
Event Deserializers
| Property Name | Default | Description |
|---|
| deserializer.maxLineLength | 2048 | 单行最大字符数,超过的会被截断 |
| deserializer.outputCharset | UTF-8 | 发送到channel的数据采用的字符编码 |
NetCat TCP Source
| Property Name | Default | Description |
|---|
| channels | – | |
| type | – | The component type name, needs to be netcat |
| bind | – | Host name or IP address to bind to |
| port | – | Port # to bind to |
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
Channel
Memory Channel
| Property Name | Default | Description |
|---|
| type | – | memory |
| capacity | 100 | 最大event数目 |
| transactionCapacity | 100 | source每个事务添加的最大event数目,和给予sink的最大数目。 |
| keep-alive | 3 | source和sink交互的超时时间 |
| byteCapacity | jvm -Xmx的80% | 最大字节数,默认为jvm -Xmx的80%。 |
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacity = 800000
Kafka Channel
| Property Name | Default | Description |
|---|
| type | – | org.apache.flume.channel.kafka.KafkaChannel |
| kafka.bootstrap.servers | – | kafka集群的broker |
| kafka.topic | flume-channel | 存储在哪个topic |
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
File Channel
| Property Name Default | Description | |
|---|
| type | – | file |
| checkpointDir | ~/.flume/file-channel/checkpoint | 检查点目录 |
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Sink
HDFS Sink
| Alias | Description |
|---|
| %{host} | headers中的key为host的value |
| %t | 毫秒时间milliseconds |
| %a | 周几缩写 (Mon, Tue, …) |
| %A | 周几完整名称 (Monday, Tuesday, …) |
| %b | 月份缩写 (Jan, Feb, …) |
| %B | 月份完整 (January, February, …) |
| %c | datetime (Thu Mar 3 23:05:25 2005) |
| %d | 该月几号(01) |
| %e | 月份,无填充 (1) |
| %D | 日期:%m/%d/%y |
| %H | 小时(00…23) |
| %I | 小时(01…12) |
| %j | 一年的第几天 (001…366) |
| %k | 小时,无填充( 0…23) |
| %m | 月份(01…12) |
| %n | 月份 (1…12) |
| %M | 分钟(00…59) |
| %p | 上午下午 am or pm |
| %s | 自1970-01-01 00:00:00 UTC以来秒数 |
| %S | 秒,(00…59) |
| %y | 年的后两位(00…99) |
| %Y | 年(2010) |
| %z | 时区 (for example, -0400) |
| %[localhost] | hostname of the host 主机名 |
| %[IP] | IP address of the host 地址IP |
| %[FQDN] | canonical hostname of the host 规范主机名 |
| Name | Default | Description |
|---|
| channel | – | |
| type | – | hdfs |
| hdfs.path | – | HDFS路径,可以使用前文的转义字符拼接路径 |
| hdfs.rollInterval | 30 | 触发写入时间 |
| hdfs.rollSize | 1024 | 触发写入大小,字节为单位 |
| hdfs.rollCount | 10 | 触发写入event数目 |
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
File Roll Sink
| Property Name | Default | Description |
|---|
| channel | – | |
| type | – | file_roll. |
| sink.directory | – | sink目录 |
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
相关链接
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources