kafka常用命令,【Kafka】測試Kafka整合Flume

 2023-11-14 阅读 14 评论 0

摘要:本文簡單測試Kafka整合Flume,從而實現"日志 -> Flume -> Kafka"。 操作環境: Kafka版本:1.0.1 Flume版本:1.6.0 測試前需滿足以下條件: 已安裝Kafka和FlumeKafka已啟動Zookeeper已啟動 1. 配置Flume的conf文件 vi /home/hadoop/fl

本文簡單測試Kafka整合Flume,從而實現"日志 -> Flume -> Kafka"。
操作環境:
Kafka版本:1.0.1
Flume版本:1.6.0
測試前需滿足以下條件:

  1. 已安裝Kafka和Flume
  2. Kafka已啟動
  3. Zookeeper已啟動

1. 配置Flume的conf文件

vi /home/hadoop/flume-kafka.conf
#添加如下內容#命名source/channel/sink
a1.sources = r1
a1.channels = c1
a1.sinks = k1#指定source策略
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/kafka-test
a1.sources.r1.inputCharset = utf-8#指定source的管道
a1.sources.r1.channels = c1#指定channel為memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100#指定sink為kafka sink,并指定sink的取數channel
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = kafka-test
a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

2. 創建Flume的監控目錄

mkdir -p /home/hadoop/kafka-test

3. Kafka創建topic

kafka-topics.sh --create --topic kafka-test --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181

4. 啟動Flume

 bin/flume-ng agent -n a1 -c conf -f /home/hadoop/flume-kafka.conf -Dflume.root.logger=info,console

5. 啟動Kafka消費者

kafka-console-consumer.sh -topic kafka-test --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning

6. 測試效果

6.1 Flume監控目錄生成文件
echo "hello world" >> /home/hadoop/kafka-test/test.txt
6.2 Kafka中已消費到數據

在這里插入圖片描述

總結

Kafka + Flume集合了兩者優點,是常用的日志傳輸方案。
二者整合時,關鍵是如何配置Flume的conf文件,配置方案也可參考Flume官網。

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/4/172994.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息