官术网_书友最值得收藏!

Fluentd

Fluentd is another tool to process log files. There are three components of Fluentd, the same as in Logstash, which are input, filter, and output. There are multiple input and output plugins are available as per the needs of your use case. Here, we will demonstrate a similar example to those seen previously, that is, reading from the log file and pushing it into Kafka.

Download Fluentd from https://www.fluentd.org/download. As we are using Ubuntu, select Debian installation. Download td-agent_2.3.4-0_amd64.deb and install it using Software Center in Ubuntu.

Once it is installed on the system, validate it using the following command:

    sudo td-agent --dry-run

The following output will be generated and certify that everything is good:

    2017-02-25 16:19:49 +0530 [info]: reading config file path="/etc/td-agent/td-agent.conf"
    2017-02-25 16:19:49 +0530 [info]: starting fluentd-0.12.31 as dry run mode
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-mixin-config-placeholders' version '0.4.0'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-kafka' version '0.5.3'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-kafka' version '0.4.1'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-mongo' version '0.7.16'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.5'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-s3' version '0.8.0'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-td' version '0.10.29'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-webhdfs' version '0.4.2'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluentd' version '0.12.31'
    2017-02-25 16:19:49 +0530 [info]: adding match pattern="td.*.*" type="tdlog"
    2017-02-25 16:19:49 +0530 [info]: adding match pattern="debug.**" type="stdout"
    2017-02-25 16:19:49 +0530 [info]: adding source type="forward"
    2017-02-25 16:19:49 +0530 [info]: adding source type="http"
    2017-02-25 16:19:49 +0530 [info]: adding source type="debug_agent"
    2017-02-25 16:19:49 +0530 [info]: using configuration file: <ROOT>
      <match td.*.*>
        @type tdlog
        apikey xxxxxx
        auto_create_table 
        buffer_type file
        buffer_path /var/log/td-agent/buffer/td
        buffer_chunk_limit 33554432
        <secondary>
          @type file
          path /var/log/td-agent/failed_records
          buffer_path /var/log/td-agent/failed_records.*
        </secondary>
      </match>
      <match debug.**>
        @type stdout
      </match>
      <source>
        @type forward
      </source>
      <source>
        @type http
        port 8888
      </source>
      <source>
        @type debug_agent
        bind 127.0.0.1
        port 24230
      </source>
    </ROOT>
  

In Fluentd, to create a Pipeline, you need to write a configuration file, which is readable by Fluentd, and process the Pipeline. The default location of the Fluentd configuration file is /etc/td-agent/td-agent.conf. The following is a configuration file which reads from the log file and pushes each event into Kafka topic:

    <source>
      @type tail
      path /home/ubuntu/demo/files/test
      pos_file /home/ubuntu/demo/fluentd/test.log.pos
      tag fluentd.example
      format none
    </source>
    <match *.**>
      @type kafka_buffered
      brokers localhost:9092
      default_topic fluentd-example
      max_send_retries 1
    </match>
  

In the configuration file, there are two directives defined previously, out of six available directives. The Source directive is where all data comes from. @type tells us which type of input plugin is being used. Here, we are using tail, which will tail the log file. This is good for the use case where the input log file is running a log file in which events/logs are getting appended at the end of the file. It is same as the tail -f operation in Linux. There are multiple parameters of the tail input plugin. Path is the absolute path of the log file. Pos_file is the file which will keep track of last read position of the input file. Tag is the tag of the event. If you want to define the input format like CSV, or apply regex, then use format. As this parameter is mandatory, we used none which will use the input text as is.

The Match directive tells Fluentd what to do with the input, *.The ** pattern is telling us that whatever is coming in through the log files, just push it into the Kafka topic. If you want to use a different topic for error and information logs, then define the pattern as error or info and tag the input as the same. Brokers is a host and port where Kafka broker is running on a system. Default-topic is the topic name where you want to push the events. If you want to retry after message failure, then set max_send_reties to one or more.

Replace the previous configuration in /etc/td-agent/td-agent.conf.

Now, create topic on Kafka, as follows:

    /bin/kafka-topics.sh --create --topic fluentd-example --zookeeper localhost:2181 --partitions 1 -replication-factor 1 

Start the Fluentd agent, as follows:

    sudo td-agent

Start Kafka consumer, as follows:

    /bin/kafka-console-consumer.sh --topic fluentd-example --bootstrap-server localhost:9092

Once the process is started without any exceptions, then start adding statements in /home/ubuntu/demo/files/test as shown in the following screenshot:

The output on Kafka will be as shown in the following screenshot:

主站蜘蛛池模板: 丹巴县| 岳普湖县| 石棉县| 祁连县| 宝丰县| 白河县| 铜山县| 读书| 龙门县| 行唐县| 和林格尔县| 开阳县| 禹州市| 章丘市| 中江县| 静乐县| 万年县| 阜阳市| 灌阳县| 清远市| 阜南县| 抚州市| 红安县| 巴塘县| 天等县| 安仁县| 樟树市| 齐齐哈尔市| 贵南县| 江门市| 姜堰市| 沙河市| 南溪县| 涡阳县| 辽阳市| 阿坝| 黑水县| 三明市| 府谷县| 淅川县| 额敏县|