AWS Kinesis Streams logs

Publish logs to AWS Kinesis Streams topics

status: stable delivery: at-least-once egress: batch state: stateless

Configuration

Example configurations

{
  "sinks": {
    "my_sink_id": {
      "type": "aws_kinesis_streams",
      "inputs": "my-source-or-transform-id",
      "partition_key_field": "user_id",
      "compression": "none",
      "region": "us-east-1",
      "stream_name": "my-stream"
    }
  }
}
[sinks.my_sink_id]
type = "aws_kinesis_streams"
inputs = "my-source-or-transform-id"
partition_key_field = "user_id"
compression = "none"
region = "us-east-1"
stream_name = "my-stream"
---
sinks:
  my_sink_id:
    type: aws_kinesis_streams
    inputs: my-source-or-transform-id
    partition_key_field: user_id
    compression: none
    encoding: null
    healthcheck: null
    region: us-east-1
    stream_name: my-stream
{
  "sinks": {
    "my_sink_id": {
      "type": "aws_kinesis_streams",
      "inputs": "my-source-or-transform-id",
      "endpoint": "127.0.0.0:5000/path/to/service",
      "partition_key_field": "user_id",
      "compression": "none",
      "region": "us-east-1",
      "stream_name": "my-stream"
    }
  }
}
[sinks.my_sink_id]
type = "aws_kinesis_streams"
inputs = "my-source-or-transform-id"
endpoint = "127.0.0.0:5000/path/to/service"
partition_key_field = "user_id"
compression = "none"
region = "us-east-1"
stream_name = "my-stream"
---
sinks:
  my_sink_id:
    type: aws_kinesis_streams
    inputs: my-source-or-transform-id
    auth: null
    endpoint: 127.0.0.0:5000/path/to/service
    partition_key_field: user_id
    buffer: null
    batch: null
    compression: none
    encoding: null
    healthcheck: null
    request: null
    proxy: null
    region: us-east-1
    stream_name: my-stream

auth

optional object
Options for the authentication strategy.

auth.access_key_id

optional string literal
The AWS access key id. Used for AWS authentication when communicating with AWS services.

auth.assume_role

optional string literal
The ARN of an IAM role to assume at startup.

auth.secret_access_key

optional string literal
The AWS secret access key. Used for AWS authentication when communicating with AWS services.

batch

optional object
Configures the sink batching behavior.

batch.max_bytes

optional uint
The maximum size of a batch, in bytes, before it is flushed.
default: 5e+06 (bytes)

batch.max_events

optional uint
The maximum size of a batch, in events, before it is flushed.
default: 500 (events)

batch.timeout_secs

optional uint
The maximum age of a batch before it is flushed.
default: 1 (seconds)

buffer

optional object
Configures the sink specific buffer behavior.

buffer.max_events

optional uint
The maximum number of events allowed in the buffer.
Relevant when: type = "memory"
default: 500 (events)

buffer.max_size

required uint
The maximum size of the buffer on the disk.
Relevant when: type = "disk"

buffer.type

optional string enum literal
The buffer’s type and storage mechanism.
Enum options
OptionDescription
diskStores the sink’s buffer on disk. This is less performant, but durable. Data will not be lost between restarts. Will also hold data in memory to enhance performance. WARNING: This may stall the sink if disk performance isn’t on par with the throughput. For comparison, AWS gp2 volumes are usually too slow for common cases.
memoryStores the sink’s buffer in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully.
default: memory

buffer.when_full

optional string enum literal
The behavior when the buffer becomes full.
Enum options
OptionDescription
blockApplies back pressure when the buffer is full. This prevents data loss, but will cause data to pile up on the edge.
drop_newestDrops new data as it’s received. This data is lost. This should be used when performance is the highest priority.
default: block

compression

common optional string enum

The compression strategy used to compress the encoded event data before transmission.

Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.

Enum options string literal
OptionDescription
gzipGzip standard DEFLATE compression.
noneNo compression.
syntaxliteral
default: none

encoding

required object
Configures the encoding specific sink behavior.

encoding.codec

required string literal
The encoding codec used to serialize the events before outputting.

encoding.except_fields

optional array
Prevent the sink from encoding the specified fields.

encoding.only_fields

optional array
Makes the sink encode only the specified fields.

encoding.timestamp_format

optional string enum literal
How to format event timestamps.
Enum options
OptionDescription
rfc3339Formats as a RFC3339 string
unixFormats as a unix timestamp
default: rfc3339

