Reduce

Collapse multiple log events into a single event based on a set of conditions and merge strategies

status: beta egress: stream state: stateful

Configuration

Example configurations

{
  "transforms": {
    "my_transform_id": {
      "type": "reduce",
      "inputs": "my-source-or-transform-id",
      "group_by": "request_id"
    }
  }
}
[transforms.my_transform_id]
type = "reduce"
inputs = "my-source-or-transform-id"
group_by = "request_id"
---
transforms:
  my_transform_id:
    type: reduce
    inputs: my-source-or-transform-id
    group_by: request_id
{
  "transforms": {
    "my_transform_id": {
      "type": "reduce",
      "inputs": "my-source-or-transform-id",
      "ends_when": ".status_code != 200 && !includes([\"info\", \"debug\"], .severity)",
      "expire_after_ms": 30000,
      "flush_period_ms": 1000,
      "group_by": "request_id",
      "starts_when": ".status_code != 200 && !includes([\"info\", \"debug\"], .severity)"
    }
  }
}
[transforms.my_transform_id]
type = "reduce"
inputs = "my-source-or-transform-id"
ends_when = '.status_code != 200 && !includes(["info", "debug"], .severity)'
expire_after_ms = 30_000
flush_period_ms = 1_000
group_by = "request_id"
starts_when = '.status_code != 200 && !includes(["info", "debug"], .severity)'
---
transforms:
  my_transform_id:
    type: reduce
    inputs: my-source-or-transform-id
    ends_when: .status_code != 200 && !includes(["info", "debug"], .severity)
    expire_after_ms: 30000
    flush_period_ms: 1000
    group_by: request_id
    merge_strategies: null
    starts_when: .status_code != 200 && !includes(["info", "debug"], .severity)

ends_when

optional string
A condition used to distinguish the final event of a transaction. If this condition resolves to true for an event, the current transaction is immediately flushed with this event.

expire_after_ms

optional uint
A maximum period of time to wait after the last event is received before a combined event should be considered complete.
default: 30000 (milliseconds)

flush_period_ms

optional uint
Controls the frequency that Vector checks for (and flushes) expired events.
default: 1000 (milliseconds)

group_by

common optional [string]
An ordered list of fields by which to group events. Each group is combined independently, allowing you to keep independent events separate. When no fields are specified, all events will be combined in a single group. Events missing a specified field will be combined in their own group.
Array string literal
Examples
[
  "request_id",
  "user_id",
  "transaction_id"
]

inputs

required [string]

A list of upstream source or transform IDs. Wildcards (*) are supported but must be the last character in the ID.

See configuration for more info.

Array string literal
Examples
[
  "my-source-or-transform-id",
  "prefix-*"
]

merge_strategies

optional object

A map of field names to custom merge strategies. For each field specified this strategy will be used for combining events rather than the default behavior.

The default behavior is as follows:

  1. The first value of a string field is kept, subsequent values are discarded.
  2. For timestamp fields the first is kept and a new field [field-name]_end is added with the last received timestamp value.
  3. Numeric values are summed.

merge_strategies.*

required string enum literal
The custom merge strategy to use for a field.
Enum options
OptionDescription
arrayEach value is appended to an array.
concatConcatenate each string value (delimited with a space).
concat_newlineConcatenate each string value (delimited with a newline).
discardDiscard all but the first value found.
maxThe maximum of all numeric values.
minThe minimum of all numeric values.
sumSum all numeric values.

starts_when

optional string
A condition used to distinguish the first event of a transaction. If this condition resolves to true for an event, the previous transaction is flushed (without this event) and a new transaction is started.

Telemetry

Metrics

link

events_in_total

counter
The number of events accepted by this component either from tagged origin like file and uri, or cumulatively from other origins.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
container_name optional
The name of the container from which the event originates.
file optional
The file from which the event originates.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the event originates.
peer_path optional
The pathname from which the event originates.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the event originates.
uri optional
The sanitized URI from which the event originates.

events_out_total

counter
The total number of events emitted by this component.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

processed_bytes_total

counter
The number of bytes processed by the component.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
container_name optional
The name of the container from which the bytes originate.
file optional
The file from which the bytes originate.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the bytes originate.
peer_path optional
The pathname from which the bytes originate.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the bytes originate.
uri optional
The sanitized URI from which the bytes originate.

processed_events_total

counter
The total number of events processed by this component. This metric is deprecated in place of using events_in_total and events_out_total metrics.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

stale_events_flushed_total

counter
The number of stale events that Vector has flushed.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

Examples

Merge Ruby exceptions

