Skip to content

Commit

Permalink
Merge branch 'topic/christian/fix-python-unhashables'
Browse files Browse the repository at this point in the history
* topic/christian/fix-python-unhashables:
  Add testcases for new Python SafeSubscriber class
  Provide a SafeSubscriber class in Python to accommodate non-hashable types
  Add immutable values to Python data type testsuite
  Python cleanup: remove unneeded semicolons and trailing whitespace
  • Loading branch information
ckreibich committed Dec 21, 2021
2 parents 36339ad + 73cc21e commit 07f42c7
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 53 deletions.
17 changes: 17 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
Broker 2.2.0
============

- The Python bindings now provide a SafeSubscriber variant of Subscriber as well
as a new Endpoint.make_safe_subscriber() method. Both avoid potential problems
when deserializing values that accommodate the Broker data model but not
Python's. Specifically, Broker allows complex types inside others (e.g., a set
of tables), but Python does not support unhashable types for indexing, which'd
be required in this scenario. SafeSubscriber employs immutable (hashable)
types when translating to Python objects and returns objects that are
read-only.

If you haven't encountered problems with the Subscriber class, you don't need
to change existing code. Broker 3.0 will make this new behavior the default
and deprecate the new APIs. In the meantime you can replace make_subscriber()
with make_safe_subscriber() to be on the safe side.

Broker 2.1.0
============

Expand Down
138 changes: 105 additions & 33 deletions bindings/python/broker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import datetime
import time
import types
import ipaddress
import collections

Expand Down Expand Up @@ -37,7 +38,7 @@ def __eq__(self, other):
if other.dst(None) is None:
return True

return self.dst(None) == other.dst(None);
return self.dst(None) == other.dst(None)

except:
return False
Expand Down Expand Up @@ -134,7 +135,7 @@ def get(self, *args, **kwargs):
if not msg.is_set():
return None

msg = msg.get();
msg = msg.get()

if isinstance(msg, tuple):
return (msg[0].string(), Data.to_py(msg[1]))
Expand All @@ -160,6 +161,36 @@ def add_topic(self, topic, block=False):
def remove_topic(self, topic, block=False):
return self._subscriber.remove_topic(_make_topic(topic), block)

class SafeSubscriber(Subscriber):
"""Subscriber subclass that makes returnes messages safe to process.
"Safe" here means safe to Python's type system, particularly regarding
hashable types. Broker's data model permits nested complex types, such as
sets of tables, but those don't directly work in Python (for example,
constructing a set of dicts will complain that dicts aren't hashable). To
work around this SafeSubscriber relies on ImmutableData rather than Data
(used by regular Subscribers)."""

def get(self, *args, **kwargs):
msg = self._subscriber.get(*args, **kwargs)

if msg is None:
return None

if isinstance(msg, _broker.OptionalSubscriberBaseValueType):
if not msg.is_set():
return None

msg = msg.get()

if isinstance(msg, tuple):
return (msg[0].string(), ImmutableData.to_py(msg[1]))

if isinstance(msg, _broker.VectorPairTopicData):
return [(d[0].string(), ImmutableData.to_py(d[1])) for d in msg]

assert False

class StatusSubscriber():
def __init__(self, internal_subscriber):
self._subscriber = internal_subscriber
Expand Down Expand Up @@ -365,10 +396,17 @@ def _to_expiry(self, e):
return (_broker.OptionalTimespan(_broker.Timespan(float(e))) if e is not None else _broker.OptionalTimespan())

class Endpoint(_broker.Endpoint):
def make_subscriber(self, topics, qsize = 20):
def make_subscriber(self, topics, qsize = 20, subscriber_class=Subscriber):
topics = _make_topics(topics)
s = _broker.Endpoint.make_subscriber(self, topics, qsize)
return Subscriber(s)
return subscriber_class(s)

def make_safe_subscriber(self, topics, qsize = 20):
"""A variant of make_subscriber that returns a SafeSubscriber instance. In
contrast to the Subscriber class, messages retrieved from
SafeSubscribers use immutable, hashable values to ensure Python can
represent them. When in doubt, use make_safe_subscriber()."""
return self.make_subscriber(topics=topics, qsize=qsize, subscriber_class=SafeSubscriber)

def make_status_subscriber(self, receive_statuses=False):
s = _broker.Endpoint.make_status_subscriber(self, receive_statuses)
Expand Down Expand Up @@ -473,11 +511,11 @@ def __init__(self, x = None):
v = _broker.Vector([Data(i) for i in x])
_broker.Data.__init__(self, v)

elif isinstance(x, set):
elif isinstance(x, set) or isinstance(x, frozenset):
s = _broker.Set(([Data(i) for i in x]))
_broker.Data.__init__(self, s)

elif isinstance(x, dict):
elif isinstance(x, dict) or isinstance(x, types.MappingProxyType):
t = _broker.Table()
for (k, v) in x.items():
t[Data(k)] = Data(v)
Expand Down Expand Up @@ -538,69 +576,103 @@ def _try_bytes_decode(b):
Data.Type.Timespan: lambda: datetime.timedelta(seconds=d.as_timespan()),
Data.Type.Timestamp: lambda: datetime.datetime.fromtimestamp(d.as_timestamp(), utc),
Data.Type.Vector: lambda: to_vector(d.as_vector())
}
}

