Persistent Queues

By default, Logstash uses in-memory bounded queues between pipeline stages(inputs → pipeline workers) to buffer events. The size of these in-memoryqueues is fixed and not configurable. If Logstash experiences a temporarymachine failure, the contents of the in-memory queue will be lost. Temporary machinefailures are scenarios where Logstash or its host machine are terminatedabnormally but are capable of being restarted.

In order to protect against data loss during abnormal termination, Logstash hasa persistent queue feature which will store the message queue on disk.Persistent queues provide durability of data within Logstash.

Persistent queues are also useful for Logstash deployments that need large buffers.Instead of deploying and managing a message broker, such as Redis, RabbitMQ, orApache Kafka, to facilitate a buffered publish-subscriber model, you can enablepersistent queues to buffer events on disk and remove the message broker.

In summary, the benefits of enabling persistent queues are as follows:

  • Absorbs bursts of events without needing an external buffering mechanism likeRedis or Apache Kafka.
  • Provides an at-least-once delivery guarantee against message loss duringa normal shutdown as well as when Logstash is terminated abnormally. If Logstashis restarted while events are in-flight, Logstash will attempt to delivermessages stored in the persistent queue until delivery succeeds at least once.

    Note

    You must set queue.checkpoint.writes: 1 explicitly to guaranteemaximum durability for all input events. See Controlling Durability.

Limitations of Persistent Queues

The following are problems not solved by the persistent queue feature:

  • Input plugins that do not use a request-response protocol cannot be protected from data loss. For example: tcp, udp, zeromq push+pull, and many other inputs do not have a mechanism to acknowledge receipt to the sender. Plugins such as beats and http, which do have an acknowledgement capability, are well protected by this queue.
  • It does not handle permanent machine failures such as disk corruption, disk failure, and machine loss. The data persisted to disk is not replicated.

How Persistent Queues Work

The queue sits between the input and filter stages in the sameprocess:

input → queue → filter + output

When an input has events ready to process, it writes them to the queue. Whenthe write to the queue is successful, the input can send an acknowledgement toits data source.

When processing events from the queue, Logstash acknowledges events ascompleted, within the queue, only after filters and outputs have completed.The queue keeps a record of events that have been processed by the pipeline.An event is recorded as processed (in this document, called "acknowledged" or"ACKed") if, and only if, the event has been processed completely by theLogstash pipeline.

What does acknowledged mean? This means the event has been handled by allconfigured filters and outputs. For example, if you have only one output,Elasticsearch, an event is ACKed when the Elasticsearch output has successfullysent this event to Elasticsearch.

During a normal shutdown (CTRL+C or SIGTERM), Logstash will stop readingfrom the queue and will finish processing the in-flight events being processedby the filters and outputs. Upon restart, Logstash will resume processing theevents in the persistent queue as well as accepting new events from inputs.

If Logstash is abnormally terminated, any in-flight events will not have beenACKed and will be reprocessed by filters and outputs when Logstash isrestarted. Logstash processes events in batches, so it is possiblethat for any given batch, some of that batch may have been successfullycompleted, but not recorded as ACKed, when an abnormal termination occurs.

For more details specific behaviors of queue writes and acknowledgement, seeControlling Durability.

Configuring Persistent Queues

To configure persistent queues, you can specify the following options in theLogstash settings file:

  • queue.type: Specify persisted to enable persistent queues. By default, persistent queues are disabled (default: queue.type: memory).
  • path.queue: The directory path where the data files will be stored. By default, the files are stored in path.data/queue.
  • queue.page_capacity: The maximum size of a queue page in bytes. The queue data consists of append-only files called "pages". The default size is 64mb. Changing this value is unlikely to have performance benefits.
  • queue.drain: Specify true if you want Logstash to wait until the persistent queue is drained before shutting down. The amount of time it takes to drain the queue depends on the number of events that have accumulated in the queue. Therefore, you should avoid using this setting unless the queue, even when full, is relatively small and can be drained quickly.
  • queue.max_events: The maximum number of events that are allowed in the queue. The default is 0 (unlimited).
  • queue.max_bytes: The total capacity of the queue in number of bytes. Thedefault is 1024mb (1gb). Make sure the capacity of your disk drive is greaterthan the value you specify here.

    Tip

    If you are using persistent queues to protect against data loss, but don’trequire much buffering, you can set queue.max_bytes to a smaller value, suchas 10mb, to produce smaller queues and improve queue performance.

If both queue.max_events andqueue.max_bytes are specified, Logstash uses whichever criteria is reachedfirst. See Handling Back Pressure for behavior when these queue limits are reached.

You can also control when the checkpoint file gets updated by setting queue.checkpoint.writes. See Controlling Durability.

Example configuration:

queue.type: persistedqueue.max_bytes: 4gb

Handling Back Pressure

When the queue is full, Logstash puts back pressure on the inputs to stall dataflowing into Logstash. This mechanism helps Logstash control the rate of dataflow at the input stage without overwhelming outputs like Elasticsearch.

Use queue.max_bytes setting to configure the total capacity of the queue ondisk. The following example sets the total capacity of the queue to 8gb:

queue.type: persistedqueue.max_bytes: 8gb

With these settings specified, Logstash will buffer events on disk until thesize of the queue reaches 8gb. When the queue is full of unACKed events, andthe size limit has been reached, Logstash will no longer accept new events.

Each input handles back pressure independently. For example, when thebeats input encounters back pressure, it no longeraccepts new connections and waits until the persistent queue has space to acceptmore events. After the filter and output stages finish processing existingevents in the queue and ACKs them, Logstash automatically starts accepting newevents.

Controlling Durability

Durability is a property of storage writes that ensures data will be available after it’s written.

When the persistent queue feature is enabled, Logstash will store events ondisk. Logstash commits to disk in a mechanism called checkpointing.

To discuss durability, we need to introduce a few details about how the persistent queue is implemented.

First, the queue itself is a set of pages. There are two kinds of pages: head pages and tail pages. The head page is where new events are written. There is only one head page. When the head page is of a certain size (see queue.page_capacity), it becomes a tail page, and a new head page is created. Tail pages are immutable, and the head page is append-only.Second, the queue records details about itself (pages, acknowledgements, etc) in a separate file called a checkpoint file.

When recording a checkpoint, Logstash will:

  • Call fsync on the head page.
  • Atomically write to disk the current state of the queue.

The process of checkpointing is atomic, which means any update to the file is saved if successful.

If Logstash is terminated, or if there is a hardware-level failure, any datathat is buffered in the persistent queue, but not yet checkpointed, is lost.

You can force Logstash to checkpoint more frequently by settingqueue.checkpoint.writes. This setting specifies the maximum number of eventsthat may be written to disk before forcing a checkpoint. The default is 1024. Toensure maximum durability and avoid losing data in the persistent queue, you canset queue.checkpoint.writes: 1 to force a checkpoint after each event iswritten. Keep in mind that disk writes have a resource cost. Setting this valueto 1 can severely impact performance.

Disk Garbage Collection

On disk, the queue is stored as a set of pages where each page is one file. Each page can be at most queue.page_capacity in size. Pages are deleted (garbage collected) after all events in that page have been ACKed. If an older page has at least one event that is not yet ACKed, that entire page will remain on disk until all events in that page are successfully processed. Each page containing unprocessed events will count against the queue.max_bytes byte size.