For other versions, see theVersioned plugin docs.
For questions about the plugin, open a topic in the Discuss forums. For bugs or feature requests, open an issue in Github.For the list of Elastic supported plugins, please consult the Elastic Support Matrix.
The aim of this filter is to aggregate information available among several events (typically log lines) belonging to a same task,and finally push aggregated information into final task event.
You should be very careful to set Logstash filter workers to 1 (-w 1
flag) for this filter to work correctlyotherwise events may be processed out of sequence and unexpected results will occur.
INFO - 12345 - TASK_START - start INFO - 12345 - SQL - sqlQuery1 - 12 INFO - 12345 - SQL - sqlQuery2 - 34 INFO - 12345 - TASK_END - end
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TASK_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] += event.get('duration')" map_action => "update" } } if [logger] == "TASK_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" map_action => "update" end_of_task => true timeout => 120 } } }
{ "message" => "INFO - 12345 - TASK_END - end message", "sql_duration" => 46}
the field sql_duration
is added and contains the sum of all sql queries durations.
INFO - 12345 - SQL - sqlQuery1 - 12 INFO - 12345 - SQL - sqlQuery2 - 34 INFO - 12345 - TASK_END - end
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TASK_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } }
Third use case: You have no specific end event.
A typical case is aggregating or tracking user behaviour. We can track a user by its ID through the events, however once the user stops interacting, the events stop coming in. There is no specific event indicating the end of the user’s interaction.
In this case, we can enable the option push_map_as_event_on_timeout to enable pushing the aggregation map as a new event when a timeout occurs.In addition, we can enable timeout_code to execute code on the populated timeout event.We can also add timeout_task_id_field so we can correlate the task_id, which in this case would be the user’s ID.
INFO - 12345 - Clicked OneINFO - 12345 - Clicked TwoINFO - 12345 - Clicked Three
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ] } aggregate { task_id => "%{user_id}" code => "map['clicks'] ||= 0; map['clicks'] += 1;" push_map_as_event_on_timeout => true timeout_task_id_field => "user_id" timeout => 600 # 10 minutes timeout timeout_tags => ['_aggregatetimeout'] timeout_code => "event.set('several_clicks', event.get('clicks') > 1)" }}
{ "user_id": "12345", "clicks": 3, "several_clicks": true, "tags": [ "_aggregatetimeout" ]}
Fourth use case : like example #3, you have no specific end event, but also, tasks come one after the other.
That is to say : tasks are not interlaced. All task1 events come, then all task2 events come, …
In that case, you don’t want to wait task timeout to flush aggregation map.
SELECT country_name, town_name FROM town
{ "country_name": "France", "town_name": "Paris" } { "country_name": "France", "town_name": "Marseille" } { "country_name": "USA", "town_name": "New-York" }
{ "country_name": "France", "towns": [ {"town_name": "Paris"}, {"town_name": "Marseille"} ] } { "country_name": "USA", "towns": [ {"town_name": "New-York"} ] }
push_previous_map_as_event
aggregate plugin option :filter { aggregate { task_id => "%{country_name}" code => " map['country_name'] = event.get('country_name') map['towns'] ||= [] map['towns'] << {'town_name' => event.get('town_name')} event.cancel() " push_previous_map_as_event => true timeout => 3 } }
country_name
, it pushes previous aggregate map as a new Logstash event, and then creates a new empty map for the next countryevent.cancel()
)Fifth use case: like example #3, there is no end event.
Events keep comming for an indefinite time and you want to push the aggregation map as soon as possible after the last user interaction without waiting for the timeout
.
This allows to have the aggregated events pushed closer to real time.
A typical case is aggregating or tracking user behaviour.
We can track a user by its ID through the events, however once the user stops interacting, the events stop coming in.
There is no specific event indicating the end of the user’s interaction.
The user ineraction will be considered as ended when no events for the specified user (task_id) arrive after the specified inactivity_timeout`.
If the user continues interacting for longer than timeout
seconds (since first event), the aggregation map will still be deleted and pushed as a new event when timeout occurs.
The difference with example #3 is that the events will be pushed as soon as the user stops interacting for inactivity_timeout
seconds instead of waiting for the end of timeout
seconds since first event.
In this case, we can enable the option push_map_as_event_on_timeout to enable pushing the aggregation map as a new event when inactivity timeout occurs.
In addition, we can enable timeout_code to execute code on the populated timeout event.
We can also add timeout_task_id_field so we can correlate the task_id, which in this case would be the user’s ID.
INFO - 12345 - Clicked OneINFO - 12345 - Clicked TwoINFO - 12345 - Clicked Three
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ] } aggregate { task_id => "%{user_id}" code => "map['clicks'] ||= 0; map['clicks'] += 1;" push_map_as_event_on_timeout => true timeout_task_id_field => "user_id" timeout => 3600 # 1 hour timeout, user activity will be considered finished one hour after the first event, even if events keep comming inactivity_timeout => 300 # 5 minutes timeout, user activity will be considered finished if no new events arrive 5 minutes after the last event timeout_tags => ['_aggregatetimeout'] timeout_code => "event.set('several_clicks', event.get('clicks') > 1)" }}
{ "user_id": "12345", "clicks": 3, "several_clicks": true, "tags": [ "_aggregatetimeout" ]}
end_of_task => true
)code
execution raises an exception, the error is logged and event is tagged _aggregateexceptionThis plugin supports the following configuration options plus the Common Options described later.
Setting | Input type | Required |
---|---|---|
string, a valid filesystem path |
No |
|
Yes |
||
No |
||
No |
||
string, one of |
No |
|
No |
||
No |
||
Yes |
||
No |
||
No |
||
No |
||
No |
||
No |
Also see Common Options for a list of options supported by allfilter plugins.
The path to file where aggregate maps are stored when Logstash stopsand are loaded from when Logstash starts.
If not defined, aggregate maps will not be stored at Logstash stop and will be lost.Must be defined in only one aggregate filter per pipeline (as aggregate maps are shared at pipeline level).
Example:
filter { aggregate { aggregate_maps_path => "/path/to/.aggregate_maps" }}
The code to execute to update aggregated map, using current event.
Or on the contrary, the code to execute to update event, using aggregated map.
Available variables are :
event
: current Logstash event
map
: aggregated map associated to task_id
, containing key/value pairs. Data structure is a ruby Hash
map_meta
: meta informations associated to aggregate map. It allows to set a custom timeout
or inactivity_timeout
.It allows also to get creation_timestamp
, lastevent_timestamp
and task_id
.
When option push_map_as_event_on_timeout=true, if you set map_meta.timeout=0
in code
block, then aggregated map is immediately pushed as a new event.
Example:
filter { aggregate { code => "map['sql_duration'] += event.get('duration')" }}
false
Tell the filter that task is ended, and therefore, to delete aggregate map after code execution.
The amount of seconds (since the last event) after which a task is considered as expired.
When timeout occurs for a task, its aggregate map is evicted.
If push_map_as_event_on_timeout or push_previous_map_as_event is set to true, the task aggregation map is pushed as a new Logstash event.
inactivity_timeout
can be defined for each "task_id" pattern.
inactivity_timeout
must be lower than timeout
.
"create_or_update"
Tell the filter what to do with aggregate map.
"create"
: create the map, and execute the code only if map wasn’t created before
"update"
: doesn’t create the map, and execute the code only if map was created before
"create_or_update"
: create the map if it wasn’t created before, execute the code in all cases
false
When this option is enabled, each time a task timeout is detected, it pushes task aggregation map as a new Logstash event.This enables to detect and process task timeouts in Logstash, but also to manage tasks that have no explicit end event.
false
When this option is enabled, each time aggregate plugin detects a new task id, it pushes previous aggregate map as a new Logstash event,and then creates a new empty map for the next task.
this option works fine only if tasks come one after the other. It means : all task1 events, then all task2 events, etc…
The expression defining task ID to correlate logs.
This value must uniquely identify the task.
Example:
filter { aggregate { task_id => "%{type}%{my_task_id}" }}
1800
The amount of seconds (since the first event) after which a task is considered as expired.
When timeout occurs for a task, its aggregate map is evicted.
If push_map_as_event_on_timeout or push_previous_map_as_event is set to true, the task aggregation map is pushed as a new Logstash event.
Timeout can be defined for each "task_id" pattern.
The code to execute to complete timeout generated event, when 'push_map_as_event_on_timeout'
or 'push_previous_map_as_event'
is set to true.The code block will have access to the newly generated timeout event that is pre-populated with the aggregation map.
If 'timeout_task_id_field'
is set, the event is also populated with the task_id value
Example:
filter { aggregate { timeout_code => "event.set('state', 'timeout')" }}
[]
Defines tags to add when a timeout event is generated and yield
Example:
filter { aggregate { timeout_tags => ["aggregate_timeout"] }}
This option indicates the timeout generated event’s field where the current "task_id" value will be set.This can help to correlate which tasks have been timed out.
By default, if this option is not set, task id value won’t be set into timeout generated event.
Example:
filter { aggregate { timeout_task_id_field => "task_id" }}
By default, timeout is computed using system time, where Logstash is running.
When this option is set, timeout is computed using event timestamp field indicated in this option.It means that when a first event arrives on aggregate filter and induces a map creation, map creation time will be equal to this event timestamp.Then, each time a new event arrives on aggregate filter, event timestamp is compared to map creation time to check if timeout happened.
This option is particularly useful when processing old logs with option push_map_as_event_on_timeout => true
.It lets to generate aggregated events based on timeout on old logs, where system time is inappropriate.
Warning : so that this option works fine, it must be set on first aggregate filter.
Example:
filter { aggregate { timeout_timestamp_field => "@timestamp" }}
The following configuration options are supported by all filter plugins:
Setting | Input type | Required |
---|---|---|
No |
||
No |
||
No |
||
No |
||
No |
||
No |
||
No |
{}
If this filter is successful, add any arbitrary fields to this event.Field names can be dynamic and include parts of the event using the %{field}
.
Example:
filter { aggregate { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" } }}
# You can also add multiple fields at once:filter { aggregate { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" "new_field" => "new_static_value" } }}
If the event has field "somefield" == "hello"
this filter, on success,would add field foo_hello
if it is present, with thevalue above and the %{host}
piece replaced with that value from theevent. The second example would also add a hardcoded field.
[]
If this filter is successful, add arbitrary tags to the event.Tags can be dynamic and include parts of the event using the %{field}
syntax.
Example:
filter { aggregate { add_tag => [ "foo_%{somefield}" ] }}
# You can also add multiple tags at once:filter { aggregate { add_tag => [ "foo_%{somefield}", "taggedy_tag"] }}
If the event has field "somefield" == "hello"
this filter, on success,would add a tag foo_hello
(and the second example would of course add a taggedy_tag
tag).
true
Disable or enable metric logging for this specific plugin instanceby default we record all the metrics we can, but you can disable metrics collectionfor a specific plugin.
Add a unique ID
to the plugin configuration. If no ID is specified, Logstash will generate one.It is strongly recommended to set this ID in your configuration. This is particularly usefulwhen you have two or more plugins of the same type, for example, if you have 2 aggregate filters.Adding a named ID in this case will help in monitoring Logstash when using the monitoring APIs.
filter { aggregate { id => "ABC" }}
false
Call the filter flush method at regular interval.Optional.
[]
If this filter is successful, remove arbitrary fields from this event.Example:
filter { aggregate { remove_field => [ "foo_%{somefield}" ] }}
# You can also remove multiple fields at once:filter { aggregate { remove_field => [ "foo_%{somefield}", "my_extraneous_field" ] }}
If the event has field "somefield" == "hello"
this filter, on success,would remove the field with name foo_hello
if it is present. The secondexample would remove an additional, non-dynamic field.
[]
If this filter is successful, remove arbitrary tags from the event.Tags can be dynamic and include parts of the event using the %{field}
syntax.
Example:
filter { aggregate { remove_tag => [ "foo_%{somefield}" ] }}
# You can also remove multiple tags at once:filter { aggregate { remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"] }}
If the event has field "somefield" == "hello"
this filter, on success,would remove the tag foo_hello
if it is present. The second examplewould remove a sad, unwanted tag as well.