AWS Kinesis Firehose

Collect logs from AWS Kinesis Firehose

status: beta role: aggregator delivery: at-least-once acknowledgements: yes egress: batch state: stateless output: log

Requirements

AWS Kinesis Firehose can only deliver data over HTTP. You will need to solve TLS termination by fronting Vector with a load balancer or configuring the tls.* options.

Configuration

Example configurations

{
  "sources": {
    "my_source_id": {
      "type": "aws_kinesis_firehose",
      "address": "0.0.0.0:443"
    }
  }
}
[sources.my_source_id]
type = "aws_kinesis_firehose"
address = "0.0.0.0:443"
---
sources:
  my_source_id:
    type: aws_kinesis_firehose
    address: 0.0.0.0:443
{
  "sources": {
    "my_source_id": {
      "type": "aws_kinesis_firehose",
      "access_key": "A94A8FE5CCB19BA61C4C08",
      "access_keys": [
        "A94A8FE5CCB19BA61C4C08"
      ],
      "address": "0.0.0.0:443",
      "record_compression": "auto"
    }
  }
}
[sources.my_source_id]
type = "aws_kinesis_firehose"
access_key = "A94A8FE5CCB19BA61C4C08"
access_keys = [ "A94A8FE5CCB19BA61C4C08" ]
address = "0.0.0.0:443"
record_compression = "auto"
---
sources:
  my_source_id:
    type: aws_kinesis_firehose
    access_key: A94A8FE5CCB19BA61C4C08
    access_keys:
      - A94A8FE5CCB19BA61C4C08
    address: 0.0.0.0:443
    record_compression: auto

access_key

optional string literal

Deprecated

This option has been deprecated, use access_keys instead.

An optional access key to authenticate requests against.

AWS Kinesis Firehose can be configured to pass along a user-configurable access key with each request. If configured, access_key should be set to the same value. Otherwise, all requests will be allowed.

Examples
"A94A8FE5CCB19BA61C4C08"

access_keys

optional [string]

An optional list of access keys to authenticate requests against.

AWS Kinesis Firehose can be configured to pass along a user-configurable access key with each request. If configured, access_keys should be set to the same value. Otherwise, all requests will be allowed.

Array string literal
Examples
[
  "A94A8FE5CCB19BA61C4C08",
  "B94B8FE5CCB19BA61C4C12"
]

acknowledgements

optional object

Deprecated

This field is deprecated.

Controls how acknowledgements are handled by this source.

This setting is deprecated in favor of enabling acknowledgements at the global or sink level.

Enabling or disabling acknowledgements at the source level has no effect on acknowledgement behavior.

See End-to-end Acknowledgements for more information on how event acknowledgement is handled.

Whether or not end-to-end acknowledgements are enabled for this source.

address

required string literal
The socket address to listen for connections on.
Examples
"0.0.0.0:443"
"localhost:443"

decoding

optional object
Configures how events are decoded from raw bytes.

decoding.codec

optional string literal enum
The codec to use for decoding events.
Enum options
OptionDescription
bytesUses the raw bytes as-is.
gelfDecodes the raw bytes as a GELF message.
jsonDecodes the raw bytes as JSON.
native

Decodes the raw bytes as Vector’s native Protocol Buffers format.

This codec is experimental.

native_json

Decodes the raw bytes as Vector’s native JSON format.

This codec is experimental.

syslog

Decodes the raw bytes as a Syslog message.

Will decode either as the RFC 3164-style format (“old” style) or the more modern RFC 5424-style format (“new” style, includes structured data).

default: bytes

framing

optional object

Framing configuration.

Framing deals with how events are separated when encoded in a raw byte form, where each event is a “frame” that must be prefixed, or delimited, in a way that marks where an event begins and ends within the byte stream.

Options for the character delimited decoder.
Relevant when: method = "character_delimited"
The character that delimits byte sequences.

The maximum length of the byte buffer.

