AWS Kinesis Data Streams Sink
The Vector aws_kinesis_streams
sink
sends logs to AWS Kinesis Data
Streams.
Configuration
- Common
- Advanced
- vector.toml
- vector.yaml
- vector.json
[sinks.my_sink_id]# Generaltype = "aws_kinesis_streams" # requiredinputs = ["my-source-or-transform-id", "prefix-*"] # requiredcompression = "none" # optional, defaultpartition_key_field = "user_id" # optional, no defaultregion = "us-east-1" # required, required when endpoint = nullstream_name = "my-stream" # required# Encodingencoding.codec = "json" # required# Healthcheckhealthcheck.enabled = true # optional, default
- optionaltable
auth
Options for the authentication strategy.
- optionalstring
access_key_id
The AWS access key id. Used for AWS authentication when communicating with AWS services. See AWS Authentication for more info.
- Syntax:
literal
- View examples
- Syntax:
- optionalstring
assume_role
The ARN of an IAM role to assume at startup. See AWS Authentication for more info.
- Syntax:
literal
- View examples
- Syntax:
- optionalstring
secret_access_key
The AWS secret access key. Used for AWS authentication when communicating with AWS services. See AWS Authentication for more info.
- Syntax:
literal
- View examples
- Syntax:
- optionaltable
batch
Configures the sink batching behavior.
- commonoptionaluint
max_bytes
The maximum size of a batch, in bytes, before it is flushed.
- Default:
5000000
(bytes)
- Default:
- commonoptionaluint
max_events
The maximum size of a batch, in events, before it is flushed. See Buffers & batches for more info.
- Default:
500
(events)
- Default:
- commonoptionaluint
timeout_secs
The maximum age of a batch before it is flushed. See Buffers & batches for more info.
- Default:
1
(seconds)
- Default:
- optionaltable
buffer
Configures the sink specific buffer behavior.
- commonoptionaluint
max_events
The maximum number of events allowed in the buffer. See Buffers & batches for more info.
- Only relevant when: type = "memory"
- Default:
500
(events)
- commonrequired*uint
max_size
The maximum size of the buffer on the disk. See Buffers & batches for more info.
- Only required when: type = "disk"
- View examples
- enumcommonoptionalstring
type
The buffer's type and storage mechanism.
- Syntax:
literal
- Default:
"memory"
- Enum, must be one of:
"memory"
"disk"
- View examples
- Syntax:
- enumoptionalstring
when_full
The behavior when the buffer becomes full.
- Syntax:
literal
- Default:
"block"
- Enum, must be one of:
"block"
"drop_newest"
- View examples
- Syntax:
- enumcommonoptionalstring
compression
The compression strategy used to compress the encoded event data before transmission.
- Syntax:
literal
- Default:
"none"
- Enum, must be one of:
"none"
"gzip"
"syntax"
- View examples
- Syntax:
- commonrequiredtable
encoding
Configures the encoding specific sink behavior.
- commonrequiredstring
codec
The encoding codec used to serialize the events before outputting.
- Syntax:
literal
- View examples
- Syntax:
- optional[string]
except_fields
Prevent the sink from encoding the specified labels.
- View examples
- optional[string]
only_fields
Prevent the sink from encoding the specified labels.
- View examples
- enumoptionalstring
timestamp_format
How to format event timestamps.
- Syntax:
literal
- Default:
"rfc3339"
- Enum, must be one of:
"rfc3339"
"unix"
- View examples
- Syntax:
- optionalstring
endpoint
Custom endpoint for use with AWS-compatible services. Providing a value for this option will make
region
moot.- Syntax:
literal
- Only relevant when: region = null
- View examples
- Syntax:
- commonoptionaltable
healthcheck
Health check options for the sink. See Health checks for more info.
- commonoptionalbool
enabled
Enables/disables the healthcheck upon Vector boot.
- Default:
true
- View examples
- Default:
- commonoptionalstring
partition_key_field
The log field used as the Kinesis record's partition key value. See Partitioning for more info.
- Syntax:
literal
- View examples
- Syntax:
- commonrequired*string
region
The AWS region of the target service. If
endpoint
is provided it will override this value since the endpoint includes the region.- Syntax:
literal
- Only required when: endpoint = null
- View examples
- Syntax:
- optionaltable
request
Configures the sink request behavior.
- optionaltable
adaptive_concurrency
Configure the adaptive concurrency algorithms. These values have been tuned by optimizing simulated results. In general you should not need to adjust these.
- optionalfloat
decrease_ratio
The fraction of the current value to set the new concurrency limit when decreasing the limit. Valid values are greater than 0 and less than 1. Smaller values cause the algorithm to scale back rapidly when latency increases. Note that the new limit is rounded down after applying this ratio.
- Default:
0.9
- Default:
- optionalfloat
ewma_alpha
The adaptive concurrency algorithm uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with the current RTT. This value controls how heavily new measurements are weighted compared to older ones. Valid values are greater than 0 and less than 1. Smaller values cause this reference to adjust more slowly, which may be useful if a service has unusually high response variability.
- Default:
0.7
- Default:
- optionalfloat
rtt_threshold_ratio
When comparing the past RTT average to the current measurements, we ignore changes that are less than this ratio higher than the past RTT. Valid values are greater than or equal to 0. Larger values cause the algorithm to ignore larger increases in the RTT.
- Default:
0.05
- Default:
- commonoptionaluint
concurrency
The maximum number of in-flight requests allowed at any given time, or "adaptive" to allow Vector to automatically set the limit based on current network and service conditions.
- Default:
5
(requests)
- Default:
- commonoptionaluint
rate_limit_duration_secs
The time window, in seconds, used for the
rate_limit_num
option.- Default:
1
(seconds)
- Default:
- commonoptionaluint
rate_limit_num
The maximum number of requests allowed within the
rate_limit_duration_secs
time window.- Default:
5
- Default:
- optionaluint
retry_attempts
The maximum number of retries to make for failed requests. The default, for all intents and purposes, represents an infinite number of retries.
- Default:
18446744073709552000
- Default:
- optionaluint
retry_initial_backoff_secs
The amount of time to wait before attempting the first retry for a failed request. Once, the first retry has failed the fibonacci sequence will be used to select future backoffs.
- Default:
1
(seconds)
- Default:
- optionaluint
retry_max_duration_secs
The maximum amount of time, in seconds, to wait between retries.
- Default:
10
(seconds)
- Default:
- commonoptionaluint
timeout_secs
The maximum time a request can take before being aborted. It is highly recommended that you do not lower this value below the service's internal timeout, as this could create orphaned requests, pile on retries, and result in duplicate data downstream. See Buffers & batches for more info.
- Default:
30
(seconds)
- Default:
- commonrequiredstring
stream_name
The stream name of the target Kinesis Logs stream.
- Syntax:
literal
- View examples
- Syntax:
Env Vars
- commonoptionalstring
AWS_ACCESS_KEY_ID
The AWS access key id. Used for AWS authentication when communicating with AWS services. See AWS Authentication for more info.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
AWS_CONFIG_FILE
Specifies the location of the file that the AWS CLI uses to store configuration profiles.
- Syntax:
literal
- Default:
"~/.aws/config"
- Syntax:
- commonoptionalstring
AWS_CREDENTIAL_EXPIRATION
Expiration time in RFC 3339 format. If unset, credentials won't expire.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
AWS_DEFAULT_REGION
The default AWS region.
- Syntax:
literal
- Only relevant when: endpoint = null
- View examples
- Syntax:
- commonoptionalstring
AWS_PROFILE
Specifies the name of the CLI profile with the credentials and options to use. This can be the name of a profile stored in a credentials or config file.
- Syntax:
literal
- Default:
"default"
- View examples
- Syntax:
- commonoptionalstring
AWS_ROLE_SESSION_NAME
Specifies a name to associate with the role session. This value appears in CloudTrail logs for commands performed by the user of this profile.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
AWS_SECRET_ACCESS_KEY
The AWS secret access key. Used for AWS authentication when communicating with AWS services. See AWS Authentication for more info.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
AWS_SESSION_TOKEN
The AWS session token. Used for AWS authentication when communicating with AWS services.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
AWS_SHARED_CREDENTIALS_FILE
Specifies the location of the file that the AWS CLI uses to store access keys.
- Syntax:
literal
- Default:
"~/.aws/credentials"
- Syntax:
Telemetry
This component provides the following metrics that can be retrieved through
the internal_metrics
source. See the
metrics section in the
monitoring page for more info.
- counter
processed_bytes_total
The total number of bytes processed by the component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.instance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
- counter
events_in_total
The total number of events accepted by this component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.instance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
- counter
processed_events_total
The total number of events processed by this component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.file
- The file that produced the errorinstance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
- counter
events_out_total
The total number of events emitted by this component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.instance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
How It Works
AWS Authentication
Vector checks for AWS credentials in the following order:
- Options
access_key_id
andsecret_access_key
. - Environment variables
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
. - The
credential_process
command in the AWS config file. (usually located at~/.aws/config
) - The AWS credentials file. (usually located at
~/.aws/credentials
) - The IAM instance profile. (will only work if running on an EC2 instance with an instance profile/role)
If credentials are not found the healtcheck will fail and an error will be logged.
Obtaining an access key
In general, we recommend using instance profiles/roles whenever possible. In
cases where this is not possible you can generate an AWS access key for any user
within your AWS account. AWS provides a detailed guide on
how to do this. Such created AWS access keys can be used via access_key_id
and secret_access_key
options.
Assuming roles
Vector can assume an AWS IAM role via the assume_role
option. This is an
optional setting that is helpful for a variety of use cases, such as cross
account access.
Buffers & batches
This component buffers & batches data as shown in the diagram above. You'll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
timeout_secs
. - The batch size meets or exceeds the configured <% if component.options.batch.children.respond_to?(:max_size) %>
max_size
<% else %>max_events
<% end %>.
Buffers are controlled via the buffer.*
options.
Health checks
Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.
Require health checks
If you'd like to exit immediately upon a health
check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.toml --require-healthy
Disable health checks
If you'd like to disable health checks for this
sink you can set the healthcheck
option to
false
.
Partitioning
By default, Vector issues random 16 byte values for each
Kinesis record's partition key, evenly
distributing records across your Kinesis partitions. Depending on your use case
this might not be sufficient since random distribution does not preserve order.
To override this, you can supply the partition_key_field
option. This option
presents an alternate field on your event to use as the partition key value instead.
This is useful if you have a field already on your event, and it also pairs
nicely with the add_fields
transform.
Missing partition keys
Kenesis requires a value for the partition key and therefore if the key is
missing or the value is blank the event will be dropped and a
warning
level log event will be logged. As such,
the field specified in the partition_key_field
option should always contain
a value.
Partition keys that exceed 256 characters
If the value provided exceeds the maximum allowed length of 256 characters Vector will slice the value and use the first 256 characters.
Non-string partition keys
Vector will coerce the value into a string.
Rate limits & adapative concurrency
Adaptive Request Concurrency (ARC)
Adaptive Requst Concurrency is a feature of Vector that does away with static rate limits and automatically optimizes HTTP concurrency limits based on downstream service responses. The underlying mechanism is a feedback loop inspired by TCP congestion control algorithms. Checkout the announcement blog post,
We highly recommend enabling this feature as it improves performance and reliability of Vector and the systems it communicates with.
To enable, set the request.concurrency
option to adaptive
:
[sinks.my-sink]request.concurrency = "adaptive"
Static rate limits
If Adaptive Request Concurrency is not for you, you can manually
set static rate limits with the request.rate_limit_duration_secs
,
request.rate_limit_num
, and request.concurrency
options:
[sinks.my-sink]request.rate_limit_duration_secs = 1request.rate_limit_num = 10request.concurrency = 10
Retry policy
Vector will retry failed requests (status == 429, >= 500, and != 501).
Other responses will not be retried. You can control the number of
retry attempts and backoff rate with the request.retry_attempts
and
request.retry_backoff_secs
options.
State
This component is stateless, meaning its behavior is consistent across each input.