endpoint

optional string
Custom endpoint for use with AWS-compatible services. Providing a value for this option will make region moot.

healthcheck

common optional object
Health check options for the sink.

healthcheck.enabled

optional bool
Enables/disables the healthcheck upon Vector boot.
default: true

inputs

required [string]

A list of upstream source or transform IDs. Wildcards (*) are supported but must be the last character in the ID.

See configuration for more info.

Array string literal
Examples
[
  "my-source-or-transform-id",
  "prefix-*"
]

partition_key_field

common optional string
The log field used as the Kinesis record’s partition key value.

proxy

optional object
Configures an HTTP(S) proxy for Vector to use.

proxy.enabled

optional bool
If false the proxy will be disabled.
default: true

proxy.http

optional string literal
The URL to proxy HTTP requests through.

proxy.https

optional string literal
The URL to proxy HTTPS requests through.

proxy.no_proxy

optional array

List of hosts to avoid proxying globally.

Allowed patterns here include: - Domain names. For example, example.com will match requests to to example.com - Wildcard domains. For example, .example.com will match requests to example.com and its subdomains - IP addresses. For example, 127.0.0.1 will match requests to 127.0.0.1 - CIDR blocks. For example, 192.168.0.0./16 will match requests to any IP addresses in this range - * will match all hosts

region

required string
The AWS region of the target service. If endpoint is provided it will override this value since the endpoint includes the region.

request

optional object
Configures the sink request behavior.

request.adaptive_concurrency

optional object
Configure the adaptive concurrency algorithms. These values have been tuned by optimizing simulated results. In general you should not need to adjust these.

request.concurrency

optional uint
The maximum number of in-flight requests allowed at any given time.
default: 5 (requests)

request.rate_limit_duration_secs

optional uint
The time window, in seconds, used for the rate_limit_num option.
default: 1 (seconds)

request.rate_limit_num

optional uint
The maximum number of requests allowed within the rate_limit_duration_secs time window.
default: 5

request.retry_attempts

optional uint
The maximum number of retries to make for failed requests. The default, for all intents and purposes, represents an infinite number of retries.
default: 1.8446744073709552e+19

request.retry_initial_backoff_secs

optional uint
The amount of time to wait before attempting the first retry for a failed request. Once, the first retry has failed the fibonacci sequence will be used to select future backoffs.
default: 1 (seconds)

request.retry_max_duration_secs

optional uint
The maximum amount of time, in seconds, to wait between retries.
default: 10 (seconds)

request.timeout_secs

optional uint
The maximum time a request can take before being aborted. It is highly recommended that you do not lower this value below the service’s internal timeout, as this could create orphaned requests, pile on retries, and result in duplicate data downstream.
default: 30 (seconds)

stream_name

required string
The stream name of the target Kinesis Logs stream.

Environment variables

AWS_ACCESS_KEY_ID

common optional string literal
The AWS access key id. Used for AWS authentication when communicating with AWS services.
Examples
AKIAIOSFODNN7EXAMPLE

AWS_CONFIG_FILE

common optional string literal
Specifies the location of the file that the AWS CLI uses to store configuration profiles.
Default: ~/.aws/config

AWS_CREDENTIAL_EXPIRATION

common optional string literal
Expiration time in RFC 3339 format. If unset, credentials won’t expire.
Examples
1996-12-19T16:39:57-08:00

AWS_DEFAULT_REGION

common optional string literal
The default AWS region.
Examples
/path/to/credentials.json

AWS_PROFILE

common optional string literal
Specifies the name of the CLI profile with the credentials and options to use. This can be the name of a profile stored in a credentials or config file.
Default: default
Examples
my-custom-profile

AWS_ROLE_SESSION_NAME

common optional string literal
Specifies a name to associate with the role session. This value appears in CloudTrail logs for commands performed by the user of this profile.
Examples
vector-session

AWS_SECRET_ACCESS_KEY

common optional string literal
The AWS secret access key. Used for AWS authentication when communicating with AWS services.
Examples
wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

AWS_SESSION_TOKEN

common optional string literal
The AWS session token. Used for AWS authentication when communicating with AWS services.
Examples
AQoEXAMPLEH4aoAH0gNCAPy...truncated...zrkuWJOgQs8IZZaIv2BXIa2R4Olgk

AWS_SHARED_CREDENTIALS_FILE

