Flume 基本使用
1.通过telnet 进行数据的传输1.1.0 创建一个文集夹mkdir tmpconf1.1.1 创建一文件并必须是(.conf)结尾touch tmpconf/a1.conf1.1.2添加配置信息# 定义这个agent中各组件的名字a1.sources = r1a1.sinks = k1a1.channels = c1# 描述和配置source组...
·
1.通过telnet 进行数据的传输
1.1.0 创建一个文集夹
mkdir tmpconf
1.1.1 创建一文件并必须是(.conf)结尾
touch tmpconf/a1.conf
1.1.2添加配置信息
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1 bind 你的IP port 端口号
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.52.120
a1.sources.r1.port = 44444
# 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.1.3 在本节点启动(注意:必须是在apache-flume-1.8.0-bin下)
bin/flume-ng agent -c conf -f tmpconf/a1.conf -n a1 -Dflume.root.logger=INFO,console
1.1.4 在第二个节点上启动(telnet)
telnet 192.168.100.201 44444
2.采集数据到HDFS
1.2.0准备工作
1. 在创建一个文件
touch tmpconf/b1.conf
2. 创见一个文件/export/taillogs/access_log 定时向 access_log 写入数据
/export/taillogs/access_log
3.创建一个临时目录 写一个脚本
vim tail-file.sh
脚本内容
#!/bin/bash
while true
do
date >> /export/taillogs/access_log;
sleep 0.5;
done
1.2.1 在文件中添加配置信息
a1.sources=r1
a1.channels=c1
a1.sinks=k1
# Describe/configure tail -F source1
a1.sources.r1.type=exec
a1.sources.r1.command =tail -F /export/taillogs/access_log
# Describe sink1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:8020/spooldir/
# Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
1.2.2 启动
配置文件节点
bin/flume-ng agent -c conf -f tmpconf/b1.conf -n a1 -Dflume.root.logger=INFO,console
脚本启动
sh tail.sh
3. 两个agent级联
1.3.0 准备工作
将3台虚拟机安装flume
scp -r apache-flume-1.8.0-bin hadoop02:/$PWD
scp -r apache-flume-1.8.0-bin hadoop03:/$PWD
在hadoop01 配置flume
vim tmpconf/c1.conf
在hadoop02 配置flume
vim tmpconf/c2.conf
1.3.1 在hadoop01 添加配置
##################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/taillogs/access_log
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
##sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.100.202
a1.sinks.k1.port = 4141
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.3.2 在hadoop2 添加配置
#me the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
##source中的avro组件是一个接收者服务
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.100.202
a1.sources.r1.port = 4141
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.100.201:8020/avro
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.3.3 启动
hadoop02 节点启动
bin/flume-ng agent -c conf -f tmpconf/c2.conf -n a1 -Dflume.root.logger=INFO,console
hadoop01 节点启动
bin/flume-ng agent -c conf -f tmpconf/c1.conf -n a1 -Dflume.root.logger=INFO,console
4.写入节点备份 按照顺序进行写入 第一个节死掉第二个节点替他工作 第一个节点复活后继续工作
1.4.0 准备工作
在hadoop01 创建文件
vim tmpconf/d1.conf
在hadoop02 创建文件
vim tmpconf/d2.conf
在hadoop03 创建文件
vim tmpconf/d3.conf
1.4.1 配置文件hadoop01
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#
##set gruop
agent1.sinkgroups = g1
##set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#
agent1.sources.r1.type = exec
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#
##set gruop
agent1.sinkgroups = g1
##set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#
agent1.sources.r1.type = exec
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#
##set gruop
agent1.sinkgroups = g1
##set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/taillogs/access_log
#
##set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
## set sink1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname =hadoop02
agent1.sinks.k1.port = 52020
#
## set sink2
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname =hadoop03
agent1.sinks.k2.port = 52020
#
##set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 2
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
#
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1
1.4.1 配置文件hadoop02
#et Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
## other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02
a1.sources.r1.port = 52020
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
##set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://hadoop01:8020/flume/failover/
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
1.4.2 配置文件hadoop03
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
## other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03
a1.sources.r1.port = 52020
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
##set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://hadoop01:8020/flume/failover/
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
1.4.3 启动命令
启动:hadoop03
bin/flume-ng agent -n a1 -c conf -f tmpconf/d3.conf -Dflume.root.logger=DEBUG,console
启动:hadoop02
bin/flume-ng agent -n a1 -c conf -f tmpconf/d2.conf -Dflume.root.logger=DEBUG,console
启动:hadoop01
bin/flume-ng agent -n agent1 -c conf -f tmpconf/d1.conf -Dflume.root.logger=DEBUG,console
5.设置负载均衡
1.5.0 准备工作
1.hadoop01
vim tmpconf/e1.conf
2.hadoop02
vim tmpconf/e2.conf
3.hadoop03
vim tmpconf/e2.conf
1.5.1 hadoop01 的配置文件
#agent name
a1.channels = c1
a1.sources = r1
a1.sinks = k1 k2
#set gruop
a1.sinkgroups = g1
#set sink group
a1.sinkgroups.g1.sinks = k1 k2
#set sources
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/taillogs/access_log
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# set sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 52021
# set sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 52021
#set failover
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
1.5.2 hadoop02 的配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02
a1.sources.r1.port = 52021
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.5.3hadoop03 的配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03
a1.sources.r1.port = 52021
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.5.4 启动
hadoop03
bin/flume-ng agent -n a1 -c conf -f tmpconf/e3.conf -Dflume.root.logger=DEBUG,console
hadoop02
bin/flume-ng agent -n a1 -c conf -f tmpconf/e2.conf -Dflume.root.logger=DEBUG,console
hadoop01
bin/flume-ng agent -n a1 -c conf -f tmpconf/e1.conf -Dflume.root.logger=DEBUG,console
6.过滤器
16.0.准备工作
hadoop01
vim tmpconf/h1.conf
hadoop02
vim tmpconf/h2.conf
16.1 添加配置文件 haoop01
# Name the components on this agent
a1.= r1 r2 r3
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/taillogs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
## static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/taillogs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/taillogs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
16.2添加配置文件 haoop02
# Name the components on this agent
a1.= r1 r2 r3
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/taillogs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
## static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/taillogs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/taillogs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
16.2 启动
hadoop01
bin/flume-ng agent -c conf -f tmpconf/h1.conf-name a1 -Dflume.root.logger=DEBUG,console
hadoop02
bin/flume-ng agent -c conf -f tmpconf/h2.conf f -name a1 -Dflume.root.logger=DEBUG,console
更多推荐
所有评论(0)