Pulsar
Publish observability events to Apache Pulsar topics
Configuration
Example configurations
{
"sinks": {
"my_sink_id": {
"type": "pulsar",
"inputs": [
"my-source-or-transform-id"
],
"endpoint": "pulsar://127.0.0.1:6650",
"topic": "topic-1234",
"compression": "none",
"encoding": {
"codec": "json"
}
}
}
}
[sinks.my_sink_id]
type = "pulsar"
inputs = [ "my-source-or-transform-id" ]
endpoint = "pulsar://127.0.0.1:6650"
topic = "topic-1234"
compression = "none"
[sinks.my_sink_id.encoding]
codec = "json"
---
sinks:
my_sink_id:
type: pulsar
inputs:
- my-source-or-transform-id
endpoint: pulsar://127.0.0.1:6650
topic: topic-1234
compression: none
encoding:
codec: json
{
"sinks": {
"my_sink_id": {
"type": "pulsar",
"inputs": [
"my-source-or-transform-id"
],
"endpoint": "pulsar://127.0.0.1:6650",
"topic": "topic-1234",
"compression": "none",
"encoding": {
"codec": "json"
},
"partition_key_field": "message"
}
}
}
[sinks.my_sink_id]
type = "pulsar"
inputs = [ "my-source-or-transform-id" ]
endpoint = "pulsar://127.0.0.1:6650"
topic = "topic-1234"
compression = "none"
partition_key_field = "message"
[sinks.my_sink_id.encoding]
codec = "json"
---
sinks:
my_sink_id:
type: pulsar
inputs:
- my-source-or-transform-id
endpoint: pulsar://127.0.0.1:6650
topic: topic-1234
compression: none
encoding:
codec: json
partition_key_field: message
acknowledgements
common optional objectacknowledgement
settings.acknowledgements.enabled
common optional boolfalse
auth
optional objectauth.oauth2
optional objectauth.oauth2.credentials_url
required string literalauth.oauth2.issuer_url
required string literalauth.token
optional string literalbuffer
optional objectConfigures the buffering behavior for this sink.
More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.
buffer.max_events
optional uinttype = "memory"
500
buffer.max_size
required uintThe maximum size of the buffer on disk.
Must be at least ~256 megabytes (268435488 bytes).
type = "disk"
buffer.type
optional string literal enumOption | Description |
---|---|
disk | Events are buffered on disk. (version 2) This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes. Data is synchronized to disk every 500ms. |
memory | Events are buffered in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes. |
memory
buffer.when_full
optional string literal enumOption | Description |
---|---|
block | Wait for free space in the buffer. This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge. |
drop_newest | Drops the event instead of waiting for free space in buffer. The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events. |
block
compression
common optional string literal enumThe compression strategy used to compress the encoded event data before transmission.
The default compression level of the chosen algorithm is used. Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.
none
encoding
required objectencoding.avro
required objectcodec = `avro`
encoding.avro.schema
required string literalencoding.codec
required string literal enumOption | Description |
---|---|
avro | Encodes an event as an Apache Avro message. |
json | Encodes an event as JSON. |
text | Plaintext encoding. This “encoding” simply uses the Users should take care if they’re modifying their log events (such as by using a |
encoding.except_fields
optional [string]encoding.only_fields
optional [string]encoding.timestamp_format
optional string literal enumOption | Description |
---|---|
rfc3339 | Formats as a RFC3339 string |
unix | Formats as a unix timestamp |
rfc3339
endpoint
required string literalhealthcheck
optional objecthealthcheck.enabled
optional booltrue
inputs
required [string]A list of upstream source or transform IDs.
Wildcards (*
) are supported.
See configuration for more info.
partition_key_field
optional string literalTelemetry
Metrics
linkbuffer_byte_size
gaugecomponent_id
instead. The value is the same as component_id
.buffer_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_events
gaugecomponent_id
instead. The value is the same as component_id
.buffer_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_received_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.component_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.component_errors_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.encode_errors_total
counterevents_in_total
countercomponent_received_events_total
instead.component_id
instead. The value is the same as component_id
.utilization
gaugecomponent_id
instead. The value is the same as component_id
.How it works
Health checks
Require health checks
If you’d like to exit immediately upon a health check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.toml --require-healthy
Disable health checks
healthcheck
option to
false
.