common optional string literal
Specifies the location of the file that the AWS CLI uses to store access keys.
Default: ~/.aws/credentials

Telemetry

Metrics

link

events_in_total

counter
The number of events accepted by this component either from tagged origin like file and uri, or cumulatively from other origins.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
container_name optional
The name of the container from which the event originates.
file optional
The file from which the event originates.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the event originates.
peer_path optional
The pathname from which the event originates.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the event originates.
uri optional
The sanitized URI from which the event originates.

events_out_total

counter
The total number of events emitted by this component.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

processed_bytes_total

counter
The number of bytes processed by the component.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
container_name optional
The name of the container from which the bytes originate.
file optional
The file from which the bytes originate.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the bytes originate.
peer_path optional
The pathname from which the bytes originate.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the bytes originate.
uri optional
The sanitized URI from which the bytes originate.

processed_events_total

counter
The total number of events processed by this component. This metric is deprecated in place of using events_in_total and events_out_total metrics.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

Permissions

Platform: Amazon Web Services
Relevant policies
PolicyRequired forRequired when
kinesis:DescribeStreamhealthcheck
kinesis:PutRecordswrite

How it works

AWS authentication

Vector checks for AWS credentials in the following order:

  1. The access_key_id and secret_access_key options.
  2. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables.
  3. The credential_process command in the AWS config file (usually located at ~/.aws/config).
  4. The AWS credentials file (usually located at ~/.aws/credentials).
  5. The IAM instance profile (only works if running on an EC2 instance with an instance profile/role).

If no credentials are found, Vector’s health check fails and an error is logged. If your AWS credentials expire, Vector will automatically search for up-to-date credentials in the places (and order) described above.

Obtaining an access key

In general, we recommend using instance profiles/roles whenever possible. In cases where this is not possible you can generate an AWS access key for any user within your AWS account. AWS provides a detailed guide on how to do this. Such created AWS access keys can be used via access_key_id and secret_access_key options.

Assuming roles

Vector can assume an AWS IAM role via the assume_role option. This is an optional setting that is helpful for a variety of use cases, such as cross account access.

Buffers and batches

This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.

Batches are flushed when 1 of 2 conditions are met:

  1. The batch age meets or exceeds the configured timeout_secs.
  2. The batch size meets or exceeds the configured max_size or max_events.

Buffers are controlled via the buffer.* options.

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require health checks

If you’d like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable health checks

If you’d like to disable health checks for this sink you can set the healthcheck option to false.

Partitioning

By default, Vector issues random 16 byte values for each Kinesis record’s partition key, evenly distributing records across your Kinesis partitions. Depending on your use case this might not be sufficient since random distribution does not preserve order. To override this, you can supply the partition_key_field option. This option presents an alternate field on your event to use as the partition key value instead. This is useful if you have a field already on your event, and it also pairs nicely with the remap transform, which enables you to add partition-related metadata to events.

Missing partition keys

Kinesis requires a value for the partition key. If the key is missing or the value is blank, the event is dropped and a warning-level log event is logged. The field specified in the partition_key_field option should thus always contain a value.

Partition keys that exceed 256 characters

If the value provided exceeds the maximum allowed length of 256 characters Vector will slice the value and use the first 256 characters.

Non-string partition keys

Vector will coerce the value into a string.

Rate limits & adapative concurrency

Adaptive Request Concurrency (ARC)

Adaptive Requst Concurrency is a feature of Vector that does away with static rate limits and automatically optimizes HTTP concurrency limits based on downstream service responses. The underlying mechanism is a feedback loop inspired by TCP congestion control algorithms. Checkout the announcement blog post,

We highly recommend enabling this feature as it improves performance and reliability of Vector and the systems it communicates with.

To enable, set the request.concurrency option to adaptive:

[sinks.my-sink]
  request.concurrency = "adaptive"

Static rate limits

If Adaptive Request Concurrency is not for you, you can manually set static rate limits with the request.rate_limit_duration_secs, request.rate_limit_num, and request.concurrency options:

[sinks.my-sink]
  request.rate_limit_duration_secs = 1
  request.rate_limit_num = 10
  request.concurrency = 10

Retry policy

Vector will retry failed requests (status == 429, >= 500, and != 501). Other responses will not be retried. You can control the number of retry attempts and backoff rate with the request.retry_attempts and request.retry_backoff_secs options.

State

This component is stateless, meaning its behavior is consistent across each input.