Skip to content

Getting started (v0.4.0)

J-Loudet edited this page Jan 23, 2023 · 7 revisions

🧑‍💻 The files and code excerpts written below are available in the Zenoh-Flow examples repository.

Table of contents

  1. Bill of Materials
  2. Nodes
    1. Source: zenoh-sub
    2. Operator: greetings-maker
    3. Sink: file-writer
  3. Application descriptor
  4. Starting the data flow

Now that you have a working installation, let us create our first Zenoh-Flow application.

As is traditional in computer science, we will start with a simple "Hello, World!": we read a chain of characters and replace "World" in the previous sentence with what we read. For instance, if we read "Alice" our application should produce: "Hello, Alice!".

This example is a perfect opportunity to illustrate a key element of Zenoh-Flow: applications are meant to run as background processes. It is not recommended to run a GUI or TUI in a Zenoh-Flow application.

Bill of Materials

To launch a Zenoh-Flow application we need the following:

  • A set of "compatible" types: as data can cross devices or programming languages, a common representation is required. Zenoh-Flow provides this common representation for basic Rust types. We will use these for our "Hello, World!" example.

  • A set of nodes: descriptors paired with shared libraries or Python scripts that implement Zenoh-Flow’s interface. These are the building blocks of any Zenoh-Flow application.

  • The application (or data flow) descriptor: this describes the structure of our application, i.e. the nodes involved, how they are connected and where to find them.

Nodes

Our application will (i) obtain a name from a Zenoh subscriber, (ii) process it and (iii) write the result in a file. We will thus have three nodes:

  • our Zenoh subscriber: a Source,
  • our processing node: an Operator,
  • our node that logs the result to a file: a Sink.

The Source, Operator and Sink are the three types of nodes that Zenoh-Flow supports. A Source fetches data from outside an application, an Operator computes over data and a Sink sends data "outside" an application.

There are two main differences between these nodes: (i) their Input / Output ports and (ii) the order in which Zenoh-Flow starts them.

  • A Source only has Output ports and is started last.
  • An Operator has both Input and Output ports and is started second.
  • A Sink only has Input ports and is started first.

Let us see next how to implement them and write their matching descriptor files.

Source: zenoh-sub

The zenoh-sub node is a Source and has to expose:

  1. a name,
  2. a uri: where Zenoh-Flow can find the shared library,
  3. its outputs: an exhaustive list of their identifiers and types. Note that two outputs cannot share the same id.

💡 The identifiers provided in the outputs section are important! Zenoh-Flow will create a data structure where the keys are equal to these identifiers.

name: zenoh-sub

# Do not forget to change the extension depending on your operating system!
# Linux  -> .so     { libzenoh_sub.so    }
# Mac OS -> .dylib  { libzenoh_sub.dylib }
uri: file:///absolute/path/to/target/debug/libzenoh_sub.so

outputs:
  - id: out
    type: string

To create a Source two traits must be implemented:

  • Source: how to create an instance of the Source.
  • Node: the implementation of an iteration, the method Zenoh-Flow will call in a loop.

Below is our proposed implementation of the Source node, zenoh-sub. Notice how we provide the same id "out" from the YAML description in the call to outputs.take("out").

use flume::Receiver;
use zenoh::{prelude::r#async::*, subscriber::Subscriber};
use zenoh_flow::prelude::*;

#[export_source]
pub struct ZenohSub<'a> {
    output: Output<String>,
    subscriber: Subscriber<'a, Receiver<Sample>>,
}

#[async_trait::async_trait]
impl<'a> Node for ZenohSub<'a> {
    async fn iteration(&self) -> Result<()> {
        let name = self.subscriber.recv_async().await?;
        self.output.send(name.value.to_string(), None).await
    }
}

