Skip to content

Latest commit

 

History

History
244 lines (181 loc) · 10.7 KB

2022-12-06.zmq.html

File metadata and controls

244 lines (181 loc) · 10.7 KB

Subscribing and listening to ZeroMQ messages

Have you wondered how blockexplorer displays new blocks or how new transactions show up in wallets1?

block explorer

It's not a rocket science, they connect to websocket exposed via REST server and subscribe to specific 'topics'.

![developer tools](2022-12-06.zmq/01.websocket.png width=600)

Where does REST server get this information from? Setting details aside, REST acts as a proxy, it connects to ZeroMQ endpoint exposed by catapult.broker process.

In this short article, I'll show you how to skip the middleman and listen directly to what broker has to tell us.

Preparing for the journey

If you're not running a node - the question is - why not?

If you are, you're probably running typical "Dual" node (peer+api node) and your docker-compose.yml contains - at least - those 4 services (containers):

  • db - this is container running mongo database
  • node - this container is running actual catapult client set up in Dual mode
  • broker - obviously this one runs broker process
  • finally rest-gateway - runs node.js REST layer

Now REST can talk to broker, cause within docker-compose.yml they're using same network, but if you want to talk with broker, you have two options:

  1. run tool that we'll discuss later inside broker container OR
  2. expose broker port inside docker-compose.yml, like so (to be able to connect from host):
    broker:
       user: '1000:1000'
       container_name: broker
       image: symbolplatform/symbol-server:gcc-1.0.3.5
       working_dir: /symbol-workdir
       ports:
           - '127.0.0.1:7902:7902'                 <===============================================
       command: /bin/bash /symbol-commands/start.sh /usr/catapult ./data broker server broker NORMAL
       stop_signal: SIGINT
       restart: on-failure:2
       volumes:
           - ../nodes/node:/symbol-workdir:rw
           - ./server:/symbol-commands:ro
       depends_on:
           - db

Note that this will expose broker only on loopback (127.0.0.1) interface, as exposing it to the whole world is likely not the greatest idea.

We don't even talk anymore

I'll be using python, mostly cause current dev branch can nicely read block headers, but any language that has ZMQ bindings will do the trick.

Initialization

ZMQ bindings are in zmq module (python3 -m pip install zmq). Connecting to broker is straightforward; broker is using pub-sub messaging pattern, which means that created listener needs to be SUB listener:

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:7902")

And that's all that's needed.

Subscribing to topics

Now it's time to subscribe to different topics.

Note: most likely you'll only be interested in few of them not all of them. I've selected a few to give good overview.

block_marker = unhexlify('9FF2D8E480CA6A49')[::-1]
finalized_marker = unhexlify('4D4832A031CE7954')[::-1]
drop_marker = unhexlify('5C20D68AEE25B0B0')[::-1]
transaction_marker = b'a'  # 0x61
ut_add_marker = b'u'  # 0x75

socket.setsockopt(zmq.SUBSCRIBE, block_marker)
socket.setsockopt(zmq.SUBSCRIBE, finalized_marker)
socket.setsockopt(zmq.SUBSCRIBE, drop_marker)

socket.setsockopt(zmq.SUBSCRIBE, transaction_marker)
socket.setsockopt(zmq.SUBSCRIBE, ut_add_marker)

Where are all those markers coming from? I'm glad you're asking. We've documented it in Technical Reference in chapter 17. Messaging.

Topic marker name Topic marker
Block 0x9FF2D8E480CA6A49
Drop blocks 0x5C20D68AEE25B0B0
Finalized block 0x4D4832A031CE7954
Transaction 0x61
Unconfirmed transaction add 0x75
Unconfirmed transaction remove 0x72
Partial transaction add 0x70
Partial transaction remove 0x71
Transaction status 0x73
Cosignature 0x63

Parsing published messages

Parsing is pretty straightforward, below some types and objects from SDK are used, but that's just for slightly nicer display.

while True:
	topic = socket.recv()
	if block_marker == topic:
		block_header = socket.recv()
		entity_hash = Hash256(socket.recv())
		generation_hash = Hash256(socket.recv())
		header = BlockFactory.deserialize(block_header)
		print(f'block height: {header.height} ({header.height.value}) entity_hash {entity_hash} generation_hash {generation_hash}')
		print(f'block harvested by: {header.signer_public_key} {facade.network.public_key_to_address(header.signer_public_key)}')
	elif finalized_marker == topic:
		body_part_1 = socket.recv()
		finalization_round = int.from_bytes(body_part_1[0:8], byteorder='little')
		finalizated_height = int.from_bytes(body_part_1[8:16], byteorder='little')
		entity_hash =  Hash256(body_part_1[16:])
		print(f'FINALIZED height: {finalization_round} ({finalizated_height}) entity_hash {entity_hash}')
	elif drop_marker == topic:
		body_part_1 = socket.recv()
		height = int.from_bytes(body_part_1[0:8], byteorder='little')
		print(f'drop after height: {height}')
	elif ut_add_marker[0] == topic[0] or transaction_marker[0] == topic[0]:  # mind [0]
		message = 'UT add' if ut_add_marker[0] == topic[0] else 'transaction add'
		# rest of the topic contains address
		address = SymbolFacade.Address(topic[1:])
		transaction = socket.recv()
		entity_hash = Hash256(socket.recv())
		merkle_component_hash = Hash256(socket.recv())
		body_part_1 = socket.recv()
		height = int.from_bytes(body_part_1[0:8], byteorder='little')
		print(f'{message} {address} {entity_hash} {height}')
	else:
		print("unknown [ %d %s ]" % (len(topic), topic))

