大数据采集工具flume各种采集方案案例
以下是整理flume的各种采集方式 代码直接用
一、source类型是netcat
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令./flume-ng agent -n a1 -f ../conf/netcat.conf -Dflume.root.logger=INFO,console
二、source类型是spooldir
a1.sources = r1 ##给agent的source起名
a1.sinks = k1 ##给agent的sinks起名
a1.channels = c1 ##给agent的channels起名
a1.sources.r1.type = spooldir ##文件夹
a1.sources.r1.spoolDir = /root/flume ##要采集的目录
a1.sources.r1.fileHeader = true ##采集过的文件是否需要添加一个后缀
a1.sinks.k1.type = logger
a1.channels.c1.type = memory ##把缓存数据放到内存
a1.channels.c1.capacity = 1000 ##管道里面最多可以存放多少事件
a1.channels.c1.transactionCapacity = 100 ##每次对最接收多少事件
a1.sources.r1.channels = c1 ## 把source和channels连接上
a1.sinks.k1.channel = c1 ##把sinks和channel连接上
命令:…/bin/flume-ng agent -n a1 -f ../conf/spooldir.conf -Dflume.root.logger=INFO,console
三、source 类型是avro
这个配置source是avro类型,是一个服务器,这个服务器
开启一个端口8088,目是接收数据的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = linux1 ##当前这一台机器的ip
a1.sources.r1.port = 8088
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令:…/bin/flume-ng agent -n a1 -f ../conf/server.conf -Dflume.root.logger=INFO,console
四、source是socket类型
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type =syslogtcp
a1.sources.r1.bind=linux1
a1.sources.r1.port=8080
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、sink类型是avro
数据发送者 是一个客户端 目的就是发送数据
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2 ##当前这一台机器的ip
a1.sources.r1.port = 666
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
./bin/flume-ng agent -n a1 -f ./conf/client.conf -Dflume.root.logger=INFO,console
六、sink类型是两个avro
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2
a1.sources.r1.port = 666
a1.sources.r2.type = netcat
a1.sources.r2.bind = linux2
a1.sources.r2.port = 777
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux1
a1.sinks.k2.port = 8088
a1.sinks.k2.batch-size = 2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
七、sink 类型是hdfs
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = hdfs #sink到hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
##filePrefix 默认值:FlumeData
##写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。
a1.sinks.k1.hdfs.filePrefix = events-
##默认值:30
##hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒
##如果设置成0,则表示不根据时间来滚动文件
#注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,
#并新打开一个临时文件来写入数据;
a1.sinks.k1.hdfs.rollInterval = 30
##默认值:1024
##当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
##如果设置成0,则表示不根据临时文件大小来滚动文件;
a1.sinks.k1.hdfs.rollSize = 0
##默认值:10
##当events数据达到该数量时候,将临时文件滚动成目标文件;
##如果设置成0,则表示不根据events数据来滚动文件;
a1.sinks.k1.hdfs.rollCount = 0
##batchSize 默认值:100
##每个批次刷新到HDFS上的events数量;
a1.sinks.k1.hdfs.batchSize = 1
##useLocalTimeStamp
##默认值:flase
##是否使用当地时间。
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
八、sink类型是kafka类型的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = Hellokafka
a1.sinks.k1.brokerList = linux1:9092,linux2:9092,linux3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
九、Source 是mysql
a1.sources = r1
a1.sinks = k1
a1.channels = c1
###########sources#################
r1
a1.sources.r1.type = org.keedio.flume.source.SQLSource
a1.sources.r1.hibernate.connection.url = jdbc:mysql://localhost:3306/test
a1.sources.r1.hibernate.connection.user = root
a1.sources.r1.hibernate.connection.password = 123456
a1.sources.r1.hibernate.connection.autocommit = true
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.r1.run.query.delay=10000
a1.sources.r1.status.file.path = /root/data/flume/
a1.sources.r1.status.file.name = sqlSource.status
a1.sources.r1.start.from = 0
a1.sources.r1.custom.query = select id,userName from user where id > $@$ order by id asc
a1.sources.r1.batch.size = 1000
a1.sources.r1.max.rows = 1000
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.r1.hibernate.c3p0.min_size=1
a1.sources.r1.hibernate.c3p0.max_size=10
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1