try:
return converters[d.get_type()]()
except KeyError:
raise TypeError("unsupported data type: " + str(d.get_type()))

class ImmutableData(Data):
"""A Data specialization that uses immutable complex types for returned Python
objects. For sets, the return type is frozenset, for tables it's a
MappingProxyType of a dict with a straightforward hashing implementation,
and for vectors it's Python tuples."""

class HashableDict(dict):
def __hash__(self):
return hash(frozenset(self.items()))

@staticmethod
def to_py(d):
def to_set(s):
return frozenset([ImmutableData.to_py(i) for i in s])

def to_table(t):
tmp = {ImmutableData.to_py(k): ImmutableData.to_py(v) for (k, v) in t.items()}
return types.MappingProxyType(ImmutableData.HashableDict(tmp.items()))

def to_vector(v):
return tuple(ImmutableData.to_py(i) for i in v)

converters = {
Data.Type.Set: lambda: to_set(d.as_set()),
Data.Type.Table: lambda: to_table(d.as_table()),
Data.Type.Vector: lambda: to_vector(d.as_vector())
}

try:
return converters[d.get_type()]()
except KeyError:
# Fall back on the Data class for types we handle identically.
return Data.to_py(d)

####### TODO: Updated to new Broker API until here.

# # TODO: complete interface
# class Store:
# def __init__(self, handle):
# self.store = handle
#
#
# def name(self):
# return self.store.name()
#
#
# class Mailbox:
# def __init__(self, handle):
# self.mailbox = handle
#
#
# def descriptor(self):
# return self.mailbox.descriptor()
#
#
# def empty(self):
# return self.mailbox.empty()
#
#
# def count(self, n = -1):
# return self.mailbox.count(n)
#
#
#
#
# class Message:
# def __init__(self, handle):
# self.message = handle
#
#
# def topic(self):
# return self.message.topic().string()
#
#
# def data(self):
# return self.message.data() # TODO: unwrap properly
#
#
# def __str__(self):
# return "%s -> %s" % (self.topic(), str(self.data()))
#
#
#
#
# class BlockingEndpoint(Endpoint):
# def __init__(self, handle):
# super(BlockingEndpoint, self).__init__(handle)
#
#
# def subscribe(self, topic):
# self.endpoint.subscribe(topic)
#
#
# def unsubscribe(self, topic):
# self.endpoint.unsubscribe(topic)
#
#
# def receive(self, x):
# if x == Status:
# return self.endpoint.receive()
# elif x == Message:
# return Message(self.endpoint.receive())
# else:
# raise BrokerError("invalid receive type")
#
#
# #def receive(self):
# # if fun1 is None:
# # return Message(self.endpoint.receive())
Expand All @@ -611,34 +683,34 @@ def _try_bytes_decode(b):
# # return self.endpoint.receive_msg(fun1)
# # raise BrokerError("invalid receive callback arity; must be 1 or 2")
# # return self.endpoint.receive_msg_or_status(fun1, fun2)
#
#
# def mailbox(self):
# return Mailbox(self.endpoint.mailbox())
#
#
#
#
# class NonblockingEndpoint(Endpoint):
# def __init__(self, handle):
# super(NonblockingEndpoint, self).__init__(handle)
#
#
# def subscribe(self, topic, fun):
# self.endpoint.subscribe_msg(topic, fun)
#
#
# def on_status(fun):
# self.endpoint.subscribe_status(fun)
#
#
# def unsubscribe(self, topic):
# self.endpoint.unsubscribe(topic)
#
#
#
#
# class Context:
# def __init__(self):
# self.context = _broker.Context()
#
#
# def spawn(self, api):
# if api == Blocking:
# return BlockingEndpoint(self.context.spawn_blocking())
# elif api == Nonblocking:
# return NonblockingEndpoint(self.context.spawn_nonblocking())
# else:
# raise BrokerError("invalid API flag: " + str(api))
#
#
32 changes: 32 additions & 0 deletions tests/python/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,38 @@ def test_messages(self):
self.assertEqual(msgs[1], msg1)
self.assertEqual(msgs[2], msg2)

# These results are not (all) immutable: try modifying the third
# value (the dict) of the last message above.
dict_data = msgs[2][1][2]
self.assertEqual(len(dict_data), 2)
dict_data["c"] = "not immutable"
self.assertEqual(len(dict_data), 3)

def test_immutable_messages(self):
with broker.Endpoint() as ep1, \
broker.Endpoint() as ep2, \
ep1.make_safe_subscriber("/test") as s1:

port = ep1.listen("127.0.0.1", 0)
ep2.peer("127.0.0.1", port, 1.0)

msg = ("/test/1", ({"a": "A"}, set([1,2,3]), ('a', 'b', 'c')))
ep2.publish(*msg)

topic, (dict_data, set_data, tuple_data) = s1.get()

# The return values are immutable, so each of the following triggers
# a type-specific exception.
with self.assertRaises(TypeError):
# 'mappingproxy' object does not support item assignment
dict_data["b"] = "B"
with self.assertRaises(AttributeError):
# 'frozenset' object has no attribute 'add'
set_data.add(4)
with self.assertRaises(TypeError):
# 'tuple' object does not support item assignment
tuple_data[3] = 'd'

def test_publisher(self):
with broker.Endpoint() as ep1, \
broker.Endpoint() as ep2, \
Expand Down
Loading

0 comments on commit 07f42c7

Please sign in to comment.