How to write a Java input plugin

Warning

This functionality is in beta and is subject to change. The design and code is less mature than official GA features and is being provided as-is with no warranties. Beta features are not subject to the support SLA of official GA features.

To develop a new Java input for Logstash, you write a new Java class thatconforms to the Logstash Java Inputs API, package it, and install it with thelogstash-plugin utility. We’ll go through each of those steps.

Set up your environment

Copy the example repo

Start by copying the example input plugin. The plugin API is currently part of theLogstash codebase so you must have a local copy of that available. You canobtain a copy of the Logstash codebase with the following git command:

git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>

The branch_name should correspond to the version of Logstash containing thepreferred revision of the Java plugin API.

Note

The beta version of the Java plugin API is available in the 6.7branch of the Logstash codebase.

Specify the target_folder for your local copy of the Logstash codebase. If youdo not specify target_folder, it defaults to a new folder called logstashunder your current folder.

Generate the .jar file

After you have obtained a copy of the appropriate revision of the Logstashcodebase, you need to compile it to generate the .jar file containing the Javaplugin API. From the root directory of your Logstash codebase ($LS_HOME), youcan compile it with ./gradlew assemble (or gradlew.bat assemble if you’rerunning on Windows). This should produce the$LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar where x, y, andz refer to the version of Logstash.

After you have successfully compiled Logstash, you need to tell your Java pluginwhere to find the logstash-core-x.y.z.jar file. Create a new file namedgradle.properties in the root folder of your plugin project. That file shouldhave a single line:

LOGSTASH_CORE_PATH=<target_folder>/logstash-core

where target_folder is the root folder of your local copy of the Logstash codebase.

Code the plugin

The example input plugin generates a configurable number of simple events beforeterminating. Let’s look at the main class in the example input.

@LogstashPlugin(name="java_input_example")public class JavaInputExample implements Input {    public static final PluginConfigSpec<Long> EVENT_COUNT_CONFIG =            PluginConfigSpec.numSetting("count", 3);    public static final PluginConfigSpec<String> PREFIX_CONFIG =            PluginConfigSpec.stringSetting("prefix", "message");    private String id;    private long count;    private String prefix;    private final CountDownLatch done = new CountDownLatch(1);    private volatile boolean stopped;    public JavaInputExample(String id, Configuration config, Context context) {            this.id = id;        count = config.get(EVENT_COUNT_CONFIG);        prefix = config.get(PREFIX_CONFIG);    }    @Override    public void start(Consumer<Map<String, Object>> consumer) {        int eventCount = 0;        try {            while (!stopped && eventCount < count) {                eventCount++;                consumer.accept.push(Collections.singletonMap("message",                        prefix + " " + StringUtils.center(eventCount + " of " + count, 20)));            }        } finally {            stopped = true;            done.countDown();        }    }    @Override    public void stop() {        stopped = true; // set flag to request cooperative stop of input    }    @Override    public void awaitStop() throws InterruptedException {        done.await(); // blocks until input has stopped    }    @Override    public Collection<PluginConfigSpec<?>> configSchema() {        return Arrays.asList(EVENT_COUNT_CONFIG, PREFIX_CONFIG);    }    @Override    public String getId() {        return this.id;    }}

Let’s step through and examine each part of that class.

Class declaration

@LogstashPlugin(name="java_input_example")public class JavaInputExample implements Input {

Notes about the class declaration:

  • All Java plugins must be annotated with the @LogstashPlugin annotation. Additionally:

    • The name property of the annotation must be supplied and defines the name of the plugin as it will be used in the Logstash pipeline definition. For example, this input would be referenced in the input section of the Logstash pipeline defintion as input { java_input_example => { .... } }
    • The value of the name property must match the name of the class excluding casing and underscores.
  • The class must implement the co.elastic.logstash.api.Input interface.

Plugin settings

The snippet below contains both the setting definition and the method referencing it.

public static final PluginConfigSpec<Long> EVENT_COUNT_CONFIG =        PluginConfigSpec.numSetting("count", 3);public static final PluginConfigSpec<String> PREFIX_CONFIG =        PluginConfigSpec.stringSetting("prefix", "message");@Overridepublic Collection<PluginConfigSpec<?>> configSchema() {    return Arrays.asList(EVENT_COUNT_CONFIG, PREFIX_CONFIG);}

The PluginConfigSpec class allows developers to specify the settings that aplugin supports complete with setting name, data type, deprecation status,required status, and default value. In this example, the count setting definesthe number of events that will be generated and the prefix setting defines anoptional prefix to include in the event field. Neither setting is required andif it is not explicitly set, the settings default to 3 and message,respectively.

The configSchema method must return a list of all settings that the pluginsupports. In a future phase of the Java plugin project, the Logstash executionengine will validate that all required settings are present and that nounsupported settings are present.

Constructor and initialization

private String id;private long count;private String prefix;public JavaInputExample(String id, Configuration config, Context context) {    this.id = id;    count = config.get(EVENT_COUNT_CONFIG);    prefix = config.get(PREFIX_CONFIG);}

All Java input plugins must have a constructor taking a String id andConfiguration and Context argument. This is the constructor that will beused to instantiate them at runtime. The retrieval and validation of all pluginsettings should occur in this constructor. In this example, the values of thetwo plugin settings are retrieved and stored in local variables for later use inthe start method.

Any additional initialization may occur in the constructor as well. If there areany unrecoverable errors encountered in the configuration or initialization ofthe input plugin, a descriptive exception should be thrown. The exception willbe logged and will prevent Logstash from starting.

Start method

@Overridepublic void start(Consumer<Map<String, Object>> consumer) {    int eventCount = 0;    try {        while (!stopped && eventCount < count) {            eventCount++;            consumer.accept.push(Collections.singletonMap("message",                    prefix + " " + StringUtils.center(eventCount + " of " + count, 20)));        }    } finally {        stopped = true;        done.countDown();    }}

The start method begins the event-producing loop in an input. Inputs are flexible and may produce events throughmany different mechanisms including:

  • a pull mechanism such as periodic queries of external database</li>
  • a push mechanism such as events sent from clients to a local network port</li>
  • a timed computation such as a heartbeat</li>
  • any other mechanism that produces a useful stream of events. Event streams may be either finite or infinite.If the input produces an infinite stream of events, this method should loop until a stop request is made throughthe stop method. If the input produces a finite stream of events, this method should terminate when the lastevent in the stream is produced or a stop request is made, whichever comes first.

Events should be constructed as instances of Map<String, Object> and pushed into the event pipeline via theConsumer<Map<String, Object>>.accept() method.

Stop and awaitStop methods

private final CountDownLatch done = new CountDownLatch(1);private volatile boolean stopped;@Overridepublic void stop() {    stopped = true; // set flag to request cooperative stop of input}@Overridepublic void awaitStop() throws InterruptedException {    done.await(); // blocks until input has stopped}

The stop method notifies the input to stop producing events. The stopmechanism may be implemented in any way that honors the API contract though avolatile boolean flag works well for many use cases.

Inputs stop both asynchronously and cooperatively. Use the awaitStop method toblock until the input has completed the stop process. Note that this methodshould not signal the input to stop as the stop method does. TheawaitStop mechanism may be implemented in any way that honors the API contractthough a CountDownLatch works well for many use cases.

getId method

@Overridepublic String getId() {    return id;}

For input plugins, the getId method should always return the id that was provided to the plugin through itsconstructor at instantiation time.

Unit tests

Lastly, but certainly not least importantly, unit tests are strongly encouraged.The example input plugin includes anexample unittest that you can use as a template for your own.

Package and deploy

Java plugins are packaged as Ruby gems for dependency management andinteroperability with Ruby plugins.

Note

One of the goals for Java plugin support is to eliminate the need for anyknowledge of Ruby or its toolchain for Java plugin development. Future phases ofthe Java plugin project will automate the packaging of Java plugins as Ruby gemsso no direct knowledge of or interaction with Ruby will be required. In thecurrent phase, Java plugins must still be manually packaged as Ruby gemsand installed with the logstash-plugin utility.

Compile to JAR file

The Java plugin should be compiled and assembled into a fat jar with thevendor task in the Gradle build file. This will package all Java dependenciesinto a single jar and write it to the correct folder for later packaging into aRuby gem.

Manually package as Ruby gem

Several Ruby source files are required to package the jar file as aRuby gem. These Ruby files are used only at Logstash startup time to identifythe Java plugin and are not used during runtime event processing.

Note

These Ruby source files will be automatically generated in a future release.

logstash-input-<input-name>.gemspec

Gem::Specification.new do |s|  s.name            = 'logstash-input-java_input_example'  s.version         = PLUGIN_VERSION  s.licenses        = ['Apache-2.0']  s.summary         = "Example input using Java plugin API"  s.description     = ""  s.authors         = ['Elasticsearch']  s.email           = 'info@elastic.co'  s.homepage        = "http://www.elastic.co/guide/en/logstash/current/index.html"  s.require_paths = ['lib', 'vendor/jar-dependencies']  # Files  s.files = Dir["lib/**/*","spec/**/*","*.gemspec","*.md","CONTRIBUTORS","Gemfile","LICENSE","NOTICE.TXT", "vendor/jar-dependencies/**/*.jar", "vendor/jar-dependencies/**/*.rb", "VERSION", "docs/**/*"]  # Special flag to let us know this is actually a logstash plugin  s.metadata = { 'logstash_plugin' => 'true', 'logstash_group' => 'input'}  # Gem dependencies  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"  s.add_runtime_dependency 'jar-dependencies'  s.add_development_dependency 'logstash-devutils'end

You can use this file with the following modifications:

  • s.name must follow the logstash-input-<input-name> pattern
  • s.version must match the project.version specified in the build.gradle file.Both versions should be set to be read from the VERSION file in this example.

lib/logstash/inputs/<input-name>.rb

# encoding: utf-8require "logstash/inputs/base"require "logstash/namespace"require "logstash-input-java_input_example_jars"require "java"class LogStash::inputs::JavaInputExample < LogStash::Inputs::Base  config_name "java_input_example"  def self.javaClass() org.logstash.javaapi.JavaInputExample.java_class; endend

Modify these items in the file above:

  • Change the name to correspond with the input name.
  • Change require "logstash-input-java_input_example_jars" to reference the appropriate "jars" fileas described below.
  • Change class LogStash::Inputs::JavaInputExample < LogStash::Inputs::Base to provide a unique anddescriptive Ruby class name.
  • Change config_name "java_input_example" to match the name of the plugin as specified in the name property ofthe @LogstashPlugin annotation.
  • Change def self.javaClass() org.logstash.javaapi.JavaInputExample.java_class; end to return theclass of the Java input.

lib/logstash-input-<input-name>_jars.rb

require 'jar_dependencies'require_jar('org.logstash.javaapi', 'logstash-input-java_input_example', '0.0.1')

In the file above:

  • Rename the file to correspond to the input name.
  • Change the require_jar directive to correspond to the group specified in theGradle build file, the name of the input JAR file, and the version asspecified in both the gemspec and Gradle build file.

After you have created the previous files and the plugin JAR file, build the gem using thefollowing command:

gem build logstash-input-<input-name>.gemspec

Installing the Java plugin in Logstash

After you have packaged your Java plugin as a Ruby gem, you can install it inLogstash with this command:

bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem

For Windows platforms: Substitute backslashes for forward slashes as appropriate in the command.

Running Logstash with the Java input plugin

The following is a minimal Logstash configuration that can be used to test thatthe Java input plugin is correctly installed and functioning.

input {  java_input_example {}}output {  stdout { codec => rubydebug }}

Copy the above Logstash configuration to a file such as java_input.conf.Start Logstash with:

bin/logstash --java-execution -f /path/to/java_input.conf
Note

The --java-execution flag to enable the Java execution engine isrequired as Java plugins are not supported in the Ruby execution engine.

The expected Logstash output (excluding initialization) with the configuration above is:

{      "@version" => "1",       "message" => "message        1 of 3       ",    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ}{      "@version" => "1",       "message" => "message        2 of 3       ",    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ}{      "@version" => "1",       "message" => "message        3 of 3       ",    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ}

Feedback

If you have any feedback on Java plugin support in Logstash, please comment on ourmain Github issue or post in theLogstash forum.