Kafka Source
The Vector kafka
source
collects logs from Kafka.
Requirements
Configuration
- Common
- Advanced
- vector.toml
- vector.yaml
- vector.json
[sources.my_source_id]type = "kafka" # requiredbootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # requiredgroup_id = "consumer-group-name" # requiredkey_field = "message_key" # optional, no defaulttopics = ["^(prefix1|prefix2)-.+", "topic-1", "topic-2"] # required
- optionalstring
auto_offset_reset
If offsets for consumer group do not exist, set them using this strategy. librdkafka documentation for
auto.offset.reset
option for explanation.- Syntax:
literal
- Default:
"largest"
- View examples
- Syntax:
- commonrequiredstring
bootstrap_servers
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.
- Syntax:
literal
- View examples
- Syntax:
- optionaluint
commit_interval_ms
The frequency that the consumer offsets are committed (written) to offset storage.
- Default:
5000
(milliseconds) - View examples
- Default:
- optionaluint
fetch_wait_max_ms
Maximum time the broker may wait to fill the response.
- Default:
100
(milliseconds) - View examples
- Default:
- commonrequiredstring
group_id
The consumer group name to be used to consume events from Kafka.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
key_field
The log field name to use for the Kafka message key. If unspecified, the key would not be added to the log event. If the message has null key, then this field would not be added to the log event.
- Syntax:
literal
- View examples
- Syntax:
- optionaltable
librdkafka_options
Advanced options. See librdkafka documentation for details.
- optionalstring
offset_key
The log field name to use for the Kafka offset. If unspecified, the key would not be added to the log event.
- Syntax:
literal
- View examples
- Syntax:
- optionalstring
partition_key
The log field name to use for the Kafka partition name. If unspecified, the key would not be added to the log event.
- Syntax:
literal
- View examples
- Syntax:
- optionaltable
sasl
Options for SASL/SCRAM authentication support.
- commonoptionalbool
enabled
Enable SASL/SCRAM authentication to the remote. (Not supported on Windows at this time.)
- View examples
- commonoptionalstring
mechanism
The Kafka SASL/SCRAM mechanisms.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
password
The Kafka SASL/SCRAM authentication password.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
username
The Kafka SASL/SCRAM authentication username.
- Syntax:
literal
- View examples
- Syntax:
- optionaluint
session_timeout_ms
The Kafka session timeout in milliseconds.
- Default:
10000
(milliseconds) - View examples
- Default:
- optionaluint
socket_timeout_ms
Default timeout for network requests.
- Default:
60000
(milliseconds) - View examples
- Default:
- optionaltable
tls
Configures the TLS options for incoming connections.
- optionalstring
ca_file
Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.
- Syntax:
literal
- View examples
- Syntax:
- commonoptionalstring
crt_file
Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive,
key_file
must also be set.- Syntax:
literal
- View examples
- Syntax:
- commonoptionalbool
enabled
Enable TLS during connections to the remote.
- Default:
false
- View examples
- Default:
- commonoptionalstring
key_file
Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set,
crt_file
must also be set.- Syntax:
literal
- View examples
- Syntax:
- optionalstring
key_pass
Pass phrase used to unlock the encrypted key file. This has no effect unless
key_file
is set.- Syntax:
literal
- View examples
- Syntax:
- optionalbool
verify_certificate
If
true
(the default), Vector will validate the TLS certificate of the remote host.- Default:
true
- View examples
- Default:
- optionalstring
topic_key
The log field name to use for the Kafka topic. If unspecified, the key would not be added to the log event.
- Syntax:
literal
- View examples
- Syntax:
- commonrequired[string]
topics
The Kafka topics names to read events from. Regex is supported if the topic begins with
^
.- View examples
Output
This component outputs log events with the following fields:
{"message" : "53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] \"GET /disintermediate HTTP/2.0\" 401 20308","offset" : 100,"partition" : "partition","timestamp" : "2020-10-10T17:07:36+00:00","topic" : "topic"}
- commonrequiredstring
message
The raw line from the Kafka record.
- Syntax:
literal
- View examples
- Syntax:
- commonrequireduint
offset
The Kafka offset at the time the record was retrieved.
- View examples
- commonrequiredstring
partition
The Kafka partition that the record came from.
- Syntax:
literal
- View examples
- Syntax:
- commonrequiredtimestamp
timestamp
The timestamp encoded in the Kafka message or the current time if it cannot be fetched.
- View examples
- commonrequiredstring
topic
The Kafka topic that the record came from.
- Syntax:
literal
- View examples
- Syntax:
Telemetry
This component provides the following metrics that can be retrieved through
the internal_metrics
source. See the
metrics section in the
monitoring page for more info.
- counter
consumer_offset_updates_failed_total
The total number of failures to update a Kafka consumer offset. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.instance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
- counter
events_failed_total
The total number of failures to read a Kafka message. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.instance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
- counter
processed_bytes_total
The total number of bytes processed by the component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.instance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
- counter
events_out_total
The total number of events emitted by this component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.instance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
- counter
processed_events_total
The total number of events processed by this component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.file
- The file that produced the errorinstance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
How It Works
Context
By default, the kafka
source will augment events with helpful
context keys as shown in the "Output" section.
State
This component is stateless, meaning its behavior is consistent across each input.
Transport Layer Security (TLS)
Vector uses Openssl for TLS protocols. You can
adjust TLS behavior via the tls.*
options.
librdkafka
The kafka
sink uses librdkafka
under the hood. This
is a battle-tested, high performance, and reliable library that facilitates
communication with Kafka. As Vector produces static MUSL builds,
this dependency is packaged with Vector, meaning you do not need to install it.