#[async_trait::async_trait]
impl<'a> Source for ZenohSub<'a> {
    async fn new(
        context: Context,
        _configuration: Option<Configuration>,
        mut outputs: Outputs,
    ) -> Result<Self> {
        let subscriber = context
            .zenoh_session()
            .declare_subscriber("zf/getting-started/hello")
            .res()
            .await?;

        Ok(ZenohSub {
            output: outputs.take("out").expect("Could not find output 'out'"),
            subscriber,
        })
    }
}

Operator: greetings-maker

The greetings-maker node is an Operator and has to expose:

  1. a name;
  2. a uri: where Zenoh-Flow can find the shared library;
  3. its inputs: an exhaustive list of their identifiers and types. Two inputs cannot share the same id;
  4. its outputs: an exhaustive list of their identifiers and types. Again, two outputs cannot share the same id (but an input can have the same name as an output).
id: greetings-maker

# Do not forget to change the extension depending on your operating system!
# Linux  -> .so
# Mac OS -> .dylib
uri: file:///absolute/path/to/target/debug/libgreetings_maker.so

inputs:
  - id: name
    type: string

outputs:
  - id: greeting
    type: string

To create an Operator two traits must be implemented:

  • Operator: how to create an instance of the Operator.
  • Node: the implementation of an iteration, the method Zenoh-Flow will call in a loop.

Below is our proposed implementation of the Operator node, greetings-maker. Notice how we provide the same ids from the YAML description in the calls to inputs.take("name") and outputs.take("greeting").

use zenoh_flow::prelude::*;

#[export_operator]
pub struct GreetingsMaker {
    input: Input<String>,
    output: Output<String>,
}

#[async_trait::async_trait]
impl Operator for GreetingsMaker {
    async fn new(
        _context: Context,
        _configuration: Option<Configuration>,
        mut inputs: Inputs,
        mut outputs: Outputs,
    ) -> Result<Self> {
        Ok(GreetingsMaker {
            input: inputs.take("name").expect("No input 'name' found"),
            output: outputs
                .take("greeting")
                .expect("No output 'greeting' found"),
        })
    }
}

#[async_trait::async_trait]
impl Node for GreetingsMaker {
    async fn iteration(&self) -> Result<()> {
        let (message, _) = self.input.recv().await?;
        if let Message::Data(characters) = message {
            let name = characters.trim_end();

            let greeting = match name {
                "Sofia" | "Leonardo" => format!("Ciao, {}!\n", name),
                "Lucia" | "Martin" => format!("¡Hola, {}!\n", name),
                "Jade" | "Gabriel" => format!("Bonjour, {} !\n", name),
                _ => format!("Hello, {}!\n", name),
            };

            return self.output.send(greeting, None).await;
        }

        Ok(())
    }
}

Feel free to modify the match arms and add more variations!

Sink: file-writer

The file-writer node is a Sink and it should thus expose:

  1. a name,
  2. a uri: where Zenoh-Flow can find the shared library,
  3. its inputs: an exhaustive list of their identifiers and types. Two inputs cannot share the same id.
id: file-writer

# Do not forget to change the extension depending on your operating system!
# Linux   -> .so
# Mac OS  -> .dylib
uri: file:///absolute/path/to/target/debug/libfile_writer.so

inputs:
  - id: in
    type: string

In case it was not obvious 😉, to create a Sink two traits must be implemented:

  • Sink: how to create an instance of the Sink.
  • Node: the implementation of an iteration, the method Zenoh-Flow will call in a loop.

Below is our proposed implementation of the Sink node, file-writer. Notice how we provide the same ids from the YAML description in the calls to inputs.take("in").

use async_std::{fs::File, io::WriteExt, sync::Mutex};
use zenoh_flow::prelude::*;

#[export_sink]
pub struct FileWriter {
    input: Input<String>,
    file: Mutex<File>,
}

