本文簡單測試Kafka整合Flume,從而實現"日志 -> Flume -> Kafka"。
操作環境:
Kafka版本:1.0.1
Flume版本:1.6.0
測試前需滿足以下條件:
- 已安裝Kafka和Flume
- Kafka已啟動
- Zookeeper已啟動
vi /home/hadoop/flume-kafka.conf
#添加如下內容#命名source/channel/sink
a1.sources = r1
a1.channels = c1
a1.sinks = k1#指定source策略
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/kafka-test
a1.sources.r1.inputCharset = utf-8#指定source的管道
a1.sources.r1.channels = c1#指定channel為memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100#指定sink為kafka sink,并指定sink的取數channel
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = kafka-test
a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
mkdir -p /home/hadoop/kafka-test
kafka-topics.sh --create --topic kafka-test --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
bin/flume-ng agent -n a1 -c conf -f /home/hadoop/flume-kafka.conf -Dflume.root.logger=info,console
kafka-console-consumer.sh -topic kafka-test --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning
echo "hello world" >> /home/hadoop/kafka-test/test.txt
Kafka + Flume集合了兩者優點,是常用的日志傳輸方案。
二者整合時,關鍵是如何配置Flume的conf文件,配置方案也可參考Flume官網。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态