Given this event...
[
  {
    "log": {
      "host": "host-1.hostname.com",
      "message": "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)",
      "pid": 1234,
      "tid": 5678,
      "timestamp": "2020-10-07T12:33:21.223543Z"
    }
  },
  {
    "log": {
      "host": "host-1.hostname.com",
      "message": "    from foobar.rb:6:in `bar'",
      "pid": 1234,
      "tid": 5678,
      "timestamp": "2020-10-07T12:33:21.223543Z"
    }
  },
  {
    "log": {
      "host": "host-1.hostname.com",
      "message": "    from foobar.rb:2:in `foo'",
      "pid": 1234,
      "tid": 5678,
      "timestamp": "2020-10-07T12:33:21.223543Z"
    }
  },
  {
    "log": {
      "host": "host-1.hostname.com",
      "message": "    from foobar.rb:9:in `\u003cmain\u003e'",
      "pid": 1234,
      "tid": 5678,
      "timestamp": "2020-10-07T12:33:21.223543Z"
    }
  },
  {
    "log": {
      "host": "host-1.hostname.com",
      "message": "Hello world, I am a new log",
      "pid": 1234,
      "tid": 5678,
      "timestamp": "2020-10-07T12:33:22.123528Z"
    }
  }
]
...and this configuration...
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "host", "pid", "tid" ]
starts_when = "match(.message, /^[^\\s]/)"

  [transforms.my_transform_id.merge_strategies]
  message = "concat_newline"
---
transforms:
  my_transform_id:
    type: reduce
    inputs:
      - my-source-or-transform-id
    group_by:
      - host
      - pid
      - tid
    merge_strategies:
      message: concat_newline
    starts_when: match(.message, /^[^\s]/)
{
  "transforms": {
    "my_transform_id": {
      "type": "reduce",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "group_by": [
        "host",
        "pid",
        "tid"
      ],
      "merge_strategies": {
        "message": "concat_newline"
      },
      "starts_when": "match(.message, /^[^\\s]/)"
    }
  }
}
...this Vector event is produced:
[
  {
    "log": {
      "host": "host-1.hostname.com",
      "message": "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n    from foobar.rb:6:in `bar'\n    from foobar.rb:2:in `foo'\n    from foobar.rb:9:in `\u003cmain\u003e'",
      "pid": 1234,
      "tid": 5678,
      "timestamp": "2020-10-07T12:33:21.223543Z"
    }
  },
  {
    "log": {
      "host": "host-1.hostname.com",
      "message": "Hello world, I am a new log",
      "pid": 1234,
      "tid": 5678,
      "timestamp": "2020-10-07T12:33:22.123528Z"
    }
  }
]

Reduce Rails logs into a single transaction

Given this event...
[
  {
    "log": {
      "message": "Received GET /path",
      "request_id": "abcd1234",
      "request_params": {
        "key": "val"
      },
      "request_path": "/path",
      "timestamp": "2020-10-07T12:33:21.223543Z"
    }
  },
  {
    "log": {
      "message": "Executed query in 5.2ms",
      "query": "SELECT * FROM table",
      "query_duration_ms": 5.2,
      "request_id": "abcd1234",
      "timestamp": "2020-10-07T12:33:21.832345Z"
    }
  },
  {
    "log": {
      "message": "Rendered partial _partial.erb in 2.3ms",
      "render_duration_ms": 2.3,
      "request_id": "abcd1234",
      "template": "_partial.erb",
      "timestamp": "2020-10-07T12:33:22.457423Z"
    }
  },
  {
    "log": {
      "message": "Executed query in 7.8ms",
      "query": "SELECT * FROM table",
      "query_duration_ms": 7.8,
      "request_id": "abcd1234",
      "timestamp": "2020-10-07T12:33:22.543323Z"
    }
  },
  {
    "log": {
      "message": "Sent 200 in 15.2ms",
      "request_id": "abcd1234",
      "response_duration_ms": 5.2,
      "response_status": 200,
      "timestamp": "2020-10-07T12:33:22.742322Z"
    }
  }
]
...and this configuration...
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
---
transforms:
  my_transform_id:
    type: reduce
    inputs:
      - my-source-or-transform-id
{
  "transforms": {
    "my_transform_id": {
      "type": "reduce",
      "inputs": [
        "my-source-or-transform-id"
      ]
    }
  }
}
...this Vector event is produced:
{
  "log": {
    "query_duration_ms": 13,
    "render_duration_ms": 2.3,
    "request_id": "abcd1234",
    "request_params": {
      "key": "val"
    },
    "request_path": "/path",
    "response_duration_ms": 5.2,
    "status": 200,
    "timestamp": "2020-10-07T12:33:21.223543Z",
    "timestamp_end": "2020-10-07T12:33:22.742322Z"
  }
}

How it works

State

This component is stateful, meaning its behavior changes based on previous inputs (events). State is not preserved across restarts, therefore state-dependent behavior will reset between restarts and depend on the inputs (events) received since the most recent restart.