Ingesting Filebeat Data Into Your Apache Camel Flow

Access the code through github!

Filebeat is a lightweight log shipper from Elastic. At the most basic level, we point it to some log files and add some regular expressions for lines we want to transport elsewhere. As of version 6.2 Filebeat supports the following outputs:

Elasticsearch Redis
Logstash File
Kafka Console

As expected, interaction between Elastic components works great! I found Filebeat uploading directly into elasticsearch to be easy and efficient; however, in some cases we may not be able to run both on the same host or configure it in such a way that direct connectivity can be implemented.

Enter Apache Camel. Apache Camel is a powerful open source integration framework based on Enterprise Integration Patterns; it provides us a way to route data without having to write a lot of the plumbing ourselves. We will use existing Apache Camel components to ingest data from Filebeat and route it elsewhere.

One of the supported output types is Logstash. Filebeat communicates with logstash using a protocol called Lumberjack. This protocol does not have a published spec and it is considered an internal protocol to Elastic products. As a result, integration components exist, and the protocol code is open source, so there is no reason to not use it, especially when the alternative is investigating writing one’s own module/plugin for Filebeat.

In your filebeat.yml you will need to configure the prospector input and the logstash output; below are samples:

filebeat.prospectors:
 - type: log
 enabled: true
 paths:
    - /path/to/log/file(s)
  include_lines: ['WARNING']
 output.logstash:
  # The Camel Component listening
  hosts: ["localhost:5044"]

To run:
./filebeat -e -c filebeat.yml

There is an existing Camel Component for lumberjack. We set our Camel program to listen on localhost on port 5044 for Filebeat messages. We’ll be using version 2.21 of both Camel and the Lumberjack Component.

In addition to camel-core, you will need the following dependencies:

<dependency>  
  <groupId>org.apache.camel</groupId>
  <artifactlId>camel-lumberjack</artifactId>
  <version>2.21.0</version>
</dependency>
  <groupId>org.apache.camel</groupId> 
  <artifactId>camel-stream</artifactId> 
  <version>2.21.0</version>
</dependency>
<dependency>
  <groupId>com.google.code.gson</groupId>
  <artifactId>gson</artifactId> 
  <version>2.3.1</version>
</dependency>

The code inside your route to consume these messages is as simple as:

from(“lumberjack:localhost”) 
.to(“whereever”);

Once the Camel Component has reassembled the payload from Lumberjack, we can then route it like any other message. The Lumberjack Camel component result body is a Map<String, Object>. It may be desirable to pull out a specific field or treat the result as JSON (since that’s what it is effectively).

from("lumberjack:localhost") 
  .setBody(simple("${body[message]}")) 
  .to("stream:out");

Here we dump it out to a JSON string, this would be useful if we were to put the result on a JMS queue for later ingest into a system that would expect as JSON body, such as elasticsearch. The conversion is handled by Google’s Gson library:

from("lumberjack:localhost")
  .process(new Processor(){
    public void process(Exchange exchange) throws Exception {
     LinkedHashMap<String, String> logLine = 
exchange.getIn().getBody(LinkedHashMap.class);
      Gson gson = new Gson();
      String newLine = gson.toJson(logLine, LinkedHashMap.class); 
      exchange.getIn().setBody(newLine);
    }
   })
 .to("stream:out");

Once ingested, you are only limited by your imagination.

Let’s drop the message off onto a JMS queue and retrieve it back (simulating a network hop to somewhere) and ingest it into Elasticsearch. For setup, ActiveMQ is run with default settings and test_queue was created (There is a Camel component specifically for ActiveMQ, however, that is out of the scope of this article.). Elasticsearch is run with default settings.

The main code and route builder:

import org.apache.camel.main.Main;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MainApp {

  public static void main(String... args) throws Exception {
    
    ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
    Main main = new Main();
    main.bind("jms", JmsComponent.jmsComponentAutoAcknowledge(factory));
    main.addRouteBuilder(new LumberjackRouteBuilder());
    main.run(args);
  }
}

import org.apache.camel.Message;
import org.apache.camel.builder.Routebuilder;
import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.component.http4.HttpMethods;

import java.util.LinkedHashMap;
import com.google.gson.Gson;

public class LumberjackRouteBuilder extends RouteBuilder {
  private Gson gson = new Gson();
  public void configure (){
    from("lumberjack:localhost")
      .process(new Processor(){
        public void process(Exchange exchange) throws Exception {
          LinkedHashMap<String, String> logLine =
exchange.getIn().getBody(LinkedHashMap.class);
          String newLine = gson.toJson(logLine, LinkedHashMap.class);
          exchange.getIn().setBody(newLine);
        }
      })
    .to("jms:test_queue");
   from("jms:test_queue")
      .setHeader(Exchange.HTTP_METHOD,
constant(org.apache.camel.component.http4.HttpMethods.POST))
      .setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
      .to("http4://localhost:9200/test_index/doc");
 }
}

Fire up this Camel program and then Filebeat. You will see messages being ingested by Camel, JMS messages produced and consumed, and then uploaded into Elasticsearch.

Happy coding!

Written by: Knowles Atchison

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Up ↑

%d bloggers like this: