Azure Event Hubs plugin

  • Plugin version: v1.1.0
  • Released on: 2019-01-24
  • Changelog

For other versions, see theVersioned plugin docs.

Getting Help

For questions about the plugin, open a topic in the Discuss forums. For bugs or feature requests, open an issue in Github.For the list of Elastic supported plugins, please consult the Elastic Support Matrix.

Description

This plugin consumes events fromAzure Event Hubs, ahighly scalable data streaming platform and event ingestion service. Eventproducers send events to the Azure Event Hub, and this plugin consumes thoseevents for use with Logstash.

Many Azure services integrate with the Azure Event Hubs.AzureMonitor, for example, integrates with Azure Event Hubs to provide infrastructure metrics.

Event Hub connection string

The plugin uses the connection string to access Azure Events Hubs. Find theconnection string here: Azure Portal-> Event Hub ->Shared access polices. The event_hub_connections option passes the Event Hubconnection strings for the basic configuration.

Sample connection string:

Endpoint=sb://logstash.servicebus.windows.net/;SharedAccessKeyName=activity-log-read-only;SharedAccessKey=mm6AbDcEfj8lk7sjsbzoTJ10qAkiSaG663YykEAG2eg=;EntityPath=insights-operational-logs

Blob Storage and connection string

Azure Blob Storageaccount is an essential part of Azure-to-Logstash configuration.A Blob Storage account is a central location that enables multiple instances ofLogstash to work together to process events. It records theoffset (location) of processed events. On restart, Logstash resumes processingexactly where it left off.

Configuration notes:

  • A Blob Storage account is highly recommended for use with this plugin, and islikely required for production servers.
  • The storage_connection option passes the blob storage connection string.
  • Configure all Logstash instances to use the same storage_connection to get thebenefits of shared processing.

Sample Blob Storage connection string:

DefaultEndpointsProtocol=https;AccountName=logstash;AccountKey=ETOPnkd/hDAWidkEpPZDiXffQPku/SZdXhPSLnfqdRTalssdEuPkZwIcouzXjCLb/xPZjzhmHfwRCGo0SBSw==;EndpointSuffix=core.windows.net

Find the connection string to Blob Storage here:Azure Portal-> Blob Storage account -> Access keys.

Best practices

Here are some guidelines to help you avoid data conflicts that can cause lostevents.

  • Create a Logstash consumer group.Create a new consumer group specifically for Logstash. Do not use the $default orany other consumer group that might already be in use. Reusing consumer groupsamong non-related consumers can cause unexpected behavior and possibly lostevents. All Logstash instances should use the same consumer group so that they canwork together for processing events.
  • Avoid overwriting offset with multiple Event Hubs.The offsets (position) of the Event Hubs are stored in the configured Azure Blobstore. The Azure Blob store uses paths like a file system to store the offsets.If the paths between multiple Event Hubs overlap, then the offsets may be storedincorrectly.To avoid duplicate file paths, use the advanced configuration model and makesure that at least one of these options is different per Event Hub:

    • storage_connection
    • storage_container (defaults to Event Hub name if not defined)
    • consumer_group
  • Set number of threads correctly.The number of threads should equal the number of Event Hubs plus one (or more).Each Event Hub needs at least one thread. An additional thread is needed to helpcoordinate the other threads. The number of threads should not exceed the numberof Event Hubs multiplied by the number of partitions per Event Hub plus one.Threads are currently available only as a global setting.

    • Sample scenario: Event Hubs = 4. Partitions on each Event Hub = 3.Minimum threads is 5 (4 Event Hubs plus one). Maximum threads is 13 (4 EventHubs times 3 partitions plus one).
    • If you’re collecting activity logs from one event hub instance,then only 2 threads (1 Event Hub plus one) are required.

Configuration models

This plugin supports two configuration models: basic and advanced. Basicconfiguration is recommended for most use cases, and is illustrated in theexamples throughout this topic.

Basic configuration (default)

Basic configuration is the default and supports consuming from multiple EventHubs. All Events Hubs, except for the connection string, share the sameconfiguration.

You supply a list of Event Hub connection strings, complete with the Event HubEntityPath that defines the Event Hub name. All other configuration settings areshared.

input {   azure_event_hubs {      event_hub_connections => ["Endpoint=sb://example1...EntityPath=insights-logs-errors", "Endpoint=sb://example2...EntityPath=insights-metrics-pt1m"]      threads => 8      decorate_events => true      consumer_group => "logstash"      storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."   }}

Advanced configuration

The advanced configuration model accommodates deployments where different EventHubs require different configurations. Options can be configured per Event Hub.You provide a list of Event Hub names through the event_hubs option. Undereach name, specify the configuration for that Event Hub. Options can be definedglobally or expressed per Event Hub.

If the same configuration option appears in both the global and event_hubsections, the more specific (event_hub) setting takes precedence.

Note

Advanced configuration is not necessary or recommended for most use cases.

input {   azure_event_hubs {     config_mode => "advanced"     threads => 8     decorate_events => true     storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."     event_hubs => [        {"insights-operational-logs" => {         event_hub_connection => "Endpoint=sb://example1..."         initial_position => "beginning"         consumer_group => "iam_team"        }},      {"insights-metrics-pt1m" => {         event_hub_connection => "Endpoint=sb://example2..."         initial_position => "end"         consumer_group => "db_team"       }}     ]   }}

In this example, storage_connection and decorate_events are applied globally.The two Event Hubs each have their own settings for consumer_groupsand initial_position.

Azure Event Hubs Configuration Options

This plugin supports the following configuration options plus the Common Options described later.

Setting Input type Required

config_mode

string, (basic or advanced)

No

event_hubs

array

Yes, when config_mode => advanced

event_hub_connections

array

Yes, when config_mode => basic

event_hub_connection

string

Yes, when config_mode => advanced

checkpoint_interval

number

No

consumer_group

string

No

decorate_events

boolean

No

initial_position

string, (beginning, end, or look_back)

No

initial_position_look_back

number

No, unless initial_position => look_back

max_batch_size

number

No

storage_connection

string

No

storage_container

string

No

threads

number

No

Also see Common Options for a list of options supported by allinput plugins.

Note

All Event Hubs options are common to both basic and advancedconfigurations, with the following exceptions. The basic configuration usesevent_hub_connections to support multiple connections. The advancedconfiguration uses event_hubs and event_hub_connection (singular).

config_mode

  • Value type is string
  • Valid entries are basic or advanced
  • Default value is basic

Sets configuration to either Basic configuration (default) or Advanced configuration.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"  , "Endpoint=sb://example2...;EntityPath=event_hub_name2"  ]}

event_hubs

  • Value type is array
  • No default value
  • Ignored for basic configuration
  • Required for advanced configuration

Defines the Event Hubs to be read. An array of hashes where each entry is ahash of the Event Hub name and its configuration options.

azure_event_hubs {  config_mode => "advanced"  event_hubs => [      { "event_hub_name1" => {          event_hub_connection => "Endpoint=sb://example1..."      }},      { "event_hub_name2" => {          event_hub_connection => "Endpoint=sb://example2..."          storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."          storage_container => "my_container"     }}   ]   consumer_group => "logstash" # shared across all Event Hubs}

event_hub_connections

  • Value type is array
  • No default value
  • Required for basic configuration

List of connection strings that identifies the Event Hubs to be read. Connectionstrings include the EntityPath for the Event Hub.

The event_hub_connections option is definedper Event Hub. All other configuration options are shared among Event Hubs.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"  , "Endpoint=sb://example2...;EntityPath=event_hub_name2"  ]}

event_hub_connection

  • Value type is string
  • No default value
  • Valid only for advanced configuration

Connection string that identifies the Event Hub to be read. Advancedconfiguration options can be set per Event Hub. This option modifiesevent_hub_name, and should be nested under it. (See sample.) This optionaccepts only one connection string.

azure_event_hubs {   config_mode => "advanced"   event_hubs => [     { "event_hub_name1" => {        event_hub_connection => "Endpoint=sb://example1...;EntityPath=event_hub_name1"     }}   ]}

checkpoint_interval

  • Value type is number
  • Default value is 5 seconds
  • Set to 0 to disable.

Interval in seconds to write checkpoints during batch processing. Checkpointstell Logstash where to resume processing after a restart. Checkpoints areautomatically written at the end of each batch, regardless of this setting.

Writing checkpoints too frequently can slow down processing unnecessarily.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]   checkpoint_interval => 5}

consumer_group

  • Value type is string
  • Default value is $Default

Consumer group used to read the Event Hub(s). Create a consumer groupspecifically for Logstash. Then ensure that all instances of Logstash use thatconsumer group so that they can work together properly.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]   consumer_group => "logstash"}

decorate_events

  • Value type is boolean
  • Default value is false

Adds metadata about the Event Hub, including Event Hub name, consumer_group,processor_host, partition, offset, sequence, timestamp, and event_size.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]   decorate_events => true}

initial_position

  • Value type is string
  • Valid arguments are beginning, end, look_back
  • Default value is beginning

When first reading from an Event Hub, start from this position:

  • beginning reads all pre-existing events in the Event Hub
  • end does not read any pre-existing events in the Event Hub
  • look_back reads end minus a number of seconds worth of pre-existing events.You control the number of seconds using the initial_position_look_back option.

Note: If storage_connection is set, the initial_position value is used onlythe first time Logstash reads from the Event Hub.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]   initial_position => "beginning"}

initial_position_look_back

  • Value type is number
  • Default value is 86400
  • Used only if initial_position is set to look-back

Number of seconds to look back to find the initial position for pre-existingevents. This option is used only if initial_position is set to look_back. Ifstorage_connection is set, this configuration applies only the first time Logstashreads from the Event Hub.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]   initial_position => "look_back"   initial_position_look_back => 86400}

max_batch_size

  • Value type is number
  • Default value is 125

Maximum number of events retrieved and processed together. A checkpoint iscreated after each batch. Increasing this value may help with performance, butrequires more memory.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]   max_batch_size => 125}

storage_connection

  • Value type is string
  • No default value

Connection string for blob account storage. Blob account storage persists theoffsets between restarts, and ensures that multiple instances of Logstashprocess different partitions.When this value is set, restarts resume where processing left off.When this value is not set, the initial_position value is used on every restart.

We strongly recommend that you define this value for production environments.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]   storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."}

storage_container

  • Value type is string
  • Defaults to the Event Hub name if not defined

Name of the storage container used to persist offsets and allow multiple instances of Logstashto work together.

azure_event_hubs {   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]   storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."   storage_container => "my_container"}

To avoid overwriting offsets, you can use different storage containers. This isparticularly important if you are monitoring two Event Hubs with the same name.You can use the advanced configuration model to configure different storagecontainers.

azure_event_hubs {     config_mode => "advanced"     consumer_group => "logstash"     storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."     event_hubs => [        {"insights-operational-logs" => {         event_hub_connection => "Endpoint=sb://example1..."         storage_container => "insights-operational-logs-1"        }},        {"insights-operational-logs" => {         event_hub_connection => "Endpoint=sb://example2..."         storage_container => "insights-operational-logs-2"        }}     ]   }

threads

  • Value type is number
  • Minimum value is 2
  • Default value is 4

Total number of threads used to process events. The value you set here appliesto all Event Hubs. Even with advanced configuration, this value is a globalsetting, and can’t be set per event hub.

azure_event_hubs {   threads => 4}

The number of threads should be the number of Event Hubs plus one or more.See Best practices for more information.

Common Options

The following configuration options are supported by all input plugins:

Setting Input type Required

add_field

hash

No

codec

codec

No

enable_metric

boolean

No

id

string

No

tags

array

No

type

string

No

Details

 

add_field

  • Value type is hash
  • Default value is {}

Add a field to an event

codec

  • Value type is codec
  • Default value is "plain"

The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.

enable_metric

  • Value type is boolean
  • Default value is true

Disable or enable metric logging for this specific plugin instanceby default we record all the metrics we can, but you can disable metrics collectionfor a specific plugin.

id

  • Value type is string
  • There is no default value for this setting.

Add a unique ID to the plugin configuration. If no ID is specified, Logstash will generate one.It is strongly recommended to set this ID in your configuration. This is particularly usefulwhen you have two or more plugins of the same type, for example, if you have 2 azure_event_hubs inputs.Adding a named ID in this case will help in monitoring Logstash when using the monitoring APIs.

input {  azure_event_hubs {    id => "my_plugin_id"  }}

tags

  • Value type is array
  • There is no default value for this setting.

Add any number of arbitrary tags to your event.

This can help with processing later.

type

  • Value type is string
  • There is no default value for this setting.

Add a type field to all events handled by this input.

Types are used mainly for filter activation.

The type is stored as part of the event itself, so you canalso use the type to search for it in Kibana.

If you try to set a type on an event that already has one (forexample when you send an event from a shipper to an indexer) thena new input will not override the existing type. A type set atthe shipper stays with that event for its life evenwhen sent to another Logstash server.