Reduce Transform

The Vector reduce transform reduces multiple log events into a single log event based on a set of conditions and merge strategies.

Configuration

[transforms.my_transform_id]
type = "reduce" # required
inputs = ["my-source-or-transform-id", "prefix-*"] # required
group_by = [] # optional, default
  • optionalstring

    ends_when

    A condition used to distinguish the final event of a transaction. If this condition resolves to true for an event, the current transaction is immediately flushed with this event.

    • Syntax: literal
  • optionaluint

    expire_after_ms

    A maximum period of time to wait after the last event is received before a combined event should be considered complete.

    • Default: 30000 (milliseconds)
  • optionaluint

    flush_period_ms

    Controls the frequency that Vector checks for (and flushes) expired events.

    • Default: 1000 (milliseconds)
  • commonoptional[string]

    group_by

    An ordered list of fields by which to group events. Each group is combined independently, allowing you to keep independent events separate. When no fields are specified, all events will be combined in a single group. Events missing a specified field will be combined in their own group.

    • Default: []
  • optionaltable

    merge_strategies

    A map of field names to custom merge strategies. For each field specified this strategy will be used for combining events rather than the default behavior.

    The default behavior is as follows:

    1. The first value of a string field is kept, subsequent values are discarded.
    2. For timestamp fields the first is kept and a new field [field-name]_end is added with the last received timestamp value.
    3. Numeric values are summed.
    • enumcommonrequiredstring

      *

      The custom merge strategy to use for a field.

      • Syntax: literal
      • Enum, must be one of: "array" "concat" "concat_newline" "discard" "sum" "max" "min"
  • optionalstring

    starts_when

    A condition used to distinguish the first event of a transaction. If this condition resolves to true for an event, the previous transaction is flushed (without this event) and a new transaction is started.

    • Syntax: literal

Telemetry

This component provides the following metrics that can be retrieved through the internal_metrics source. See the metrics section in the monitoring page for more info.

  • counter

    stale_events_flushed_total

    The number of stale events that Vector has flushed. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    events_in_total

    The total number of events accepted by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    processed_events_total

    The total number of events processed by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • file - The file that produced the error

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    events_out_total

    The total number of events emitted by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    processed_bytes_total

    The total number of bytes processed by the component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

Examples

Merge Ruby exceptions

Given the following Vector log event:

[
{
"timestamp": "2020-10-07T12:33:21.223543Z",
"message": "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)",
"host": "host-1.hostname.com",
"pid": 1234,
"tid": 5678
},
{
"timestamp": "2020-10-07T12:33:21.223543Z",
"message": " from foobar.rb:6:in `bar'",
"host": "host-1.hostname.com",
"pid": 1234,
"tid": 5678
},
{
"timestamp": "2020-10-07T12:33:21.223543Z",
"message": " from foobar.rb:2:in `foo'",
"host": "host-1.hostname.com",
"pid": 1234,
"tid": 5678
},
{
"timestamp": "2020-10-07T12:33:21.223543Z",
"message": " from foobar.rb:9:in `<main>'",
"host": "host-1.hostname.com",
"pid": 1234,
"tid": 5678
},
{
"timestamp": "2020-10-07T12:33:22.123528Z",
"message": "Hello world, I am a new log",
"host": "host-1.hostname.com",
"pid": 1234,
"tid": 5678
}
]

And the following configuration:

vector.toml
[transforms.reduce]
type = "reduce"
group_by = ["host", "pid", "tid"]
marge_strategies.message = "concat_newline"
starts_when = 'match(.message, /^[^\s]/)'

The following Vector log event will be output:

[
{
"timestamp": "2020-10-07T12:33:21.223543Z",
"message": "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n from foobar.rb:6:in `bar'\n from foobar.rb:2:in `foo'\n from foobar.rb:9:in `<main>'",
"host": "host-1.hostname.com",
"pid": 1234,
"tid": 5678
},
{
"timestamp": "2020-10-07T12:33:22.123528Z",
"message": "Hello world, I am a new log",
"host": "host-1.hostname.com",
"pid": 1234,
"tid": 5678
}
]

How It Works

State

This component is stateful, meaning its behavior changes based on previous inputs (events). State is not preserved across restarts, therefore state-dependent behavior will reset between restarts and depend on the inputs (events) received since the most recent restart.