浅墨散人 浅墨散人
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • Flume

    • Flume
    • Flume的介绍及原理
    • Flume的安装和使用
    • Flume的使用实例
      • 介绍
      • 实例
        • exec到hdfs
        • spooldir到hdfs
        • spooldir到hdfs
        • 参考
  • BigData
  • Flume
2018-08-10
目录

Flume的使用实例

# 介绍

本文主要是记录Flume的各种sources和sink结合使用时的配置信息,由于Flume支持的sources和sink很多,这里只是拿一些常用的来记录。 本文会根据情况不定期更新

# 实例

# exec到hdfs

实现目标:实现从sources(exec)到sink(hdfs)的数据采集,实现将一条shell命令的结果存储到hdfs中

# agent配置

# 定义agent、sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#--------sources-----------
# exec sources
a1.sources.r1.type = exec
#执行一个cmd命令,将执行结果作为输入
#这里的command可以是一个linux命令(tail -F /fileName)或者执行一个shell脚本
#例如:可以使用tail -F命令监控某.log文件的变化,然后输出到hdfs中
a1.sources.r1.command = echo test exec to hdfs

#--------sink--------
# HDFS sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/test/%y-%m-%d/%H%M/%S
#a1.sinks.k1.hdfs.filePrefix = %{fileName}
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileSuffix = .txt
a1.sinks.k1.hdfs.inUseSuffix =
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#防止出现NullPoinet异常
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#防止写入到HDFS中文件内容乱码
a1.sinks.k1.hdfs.fileType = DataStream

# channels使用内存来缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 使sources和sink通过channel关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

配置说明: 以上配置实现了输入(echo test exec to hdfs)到输出hdfs(/flume/test/%y-%m-%d/%H%M/%S)目录中

  1. sources中配置了一个command,命令为:echo test exec to hdfs
  2. sink中配置了输出到hdfs中具体的某个目录中,文件名的前缀后缀等信息
  3. channels还是使用内存来缓存数据

# 启动agent

[root@hadoop1 conf-example]# flume-ng agent --conf conf --conf-file /usr/local/hadoop/flume-1.7.0/conf-example/exec-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
.........省略部分信息.........
17/10/19 00:27:31 INFO hdfs.BucketWriter: Creating /flume/test/17-10-19/0020/00/events-.1508344051412.txt.tmp
17/10/19 00:27:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1
2
3
4

# 验证

img 查看hdfs中该文件内容: img

# spooldir到hdfs

监听文件夹中文件变化,然后存储到hdfs中

实现目标:监控某目录下文件变化,如果有新文件则将该文件存储到hdfs中

# 配置agent

# 定义agent、sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#--------sources-----------
# exec sources
a1.sources.r1.type = spooldir
#监控/root/test目录中文件的变化
#注意:不能往监控目中重复丢同名文件,否则会出现如下异常
#java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /root/test/a.txt.COMPLETED
a1.sources.r1.spoolDir = /root/test
a1.sources.r1.fileHeader = true

#--------sink--------
# HDFS sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/test/%y-%m-%d/%H%M/%S
#a1.sinks.k1.hdfs.filePrefix = %{fileName}
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileSuffix = .txt
a1.sinks.k1.hdfs.inUseSuffix =
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#防止出现NullPoinet异常
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#防止写入到HDFS中文件内容乱码
a1.sinks.k1.hdfs.fileType = DataStream

# channels使用内存来缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 使sources和sink通过channel关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 启动agent

flume-ng agent --conf conf --conf-file /usr/local/hadoop/flume-1.7.0/conf-example/spooldir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
1

# 验证:新增了一个a.txt文件

在 /root/test目录中新增一些文件,然后查看agent终端日志信息,和hdfs文件系统中文件信息

[root@hadoop1 test]# echo abcd efg > a.txt
[root@hadoop1 test]# ls
a.txt.COMPLETED
1
2
3

在往/root/test目录中添加了一个a.txt文件后,flume自动标记了该文件添加了一个.COMPLETED后缀。 查看agent控制台输出 img 查看hdfs中文件变化 img

# 验证:添加中文也是没问题的

[root@hadoop1 test]# echo 中文 > b.txt
[root@hadoop1 test]# ls
a.txt.COMPLETED  b.txt.COMPLETED
1
2
3

查看agent控制台输出 img 查看hdfs中文件变化 img

# 验证:添加已经存在的重名文件

[root@hadoop1 test]# echo 123abc > a.txt
[root@hadoop1 test]# ls
a.txt  a.txt.COMPLETED  b.txt.COMPLETED
[root@hadoop1 test]#
1
2
3
4

此时查看agent控制台日志,得到如下错误:

java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /root/test/a.txt.COMPLETED
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
    at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
1
2
3
4
5
6
7
8
9
10
11
12

结论:

  1. 不能在监控的目录下添加重名的文件,否则会出现File name has been re-used异常
  2. 不能监控文件内容的变化,只能监控不同文件的变化。例如:新增了一个文件b.txt,那么flume会将b.txt作为sources接入

# spooldir到hdfs

# 参考

http://flume.apache.org/FlumeUserGuide.html (opens new window) http://blog.csdn.net/qianshangding0708/article/details/49666821 (opens new window)

#Flume
最后更新时间: 2022/7/23 10:17:11
Flume的安装和使用

← Flume的安装和使用

最近更新
01
分区分桶
08-21
02
数据模型(重要)
08-21
03
安装和编译
08-21
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式