This length does not include the trailing delimiter.

framing.method

optional string literal enum
The framing method.
Enum options
OptionDescription
bytesByte frames are passed through as-is according to the underlying I/O boundaries (e.g. split between messages or stream segments).
character_delimitedByte frames which are delimited by a chosen character.
length_delimitedByte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
newline_delimitedByte frames which are delimited by a newline character.
octet_countingByte frames according to the octet counting format.
default: bytes
Options for the newline delimited decoder.
Relevant when: method = "newline_delimited"

The maximum length of the byte buffer.

This length does not include the trailing delimiter.

framing.octet_counting

optional object
Options for the octet counting decoder.
Relevant when: method = "octet_counting"
The maximum length of the byte buffer.

record_compression

optional string literal enum

The compression scheme to use for decompressing records within the Firehose message.

Some services, like AWS CloudWatch Logs, will compress the events with gzip, before sending them AWS Kinesis Firehose. This option can be used to automatically decompress them before forwarding them to the next component.

Note that this is different from Content encoding option of the Firehose HTTP endpoint destination. That option controls the content encoding of the entire HTTP request.

Enum options string literal
OptionDescription
auto

Automatically attempt to determine the compression scheme.

The compression scheme of the object is determined by looking at its file signature, also known as magic bytes.

If the record fails to decompress with the discovered format, the record is forwarded as is. Thus, if you know the records are always gzip encoded (for example, if they are coming from AWS CloudWatch Logs), set gzip in this field so that any records that are not-gzipped are rejected.

gzipGZIP.
noneUncompressed.
default: auto

store_access_key

required bool

Whether or not to store the AWS Firehose Access Key in event secrets.

If set to true, when incoming requests contains an Access Key sent by AWS Firehose, it will be kept in the event secrets as “aws_kinesis_firehose_access_key”.

tls

optional object
Configures the TLS options for incoming/outgoing connections.

tls.alpn_protocols

optional [string]

Sets the list of supported ALPN protocols.

Declare the supported ALPN protocols, which are used during negotiation with peer. Prioritized in the order they are defined.

tls.ca_file

optional string literal

Absolute path to an additional CA certificate file.

The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.

Examples
"/path/to/certificate_authority.crt"

tls.crt_file

optional string literal

Absolute path to a certificate file used to identify this server.

The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.

If this is set, and is not a PKCS#12 archive, key_file must also be set.

Examples
"/path/to/host_certificate.crt"

tls.enabled

optional bool

Whether or not to require TLS for incoming/outgoing connections.

When enabled and used for incoming connections, an identity certificate is also required. See tls.crt_file for more information.

tls.key_file

optional string literal

Absolute path to a private key file used to identify this server.

The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.

Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal

Passphrase used to unlock the encrypted key file.

This has no effect unless key_file is set.

Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"

Enables certificate verification.

If enabled, certificates must be valid in terms of not being expired, as well as being issued by a trusted issuer. This verification operates in a hierarchical manner, checking that not only the leaf certificate (the certificate presented by the client/server) is valid, but also that the issuer of that certificate is valid, and so on until reaching a root certificate.

Relevant for both incoming and outgoing connections.

Do NOT set this to false unless you understand the risks of not verifying the validity of certificates.

tls.verify_hostname

optional bool

Enables hostname verification.

If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.

Only relevant for outgoing connections.

Do NOT set this to false unless you understand the risks of not verifying the remote hostname.

Outputs

<component_id>

Default output stream of the component. Use this component’s ID as an input to downstream transforms and sinks.

Output Data

Logs

Line

One event will be published per incoming AWS Kinesis Firehose record.
Fields
message required string literal
The raw record from the incoming payload.
Examples
Started GET / for 127.0.0.1 at 2012-03-10 14:28:14 +0100
request_id required string literal
The AWS Kinesis Firehose request ID, value of the X-Amz-Firehose-Request-Id header.
Examples
ed1d787c-b9e2-4631-92dc-8e7c9d26d804
source_arn required string literal
The AWS Kinises Firehose delivery stream that issued the request, value of the X-Amz-Firehose-Source-Arn header.
Examples
arn:aws:firehose:us-east-1:111111111111:deliverystream/test
source_type required string literal
The name of the source type.
Examples
aws_kinesis_firehose
timestamp required timestamp
The exact time the event was ingested into Vector.
Examples
2020-10-10T17:07:36.452332Z

Telemetry

Metrics

link

component_discarded_events_total

counter
The number of events dropped by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

component_errors_total

counter
The total number of errors encountered by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
error_type
The type of the error
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
stage
The stage within the component at which the error occurred.

component_received_bytes_total

counter
The number of raw bytes accepted by this component from source origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

events_in_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins. This metric is deprecated and will be removed in a future version. Use component_received_events_total instead.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

events_out_total

counter
The total number of events emitted by this component. This metric is deprecated and will be removed in a future version. Use component_sent_events_total instead.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

processed_bytes_total

counter
The number of bytes processed by the component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the bytes originate.
file optional
The file from which the bytes originate.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the bytes originate.
peer_path optional
The pathname from which the bytes originate.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the bytes originate.
uri optional
The sanitized URI from which the bytes originate.

request_automatic_decode_errors_total

counter
The total number of request errors for this component when it attempted to automatically discover and handle the content-encoding of incoming request data.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

request_read_errors_total

counter
The total number of request read errors for this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

requests_received_total

counter
The total number of requests received by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

source_lag_time_seconds

histogram
The difference between the timestamp recorded in each event and the time when it was ingested, expressed as fractional seconds.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

Examples

AWS CloudWatch Subscription message

Given this event...
{
  "requestId": "ed1d787c-b9e2-4631-92dc-8e7c9d26d804",
  "timestamp": 1600110760138,
  "records": [
	{
	  "data": "H4sIABk1bV8AA52TzW7bMBCE734KQ2db/JdI3QzETS8FAtg91UGgyOuEqCQq5Mqua+TdS8lu0hYNUpQHAdoZDcn9tKfJdJo0EEL5AOtjB0kxTa4W68Xdp+VqtbheJrPB4A4t+EFiv6yzVLuHa+/6blARAr5UV+ihbH4vh/4+VN52aF37wdYIPkTDlyhF8SrabFsOWhIrtz+Dlnto8dV3Gp9RstshXKhMi0xpqk3GpNJccpFRKYw0WvCM5kIbzrVWipm4VK55rrSk44HGHLTx/lg2wxVYRiljVGWGCvPiuPRn2O60Se6P8UKbpOBZrulsk2xLhCEjljYJk2QFHeGU04KxQqpCsumcSko3SfQ+uoBnn8pTJmjKWZYyI0axAXx021G++bweS5136CpXj8WP6/UNYek5ycMOPPhReETsQkHI4XBIO2/bynZlXXkXwryrS9w536TWkab0XwED6e/tU2/R9eGS9NTD5VgEvnWwtQikcu0e/AO0FYyu4HpfwR3Gf2R0Btza9qxgiUNUISiLr30AP7fbyMzu7OWA803ynIzdfJ69B1EZpoVhsWMRZ8a5UVJoRoUyUlDNspxzZWiEnOXiXYiSvQOR5TnN/xsiNalmKZcy5Yr/yfB6+RZD/gbDC0IbOx8wQrMhxGGYx4lBW5X1wJBLkpO981jWf6EXogvIrm+rYYrKOn4Hgbg4b439/s8cFeVvcNwBtHBkOdWvQIdRnTxPfgCXvyEgSQQAAA=="
	}
  ]
}
...and this configuration...
[sources.my_source_id]
type = "aws_kinesis_firehose"
address = "0.0.0.0:443"
---
sources:
  my_source_id:
    type: aws_kinesis_firehose
    address: 0.0.0.0:443
{
  "sources": {
    "my_source_id": {
      "type": "aws_kinesis_firehose",
      "address": "0.0.0.0:443"
    }
  }
}
...this Vector event is produced:
[{"log":{"message":"{\"messageType\":\"DATA_MESSAGE\",\"owner\":\"111111111111\",\"logGroup\":\"test\",\"logStream\":\"test\",\"subscriptionFilters\":[\"Destination\"],\"logEvents\":[{\"id\":\"35683658089614582423604394983260738922885519999578275840\",\"timestamp\":1600110569039,\"message\":\"{\\\"bytes\\\":26780,\\\"datetime\\\":\\\"14/Sep/2020:11:45:41 -0400\\\",\\\"host\\\":\\\"157.130.216.193\\\",\\\"method\\\":\\\"PUT\\\",\\\"protocol\\\":\\\"HTTP/1.0\\\",\\\"referer\\\":\\\"https://www.principalcross-platform.io/markets/ubiquitous\\\",\\\"request\\\":\\\"/expedite/convergence\\\",\\\"source_type\\\":\\\"stdin\\\",\\\"status\\\":301,\\\"user-identifier\\\":\\\"-\\\"}\"},{\"id\":\"35683658089659183914001456229543810359430816722590236673\",\"timestamp\":1600110569041,\"message\":\"{\\\"bytes\\\":17707,\\\"datetime\\\":\\\"14/Sep/2020:11:45:41 -0400\\\",\\\"host\\\":\\\"109.81.244.252\\\",\\\"method\\\":\\\"GET\\\",\\\"protocol\\\":\\\"HTTP/2.0\\\",\\\"referer\\\":\\\"http://www.investormission-critical.io/24/7/vortals\\\",\\\"request\\\":\\\"/scale/functionalities/optimize\\\",\\\"source_type\\\":\\\"stdin\\\",\\\"status\\\":502,\\\"user-identifier\\\":\\\"feeney1708\\\"}\"}]}","request_id":"ed1d787c-b9e2-4631-92dc-8e7c9d26d804","source_arn":"arn:aws:firehose:us-east-1:111111111111:deliverystream/test","source_type":"aws_kinesis_firehose","timestamp":"2020-09-14T19:12:40.138Z"}}]

How it works

Context

By default, the aws_kinesis_firehose source augments events with helpful context keys.

State

This component is stateless, meaning its behavior is consistent across each input.

Forwarding CloudWatch Log events

This source is the recommended way to ingest logs from AWS CloudWatch logs via AWS CloudWatch Log subscriptions. To set this up:

  1. Deploy vector with a publicly exposed HTTP endpoint using this source. You will likely also want to use the parse_aws_cloudwatch_log_subscription_message function to extract the log events. Make sure to set the access_keys to secure this endpoint. Your configuration might look something like:

     [sources.firehose]
     # General
     type = "aws_kinesis_firehose"
     address = "127.0.0.1:9000"
     access_keys = ["secret"]
    
     [transforms.cloudwatch]
     type = "remap"
     inputs = ["firehose"]
     drop_on_error = false
     source = '''
     parsed = parse_aws_cloudwatch_log_subscription_message!(.message)
     . = unnest(parsed.log_events)
     . = map_values(.) -> |value| {
         event = del(value.log_events)
         value |= event
         message = del(.message)
         . |= object!(parse_json!(message))
     }
     '''
    
     [sinks.console]
     type = "console"
     inputs = ["cloudwatch"]
     encoding.codec = "json"
    
  2. Create a Kinesis Firehose delivery stream in the region where the CloudWatch Logs groups exist that you want to ingest.

  3. Set the stream to forward to your Vector instance via its HTTP Endpoint destination. Make sure to configure the same access_keys you set earlier.

  4. Setup a CloudWatch Logs subscription to forward the events to your delivery stream

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols. You can adjust TLS behavior via the tls.* options.