Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade to ZMQ 6.0 using dashd-zmq client #70

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
language: node_js
sudo: false
node_js:
- '8'
- '10'
- '12'
- '14'
env:
- CXX=g++-4.8 CC=gcc-4.8
- CXX=g++-8 CC=gcc-8
addons:
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- g++-4.8
- gcc-4.8
- g++-8
- gcc-8
- libzmq3-dev

script:
Expand Down
97 changes: 56 additions & 41 deletions lib/services/dashd.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ var spawn = require('child_process').spawn;
var util = require('util');
var mkdirp = require('mkdirp');
var dashcore = require('@dashevo/dashcore-lib');
var zmq = require('zeromq');
var async = require('async');
var LRU = require('lru-cache');
var DashdZMQ = require('@dashevo/dashd-zmq');
var DashdRPC = require('@dashevo/dashd-rpc');
var $ = dashcore.util.preconditions;
var _ = dashcore.deps._;
Expand Down Expand Up @@ -776,46 +776,57 @@ Dash.prototype._checkSyncedAndSubscribeZmqEvents = function(node) {

Dash.prototype._subscribeZmqEvents = function(node) {
var self = this;
node.zmqSubSocket.subscribe('hashblock');
node.zmqSubSocket.subscribe('rawtx');
node.zmqSubSocket.subscribe('rawtxlock');
node.zmqSubSocket.on('message', function(topic, message) {
var topicString = topic.toString('utf8');
if (topicString === 'rawtxlock') {
self._zmqTransactionLockHandler(node, message);
} else if (topicString === 'rawtx') {
self._zmqTransactionHandler(node, message);
} else if (topicString === 'hashblock') {
self._zmqBlockHandler(node, message);
}
node.zmqSubSocket.subscribe(DashdZMQ.TOPICS.hashblock);
node.zmqSubSocket.on(DashdZMQ.TOPICS.hashblock, (message) => {
self._zmqBlockHandler(node, Buffer.from(message, 'hex'));
});

node.zmqSubSocket.subscribe(DashdZMQ.TOPICS.rawtx);
node.zmqSubSocket.on(DashdZMQ.TOPICS.rawtx, (message) => {
self._zmqTransactionHandler(node, Buffer.from(message, 'hex'));
});

node.zmqSubSocket.subscribe(DashdZMQ.TOPICS.rawtxlock);
node.zmqSubSocket.on(DashdZMQ.TOPICS.rawtxlock, (message) => {
self._zmqTransactionLockHandler(node, Buffer.from(message, 'hex'));
});
};

Dash.prototype._initZmqSubSocket = function(node, zmqUrl) {
var self = this;
node.zmqSubSocket = zmq.socket('sub');
return new Promise((resolve) => {
const [protocol, path] = zmqUrl.split('//');
Alex-Werner marked this conversation as resolved.
Show resolved Hide resolved
const [host, port] = path.split(':');

node.zmqSubSocket = new DashdZMQ({
protocol: protocol.slice(0, -1),
host,
port
});

node.zmqSubSocket.on('connect', function(fd, endPoint) {
log.info('ZMQ connected to:', endPoint);
});
node.zmqSubSocket.on('connect', function (info) {
log.info('ZMQ connected to:', info.address);
});

node.zmqSubSocket.on('connect_delay', function(fd, endPoint) {
Alex-Werner marked this conversation as resolved.
Show resolved Hide resolved
log.warn('ZMQ connection delay:', endPoint);
});
node.zmqSubSocket.on('connect_delay', function(fd, endPoint) {
log.warn('ZMQ connection delay:', endPoint);
});

node.zmqSubSocket.on('disconnect', function(fd, endPoint) {
log.warn('ZMQ disconnect:', endPoint);
});
node.zmqSubSocket.on('disconnect', function(fd, endPoint) {
log.warn('ZMQ disconnect:', endPoint);
});

node.zmqSubSocket.on('monitor_error', function(err) {
log.error('Error in monitoring: %s, will restart monitoring in 5 seconds', err);
setTimeout(function() {
self.zmqSubSocket.monitor(500, 0);
}, 5000);
});
node.zmqSubSocket.on('monitor_error', function(err) {
log.error('Error in monitoring: %s, will restart monitoring in 5 seconds', err);
setTimeout(function() {
self.zmqSubSocket.monitor(500, 0);
}, 5000);
});

node.zmqSubSocket.monitor(500, 0);
node.zmqSubSocket.connect(zmqUrl);
node.zmqSubSocket
.connect()
.then(resolve);
});
};

Dash.prototype._checkReindex = function(node, callback) {
Expand Down Expand Up @@ -980,16 +991,19 @@ Dash.prototype._spawnChildProcess = function(callback) {
return callback(new Error('Stopping while trying to spawn dashd.'));
}

self._initZmqSubSocket(node, self.spawn.config.zmqpubrawtx);
self._initZmqSubSocket(node, self.spawn.config.zmqpubrawtx)
.then(() => {
self._checkReindex(node, function (err) {

self._checkReindex(node, function(err) {
if (err) {
return callback(err);
}
self._checkSyncedAndSubscribeZmqEvents(node);
callback(null, node);
});
if (err) {
return callback(err);
}

self._checkSyncedAndSubscribeZmqEvents(node);

callback(null, node);
});
});
});

});
Expand Down Expand Up @@ -1026,8 +1040,9 @@ Dash.prototype._connectProcess = function(config, callback) {
return callback(new Error('Stopping while trying to connect to dashd.'));
}

self._initZmqSubSocket(node, config.zmqpubrawtx);
self._subscribeZmqEvents(node);
self._initZmqSubSocket(node, config.zmqpubrawtx).then(() => {
self._subscribeZmqEvents(node);
});

callback(null, node);
});
Expand Down
Loading