From 695619b561616c4eac5e377f5c08f965748e6255 Mon Sep 17 00:00:00 2001 From: Baptiste Jonglez Date: Thu, 18 Apr 2024 18:21:49 +0200 Subject: [PATCH] Bring back asyncio examples The examples were added in #48 and subsequently deleted in 280385977323c411bb6f80358dadae83413237d9 It's not clear why these asyncio examples were removed: they appear to work fine and are important to show that zenoh is compatible with asyncio. Tested with eclipse-zenoh==0.10.1rc0 and Python 3.11 on Debian. --- examples/asyncio/z_delete.py | 76 ++++++++++++++++++++ examples/asyncio/z_get.py | 94 ++++++++++++++++++++++++ examples/asyncio/z_get_parallel.py | 104 +++++++++++++++++++++++++++ examples/asyncio/z_info.py | 71 ++++++++++++++++++ examples/asyncio/z_pub.py | 96 +++++++++++++++++++++++++ examples/asyncio/z_pull.py | 95 ++++++++++++++++++++++++ examples/asyncio/z_put.py | 81 +++++++++++++++++++++ examples/asyncio/z_queryable.py | 112 +++++++++++++++++++++++++++++ examples/asyncio/z_scout.py | 33 +++++++++ examples/asyncio/z_sub.py | 91 +++++++++++++++++++++++ 10 files changed, 853 insertions(+) create mode 100644 examples/asyncio/z_delete.py create mode 100644 examples/asyncio/z_get.py create mode 100644 examples/asyncio/z_get_parallel.py create mode 100644 examples/asyncio/z_info.py create mode 100644 examples/asyncio/z_pub.py create mode 100644 examples/asyncio/z_pull.py create mode 100644 examples/asyncio/z_put.py create mode 100644 examples/asyncio/z_queryable.py create mode 100644 examples/asyncio/z_scout.py create mode 100644 examples/asyncio/z_sub.py diff --git a/examples/asyncio/z_delete.py b/examples/asyncio/z_delete.py new file mode 100644 index 00000000..46ca5cd5 --- /dev/null +++ b/examples/asyncio/z_delete.py @@ -0,0 +1,76 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +import argparse +import json +import zenoh +from zenoh import config + + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_put', + description='zenoh put example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--key', '-k', dest='key', + default='/demo/example/zenoh-python-put', + type=str, + help='The key expression matching resources to delete.') + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + key = args.key + + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + print("Deleting resources matching '{}'...".format(key)) + await session.delete(key) + + await session.close() + +asyncio.run(main()) diff --git a/examples/asyncio/z_get.py b/examples/asyncio/z_get.py new file mode 100644 index 00000000..0b5056b1 --- /dev/null +++ b/examples/asyncio/z_get.py @@ -0,0 +1,94 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +import argparse +import json +import zenoh +from zenoh import config, QueryTarget + + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_get', + description='zenoh get example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--selector', '-s', dest='selector', + default='/demo/example/**', + type=str, + help='The selection of resources to query.') + parser.add_argument('--target', '-t', dest='target', + choices=['ALL', 'BEST_MATCHING', + 'ALL_COMPLETE', 'NONE'], + default='ALL', + type=str, + help='The target queryables of the query.') + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + selector = args.selector + target = { + 'ALL': QueryTarget.All(), + 'BEST_MATCHING': QueryTarget.BestMatching(), + 'ALL_COMPLETE': QueryTarget.AllComplete(), + 'NONE': QueryTarget.No()}.get(args.target) + + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + print("Sending Query '{}'...".format(selector)) + replies = await session.get(selector, target=target) + for reply in replies: + if isinstance(reply.sample, zenoh.Sample): + print(">> Received ('{}': '{}')" + .format(reply.sample.key_expr, reply.sample.payload.decode("utf-8"))) + else: + print(">> Received (ERROR: '{}')" + .format(reply.sample.payload.decode("utf-8"))) + + await session.close() + +asyncio.run(main()) diff --git a/examples/asyncio/z_get_parallel.py b/examples/asyncio/z_get_parallel.py new file mode 100644 index 00000000..c9e35a6a --- /dev/null +++ b/examples/asyncio/z_get_parallel.py @@ -0,0 +1,104 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +import argparse +import json +import zenoh +from zenoh import config, QueryTarget + + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_get_parallel', + description='zenoh parallel get example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--selector', '-s', dest='selector', + default='/demo/example/**', + type=str, + help='The selection of resources to query.') + parser.add_argument('--target', '-t', dest='target', + choices=['ALL', 'BEST_MATCHING', + 'ALL_COMPLETE', 'NONE'], + default='ALL', + type=str, + help='The target queryables of the query.') + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + selector = args.selector + target = { + 'ALL': QueryTarget.All(), + 'BEST_MATCHING': QueryTarget.BestMatching(), + 'ALL_COMPLETE': QueryTarget.AllComplete(), + 'NONE': QueryTarget.No()}.get(args.target) + + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + async def do_query(sleep_time): + print("Sending Query '{}?(sleep={})'...".format(selector, sleep_time)) + replies = await session.get("{}?(sleep={})".format(selector, sleep_time), target=target) + for reply in replies: + if isinstance(reply.sample, zenoh.Sample): + print(">> Received ('{}': '{}')" + .format(reply.sample.key_expr, reply.sample.payload.decode("utf-8"))) + else: + print(">> Received (ERROR: '{}')" + .format(reply.sample.payload.decode("utf-8"))) + + start = time.time() + await asyncio.gather( + asyncio.create_task(do_query(1)), + asyncio.create_task(do_query(2)), + asyncio.create_task(do_query(3)), + ) + end = time.time() + print(f'Time: {end-start:.2f} sec') + + await session.close() + +asyncio.run(main()) diff --git a/examples/asyncio/z_info.py b/examples/asyncio/z_info.py new file mode 100644 index 00000000..c828e7a4 --- /dev/null +++ b/examples/asyncio/z_info.py @@ -0,0 +1,71 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +import argparse +import json +import zenoh +from zenoh import config + + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_info', + description='zenoh info example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + info = await session.info() + for key in info: + print("{} : {}".format(key, info[key])) + + await session.close() + +asyncio.run(main()) diff --git a/examples/asyncio/z_pub.py b/examples/asyncio/z_pub.py new file mode 100644 index 00000000..071d1738 --- /dev/null +++ b/examples/asyncio/z_pub.py @@ -0,0 +1,96 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +import argparse +import itertools +import json +import zenoh +from zenoh import config + + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_pub', + description='zenoh pub example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--key', '-k', dest='key', + default='/demo/example/zenoh-python-pub', + type=str, + help='The key expression to publish onto.') + parser.add_argument('--value', '-v', dest='value', + default='Pub from Python!', + type=str, + help='The value to publish.') + parser.add_argument("--iter", dest="iter", type=int, + help="How many puts to perform") + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + key = args.key + value = args.value + + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + print("Declaring key expression '{}'...".format(key), end='') + rid = await session.declare_expr(key) + print(" => RId {}".format(rid)) + + print("Declaring publication on '{}'...".format(rid)) + await session.declare_publication(rid) + + for idx in itertools.count() if args.iter is None else range(args.iter): + time.sleep(1) + buf = "[{:4d}] {}".format(idx, value) + print("Putting Data ('{}': '{}')...".format(rid, buf)) + await session.put(rid, bytes(buf, encoding='utf8')) + + await session.undeclare_publication(rid) + await session.undeclare_expr(rid) + await session.close() + +asyncio.run(main()) diff --git a/examples/asyncio/z_pull.py b/examples/asyncio/z_pull.py new file mode 100644 index 00000000..f49e9d0d --- /dev/null +++ b/examples/asyncio/z_pull.py @@ -0,0 +1,95 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +from datetime import datetime +import argparse +import json +import zenoh +from zenoh import Reliability, SubMode + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_pull', + description='zenoh pull example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--key', '-k', dest='key', + default='/demo/example/**', + type=str, + help='The key expression matching resources to pull.') + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + key = args.key + + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + async def listener(sample): + time = '(not specified)' if sample.source_info is None or sample.timestamp is None else datetime.fromtimestamp( + sample.timestamp.time) + print(">> [Subscriber] Received {} ('{}': '{}')" + .format(sample.kind, sample.key_expr, sample.payload.decode("utf-8"), time)) + + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + print("Creating Subscriber on '{}'...".format(key)) + + sub = await session.subscribe( + key, listener, reliability=Reliability.Reliable, mode=SubMode.Pull) + + print("Press to pull data...") + c = '\0' + while c != 'q': + c = sys.stdin.read(1) + if c == '': + time.sleep(1) + else: + sub.pull() + + await sub.close() + await session.close() + +asyncio.run(main()) diff --git a/examples/asyncio/z_put.py b/examples/asyncio/z_put.py new file mode 100644 index 00000000..d31e8f0d --- /dev/null +++ b/examples/asyncio/z_put.py @@ -0,0 +1,81 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +import argparse +import json +import zenoh +from zenoh import config + + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_put', + description='zenoh put example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--key', '-k', dest='key', + default='/demo/example/zenoh-python-put', + type=str, + help='The key expression to write.') + parser.add_argument('--value', '-v', dest='value', + default='Put from Python!', + type=str, + help='The value to write.') + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + key = args.key + value = args.value + + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + print("Putting Data ('{}': '{}')...".format(key, value)) + await session.put(key, value) + + await session.close() + +asyncio.run(main()) diff --git a/examples/asyncio/z_queryable.py b/examples/asyncio/z_queryable.py new file mode 100644 index 00000000..5840328e --- /dev/null +++ b/examples/asyncio/z_queryable.py @@ -0,0 +1,112 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +import argparse +import json +import zenoh +from zenoh import config, Sample + + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_queryable', + description='zenoh queryable example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--key', '-k', dest='key', + default='/demo/example/zenoh-python-queryable', + type=str, + help='The key expression matching queries to reply to.') + parser.add_argument('--value', '-v', dest='value', + default='Queryable from Python!', + type=str, + help='The value to reply to queries.') + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + key = args.key + value = args.value + + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + # Note: As an example the concrete implementation of the queryable callback is implemented here as a coroutine. + # It checks if the query's value_selector (the substring after '?') is a float, and if yes, sleeps for this number of seconds. + # Run example/asyncio/z_get_parallel.py example to see how 3 concurrent get() are executed in parallel in this z_queryable.py + async def queryable_corouting(query): + selector = query.selector + try: + sleep_time = selector.parse_value_selector().properties.get('sleep') + if sleep_time is not None: + print(" Sleeping {} secs before replying".format( + float(sleep_time))) + await asyncio.sleep(float(sleep_time)) + print(" SLEEP DONE") + except Exception as e: + print(" WARN: error in value selector: {}. Ignore it.".format(e)) + print(" Replying to query on {}".format(selector)) + reply = "{} (this is the reply to query on {})".format(value, selector) + query.reply(Sample(key_expr=key, value=reply.encode())) + + async def queryable_callback(query): + print(">> [Queryable ] Received Query '{}'".format(query.selector)) + # schedule a task that will call queryable_corouting(query) + asyncio.create_task(queryable_corouting(query)) + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + print("Creating Queryable on '{}'...".format(key)) + queryable = await session.queryable(key, queryable_callback) + + print("Enter 'q' to quit......") + c = '\0' + while c != 'q': + c = sys.stdin.read(1) + if c == '': + time.sleep(1) + + await queryable.close() + await session.close() + +asyncio.run(main()) diff --git a/examples/asyncio/z_scout.py b/examples/asyncio/z_scout.py new file mode 100644 index 00000000..4def468b --- /dev/null +++ b/examples/asyncio/z_scout.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +import argparse +import zenoh +from zenoh import WhatAmI + + +async def main(): + # initiate logging + zenoh.init_logger() + + print("Scouting...") + hellos = await zenoh.async_scout(WhatAmI.Peer | WhatAmI.Router, 1.0) + + for hello in hellos: + print(hello) + +asyncio.run(main()) diff --git a/examples/asyncio/z_sub.py b/examples/asyncio/z_sub.py new file mode 100644 index 00000000..006c136c --- /dev/null +++ b/examples/asyncio/z_sub.py @@ -0,0 +1,91 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import asyncio +import sys +import time +from datetime import datetime +import argparse +import json +import zenoh +from zenoh import Reliability, SubMode + + +async def main(): + # --- Command line argument parsing --- --- --- --- --- --- + parser = argparse.ArgumentParser( + prog='z_sub', + description='zenoh sub example') + parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') + parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') + parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') + parser.add_argument('--key', '-k', dest='key', + default='/demo/example/**', + type=str, + help='The key expression to subscribe to.') + parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + + args = parser.parse_args() + conf = zenoh.config_from_file( + args.config) if args.config is not None else zenoh.Config() + if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) + if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) + if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) + key = args.key + + # zenoh-net code --- --- --- --- --- --- --- --- --- --- --- + + async def listener(sample): + print(">> [Subscriber] Received {} ('{}': '{}')" + .format(sample.kind, sample.key_expr, sample.payload.decode("utf-8"))) + + # initiate logging + zenoh.init_logger() + + print("Openning session...") + session = await zenoh.async_open(conf) + + print("Creating Subscriber on '{}'...".format(key)) + + sub = await session.subscribe(key, listener, reliability=Reliability.Reliable, mode=SubMode.Push) + + print("Enter 'q' to quit...") + c = '\0' + while c != 'q': + c = sys.stdin.read(1) + if c == '': + time.sleep(1) + + await sub.close() + await session.close() + + +asyncio.run(main())