forked from eclipse-zenoh/zenoh-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
z_pub_thr.py
96 lines (86 loc) · 2.52 KB
/
z_pub_thr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#
# Copyright (c) 2022 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <[email protected]>
#
import zenoh
def main(conf: zenoh.Config, payload_size: int):
# initiate logging
zenoh.init_log_from_env_or("error")
data = bytearray()
for i in range(0, payload_size):
data.append(i % 10)
data = zenoh.ZBytes(data)
congestion_control = zenoh.CongestionControl.BLOCK
with zenoh.open(conf) as session:
pub = session.declare_publisher(
"test/thr", congestion_control=congestion_control
)
print("Press CTRL-C to quit...")
while True:
pub.put(data)
# --- Command line argument parsing --- --- --- --- --- ---
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(
prog="z_pub_thr", description="zenoh throughput pub example"
)
parser.add_argument(
"--mode",
"-m",
dest="mode",
choices=["peer", "client"],
type=str,
help="The zenoh session mode.",
)
parser.add_argument(
"--connect",
"-e",
dest="connect",
metavar="ENDPOINT",
action="append",
type=str,
help="Endpoints to connect to.",
)
parser.add_argument(
"--listen",
"-l",
dest="listen",
metavar="ENDPOINT",
action="append",
type=str,
help="Endpoints to listen on.",
)
parser.add_argument(
"payload_size", type=int, help="Sets the size of the payload to publish."
)
parser.add_argument(
"--config",
"-c",
dest="config",
metavar="FILE",
type=str,
help="A configuration file.",
)
args = parser.parse_args()
conf = (
zenoh.Config.from_file(args.config)
if args.config is not None
else zenoh.Config()
)
if args.mode is not None:
conf.insert_json5("mode", json.dumps(args.mode))
if args.connect is not None:
conf.insert_json5("connect/endpoints", json.dumps(args.connect))
if args.listen is not None:
conf.insert_json5("listen/endpoints", json.dumps(args.listen))
main(conf, args.payload_size)