#[async_trait::async_trait]
impl Node for FileWriter {
    async fn iteration(&self) -> Result<()> {
        let (message, _) = self.input.recv().await?;

        if let Message::Data(data) = message {
            let mut file = self.file.lock().await;
            file.write_all(data.as_bytes())
                .await
                .map_err(|e| zferror!(ErrorKind::IOError, "{:?}", e))?;
            return file
                .flush()
                .await
                .map_err(|e| zferror!(ErrorKind::IOError, "{:?}", e).into());
        }

        Ok(())
    }
}

#[async_trait::async_trait]
impl Sink for FileWriter {
    async fn new(
        _context: Context,
        _configuration: Option<Configuration>,
        mut inputs: Inputs,
    ) -> Result<Self> {
        Ok(FileWriter {
            file: Mutex::new(
                File::create("/tmp/greetings.txt")
                    .await
                    .expect("Could not create '/tmp/greetings.txt'"),
            ),
            input: inputs.take("in").expect("No Input called 'in' found"),
        })
    }
}

And we're set! We have all the building blocks we need to describe our application.

Application descriptor

Let us write the data flow step by step. We first need to give it a name:

flow: hello-world

We then need to specify the nodes that compose it and where we can find their description. Each type of node has a dedicated section. For our example, the declaration can look like this:

vars:
    BASE_DIR: file://todo!


sources:
  - id: zenoh-sub
    descriptor: "{{ BASE_DIR }}/zenoh-sub.yaml"


operators:
  - id: greetings-maker
    descriptor: "{{ BASE_DIR }}/greetings-maker.yaml"


sinks:
  - id: file-writer
    descriptor: "{{ BASE_DIR }}/file-writer.yaml"

💡 The section vars is a special section that Zenoh-Flow uses to do pre-processing (more specifically, string replacement). Every time Zenoh-Flow encounters two pairs of curly braces, also as known as mustache, it will replace them with the value associated with the variable enclosed inside. Thus, for the above declaration, every occurrence of {{ BASE_DIR }} will be replaced with todo!. See this page for more information.

To complete our application descriptor we need to specify how the nodes are connected: the links that exist. A link connects an Output port to an Input port that are of the same type.

links:
  - from:
      node: zenoh-sub
      output: out
    to:
      node: greetings-maker
      input: name

  - from:
      node: greetings-maker
      output: greeting
    to:
      node: file-writer
      input: in

⚠️ The output and input in the links section must match what is declared in their respective YAML descriptor (the values throughout this guide are consistent).

That's it! The application is complete, all that is left to do is to launch it.

Starting the data-flow

If you don't already have a Zenoh router (with the storage plugin configured correctly!) running or a Zenoh-Flow daemon, we invite you to read the installation page.

Provided that the paths, ports and links are correct in the data-flow.yaml file (i.e. adapted to your machine), we can ask our running Zenoh-Flow daemon to launch our application:

$ zfctl launch getting-started/data-flow.yaml

After which you should see:

[2022-12-16T14:31:19Z INFO  zenoh_flow_daemon::runtime] Instantiating: hello-world
[…]
[2022-12-16T14:31:20Z INFO  zenoh_flow_daemon::runtime] Done Instantiation Flow hello-world - Instance UUID: 416817a0-0068-412a-afcb-f33d35c21d7d

Which indicates that our application is running in the background. Great! To see it producing results, let us constantly read the contents of the output file /tmp/greetings.txt:

# Launch this command in a separate terminal
$ tail -f /tmp/greetings.txt

Awesome! Now for some live interactions. As our Source, zenoh-sub, is subscribed to "zf/getting-started/hello" we need to publish values on this key expression. Several options are possible:

# If you have compiled the `z_put` example of Zenoh in debug
$ ./zenoh/target/release/z_put -k "zf/getting-started/hello" -v "Alice"

# If you have enabled the REST plugin of Zenoh
$ curl -X PUT -H "content-type:text/plain" -d 'Bob' http://localhost:8000/zf/getting-started/hello

The terminal should display the following, indicating that our application is running smoothly:

Hello, Bob!
Hello, Alice!