In "Hello, WASM world" we learnt how to get started making our first WASM plugin. In this guide, we will learn how to go farther with that skill. We will build a plugin that merges one long message split over multiple lines, into one message.
We will learn how to:
- Maintain state between events.
- Merge messages and perform string interpolation.
- Build a native Rust test suite.
- Build a Vector test suite.
Our mock ticket
Our pretend boss assigned us a new issue! It says we need to turn these 4 messages:
{ "vector": "one tool for all your observability needs","version": "0.10.0" }{ "vic":"the flying squirrel" }
Into these two:
{ "vector": "one tool for all your observability needs", "version": "0.10.0" }{ "vic": "the flying squirrel" }
Such drudgery. 🙄 Oh well, some input feeding us junk does mean we get to write Rust!
In order to make it work, we'll need to:
- Accept n messages as input, storing them concatenated together inside some state.
- Determine after each input if the new state is a parsable JSON.
- If the state is valid JSON, output it.
This sounds fun, so let's get started!
Building a workspace
Let's start from scratch:
PLUGIN_NAME=bananacargo init --lib ${PLUGIN_NAME}cd ${PLUGIN_NAME}
Next, add the following content, setting the crate up as a cdylib
and adding some important libraries:
[lib]crate-type = ["cdylib"][dependencies]vector-wasm = { version = "0.1", git = "https://github.com/timberio/vector/"}serde_json = "1.0"serde = { version = "1.0", features = ["derive"] }anyhow = "1.0"
Now, scaffold out the minimal structure of the crate, this one does nothing, it just passes data on:
#![deny(improper_ctypes)]use std::convert::TryInto;use vector_wasm::{hostcall, Registration, Role};/// Perform one time initialization and registration.////// During this time Vector and the plugin can validate that they can indeed work together,/// do any one-time initialization, or validate configuration settings.////// It's required that the plugin call [`vector_wasm::Registration::register`] before returning.#[no_mangle]pub extern "C" fn init() {// Vector provides you with a [`vector_wasm::WasmModuleConfig`] to validate for yourself.let config = hostcall::config().unwrap();assert_eq!(config.role, Role::Transform);// Finally, pass Vector a [`vector_wasm::Registration`]Registration::transform().register().unwrap();}/// Process data starting from a given point in memory to another point.////// It's not necessary for the plugin to actually read, or parse this data.////// Call [`vector_wasm::hostcall::emit`] to emit a message out.////// # Returns////// This function should return the number of emitted messages.#[no_mangle]pub extern "C" fn process(data: u32, length: u32) -> u32 {// Vector allocates a chunk of memory through the hostcall interface.// You can view the data as a slice of bytes.let data = unsafe {std::ptr::slice_from_raw_parts_mut(data as *mut u8, length.try_into().unwrap()).as_mut().unwrap()};hostcall::emit(data).unwrap();// Hint to Vector how many events you emitted.1}/// Perform one-time optional shutdown events.////// **Note:** There is no guarantee this function will be called before shutdown,/// as we may be forcibly killed.#[no_mangle]pub extern "C" fn shutdown() {}
Let's review what we've written quickly:
- We defined 3 functions,
init
,process
, andshutdown
(with#[no_mangle]
). These are the same as thelua
v2 transform. - In the
init
function, we calledRegistration::register
, registering with the host. - In the
process
function, we calledhostcall::emit
to emit events farther through the pipeline. - We called some
unsafe
code that read fromdata
withlength
as a byte array.
Before going farther, make sure it works (natively and in WASM!):
cargo buildcargo build --target wasm32-wasi
Writing Code
In our example we don't need to concern ourselves with the init()
and shutdown()
functions. Those are already set up
for us.
process
Each time the process
function is called, a new event is arriving at the transform. In the process
function, we get
a pointer to some data, and the length of it. From there, we can decide what we want to do.
Since Vector always gives us a correctly structured JSON representation of the Event
, we first need to parse that, and
extract our partial message.
We can take the pointer and length as a slice of u8
values, we can then parse that into a serde_json::Value
:
let data = unsafe {std::ptr::slice_from_raw_parts_mut(data as *mut u8, length.try_into().unwrap()).as_mut().unwrap()};let value: serde_json::Value = serde_json::de::from_slice(data).unwrap();
From this event, we need to take out the message, containing the partially complete JSON:
let message_field = value.get("message").and_then(serde_json::Value::as_str).unwrap_or(Default::default()); // Fall back to empty.
Next, we'll need to introduce some mutable state. We can use a std::sync::Mutex
and a once_cell::sync::Lazy
for this.
In your global scope at the top of the file, add these.
// A value which is initialized on the first access.use once_cell::sync::Lazy;// A mutual exclusion primitive useful for protecting shared datause std::sync::Mutex;// The working state of the string which represents a partial JSON.static STATE: Lazy<Mutex<String>> = Lazy::new(|| Mutex::new(String::new()));
We'll be able to lock, and then mutate, this STATE
, gradually building it up.
Back over in the process
function, let's sketch out an API that's semantically meaningful and testable.
match transform(&mut *STATE.lock().unwrap(), message_field) {Ok(Some(value)) => {let value_string = value.to_string();hostcall::emit(value_string.into_bytes()).unwrap();1},Ok(None) => 0,Err(e) => {// This is an unexpected error. The state will be reset.hostcall::raise(e).unwrap();0},}
This means we can go write all our plugin-specific code in a function called transform
that takes a mutable borrow
of the state, and an immutable view of the new arrival. If the transform
function returns a value with no error, we
emit it out of Vector. If it returns nothing, we presume it was only a partial of a JSON, and do nothing. If we get an
error, we just pass it up.
Now it's time to write our main logic!
fn transform(state: &mut String, arrival: impl AsRef<str>) -> Result<Option<serde_json::Value>, Error> {// Add the new arrival on.state.push_str(&mut arrival.as_ref());// Try to read from it using a "reader" non-destructively.let mut working_state = state.clone();let de = serde_json::Deserializer::from_str(&working_state);let mut de_stream = de.into_iter::<serde_json::Value>();let output = de_stream.next();match output {Some(Ok(value)) if value.is_object() => {let offset = de_stream.byte_offset();let new_state = working_state.split_off(offset);*state = new_state;Ok(Some(value))},Some(Ok(value)) => {// This is an unexpected error. The most we can do is report it and clear our state.state.clear();Err(anyhow::anyhow!("Was provided {}, not an object", value))}Some(Err(e)) if e.is_eof() => {// Not an error, keep going!Ok(None)},None => {// Not an error, keep going!Ok(None)}Some(Err(e)) => {// This is an unexpected error. The most we can do is report it and clear our state.state.clear();Err(e.into())},}}
Since we cleverly created a new function which returns a Result
we can use the ?
operator inside, allowing us to
easily bubble up errors.
We also structured our function for testability. Since the transform
function doesn't have side effects
(it takes in a mutable state, instead of directly mutating STATE
) we can more easily test it for correctness.
At this point you should try cargo check
or cargo build
to see if our code builds!
Testing (Rust)
For our module, we're actually going to do two phases of testing.
You might have noticed, that in our effort to create a partial JSON parser WASM plugin we also inadvertently created a native one.
Whoops. Guess we'll just have to use it for testing. 🙄
Let's create both a test module, and our first test like so:
#[cfg(test)]mod test {use super::*;#[test]fn single() -> Result<(), Error> {let expected = serde_json::json!({"foo": "bar",});let mut working_buffer = String::from("");let out_result = transform(&mut working_buffer,&r#"{ "foo": "bar" }"#,);assert!(out_result.is_ok());let out_option = out_result?;assert!(out_option.is_some());let out_json = out_option.unwrap();assert_eq!(out_json, expected);Ok(())}}
Try to write a few more!
These tests are excellent for making sure that our transform
works as expected, but you've probably noticed
that there's some code we're not testing, and we're not really testing it inside Vector, either.
Let's solve that next! First though, we need to build our wasm
plugin.
cargo build --release --target wasm32-wasi
Our new module is now located at target/wasm32-wasi/release/${PLUGIN_NAME}.wasm
.
Testing (Vector)
As you may recall from the Unit Testing Guide, Vector supports testing configurations right out of the box. We'll use this to test our new WASM plugin.
Using your WASM capable Vector, create a configuration, adding the plugin name as appropriate:
data_dir = "/var/lib/vector/"[sources.source0]max_length = 102400type = "stdin"[transforms.transform0]inputs = ["source0"]type = "wasm"module = "target/wasm32-wasi/release/${PLUGIN_NAME}.wasm"artifact_cache = "tmp"[sinks.sink0]healthcheck = trueinputs = ["transform0"]type = "console"encoding = "json"buffer.type = "memory"buffer.max_events = 500buffer.when_full = "block"
Then, using your WASM capable Vector build take it for a test drive:
ana@autonoma:~/git/timberio/banana$ ../vector/target/release/vector --config banana.tomlAug 25 10:29:19.705 INFO vector: Log level "info" is enabled.Aug 25 10:29:19.706 INFO vector: Loading configs. path=["banana.toml"]Aug 25 10:29:19.708 INFO vector: Vector is starting. version="0.11.0" git_version="v0.9.0-530-g1b9eadd" released="Mon, 17 Aug 2020 20:48:21 +0000" arch="x86_64"Aug 25 10:29:19.708 INFO vector::sources::stdin: Capturing STDIN.Aug 25 10:29:19.708 INFO vector::internal_events::wasm::compilation: WASM Compilation via `lucet` state="beginning" role="transform"Aug 25 10:29:19.708 INFO vector::internal_events::wasm::compilation: WASM Compilation via `lucet` state="cached" role="transform"Aug 25 10:29:19.708 INFO vector::topology: Running healthchecks.Aug 25 10:29:19.708 INFO vector::topology: Starting source "source0"Aug 25 10:29:19.708 INFO vector::topology: Starting transform "transform0"Aug 25 10:29:19.708 INFO vector::topology: Starting sink "sink0"Aug 25 10:29:19.708 INFO vector::topology::builder: Healthcheck: Passed.{ "foo": "bar" }{"foo":"bar"}123Aug 25 10:29:29.478 ERROR transform{name=transform0 type=wasm}: vector::wasm: WASM plugin errored: Was provided 123, not an object{ "foo":"bar" }{"foo":"bar"}Aug 25 10:29:38.135 INFO vector::shutdown: All sources have finished.Aug 25 10:29:38.135 INFO source{name=source0 type=stdin}: vector::sources::stdin: finished sendingAug 25 10:29:38.135 INFO vector: Shutting down.
Good enough to start.
Before you go and deploy it, give it a more thorough testing, try using a file/wasm/file
pipeline and checking
to make sure the results are your expectation. Try to also think of some potential issues!
When you have some ideas of what to test, you can add a new behavior test to Vector:
[[tests]]name = "test"[[tests.inputs]]insert_at = "transform0"type = "log"log_fields.message = "{ \"foo\":"[[tests.inputs]]insert_at = "transform0"type = "log"log_fields.message = "\"bar\" }"[[tests.outputs]]extract_from = "transform0"[[tests.outputs.conditions]]"foo.equals" = "bar"
Running the test:
ana@autonoma:~/git/timberio/banana$ ../vector/target/release/vector test banana.tomlAug 25 10:49:42.370 INFO vector: Log level "info" is enabled.Running banana.toml testsAug 25 10:49:42.374 INFO vector::internal_events::wasm::compilation: WASM Compilation via `lucet` state="beginning" role="transform"Aug 25 10:49:42.374 INFO vector::internal_events::wasm::compilation: WASM Compilation via `lucet` state="cached" role="transform"test banana.toml: test ... passed
Perfect! Great job!
Next steps
You may have noticed, the plugin we made does not persist timestamp
or host
keys. Our plugin also doesn't support
any options, such as changing the key to parse from, or the key to save to.
These would be great next steps, but you are free to soar as you please! 🐦
Worked example
#![deny(improper_ctypes)]use std::{convert::TryInto, sync::Mutex};use once_cell::sync::Lazy;use vector_wasm::{hostcall, Registration, Role};use anyhow::Error;static STATE: Lazy<Mutex<String>> = Lazy::new(|| Mutex::new(String::new()));/// Perform one time initialization and registration.////// During this time Vector and the plugin can validate that they can indeed work together,/// do any one-time initialization, or validate configuration settings.////// It's required that the plugin call [`vector_wasm::Registration::register`] before returning.#[no_mangle]pub extern "C" fn init() {// Vector provides you with a [`vector_wasm::WasmModuleConfig`] to validate for yourself.let config = hostcall::config().unwrap();assert_eq!(config.role, Role::Transform);// Finally, pass Vector a [`vector_wasm::Registration`]Registration::transform().register().unwrap();}/// Process data starting from a given point in memory to another point.////// It's not necessary for the plugin to actually read, or parse this data.////// Call [`vector_wasm::hostcall::emit`] to emit a message out.////// # Returns////// This function should return the number of emitted messages.#[no_mangle]pub extern "C" fn process(data: u32, length: u32) -> u32 {// Vector allocates a chunk of memory through the hostcall interface.// You can view the data as a slice of bytes.let data = unsafe {std::ptr::slice_from_raw_parts_mut(data as *mut u8, length.try_into().unwrap()).as_mut().unwrap()};let value: serde_json::Value = serde_json::de::from_slice(data).unwrap();let message_field = value.get("message").and_then(serde_json::Value::as_str).unwrap_or(Default::default()); // Fall back to empty.match transform(&mut *STATE.lock().unwrap(), message_field) {Ok(Some(value)) => {let value_string = value.to_string();hostcall::emit(value_string.into_bytes()).unwrap();1},Ok(None) => 0,Err(e) => {// This is an unexpected error. The most we can do is report it and clear our state.hostcall::raise(e).unwrap();0},}}/// Perform one-time optional shutdown events.////// **Note:** There is no guarantee this function will be called before shutdown,/// as we may be forcibly killed.#[no_mangle]pub extern "C" fn shutdown() {}fn transform(state: &mut String, arrival: impl AsRef<str>) -> Result<Option<serde_json::Value>, Error> {// Add the new arrival on.state.push_str(&mut arrival.as_ref());// Try to read from it using a "reader" non-destructively.let mut working_state = state.clone();let de = serde_json::Deserializer::from_str(&working_state);let mut de_stream = de.into_iter::<serde_json::Value>();let output = de_stream.next();match output {Some(Ok(value)) if value.is_object() => {let offset = de_stream.byte_offset();let new_state = working_state.split_off(offset);*state = new_state;Ok(Some(value))},Some(Ok(value)) => {// This is an unexpected error. The most we can do is report it and clear our state.state.clear();Err(anyhow::anyhow!("Was provided {}, not an object", value))}Some(Err(e)) if e.is_eof() => {// Not an error, keep going!Ok(None)},None => {// Not an error, keep going!Ok(None)}Some(Err(e)) => {// This is an unexpected error. The most we can do is report it and clear our state.state.clear();Err(e.into())},}}#[cfg(test)]mod test {use super::*;#[test]fn single() -> Result<(), Error> {let expected = serde_json::json!({"foo": "bar",});let mut working_buffer = String::from("");let out_result = transform(&mut working_buffer,&r#"{ "foo": "bar" }"#,);assert!(out_result.is_ok());let out_option = out_result?;assert!(out_option.is_some());let out_json = out_option.unwrap();assert_eq!(out_json, expected);Ok(())}#[test]fn errors() -> Result<(), Error> {let mut working_buffer = String::from("");let out_result = transform(&mut working_buffer,&r#"{ "foo" }"#,);assert!(out_result.is_err());Ok(())}#[test]fn double() -> Result<(), Error> {let expected = serde_json::json!({"foo": "bar",});let mut working_buffer = String::from("");let out_result = transform(&mut working_buffer,&r#"{ "foo":"#,);assert!(out_result.is_ok());let out_option = out_result?;assert!(out_option.is_none());let out_result = transform(&mut working_buffer,&r#""bar" }"#,);assert!(out_result.is_ok());let out_option = out_result?;assert!(out_option.is_some());let out_json = out_option.unwrap();assert_eq!(out_json, expected);Ok(())}#[test]fn multiple_expected() -> Result<(), Error> {let expected = serde_json::json!({"foo": "bar",});let mut working_buffer = String::from("");let out_result = transform(&mut working_buffer,&r#"{ "foo":"#,);assert!(out_result.is_ok());let out_option = out_result?;assert!(out_option.is_none());let out_result = transform(&mut working_buffer,&r#""bar" }"#,);assert!(out_result.is_ok());let out_option = out_result?;assert!(out_option.is_some());let out_json = out_option.unwrap();assert_eq!(out_json, expected);let expected = serde_json::json!({"baz": "bean",});let out_result = transform(&mut working_buffer,&r#"{ "baz":"#,);assert!(out_result.is_ok());let out_option = out_result?;assert!(out_option.is_none());let out_result = transform(&mut working_buffer,&r#""bean" }"#,);assert!(out_result.is_ok());let out_option = out_result?;assert!(out_option.is_some());let out_json = out_option.unwrap();assert_eq!(out_json, expected);Ok(())}}