AWS Kinesis Firehose
Collect logs from AWS Kinesis Firehose
Requirements
tls.*
options.Configuration
Example configurations
{
"sources": {
"my_source_id": {
"type": "aws_kinesis_firehose",
"address": "0.0.0.0:443"
}
}
}
[sources.my_source_id]
type = "aws_kinesis_firehose"
address = "0.0.0.0:443"
---
sources:
my_source_id:
type: aws_kinesis_firehose
address: 0.0.0.0:443
{
"sources": {
"my_source_id": {
"type": "aws_kinesis_firehose",
"access_key": "A94A8FE5CCB19BA61C4C08",
"access_keys": [
"A94A8FE5CCB19BA61C4C08"
],
"address": "0.0.0.0:443",
"record_compression": "auto"
}
}
}
[sources.my_source_id]
type = "aws_kinesis_firehose"
access_key = "A94A8FE5CCB19BA61C4C08"
access_keys = [ "A94A8FE5CCB19BA61C4C08" ]
address = "0.0.0.0:443"
record_compression = "auto"
---
sources:
my_source_id:
type: aws_kinesis_firehose
access_key: A94A8FE5CCB19BA61C4C08
access_keys:
- A94A8FE5CCB19BA61C4C08
address: 0.0.0.0:443
record_compression: auto
access_key
optional string literalAn access key to authenticate requests against.
AWS Kinesis Firehose can be configured to pass along a user-configurable access key with each request. If
configured, access_key
should be set to the same value. Otherwise, all requests are allowed.
access_keys
optional [string]A list of access keys to authenticate requests against.
AWS Kinesis Firehose can be configured to pass along a user-configurable access key with each request. If
configured, access_keys
should be set to the same value. Otherwise, all requests are allowed.
acknowledgements
optional objectControls how acknowledgements are handled by this source.
This setting is deprecated in favor of enabling acknowledgements
at the global or sink level.
Enabling or disabling acknowledgements at the source level has no effect on acknowledgement behavior.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional booladdress
required string literaldecoding
optional objectdecoding.codec
optional string literal enumOption | Description |
---|---|
bytes | Uses the raw bytes as-is. |
gelf | Decodes the raw bytes as a GELF message. |
json | Decodes the raw bytes as JSON. |
native | Decodes the raw bytes as Vector’s native Protocol Buffers format. This codec is experimental. |
native_json | Decodes the raw bytes as Vector’s native JSON format. This codec is experimental. |
syslog | Decodes the raw bytes as a Syslog message. Decodes either as the RFC 3164-style format (“old” style) or the RFC 5424-style format (“new” style, includes structured data). |
bytes
framing
optional objectFraming configuration.
Framing handles how events are separated when encoded in a raw byte form, where each event is a frame that must be prefixed, or delimited, in a way that marks where an event begins and ends within the byte stream.
framing.character_delimited
required objectmethod = "character_delimited"
framing.character_delimited.delimiter
required uintframing.character_delimited.max_length
optional uintThe maximum length of the byte buffer.
This length does not include the trailing delimiter.
By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.
If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.
framing.method
optional string literal enumOption | Description |
---|---|
bytes | Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments). |
character_delimited | Byte frames which are delimited by a chosen character. |
length_delimited | Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length. |
newline_delimited | Byte frames which are delimited by a newline character. |
octet_counting | Byte frames according to the octet counting format. |
bytes
framing.newline_delimited
optional objectmethod = "newline_delimited"
framing.newline_delimited.max_length
optional uintThe maximum length of the byte buffer.
This length does not include the trailing delimiter.
By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.
If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.
framing.octet_counting
optional objectmethod = "octet_counting"
framing.octet_counting.max_length
optional uintrecord_compression
optional string literal enumThe compression scheme to use for decompressing records within the Firehose message.
Some services, like AWS CloudWatch Logs, compresses the events with gzip, before sending them AWS Kinesis Firehose. This option can be used to automatically decompress them before forwarding them to the next component.
Note that this is different from Content encoding option of the Firehose HTTP endpoint destination. That option controls the content encoding of the entire HTTP request.
Option | Description |
---|---|
auto | Automatically attempt to determine the compression scheme. The compression scheme of the object is determined by looking at its file signature, also known as magic bytes. If the record fails to decompress with the discovered format, the record is forwarded as is.
Thus, if you know the records are always gzip encoded (for example, if they are coming from AWS CloudWatch Logs),
set |
gzip | GZIP. |
none | Uncompressed. |
auto
store_access_key
required boolWhether or not to store the AWS Firehose Access Key in event secrets.
If set to true
, when incoming requests contains an access key sent by AWS Firehose, it is kept in the
event secrets as “aws_kinesis_firehose_access_key”.
tls
optional objecttls.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.
tls.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls.crt_file
optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls.enabled
optional boolWhether or not to require TLS for incoming or outgoing connections.
When enabled and used for incoming connections, an identity certificate is also required. See tls.crt_file
for
more information.
tls.key_file
optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
tls.verify_certificate
optional boolEnables certificate verification.
If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.
Relevant for both incoming and outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
tls.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
Outputs
<component_id>
Output Data
Logs
Line
Started GET / for 127.0.0.1 at 2012-03-10 14:28:14 +0100
X-Amz-Firehose-Request-Id
header.ed1d787c-b9e2-4631-92dc-8e7c9d26d804
X-Amz-Firehose-Source-Arn
header.arn:aws:firehose:us-east-1:111111111111:deliverystream/test
aws_kinesis_firehose
2020-10-10T17:07:36.452332Z
Telemetry
Metrics
linkcomponent_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.component_errors_total
countercomponent_id
instead. The value is the same as component_id
.component_received_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.request_automatic_decode_errors_total
countercomponent_id
instead. The value is the same as component_id
.request_read_errors_total
countercomponent_id
instead. The value is the same as component_id
.requests_received_total
countercomponent_id
instead. The value is the same as component_id
.source_lag_time_seconds
histogramcomponent_id
instead. The value is the same as component_id
.Examples
AWS CloudWatch Subscription message
Given this event...{
"requestId": "ed1d787c-b9e2-4631-92dc-8e7c9d26d804",
"timestamp": 1600110760138,
"records": [
{
"data": "H4sIABk1bV8AA52TzW7bMBCE734KQ2db/JdI3QzETS8FAtg91UGgyOuEqCQq5Mqua+TdS8lu0hYNUpQHAdoZDcn9tKfJdJo0EEL5AOtjB0kxTa4W68Xdp+VqtbheJrPB4A4t+EFiv6yzVLuHa+/6blARAr5UV+ihbH4vh/4+VN52aF37wdYIPkTDlyhF8SrabFsOWhIrtz+Dlnto8dV3Gp9RstshXKhMi0xpqk3GpNJccpFRKYw0WvCM5kIbzrVWipm4VK55rrSk44HGHLTx/lg2wxVYRiljVGWGCvPiuPRn2O60Se6P8UKbpOBZrulsk2xLhCEjljYJk2QFHeGU04KxQqpCsumcSko3SfQ+uoBnn8pTJmjKWZYyI0axAXx021G++bweS5136CpXj8WP6/UNYek5ycMOPPhReETsQkHI4XBIO2/bynZlXXkXwryrS9w536TWkab0XwED6e/tU2/R9eGS9NTD5VgEvnWwtQikcu0e/AO0FYyu4HpfwR3Gf2R0Btza9qxgiUNUISiLr30AP7fbyMzu7OWA803ynIzdfJ69B1EZpoVhsWMRZ8a5UVJoRoUyUlDNspxzZWiEnOXiXYiSvQOR5TnN/xsiNalmKZcy5Yr/yfB6+RZD/gbDC0IbOx8wQrMhxGGYx4lBW5X1wJBLkpO981jWf6EXogvIrm+rYYrKOn4Hgbg4b439/s8cFeVvcNwBtHBkOdWvQIdRnTxPfgCXvyEgSQQAAA=="
}
]
}
[sources.my_source_id]
type = "aws_kinesis_firehose"
address = "0.0.0.0:443"
---
sources:
my_source_id:
type: aws_kinesis_firehose
address: 0.0.0.0:443
{
"sources": {
"my_source_id": {
"type": "aws_kinesis_firehose",
"address": "0.0.0.0:443"
}
}
}
[{"log":{"message":"{\"messageType\":\"DATA_MESSAGE\",\"owner\":\"111111111111\",\"logGroup\":\"test\",\"logStream\":\"test\",\"subscriptionFilters\":[\"Destination\"],\"logEvents\":[{\"id\":\"35683658089614582423604394983260738922885519999578275840\",\"timestamp\":1600110569039,\"message\":\"{\\\"bytes\\\":26780,\\\"datetime\\\":\\\"14/Sep/2020:11:45:41 -0400\\\",\\\"host\\\":\\\"157.130.216.193\\\",\\\"method\\\":\\\"PUT\\\",\\\"protocol\\\":\\\"HTTP/1.0\\\",\\\"referer\\\":\\\"https://www.principalcross-platform.io/markets/ubiquitous\\\",\\\"request\\\":\\\"/expedite/convergence\\\",\\\"source_type\\\":\\\"stdin\\\",\\\"status\\\":301,\\\"user-identifier\\\":\\\"-\\\"}\"},{\"id\":\"35683658089659183914001456229543810359430816722590236673\",\"timestamp\":1600110569041,\"message\":\"{\\\"bytes\\\":17707,\\\"datetime\\\":\\\"14/Sep/2020:11:45:41 -0400\\\",\\\"host\\\":\\\"109.81.244.252\\\",\\\"method\\\":\\\"GET\\\",\\\"protocol\\\":\\\"HTTP/2.0\\\",\\\"referer\\\":\\\"http://www.investormission-critical.io/24/7/vortals\\\",\\\"request\\\":\\\"/scale/functionalities/optimize\\\",\\\"source_type\\\":\\\"stdin\\\",\\\"status\\\":502,\\\"user-identifier\\\":\\\"feeney1708\\\"}\"}]}","request_id":"ed1d787c-b9e2-4631-92dc-8e7c9d26d804","source_arn":"arn:aws:firehose:us-east-1:111111111111:deliverystream/test","source_type":"aws_kinesis_firehose","timestamp":"2020-09-14T19:12:40.138Z"}}]
How it works
Forwarding CloudWatch Log events
This source is the recommended way to ingest logs from AWS CloudWatch logs via AWS CloudWatch Log subscriptions. To set this up:
Deploy vector with a publicly exposed HTTP endpoint using this source. You will likely also want to use the
parse_aws_cloudwatch_log_subscription_message
function to extract the log events. Make sure to set theaccess_keys
to secure this endpoint. Your configuration might look something like:[sources.firehose] # General type = "aws_kinesis_firehose" address = "127.0.0.1:9000" access_keys = ["secret"] [transforms.cloudwatch] type = "remap" inputs = ["firehose"] drop_on_error = false source = ''' parsed = parse_aws_cloudwatch_log_subscription_message!(.message) . = unnest(parsed.log_events) . = map_values(.) -> |value| { event = del(value.log_events) value |= event message = string!(del(.message)) merge(value, object!(parse_json!(message))) } ''' [sinks.console] type = "console" inputs = ["cloudwatch"] encoding.codec = "json"
Create a Kinesis Firehose delivery stream in the region where the CloudWatch Logs groups exist that you want to ingest.
Set the stream to forward to your Vector instance via its HTTP Endpoint destination. Make sure to configure the same
access_keys
you set earlier.Setup a CloudWatch Logs subscription to forward the events to your delivery stream
Transport Layer Security (TLS)
tls.*
options.