From 14bff3de5e2a0c9f162bd32e49e1fdbbcf898e4f Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Fri, 29 Nov 2024 17:42:24 +0100 Subject: [PATCH] add querier --- Cargo.lock | 52 ++++++------- Cargo.toml | 4 +- examples/README.md | 18 +++++ examples/z_querier.py | 158 ++++++++++++++++++++++++++++++++++++++++ src/query.rs | 57 ++++++++++++++- src/session.rs | 29 +++++++- tests/examples_check.py | 37 ++++++++++ tests/test_session.py | 51 +++++++++++++ 8 files changed, 376 insertions(+), 30 deletions(-) create mode 100644 examples/z_querier.py diff --git a/Cargo.lock b/Cargo.lock index 14c9efde..716392a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2719,7 +2719,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "ahash", "async-trait", @@ -2765,7 +2765,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "zenoh-collections", ] @@ -2773,7 +2773,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "tracing", "uhlc", @@ -2784,12 +2784,12 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" [[package]] name = "zenoh-config" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "json5", "num_cpus", @@ -2810,7 +2810,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "lazy_static", "tokio", @@ -2821,7 +2821,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "aes", "hmac", @@ -2834,7 +2834,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "bincode", "flume", @@ -2851,7 +2851,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "hashbrown", "keyed-set", @@ -2865,7 +2865,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -2882,7 +2882,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "flume", @@ -2906,7 +2906,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "base64 0.22.1", @@ -2932,7 +2932,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "socket2", @@ -2949,7 +2949,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "base64 0.22.1", @@ -2978,7 +2978,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "socket2", @@ -2997,7 +2997,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "nix", @@ -3015,7 +3015,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "futures-util", @@ -3035,7 +3035,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "proc-macro2", "quote", @@ -3046,7 +3046,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "git-version", "libloading", @@ -3062,7 +3062,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "const_format", "rand", @@ -3089,7 +3089,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "anyhow", ] @@ -3097,7 +3097,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "lazy_static", "ron", @@ -3110,7 +3110,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "event-listener", "futures", @@ -3123,7 +3123,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "futures", "tokio", @@ -3136,7 +3136,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "crossbeam-utils", @@ -3169,7 +3169,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#7e044ad16729af35ee0981c362369c60012cfa11" +source = "git+https://github.com/DenisBiryukov91/zenoh.git?branch=querier#77dde4778fce33680e0f40f6ed0fc96311ee85c0" dependencies = [ "async-trait", "const_format", diff --git a/Cargo.toml b/Cargo.toml index 5669243b..5bfea808 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,5 +50,5 @@ pyo3 = { version = "0.21.2", features = [ "experimental-declarative-modules", ] } validated_struct = "2.1.0" -zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["unstable", "internal"], default-features = false } -zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["internal"], optional = true } +zenoh = { version = "1.0.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "querier", features = ["unstable", "internal"], default-features = false } +zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "querier", features = ["internal"], optional = true } diff --git a/examples/README.md b/examples/README.md index d5610577..f488cf53 100644 --- a/examples/README.md +++ b/examples/README.md @@ -106,6 +106,24 @@ or python3 z_get.py -s 'demo/**' ``` +### z_querier + +Continuously sends query messages for a selector. +The queryables with a matching path or selector (for instance [z_queryable](#z_queryable) and [z_storage](#z_storage)) +will receive these queries and reply with paths/payloads that will be received by the querier's query callback. + +Typical usage: + + ```bash + python3 z_querier.py + ``` + +or + + ```bash + python3 z_get.py -s 'demo/**' + ``` + ### z_queryable Creates a queryable function with a key expression. diff --git a/examples/z_querier.py b/examples/z_querier.py new file mode 100644 index 00000000..316f9bbf --- /dev/null +++ b/examples/z_querier.py @@ -0,0 +1,158 @@ +# +# Copyright (c) 2024 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import itertools +import time +from typing import Optional, Tuple + +import zenoh + + +def main( + conf: zenoh.Config, + selector: str, + target: zenoh.QueryTarget, + payload: str, + timeout: float, + iter: int, +): + # initiate logging + zenoh.init_log_from_env_or("error") + + print("Opening session...") + with zenoh.open(conf) as session: + keyexpr, params = split_selector(selector) + + print(f"Declaring Querier on '{keyexpr}'...") + querier = session.declare_querier(keyexpr, target=target, timeout=timeout) + + print("Press CTRL-C to quit...") + for idx in itertools.count() if iter is None else range(iter): + time.sleep(1.0) + buf = f"[{idx:4d}] {payload if payload else ''}" + print(f"Querying '{selector}' with payload '{buf}')...") + + replies = querier.get(parameters=params, payload=buf) + for reply in replies: + try: + print( + f">> Received ('{reply.ok.key_expr}': '{reply.ok.payload.to_string()}')" + ) + except: + print(f">> Received (ERROR: '{reply.err.payload.to_string()}')") + +def split_selector(selector: str) -> Tuple[str, Optional[str]]: + res = selector.split("?", 2) + if len(res) == 0: + return "", None + elif len(res) == 1: + return res[0], None + return res[0], res[1] + +if __name__ == "__main__": + # --- Command line argument parsing --- --- --- --- --- --- + import argparse + import json + + parser = argparse.ArgumentParser( + prog="z_querier", description="zenoh querier example" + ) + parser.add_argument( + "--mode", + "-m", + dest="mode", + choices=["peer", "client"], + type=str, + help="The zenoh session mode.", + ) + parser.add_argument( + "--connect", + "-e", + dest="connect", + metavar="ENDPOINT", + action="append", + type=str, + help="Endpoints to connect to.", + ) + parser.add_argument( + "--listen", + "-l", + dest="listen", + metavar="ENDPOINT", + action="append", + type=str, + help="Endpoints to listen on.", + ) + parser.add_argument( + "--selector", + "-s", + dest="selector", + default="demo/example/**", + type=str, + help="The selection of resources to query.", + ) + parser.add_argument( + "--target", + "-t", + dest="target", + choices=["ALL", "BEST_MATCHING", "ALL_COMPLETE", "NONE"], + default="BEST_MATCHING", + type=str, + help="The target queryables of the query.", + ) + parser.add_argument( + "--payload", + "-p", + dest="payload", + type=str, + help="An optional payload to send in the query.", + ) + parser.add_argument( + "--timeout", + "-o", + dest="timeout", + default=10.0, + type=float, + help="The query timeout", + ) + parser.add_argument( + "--config", + "-c", + dest="config", + metavar="FILE", + type=str, + help="A configuration file.", + ) + parser.add_argument( + "--iter", dest="iter", type=int, help="How many puts to perform" + ) + + args = parser.parse_args() + conf = ( + zenoh.Config.from_file(args.config) + if args.config is not None + else zenoh.Config() + ) + if args.mode is not None: + conf.insert_json5("mode", json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5("connect/endpoints", json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5("listen/endpoints", json.dumps(args.listen)) + target = { + "ALL": zenoh.QueryTarget.ALL, + "BEST_MATCHING": zenoh.QueryTarget.BEST_MATCHING, + "ALL_COMPLETE": zenoh.QueryTarget.ALL_COMPLETE, + }.get(args.target) + + main(conf, args.selector, target, args.payload, args.timeout, args.iter) diff --git a/src/query.rs b/src/query.rs index 894fa8f7..f2f0fda7 100644 --- a/src/query.rs +++ b/src/query.rs @@ -21,7 +21,7 @@ use pyo3::{ use crate::{ bytes::{Encoding, ZBytes}, config::ZenohId, - handlers::HandlerImpl, + handlers::{into_handler, HandlerImpl}, key_expr::KeyExpr, macros::{build, downcast_or_new, enum_mapper, option_wrapper, wrapper}, qos::{CongestionControl, Priority}, @@ -284,6 +284,61 @@ impl Queryable { } } +option_wrapper!(zenoh::query::Querier<'static>, "Undeclared querier"); + +#[pymethods] +impl Querier { + #[classmethod] + fn __class_getitem__(cls: &Bound, args: &Bound) -> PyObject { + generic(cls, args) + } + + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> PyResult<&'a Bound<'py, Self>> { + Self::check(this) + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + #[getter] + fn key_expr(&self) -> PyResult { + Ok(self.get_ref()?.key_expr().clone().into()) + } + + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None))] + fn get( + &self, + py: Python, + handler: Option<&Bound>, + #[pyo3(from_py_with = "Parameters::from_py_opt")] parameters: Option, + #[pyo3(from_py_with = "ZBytes::from_py_opt")] payload: Option, + #[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option, + #[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option, + ) -> PyResult> { + let this = self.get_ref()?; + let (handler, _) = into_handler(py, handler)?; + let builder = build!(this.get(), parameters, payload, encoding, attachment,); + wait(py, builder.with(handler)).map_into() + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("{:?}", self.get_ref()?)) + } +} + wrapper!(zenoh::query::Selector<'static>: Clone); downcast_or_new!(Selector, None); diff --git a/src/session.rs b/src/session.rs index 89920adb..0fe930c1 100644 --- a/src/session.rs +++ b/src/session.rs @@ -28,7 +28,7 @@ use crate::{ macros::{build, wrapper}, pubsub::{Publisher, Subscriber}, qos::{CongestionControl, Priority, Reliability}, - query::{QueryConsolidation, QueryTarget, Queryable, Reply, Selector}, + query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, Selector}, time::Timestamp, utils::{timeout, wait, IntoPython, MapInto}, }; @@ -225,6 +225,33 @@ impl Session { wait(py, builder).map_into() } + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (key_expr, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None))] + fn declare_querier( + &self, + py: Python, + #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, + target: Option, + #[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option< + QueryConsolidation, + >, + #[pyo3(from_py_with = "timeout")] timeout: Option, + congestion_control: Option, + priority: Option, + express: Option, + ) -> PyResult { + let builder = build!( + self.0.declare_querier(key_expr), + target, + consolidation, + timeout, + congestion_control, + priority, + express, + ); + wait(py, builder).map_into() + } + fn liveliness(&self) -> Liveliness { Liveliness(self.0.clone()) } diff --git a/tests/examples_check.py b/tests/examples_check.py index cba6c0fc..047e3e38 100644 --- a/tests/examples_check.py +++ b/tests/examples_check.py @@ -149,6 +149,43 @@ def test_z_get_z_queryable(): assert not z_queryable.errors +def test_z_querier_z_queryable(): + """Test z_querier & z_queryable""" + z_queryable = Pyrun("z_queryable.py", ["-k=demo/example/zenoh-python-queryable"]) + time.sleep(3) + ## z_querier: Able to get reply from queryable + z_querier = Pyrun( + "z_querier.py", ["-s=demo/example/zenoh-python-queryable", "-p=value"] + ) + time.sleep(5) + z_queryable.interrupt() + z_querier.interrupt() + + if not ( + "Received ('demo/example/zenoh-python-queryable': 'Queryable from Python!')" + in "".join(z_querier.stdout) + ): + z_querier.dbg() + z_queryable.dbg() + z_querier.errors.append("z_querier didn't get a response from z_queryable") + queryableout = "".join(z_queryable.stdout) + if not ( + "Received Query 'demo/example/zenoh-python-queryable' with payload: [ 0] value" + in queryableout + ): + z_queryable.errors.append("z_queryable didn't catch query [0]") + elif not ( + "Received Query 'demo/example/zenoh-python-queryable' with payload: [ 2] value" + in queryableout + ): + z_queryable.errors.append("z_queryable didn't catch query [2]") + if any(("z_queryable" in error) for error in z_queryable.errors): + z_queryable.dbg() + + assert not z_querier.errors + assert not z_queryable.errors + + def test_z_storage_z_sub(): """Test z_storage & z_sub.""" z_storage = Pyrun("z_storage.py") diff --git a/tests/test_session.py b/tests/test_session.py index d81165a0..13c59e3e 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -93,6 +93,56 @@ def queryable_callback(query: Query): queryable.undeclare() +def run_session_qrrrep(peer01: Session, peer02: Session): + keyexpr = "test/querier" + + for size in MSG_SIZE: + num_requests = 0 + num_replies = 0 + num_errors = 0 + + def queryable_callback(query: Query): + nonlocal num_requests + query.reply(keyexpr, bytes(size)) + num_requests += 1 + + print("[QR][01c] Queryable on peer01 session") + queryable = peer01.declare_queryable( + keyexpr, queryable_callback, complete=False + ) + + time.sleep(SLEEP) + + print(f"[QR][02c] Declaring querier on peer02 session.") + querier = peer02.declare_querier(keyexpr) + print(f"[QR][03c] Sending {MSG_COUNT} queries.") + for _ in range(MSG_COUNT): + replies = querier.get() + for reply in replies: + try: + unwraped_reply = reply.ok + except: + unwraped_reply = None + + if unwraped_reply: + assert len(unwraped_reply.payload) == size + num_replies += 1 + else: + num_errors += 1 + + time.sleep(SLEEP) + print(f"[QR][03c] Got on querier {num_replies}/{MSG_COUNT} replies.") + assert num_replies == MSG_COUNT + assert num_requests == MSG_COUNT + assert num_errors == 0 + + print("[QR][04c] Undeclare querier on peer02 session") + querier.undeclare() + + print("[QR][05c] Unqueryable on peer01 session") + queryable.undeclare() + + def run_session_pubsub(peer01: Session, peer02: Session): keyexpr = "test_pub/session" msg = "Pub Message".encode() @@ -141,4 +191,5 @@ def test_session(): (peer01, peer02) = open_session(["tcp/127.0.0.1:17447"]) run_session_qryrep(peer01, peer02) run_session_pubsub(peer01, peer02) + run_session_qrrrep(peer01, peer02) close_session(peer01, peer02)