diff --git a/examples/README.md b/examples/README.md index fa933bc..9fe9ab4 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,6 +9,16 @@ This directory contains examples on using pynng for different tasks. * [`pair1_async.py`](./pair1_async.py): Demonstrates a polyamorous pair1 connection. +Several more examples are described in [pynng.ReadTheDocs](https://pynng.readthedocs.io/en/latest/core.html#available-protocols) + and implemented here + +* [`pair1_PolyAsync.py`](./pair1_PolyAsync.py): More elaborate demo of + polyamorous pair1 +* [`pubsub_1SingleApp.py`](./pubsub_1SingleApp.py): single app showing publish and subscribe +* [`pubsub_2publishAsync.py`](./pubsub_2publishAsync.py): pub/sub publisher side using Trio async +* [`pubsub_2subscribe.py`](./pubsub_2subscribe.py): pub/sub subscriber side, no threading +* [`pubsub_2subscribeAsync.py`](./pubsub_2subscribeAsync.py): pub/sub subscriber using Trio async + Adding an Example ----------------- diff --git a/examples/pair0Async.py b/examples/pair0Async.py new file mode 100644 index 0000000..8811ede --- /dev/null +++ b/examples/pair0Async.py @@ -0,0 +1,25 @@ +""" +pair0Async.py very simple pair of pair0 sockets doing send/receive in a trio.run + +Demonstrate how to use a pair0 socket asynchronously with Trio +Pair0 is very simple 1:1 bidirectional message passing. +this demo is kinda trivial as it runs both send + receive at same time +""" + + +import pynng +import trio + +# async function that sends a +async def send_and_recv(sender, receiver, message): + await sender.asend(message) + return await receiver.arecv() + +with pynng.Pair0(listen='tcp://127.0.0.1:54321') as s1, \ + pynng.Pair0(dial='tcp://127.0.0.1:54321') as s2: + # simplistic Trio async function + received = trio.run(send_and_recv, s1, s2, b'hello there old pal!') + print("received: ", received) + assert received == b'hello there old pal!' + + diff --git a/examples/pair1_PolyAsync.py b/examples/pair1_PolyAsync.py new file mode 100644 index 0000000..bceea64 --- /dev/null +++ b/examples/pair1_PolyAsync.py @@ -0,0 +1,98 @@ +""" +Demonstrate how to use a pair1 polyamorous sockets with Trio async + derived from the docs page + https://pynng.readthedocs.io/en/latest/core.html#available-protocols + +Pair1 sockets are similar to pair0 sockets. The difference is that while pair0 +supports only a single connection, pair1 sockets support _n_ one-to-one +connections. + +This program demonstrates how to use pair1 sockets. The key differentiator is +that with pair1 sockets, you must always specify the *pipe* that you want to +use for the message. + +""" + +# pynng mod of async polyamorus example in ReadTheDocs +# from the docs page https://pynng.readthedocs.io/en/latest/core.html#available-protocols +# +# Pair1 allows single Server/Listener to connect bi-directionally with multiple Client/Dialers +# it does NOT operate as a Publisher in that a listner.send() goes to ?? +# + +import sys, traceback +from pynng import Pair1, Timeout +print("begin Pair 1 polyamorous test") + +address = 'tcp://127.0.0.1:12343' +with Pair1(listen=address, polyamorous=True, recv_timeout=100) as s0, \ + Pair1(dial=address, polyamorous=True, recv_timeout=100) as s1, \ + Pair1(dial=address, polyamorous=True, recv_timeout=100) as s2: + print("opened all 3") + s0.send(b'hi everybody!') + s1.send(b'hello from s1') + s2.send(b'hello from s2') + print("sent all three") + print("recv_msg on s0") + msg1 = s0.recv_msg() + print(msg1.bytes) # prints b'hello from s1' + + msg2 = s0.recv_msg() + print(msg2.bytes) # prints b'hello from s2' + + print("recv on s1:") + msg01 = s1.recv() + print(msg01) # prints b'hello from s1' + + try: + print("recv on s2") + msg02 = s2.recv() + print(msg02) # prints b'hello from s2' + except Timeout: + print("Timeout on S2 waiting to hear from s0") + + print("send single msg responses") + msg1.pipe.send(b'hey s1') + msg2.pipe.send(b'hey s2') + print(s2.recv()) # prints b'hey s2' + print(s1.recv()) # prints b'hey s1' + + # beyond first msg, repeats will share the Pipe but not data + s1.send(b'more from s1') + morMsg = s0.recv_msg() + print("morMsg: ") + print(morMsg.bytes) + if morMsg.pipe == msg1.pipe: + print ("msg1 and morMsg share pipe") + else: + print ("msg1 and morMsg do NOT share pipe") + print("and msg1 still says:") + print(msg1.bytes) + + print("what if s0 does recv instead of recvMsg?") + s1.send(b'again from s1') + more = s0.recv() + print(more) +# print("It works, we just dont get the Message info") + + print("Pair1 with both listen and dial should throw exception") + # pynng Pair1 has no code to recognize this error, allowing both arguments + # however the underlying Socket should throw an AddressInUse exception + try: + with Pair1(dial=address, listen=address, polyamorous=True, recv_timeout=100) as s3: + s3.send(b'hello out there') + msg = s0.recv_msg() + print("rceve on s0") + print(msg.bytes) + s3.send(b'hello out there') + msg = s3.recv_msg() + print("rceve on s3") + print(msg.bytes) + except: + print("caught something", sys.exc_info()[0]) + traceback.print_exc()#sys.exc_info()[2].print_tb() + #raise + +print("End Pair1 test") + + diff --git a/examples/pubsub_1SingleApp.py b/examples/pubsub_1SingleApp.py new file mode 100644 index 0000000..71188e4 --- /dev/null +++ b/examples/pubsub_1SingleApp.py @@ -0,0 +1,50 @@ +""" +Demonstrates Publisher/Subscriber (Pub/Sub) pattern of pynng as shown in: + https://pynng.readthedocs.io/en/latest/core.html#available-# + +Publisher/Subscriber has one socket that publishes messages and +any number of subscribers that listen to that publisher. +Subscribers can subscribe to a TOPIC, and they will only deliver those topics +to the rest of the application (under hood they still receive all msgs, but +will discard those that are not of right topic) + +""" +import time +from pynng import Pub0, Sub0, Timeout + +address = 'tcp://127.0.0.1:31313' +with Pub0(listen=address) as pub, \ + Sub0(dial=address, recv_timeout=1000) as sub0, \ + Sub0(dial=address, recv_timeout=1000) as sub1, \ + Sub0(dial=address, recv_timeout=1000) as sub2, \ + Sub0(dial=address, recv_timeout=1000) as sub3: + + sub0.subscribe(b'wolf') + sub1.subscribe(b'puppy') + # The empty string matches everything! + sub2.subscribe(b'') + # we're going to send two messages before receiving anything, and this is + # the only socket that needs to receive both messages. + sub2.recv_buffer_size = 2 + # sub3 is not subscribed to anything + # make sure everyone is connected + print("all set, wait a moment then Publish") + time.sleep(2) + + pub.send(b'puppy: that is a cute dog') + pub.send(b'wolf: that is a big dog') + + time.sleep(1) + + print("sub0 rcv:",sub0.recv()) # prints b'wolf...' since that is the matching message + print("sub1 rcv:",sub1.recv()) # prints b'puppy...' since that is the matching message + + # sub2 will receive all messages (since empty string matches everything) + print("sub2 rcv:",sub2.recv()) # prints b'puppy...' since it was sent first + print("sub2 rcv:",sub2.recv()) # prints b'wolf...' since it was sent second + + try: + sub3.recv() + assert False, 'never gets here since sub3 is not subscribed' + except Timeout: + print('got a Timeout since sub3 had no subscriptions') diff --git a/examples/pubsub_2publishAsync.py b/examples/pubsub_2publishAsync.py new file mode 100644 index 0000000..91e9081 --- /dev/null +++ b/examples/pubsub_2publishAsync.py @@ -0,0 +1,58 @@ +""" + PyNNG example of Pub/Sub with both normal and async versions + extended from what is the docs + https://pynng.readthedocs.io/en/latest/core.html#available-# + +Publisher/Subscriber has one socket that publishes messages and +any number of subscribers that listen to that publisher. +Subscribers can subscribe to a TOPIC, and they will only deliver those topics +to the rest of the application (under hood they still receive all msgs, but +will discard those that are not of right topic) + + this is the publisher part and should be run in conjunction with one of the + pynng_subTest/_subTestAsync examples + +""" +import time +from pynng import Pub0, Sub0, Timeout +import trio + +address = 'tcp://127.0.0.1:31313' + +wolfStr = "wolf: that is a big dog " +puppyStr = "puppy: that is a cute puppy " + +def pubLoop_Sync(pub): + print("sync publisher") + for i in range(120): + print("publish %d"%(i)) + sp = puppyStr+str(i) + pub.send(sp.encode()) + sw = wolfStr+str(i) + pub.send(sw.encode()) + time.sleep(1) + + +async def asend(pub, msg): + pub.asend(msg) + +async def pubLoop_Async(pub): + print("Trio Async Publisher") + for i in range(120): + print("async publish %d"%(i)) + sp = puppyStr+str(i) + await pub.asend(sp.encode()) + sw = wolfStr+str(i) + await pub.asend(sw.encode()) + await trio.sleep(0.75) + pass + +if __name__ == "__main__": + + with Pub0(listen=address) as pub: + print("Pub started, wait for subs") + time.sleep(2) + + #pubLoop_Sync(pub) + trio.run(pubLoop_Async, pub) + pub.close() diff --git a/examples/pubsub_2subscribe.py b/examples/pubsub_2subscribe.py new file mode 100644 index 0000000..404acf0 --- /dev/null +++ b/examples/pubsub_2subscribe.py @@ -0,0 +1,49 @@ +""" + PyNNG example of Pub/Sub with both normal single thread subscriber + extended from what is the docs + https://pynng.readthedocs.io/en/latest/core.html#available-# + +Publisher/Subscriber has one socket that publishes messages and +any number of subscribers that listen to that publisher. +Subscribers can subscribe to a TOPIC, and they will only deliver those topics +to the rest of the application (under hood they still receive all msgs, but +will discard those that are not of right topic) + + this is the subscriber part and should be run in conjunction with pynng_pubTest.py + +""" +import time +from pynng import Pub0, Sub0, Timeout + +address = 'tcp://127.0.0.1:31313' +with Sub0(dial=address, recv_timeout=1000) as sub0, \ + Sub0(dial=address, recv_timeout=1000) as sub1, \ + Sub0(dial=address, recv_timeout=1000) as sub2, \ + Sub0(dial=address, recv_timeout=1000) as sub3: + + sub0.subscribe(b'wolf') + sub1.subscribe(b'puppy') + # The empty string matches everything! + sub2.subscribe(b'') + # we're going to send two messages before receiving anything, and this is + # the only socket that needs to receive both messages. + sub2.recv_buffer_size = 2 + # sub3 is not subscribed to anything + # make sure everyone is connected + print("all set, wait a moment then Publish") + time.sleep(1) + + while True: + print("sub0 rcv:",sub0.recv()) # prints b'wolf...' since that is the matching message + print("sub1 rcv:",sub1.recv()) # prints b'puppy...' since that is the matching message + + # sub2 will receive all messages (since empty string matches everything) + print("sub2 rcv:",sub2.recv()) # prints b'puppy...' since it was sent first + print("sub2 rcv:",sub2.recv()) # prints b'wolf...' since it was sent second + + try: + sub3.recv() + assert False, 'never gets here since sub3 is not subscribed' + except Timeout: + print('got a Timeout since sub3 had no subscriptions') + time.sleep(1) diff --git a/examples/pubsub_2subscribeAsync.py b/examples/pubsub_2subscribeAsync.py new file mode 100644 index 0000000..0c7788f --- /dev/null +++ b/examples/pubsub_2subscribeAsync.py @@ -0,0 +1,83 @@ +""" + PyNNG example of Pub/Sub Subscriber with both normal and async loops + extended from what is the docs + https://pynng.readthedocs.io/en/latest/core.html#available-# + +Publisher/Subscriber has one socket that publishes messages and +any number of subscribers that listen to that publisher. +Subscribers can subscribe to a TOPIC, and they will only deliver those topics +to the rest of the application (under hood they still receive all msgs, but +will discard those that are not of right topic) + + this is the subscriber part and should be run in conjunction with pynng_pubTest.py + +""" +import time +from pynng import Pub0, Sub0, Timeout +import trio + + +address = 'tcp://127.0.0.1:31313' +sub0 = Sub0(dial=address, recv_timeout=1000) +sub1 = Sub0(dial=address, recv_timeout=1000) +sub2 = Sub0(dial=address, recv_timeout=1000) +sub3 = Sub0(dial=address, recv_timeout=1000) + +def syncLoop(): + while True: + print("sub0 rcv:",sub0.recv()) # prints b'wolf...' since that is the matching message + print("sub1 rcv:",sub1.recv()) # prints b'puppy...' since that is the matching message + + # sub2 will receive all messages (since empty string matches everything) + print("sub2 rcv:",sub2.recv()) # prints b'puppy...' since it was sent first + print("sub2 rcv:",sub2.recv()) # prints b'wolf...' since it was sent second + + try: + sub3.recv() + assert False, 'never gets here since sub3 is not subscribed' + except Timeout: + print('got a Timeout since sub3 had no subscriptions') + time.sleep(1) + +async def asyncSubLoop( ): + print("Begin Async Sub Loop") + while True: + s = await sub0.arecv() + print("sub0 rcv:",s) # prints b'wolf...' since that is the matching message + s = await sub1.arecv() + print("sub1 rcv:",s) # prints b'puppy...' since that is the matching message + + # sub2 will receive all messages (since empty string matches everything) + #s = await asyncReadSubscriber(sub2) + print("sub2 rcv:",await sub2.arecv()) # prints b'puppy...' since it was sent first + print("sub2 rcv:",await sub2.arecv()) # prints b'wolf...' since it was sent second + + try: + sub3.recv() + assert False, 'never gets here since sub3 is not subscribed' + except Timeout: + print('got a Timeout since sub3 had no subscriptions') + + await trio.sleep(1) + print ("End forever async subscriber loop") + pass + +if __name__ == "__main__": + sub0.subscribe(b'wolf') + sub1.subscribe(b'puppy') + # The empty string matches everything! + sub2.subscribe(b'') + # we're going to send two messages before receiving anything, and this is + # the only socket that needs to receive both messages. + sub2.recv_buffer_size = 2 + # sub3 is not subscribed to anything + # make sure everyone is connected + print("all set, wait a moment then Publish") + time.sleep(1) + + #syncLoop() + trio.run(asyncSubLoop) + sub0.close() + sub1.close() + sub2.close() + sub3.close()