AMQP

Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ

status: beta role: aggregator delivery: at-least-once acknowledgements: no egress: stream state: stateless output: log

Configuration

Example configurations

{
  "sources": {
    "my_source_id": {
      "type": "amqp",
      "connection": null,
      "group_id": "consumer-group-name",
      "routing_key_field": "routing",
      "exchange_key": "exchange",
      "offset_key": "offset"
    }
  }
}
[sources.my_source_id]
type = "amqp"
group_id = "consumer-group-name"
routing_key_field = "routing"
exchange_key = "exchange"
offset_key = "offset"
---
sources:
  my_source_id:
    type: amqp
    connection: null
    group_id: consumer-group-name
    routing_key_field: routing
    exchange_key: exchange
    offset_key: offset
{
  "sources": {
    "my_source_id": {
      "type": "amqp",
      "connection": null,
      "group_id": "consumer-group-name",
      "routing_key_field": "routing",
      "exchange_key": "exchange",
      "offset_key": "offset"
    }
  }
}
[sources.my_source_id]
type = "amqp"
group_id = "consumer-group-name"
routing_key_field = "routing"
exchange_key = "exchange"
offset_key = "offset"
---
sources:
  my_source_id:
    type: amqp
    connection: null
    group_id: consumer-group-name
    routing_key_field: routing
    exchange_key: exchange
    offset_key: offset

connection

required object
Connection options for the AMQP source.

connection.connection_string

required string literal
Connection string to use when connecting to an AMQP server in the format of amqp://user:password@host:port/vhost?timeout=seconds. The default vhost can be represented as %2f.
Examples
"amqp://user:password@127.0.0.1:5672/%2f?timeout=10"

exchange_key

common optional string literal
The log field name to use for the AMQP exchange key.
Examples
"exchange"
default: exchange

group_id

required string literal
The consumer group name to be used to consume events from AMQP.
Examples
"consumer-group-name"

offset_key

common optional string literal
The log field name to use for the AMQP offset key.
Examples
"offset"
default: offset

routing_key_field

common optional string literal
The log field name to use for the AMQP routing key.
Examples
"routing"
default: routing

Outputs

<component_id>

Default output stream of the component. Use this component’s ID as an input to downstream transforms and sinks.

Output Data

Logs

Record

An individual AMQP record.
Fields
exchange required string literal
The AMQP exchange that the record came from.
Examples
topic
message required string literal
The raw line from the AMQP record.
Examples
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
offset required uint
The AMQP offset at the time the record was retrieved.
Examples
100
timestamp required timestamp
The timestamp encoded in the AMQP message or the current time if it cannot be fetched.
Examples
2020-10-10T17:07:36.452332Z

Telemetry

Metrics

link

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

consumer_offset_updates_failed_total

counter
The total number of failures to update a Kafka consumer offset.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

events_failed_total

counter
The total number of failures to read a Kafka message.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

events_in_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins. This metric is deprecated and will be removed in a future version. Use component_received_events_total instead.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

events_out_total

counter
The total number of events emitted by this component. This metric is deprecated and will be removed in a future version. Use component_sent_events_total instead.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

processed_bytes_total

counter
The number of bytes processed by the component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the bytes originate.
file optional
The file from which the bytes originate.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the bytes originate.
peer_path optional
The pathname from which the bytes originate.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the bytes originate.
uri optional
The sanitized URI from which the bytes originate.

processed_events_total

counter
The total number of events processed by this component. This metric is deprecated in place of using component_received_events_total and component_sent_events_total metrics.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

source_lag_time_seconds

histogram
The difference between the timestamp recorded in each event and the time when it was ingested, expressed as fractional seconds.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

How it works

Context

By default, the amqp source augments events with helpful context keys.

Lapin

The amqp source and sink uses lapin under the hood. This is a reliable pure rust library that facilitates communication with AMPQ servers such as RabbitMQ.

State

This component is stateless, meaning its behavior is consistent across each input.