Skip to content

Commit

Permalink
Add support for multipart messages
Browse files Browse the repository at this point in the history
  • Loading branch information
wirg_mo committed Oct 8, 2021
1 parent 2e5443c commit b9ce74b
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 9 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 1.0.0-dev.2

### Add support for multipart messages
- Rename `Message` to `ZFrame`
- Add `ZMessage` as a queue of `ZFrame`'s
- Receive messages as `ZMessage` instead of `Message`(`ZFrame`)
- Reduce minimum SDK version to `2.13.0`


## 1.0.0-dev.1

### Add crude implementation of libzmq
Expand Down
1 change: 1 addition & 0 deletions lib/src/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const int ZMQ_POLLOUT = 2;
const int ZMQ_POLLERR = 4;
const int ZMQ_POLLPRI = 8;

const int ZMQ_DONTWAIT = 1;
const int ZMQ_SNDMORE = 2;

const int ZMQ_CURVE_PUBLICKEY = 48;
Expand Down
171 changes: 165 additions & 6 deletions lib/src/zeromq.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
library dartzmq;

import 'dart:async';
import 'dart:collection';
import 'dart:ffi';
import 'dart:io';
import 'dart:typed_data';
Expand Down Expand Up @@ -85,6 +86,7 @@ class ZeroMQ {
final socket = _createdSockets[event.socket]!;

// Receive multiple message parts
final zMessage = ZMessage();
while (true) {
var rc = _bindings.zmq_msg_init(msg);
_checkSuccess(rc);
Expand All @@ -98,9 +100,11 @@ class ZeroMQ {
final copyOfData = Uint8List.fromList(data.asTypedList(size));
final hasNext = _bindings.zmq_msg_more(msg) != 0;

socket._controller.add(Message(copyOfData, hasMore: hasNext));
zMessage.add(ZFrame(copyOfData, hasMore: hasNext));
if (!hasNext) break;
}
// TODO need to check if zMessage.isEmpty ?
socket._controller.add(zMessage);
_bindings.zmq_msg_close(msg);
}

Expand Down Expand Up @@ -190,11 +194,165 @@ enum SocketMode {
stream
}

class Message {
/// ZFrame
///
/// A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code.
/// When you read a frame from a socket, the [hasMore] member indicates
/// if the frame is part of an unfinished multipart message.
class ZFrame {
/// The payload that was received or is to be sent
final Uint8List payload;

/// Is this frame part of an unfinished multipart message?
final bool hasMore;

Message(this.payload, {this.hasMore = false});
ZFrame(this.payload, {this.hasMore = false});
}

/// ZMessage
///
/// This class provides a list-like container interface,
/// with methods to work with the overall container. ZMessage messages are
/// composed of zero or more ZFrame objects.
class ZMessage implements Queue<ZFrame> {
final DoubleLinkedQueue<ZFrame> _frames = DoubleLinkedQueue();

@override
Iterator<ZFrame> get iterator => _frames.iterator;

@override
void add(ZFrame value) => _frames.add;

@override
void addAll(Iterable<ZFrame> iterable) => _frames.addAll(iterable);

@override
void addFirst(ZFrame value) => _frames.addFirst(value);

@override
void addLast(ZFrame value) => _frames.addLast(value);

@override
void clear() => _frames.clear();

@override
bool remove(Object? value) => _frames.remove(value);

@override
ZFrame removeFirst() => _frames.removeFirst();

@override
ZFrame removeLast() => _frames.removeLast();

@override
void removeWhere(bool Function(ZFrame element) test) =>
_frames.removeWhere(test);

@override
void retainWhere(bool Function(ZFrame element) test) =>
_frames.retainWhere(test);

@override
bool any(bool Function(ZFrame element) test) => _frames.any(test);

@override
Queue<R> cast<R>() => _frames.cast<R>();

@override
bool contains(Object? element) => _frames.contains(element);

@override
ZFrame elementAt(int index) => _frames.elementAt(index);

@override
bool every(bool Function(ZFrame element) test) => _frames.every(test);

@override
Iterable<T> expand<T>(Iterable<T> Function(ZFrame element) toElements) =>
_frames.expand(toElements);

@override
ZFrame get first => _frames.first;

@override
ZFrame firstWhere(bool Function(ZFrame element) test,
{ZFrame Function()? orElse}) =>
_frames.firstWhere(test, orElse: orElse);

@override
T fold<T>(T initialValue,
T Function(T previousValue, ZFrame element) combine) =>
_frames.fold(initialValue, combine);

@override
Iterable<ZFrame> followedBy(Iterable<ZFrame> other) =>
_frames.followedBy(other);

@override
void forEach(void Function(ZFrame element) action) => _frames.forEach(action);

@override
bool get isEmpty => _frames.isEmpty;

@override
bool get isNotEmpty => _frames.isNotEmpty;

@override
String join([String separator = ""]) => _frames.join(separator);

@override
ZFrame get last => _frames.last;

@override
ZFrame lastWhere(bool Function(ZFrame element) test,
{ZFrame Function()? orElse}) =>
_frames.lastWhere(test, orElse: orElse);

@override
int get length => _frames.length;

@override
Iterable<T> map<T>(T Function(ZFrame e) toElement) => _frames.map(toElement);

@override
ZFrame reduce(ZFrame Function(ZFrame value, ZFrame element) combine) =>
_frames.reduce(combine);

@override
ZFrame get single => _frames.single;

@override
ZFrame singleWhere(bool Function(ZFrame element) test,
{ZFrame Function()? orElse}) =>
_frames.singleWhere(test, orElse: orElse);

@override
Iterable<ZFrame> skip(int count) => _frames.skip(count);

@override
Iterable<ZFrame> skipWhile(bool Function(ZFrame value) test) =>
_frames.skipWhile(test);

@override
Iterable<ZFrame> take(int count) => _frames.take(count);

@override
Iterable<ZFrame> takeWhile(bool Function(ZFrame value) test) =>
_frames.takeWhile(test);

@override
List<ZFrame> toList({bool growable = true}) =>
_frames.toList(growable: growable);

@override
Set<ZFrame> toSet() => _frames.toSet();

@override
Iterable<ZFrame> where(bool Function(ZFrame element) test) =>
_frames.where(test);

@override
Iterable<T> whereType<T>() => _frames.whereType<T>();
}

class ZmqSocket {
Expand All @@ -203,9 +361,10 @@ class ZmqSocket {

bool _closed = false;

late final StreamController<Message> _controller;
Stream<Message> get messages => _controller.stream;
Stream<Uint8List> get payloads => messages.map((m) => m.payload);
late final StreamController<ZMessage> _controller;
Stream<ZMessage> get messages => _controller.stream;
Stream<Uint8List> get payloads =>
messages.expand((element) => element._frames.map((e) => e.payload));

ZmqSocket(this._handle, this._zmq) {
_controller = StreamController(onListen: () {
Expand Down
6 changes: 3 additions & 3 deletions pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
name: dartzmq
description: A wrapper for libzmq
version: 1.0.0-dev.1
description: A simple dart zeromq implementation/wrapper around the libzmq C++ library
version: 1.0.0-dev.2
homepage: https://github.com/enwi/dartzmq

environment:
sdk: ">=2.15.0-116.0.dev <3.0.0"
sdk: ">=2.13.0 <3.0.0"
flutter: ">=1.17.0"

dependencies:
Expand Down

0 comments on commit b9ce74b

Please sign in to comment.