-
Notifications
You must be signed in to change notification settings - Fork 24
/
service.js
78 lines (64 loc) · 2.06 KB
/
service.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
let debug = require('debug')('service')
let debugZmq = require('debug')('service:zmq')
let debugZmqTx = require('debug')('service:zmq:tx')
let Indexd = require('indexd')
let leveldown = require('leveldown')
let rpc = require('./rpc')
let zmq = require('zeromq')
if (!process.env.INDEXDB) {
console.log("INDEXDB is not set");
process.exit(-1);
}
let db = leveldown(process.env.INDEXDB)
let indexd = new Indexd(db, rpc)
module.exports = function initialize (callback) {
function errorSink (err) {
if (err) debug(err)
}
debug(`Opening leveldb @ ${process.env.INDEXDB}`)
db.open({
writeBufferSize: 1 * 1024 * 1024 * 1024 // 1 GiB
}, (err) => {
if (err) return callback(err)
debug(`Opened leveldb @ ${process.env.INDEXDB}`)
if (!process.env.ZMQ) {
console.log(`ZMQ is not set`);
process.exit(-1);
}
let zmqSock = zmq.socket('sub')
zmqSock.connect(process.env.ZMQ)
zmqSock.subscribe('hashblock')
zmqSock.subscribe('hashtx')
let sequences = {}
zmqSock.on('message', (topic, message, sequence) => {
topic = topic.toString('utf8')
message = message.toString('hex')
sequence = sequence.readUInt32LE()
if (sequences[topic] === undefined) sequences[topic] = sequence
else sequences[topic] += 1
if (sequence !== sequences[topic]) {
if (sequence < sequences[topic]) debugZmq(`bitcoind may have restarted`)
else debugZmq(`${sequence - sequences[topic]} messages lost`)
sequences[topic] = sequence
indexd.tryResync(errorSink)
}
switch (topic) {
case 'hashblock': {
debugZmq(topic, message)
return indexd.tryResync(errorSink)
}
case 'hashtx': {
debugZmqTx(topic, message)
return indexd.notify(message, errorSink)
}
}
})
setInterval(() => indexd.tryResync(errorSink), 60000) // attempt every minute
indexd.tryResync(errorSink)
indexd.tryResyncMempool(errorSink) // only necessary once
callback()
})
}
module.exports.get = function get () {
return indexd
}