- Practical Real-time Data Processing and Analytics
- Shilpi Saxena Saurabh Gupta
- 707字
- 2021-07-08 10:23:13
Fluentd
Fluentd is another tool to process log files. There are three components of Fluentd, the same as in Logstash, which are input, filter, and output. There are multiple input and output plugins are available as per the needs of your use case. Here, we will demonstrate a similar example to those seen previously, that is, reading from the log file and pushing it into Kafka.
Download Fluentd from https://www.fluentd.org/download. As we are using Ubuntu, select Debian installation. Download td-agent_2.3.4-0_amd64.deb and install it using Software Center in Ubuntu.
Once it is installed on the system, validate it using the following command:
sudo td-agent --dry-run
The following output will be generated and certify that everything is good:
2017-02-25 16:19:49 +0530 [info]: reading config file path="/etc/td-agent/td-agent.conf" 2017-02-25 16:19:49 +0530 [info]: starting fluentd-0.12.31 as dry run mode 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-mixin-config-placeholders' version '0.4.0' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-kafka' version '0.5.3' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-kafka' version '0.4.1' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-mongo' version '0.7.16' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.5' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-s3' version '0.8.0' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-scribe' version '0.10.14' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-td' version '0.10.29' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-webhdfs' version '0.4.2' 2017-02-25 16:19:49 +0530 [info]: gem 'fluentd' version '0.12.31' 2017-02-25 16:19:49 +0530 [info]: adding match pattern="td.*.*" type="tdlog" 2017-02-25 16:19:49 +0530 [info]: adding match pattern="debug.**" type="stdout" 2017-02-25 16:19:49 +0530 [info]: adding source type="forward" 2017-02-25 16:19:49 +0530 [info]: adding source type="http" 2017-02-25 16:19:49 +0530 [info]: adding source type="debug_agent" 2017-02-25 16:19:49 +0530 [info]: using configuration file: <ROOT> <match td.*.*> @type tdlog apikey xxxxxx auto_create_table buffer_type file buffer_path /var/log/td-agent/buffer/td buffer_chunk_limit 33554432 <secondary> @type file path /var/log/td-agent/failed_records buffer_path /var/log/td-agent/failed_records.* </secondary> </match> <match debug.**> @type stdout </match> <source> @type forward </source> <source> @type http port 8888 </source> <source> @type debug_agent bind 127.0.0.1 port 24230 </source> </ROOT>
In Fluentd, to create a Pipeline, you need to write a configuration file, which is readable by Fluentd, and process the Pipeline. The default location of the Fluentd configuration file is /etc/td-agent/td-agent.conf. The following is a configuration file which reads from the log file and pushes each event into Kafka topic:
<source> @type tail path /home/ubuntu/demo/files/test pos_file /home/ubuntu/demo/fluentd/test.log.pos tag fluentd.example format none </source> <match *.**> @type kafka_buffered brokers localhost:9092 default_topic fluentd-example max_send_retries 1 </match>
In the configuration file, there are two directives defined previously, out of six available directives. The Source directive is where all data comes from. @type tells us which type of input plugin is being used. Here, we are using tail, which will tail the log file. This is good for the use case where the input log file is running a log file in which events/logs are getting appended at the end of the file. It is same as the tail -f operation in Linux. There are multiple parameters of the tail input plugin. Path is the absolute path of the log file. Pos_file is the file which will keep track of last read position of the input file. Tag is the tag of the event. If you want to define the input format like CSV, or apply regex, then use format. As this parameter is mandatory, we used none which will use the input text as is.
The Match directive tells Fluentd what to do with the input, *.The ** pattern is telling us that whatever is coming in through the log files, just push it into the Kafka topic. If you want to use a different topic for error and information logs, then define the pattern as error or info and tag the input as the same. Brokers is a host and port where Kafka broker is running on a system. Default-topic is the topic name where you want to push the events. If you want to retry after message failure, then set max_send_reties to one or more.
Replace the previous configuration in /etc/td-agent/td-agent.conf.
Now, create topic on Kafka, as follows:
/bin/kafka-topics.sh --create --topic fluentd-example --zookeeper localhost:2181 --partitions 1 -replication-factor 1
Start the Fluentd agent, as follows:
sudo td-agent
Start Kafka consumer, as follows:
/bin/kafka-console-consumer.sh --topic fluentd-example --bootstrap-server localhost:9092
Once the process is started without any exceptions, then start adding statements in /home/ubuntu/demo/files/test as shown in the following screenshot:

The output on Kafka will be as shown in the following screenshot:

- Flask Web全棧開發實戰
- Developing Mobile Web ArcGIS Applications
- 人臉識別原理及算法:動態人臉識別系統研究
- Mastering Rust
- Java程序設計:原理與范例
- Hands-On Natural Language Processing with Python
- 區塊鏈技術進階與實戰(第2版)
- Unity 2018 Augmented Reality Projects
- uni-app跨平臺開發與應用從入門到實踐
- Bootstrap for Rails
- IBM Cognos TM1 Developer's Certification guide
- Clojure High Performance Programming(Second Edition)
- 樹莓派開發從零開始學:超好玩的智能小硬件制作書
- Thymeleaf 3完全手冊
- Effective Python:編寫高質量Python代碼的90個有效方法(原書第2版)