Kafka Source

The Vector kafka source collects logs from Kafka.

Requirements

Configuration

[sources.my_source_id]
type = "kafka" # required
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # required
group_id = "consumer-group-name" # required
key_field = "message_key" # optional, no default
topics = ["^(prefix1|prefix2)-.+", "topic-1", "topic-2"] # required
  • optionalstring

    auto_offset_reset

    If offsets for consumer group do not exist, set them using this strategy. librdkafka documentation for auto.offset.reset option for explanation.

    • Syntax: literal
    • Default: "largest"
  • commonrequiredstring

    bootstrap_servers

    A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

    • Syntax: literal
  • optionaluint

    commit_interval_ms

    The frequency that the consumer offsets are committed (written) to offset storage.

    • Default: 5000 (milliseconds)
  • optionaluint

    fetch_wait_max_ms

    Maximum time the broker may wait to fill the response.

    • Default: 100 (milliseconds)
  • commonrequiredstring

    group_id

    The consumer group name to be used to consume events from Kafka.

    • Syntax: literal
  • commonoptionalstring

    key_field

    The log field name to use for the Kafka message key. If unspecified, the key would not be added to the log event. If the message has null key, then this field would not be added to the log event.

    • Syntax: literal
  • optionaltable

    librdkafka_options

    Advanced options. See librdkafka documentation for details.

  • optionalstring

    offset_key

    The log field name to use for the Kafka offset. If unspecified, the key would not be added to the log event.

    • Syntax: literal
  • optionalstring

    partition_key

    The log field name to use for the Kafka partition name. If unspecified, the key would not be added to the log event.

    • Syntax: literal
  • optionaltable

    sasl

    Options for SASL/SCRAM authentication support.

    • commonoptionalbool

      enabled

      Enable SASL/SCRAM authentication to the remote. (Not supported on Windows at this time.)

    • commonoptionalstring

      mechanism

      The Kafka SASL/SCRAM mechanisms.

      • Syntax: literal
    • commonoptionalstring

      password

      The Kafka SASL/SCRAM authentication password.

      • Syntax: literal
    • commonoptionalstring

      username

      The Kafka SASL/SCRAM authentication username.

      • Syntax: literal
  • optionaluint

    session_timeout_ms

    The Kafka session timeout in milliseconds.

    • Default: 10000 (milliseconds)
  • optionaluint

    socket_timeout_ms

    Default timeout for network requests.

    • Default: 60000 (milliseconds)
  • optionaltable

    tls

    Configures the TLS options for incoming connections.

    • optionalstring

      ca_file

      Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.

      • Syntax: literal
    • commonoptionalstring

      crt_file

      Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.

      • Syntax: literal
    • commonoptionalbool

      enabled

      Enable TLS during connections to the remote.

      • Default: false
    • commonoptionalstring

      key_file

      Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.

      • Syntax: literal
    • optionalstring

      key_pass

      Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.

      • Syntax: literal
    • optionalbool

      verify_certificate

      If true (the default), Vector will validate the TLS certificate of the remote host.

      • Default: true
  • optionalstring

    topic_key

    The log field name to use for the Kafka topic. If unspecified, the key would not be added to the log event.

    • Syntax: literal
  • commonrequired[string]

    topics

    The Kafka topics names to read events from. Regex is supported if the topic begins with ^.

Output

This component outputs log events with the following fields:

{
"message" : "53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] \"GET /disintermediate HTTP/2.0\" 401 20308",
"offset" : 100,
"partition" : "partition",
"timestamp" : "2020-10-10T17:07:36+00:00",
"topic" : "topic"
}
  • commonrequiredstring

    message

    The raw line from the Kafka record.

    • Syntax: literal
  • commonrequireduint

    offset

    The Kafka offset at the time the record was retrieved.

  • commonrequiredstring

    partition

    The Kafka partition that the record came from.

    • Syntax: literal
  • commonrequiredtimestamp

    timestamp

    The timestamp encoded in the Kafka message or the current time if it cannot be fetched.

  • commonrequiredstring

    topic

    The Kafka topic that the record came from.

    • Syntax: literal

Telemetry

This component provides the following metrics that can be retrieved through the internal_metrics source. See the metrics section in the monitoring page for more info.

  • counter

    consumer_offset_updates_failed_total

    The total number of failures to update a Kafka consumer offset. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    events_failed_total

    The total number of failures to read a Kafka message. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    processed_bytes_total

    The total number of bytes processed by the component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    events_out_total

    The total number of events emitted by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    processed_events_total

    The total number of events processed by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • file - The file that produced the error

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

How It Works

Context

By default, the kafka source will augment events with helpful context keys as shown in the "Output" section.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses Openssl for TLS protocols. You can adjust TLS behavior via the tls.* options.

librdkafka

The kafka sink uses librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.