One thing that might not be clear is how many times socket.recv() should be called per given topic.

It depends how actual message is constructed, but we've documented it as well, instead of explaining it, I'll try to show it using pieces of documentation:

![block message layout](2022-12-06.zmq/02.block.png width=600)

![finalized block message layout](2022-12-06.zmq/03.finalized.png width=600)

![transaction add message (mempool)](2022-12-06.zmq/04.transaction.png width=600)

One thing worth noting is how finalized block message data is all within single 'packet'.

And if you got here, there's one more thing worth mentioning. In previous section, I was subscribing to "general" transactions topic with:

socket.setsockopt(zmq.SUBSCRIBE, transaction_marker)

But if you take a look at images (or even better Technical Reference), address is a part of a topic. That means you can subscribe to messages that are targetting specific account e.g.:

transaction_marker = b'a'
address = SymbolFacade.Address('NCHVMMCVPZGUWZTWTLNH46OFRM2QIPILE4SKZEA')

scoped_address_marker = transaction_marker + address.bytes
socket.setsockopt(zmq.SUBSCRIBE, scoped_address_marker)

Finally, bit important detail: I'm using BlockFactory to deserialize block_header, but there's

Code, output and some more random comments

You can find whole code here: https://gist.github.com/gimre-xymcity/718cc15d9e9c3ff48a493bcfb7986834

Let's take a look at piece of output:

  block height: 0x00000000001BB327 (1815335) entity_hash C8E617712A81D5DC13DC36C699E5DCA42C11972120E7A4DB37187E9C22941FE3 generation_hash FFFED500BCB2522FA892265D4F379AFC55CD1A9155F8FBB2DB43D30C5C99DD82
  block harvested by: 44E0DB9EC1FF08C392AAC8A2A787E68C2C8F36324E1065D90D976576580E7EA6 NCE5QOGVUM6ZHJIYXTA6NHZYJUHMBNRNVMG2L4I
+ transaction add NA6JCCGCVLTNCXFP6ZZHCEKIQN252LEWQMULS5Q 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335
+ transaction add NA2NFUHQWYIASA5BHFJBM6OBQDEZDI34RUMNDHA 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335
+ transaction add NB67BYHT34LHNPCEPUVIIHPNXZE7FRTX5BHQJVA 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335
  block height: 0x00000000001BB328 (1815336) entity_hash 9EE91BB4BFE409A11789D5FDD398384BC86C528208C50E21884B6139E14E874E generation_hash FFED290A09BA44F1352F93BE6AA5FEF6400C7D1D6A69EC2662B98F4B12A9EC28
  block harvested by: 68ADEC7660181CD266F97BABAB6C9905D0DD7F669C2B107BBFB68B98074CCB9B NDGGHWO5PXID32IPA2C3EZCIMC7WHOZTXZDLTYY

What you can observe here is that single notification resulted in 3 published messages (marked for readability). If you've read previous part until the end, you might know already why. All those 3 messages are sent with 3 different topics? Why?

The answer is sort of obvious, the transaction in question is an aggregate transaction and broker must notify all accounts involved:

  • multisig account: NA2NFUHQWYIASA5BHFJBM6OBQDEZDI34RUMNDHA
  • sender (cosignatory): NB67BYHT34LHNPCEPUVIIHPNXZE7FRTX5BHQJVA
  • destination account: NA6JCCGCVLTNCXFP6ZZHCEKIQN252LEWQMULS5Q

This gets a little noisy with larger aggregates (or more cosignatories) - that's also reason, why REST layer is supposed to subscribe only to 'scoped' topics (so including addresses).

Mongo-less broker (optional / fun stuff)

As we've repeated multiple times, catapult is very flexible when it comes to setting it up.

To snoop on zmq messages, you can pretty easily setup node, that will run server and broker but without mongo (and without REST, as REST layer is quite useless without mongo).

  1. catapult.server needs to be configured like in usual Dual mode setup, this means that filespooling and partialtransaction extensions should be set to true in config-extensions-server.properties
[extensions]
# api extensions
extension.filespooling = true
extension.partialtransaction = true
# addressextraction must be first because mongo and zeromq depend on extracted addresses
extension.addressextraction = false
extension.mongo = false
extension.zeromq = false
...
  1. broker needs zeromq but not mongo, config-extensions-broker.properties
# addressextraction must be first because mongo and zeromq depend on extracted addresses
extension.addressextraction = true
extension.mongo = false
extension.zeromq = true

extension.hashcache = true
  1. last but not least, docker-compose.yml should only contain entries for server and broker services (containers)

Sidenote, if anyone in future would like to push data to different database, the only thing that's needed is writing proper extension, that will be loaded through broker.


<style class="fallback">body{visibility:hidden}</style><script>markdeepOptions={tocStyle:'long'};</script> <script src="./markdeep.min.js" charset="utf-8"></script>

Footnotes

  1. not all wallets might support this, but they usually support at least multisig showing