Flume的使用实例
# 介绍
本文主要是记录Flume的各种sources和sink结合使用时的配置信息,由于Flume支持的sources和sink很多,这里只是拿一些常用的来记录。 本文会根据情况不定期更新
# 实例
# exec到hdfs
实现目标:实现从sources(exec)到sink(hdfs)的数据采集,实现将一条shell命令的结果存储到hdfs中
# agent配置
# 定义agent、sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#--------sources-----------
# exec sources
a1.sources.r1.type = exec
#执行一个cmd命令,将执行结果作为输入
#这里的command可以是一个linux命令(tail -F /fileName)或者执行一个shell脚本
#例如:可以使用tail -F命令监控某.log文件的变化,然后输出到hdfs中
a1.sources.r1.command = echo test exec to hdfs
#--------sink--------
# HDFS sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/test/%y-%m-%d/%H%M/%S
#a1.sinks.k1.hdfs.filePrefix = %{fileName}
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileSuffix = .txt
a1.sinks.k1.hdfs.inUseSuffix =
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#防止出现NullPoinet异常
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#防止写入到HDFS中文件内容乱码
a1.sinks.k1.hdfs.fileType = DataStream
# channels使用内存来缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 使sources和sink通过channel关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
配置说明: 以上配置实现了输入(echo test exec to hdfs)到输出hdfs(/flume/test/%y-%m-%d/%H%M/%S)目录中
- sources中配置了一个command,命令为:echo test exec to hdfs
- sink中配置了输出到hdfs中具体的某个目录中,文件名的前缀后缀等信息
- channels还是使用内存来缓存数据
# 启动agent
[root@hadoop1 conf-example]# flume-ng agent --conf conf --conf-file /usr/local/hadoop/flume-1.7.0/conf-example/exec-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
.........省略部分信息.........
17/10/19 00:27:31 INFO hdfs.BucketWriter: Creating /flume/test/17-10-19/0020/00/events-.1508344051412.txt.tmp
17/10/19 00:27:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1
2
3
4
2
3
4
# 验证
查看hdfs中该文件内容:
# spooldir到hdfs
监听文件夹中文件变化,然后存储到hdfs中
实现目标:监控某目录下文件变化,如果有新文件则将该文件存储到hdfs中
# 配置agent
# 定义agent、sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#--------sources-----------
# exec sources
a1.sources.r1.type = spooldir
#监控/root/test目录中文件的变化
#注意:不能往监控目中重复丢同名文件,否则会出现如下异常
#java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /root/test/a.txt.COMPLETED
a1.sources.r1.spoolDir = /root/test
a1.sources.r1.fileHeader = true
#--------sink--------
# HDFS sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/test/%y-%m-%d/%H%M/%S
#a1.sinks.k1.hdfs.filePrefix = %{fileName}
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileSuffix = .txt
a1.sinks.k1.hdfs.inUseSuffix =
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#防止出现NullPoinet异常
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#防止写入到HDFS中文件内容乱码
a1.sinks.k1.hdfs.fileType = DataStream
# channels使用内存来缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 使sources和sink通过channel关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 启动agent
flume-ng agent --conf conf --conf-file /usr/local/hadoop/flume-1.7.0/conf-example/spooldir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
1
# 验证:新增了一个a.txt文件
在 /root/test目录中新增一些文件,然后查看agent终端日志信息,和hdfs文件系统中文件信息
[root@hadoop1 test]# echo abcd efg > a.txt
[root@hadoop1 test]# ls
a.txt.COMPLETED
1
2
3
2
3
在往/root/test目录中添加了一个a.txt文件后,flume自动标记了该文件添加了一个.COMPLETED后缀。
查看agent控制台输出 查看hdfs中文件变化
# 验证:添加中文也是没问题的
[root@hadoop1 test]# echo 中文 > b.txt
[root@hadoop1 test]# ls
a.txt.COMPLETED b.txt.COMPLETED
1
2
3
2
3
查看agent控制台输出 查看hdfs中文件变化
# 验证:添加已经存在的重名文件
[root@hadoop1 test]# echo 123abc > a.txt
[root@hadoop1 test]# ls
a.txt a.txt.COMPLETED b.txt.COMPLETED
[root@hadoop1 test]#
1
2
3
4
2
3
4
此时查看agent控制台日志,得到如下错误:
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /root/test/a.txt.COMPLETED
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
结论:
- 不能在监控的目录下添加重名的文件,否则会出现File name has been re-used异常
- 不能监控文件内容的变化,只能监控不同文件的变化。例如:新增了一个文件b.txt,那么flume会将b.txt作为sources接入
# spooldir到hdfs
# 参考
http://flume.apache.org/FlumeUserGuide.html (opens new window) http://blog.csdn.net/qianshangding0708/article/details/49666821 (opens new window)
最后更新时间: 2022/7/23 10:17:11