-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
215 lines (187 loc) · 7.47 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// Copyright (c) 2022, NuoDB, Inc.
// All rights reserved.
//
// Redistribution and use permitted under the terms of the 3-clause BSD license.
const { Pool } = require('node-nuodb');
class ShardMultiplexerValidationError extends Error {
constructor(msg){
super(msg);
this.name = 'ShardMultiplexerValidationError';
}
}
class ShardMultiplexerNotFoundError extends Error {
constructor(msg) {
super(msg);
this.name = 'ShardMultiplexerNotFoundError';
}
}
const REQUIRED_INITIAL_ARGUMENTS = ["shards", "shardMapper"];
class ShardMultiplexer {
static STATE_UNINITIALIZED = "uninitialized";
static STATE_INITIALIZING = "initializing";
static STATE_RUNNING = "running";
static STATE_CLOSING = "closing";
static STATE_CLOSED = "closed";
/**
*
* @param {Object} args
* @param {Function} args.shardMapper - A function that takes in arguments and returns an identifier for the shards object
* The first argument is a this argument referencing the current multiplexer object
* @param {Object[]} args.shards - A list of objects of the form {id: any, poolConfig: Driver.Pool.Config}
* @param {Function} poll - A function executed at interval pollInterval, accepting on argument which is the "this" argument for the multiplexer.
* @param {Number} pollInterval - the interval in ms that the poll function should be executed.
*
*/
constructor(args) {
this.state = ShardMultiplexer.STATE_UNINITIALIZED;
REQUIRED_INITIAL_ARGUMENTS.forEach(
r => {
if(!(r in args)){
throw new ShardMultiplexerValidationError(`Cannot find required argument ${r} in constructor arguments`);
}
});
this.shards = {};
this.shardMapper = args.shardMapper;
// define curried init function
this.init = this._init(args.shards);
if(ShardMultiplexer._pollIntervalIsValid(args.pollInterval)
&& ShardMultiplexer._pollIsValid(args.poll)
) {
this._poll = args.poll;
this._pollInterval = args.pollInterval;
this._pollAndSetTimer();
}
}
static _pollIsValid(p){
// this could potentially b
return p instanceof Function;
}
static _pollIntervalIsValid(pi) {
return Number.isSafeInteger(pi) && pi > 0;
}
/**
* Call the poll function, and start/continue the cadence for polling
*/
async _pollAndSetTimer() {
if(this.state === ShardMultiplexer.STATE_CLOSING || this.state === ShardMultiplexer.STATE_CLOSED)
throw new ShardMultiplexerValidationError(`poll still active when state is ${this.state}`);
this._pollResult = await this._poll(this);
this.pollTimer = setTimeout(() => this._pollAndSetTimer(),this._pollInterval);
}
get pollResult() {
return this._pollResult;
}
set pollResult(any){
throw new Error("The poll result can only be set by the ShardMultiplexer internal functions.");
}
get pollInterval() {
return this._pollInterval;
}
set pollInterval(interval){
if(!ShardMultiplexer._pollIntervalIsValid(interval)
|| !ShardMultiplexer._pollIsValid(this._poll)
){
throw new ShardMultiplexerValidationError(`The poll interval ${interval} is invalid, or there is no poll function set.`);
}
this._pollInterval = interval;
// clear the old interval, and start with the new interval
clearTimeout(this.pollTimer);
this.pollTimer = setTimeout(this._pollAndSetTimer, this._pollInterval);
}
get poll() {
return this._poll;
}
set poll(p) {
if(!ShardMultiplexer._pollIsValid(p)){
throw new ShardMultiplexerValidationError("The poll attribute must be a valid function.");
}
// We don't need to clear the old interval, because the timeout references a wrapper which calls this._poll
this._poll = p;
}
/**
* Used by the constructor to create the init method
* init method is an asynchronous method that accepts no arguments, and returns no values, but commissions shards and sets the state to running
* @param {Object[]} shards -- an array of shard config objects. Passed in via the constructor
* @param {String | Number} shard.id -- an identifier for the shard
* @param {Object} shard.config -- a configuration to be passed to the connection pool
*/
_init(shards) {
return async () => {
if( this.state !== ShardMultiplexer.STATE_UNINITIALIZED){
throw new ShardMultiplexerValidationError(`Cannot initialize a multiplexer in state ${this.state}`);
}
await Promise.all(
shards.map((s) => this.commissionShard(s))
);
this.state = ShardMultiplexer.STATE_RUNNING;
}
}
async close() {
this.state = ShardMultiplexer.STATE_CLOSING
clearTimeout(this.pollTimer);
await Object.keys(this.shards).reduce( async (acc, curr) => {
await acc;
await this.decommissionShard(curr);
}, Promise.resolve());
this.state = ShardMultiplexer.STATE_CLOSED;
}
/**
* Remove the multiplexer's tracking of the shard, and gracefully close its pool
* @param {String | Number} id -- an identifier for the shard to decommission
*/
async decommissionShard(id) {
// ? How should we be handling pools with a connection out there?
const shard = this.shards[id];
if(!shard){
throw new ShardMultiplexerNotFoundError(`Cannot find shard with id ${id}`);
}
await this.shards[id].pool.closePool();
delete this.shards[id]
}
/**
* Create a new shard, with a matching pool object
* @param {Object} shard -- an object describing the shard to commission a pool for
* @param {String | Number} shard.id -- an identifier for the shard
* @param {Object} shard.config -- a configuration to be passed to the connection pool
*/
async commissionShard(shard) {
if(this.shards?.[shard.id]?.pool){
throw new ShardMultiplexerValidationError(`There is already a shard commissioned with this id`);
}
// create the pool
const shardPool = new Pool(shard.poolConfig);
await shardPool.init();
// destructured assign and add the pool in
this.shards[shard.id] = {...shard, pool: shardPool, activeConnections: []};
}
/**
*
* @param {Object} connection -- a connection object, requested from a pool via the multiplexer
* @param {String | Number} conneciton.shardId -- the shard id the denotes which shard the connection object came from (assigned by the multiplexer)
*/
async releaseConnection(connection) {
// ? What happens if the pool for this connection no longer exists?
const shard = this.shards[connection?.shardId];
if(!shard){
throw new ShardMultiplexerNotFoundError(`Cannot find associated shard for connection with shard id ${connection?.shardId}`);
}
// The connection's shard has been found, we can release the connection back to the pool
await shard.pool.releaseConnection(connection);
}
/**
*
* @param {any} args -- Arguments matching the arguments as defined by the user during multiplexer initialization in the supplied shardMapper function
* @returns -- A connection object, with a shard id attribute.
*/
async requestConnection(...args) {
const shardId = this.shardMapper(...args);
const pool = this.shards[shardId]?.pool;
if(pool === undefined){
throw new ShardMultiplexerNotFoundError(`shardMapper returned shard id ${shardId}, which does not exist or has no associated connection pool`);
}
const connection = await pool.requestConnection();
connection.shardId = shardId;
return connection;
}
}
module.exports = {ShardMultiplexer, ShardMultiplexerValidationError, ShardMultiplexerNotFoundError};