-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js.old
101 lines (100 loc) · 2.59 KB
/
index.js.old
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"use strict";
const {
functions: {delay, timeout}
} = require("@chumager/promise-helpers");
class localPromise extends Promise {}
delay(localPromise);
timeout(localPromise);
function create({db, model = "Mutex", collection = "__mutexes", clean = false, chainable = false, TTL = 0}) {
//base schema
const schema = new db.Schema(
{
_id: {
type: "String"
}
},
{
collection,
validateBeforeSave: false
}
);
//in case of using TTL
if (TTL) {
schema.index(
{
expire: 1
},
{
expireAfterSeconds: 0
}
);
schema.add({
expire: {
type: Date,
default() {
return Date.now() + TTL * 1000;
}
}
});
}
//model creation
const Model = db.model(model, schema);
//delete Mutex on start by setting
let cleaned;
if (clean) cleaned = Model.deleteMany();
//returning object
let res = {
//function for locking mutex
lock({lockName = "mutex", fn, maxTries = 1, timeout, delay = 100}) {
const start = Date.now();
//stop helps to stop lock loop it timeout.
let stop = false;
//release function
const free = () => Model.deleteOne({_id: lockName});
let lock = [...Array(maxTries).keys()].reduce((p, id) => {
//reverse logic, assuming catch
return p.catch(() => {
//if first try just create
const bulk = [
{
insertOne: {
document: {_id: lockName}
}
}
];
const bulkOptions = {
w: "majority",
j: true
};
if (id === 0) {
return Model.bulkWrite(bulk, bulkOptions);
}
//id not first wait until next attempt
return localPromise.delay(delay).then(() => {
return !stop && Model.bulkWrite(bulk, bulkOptions);
});
});
}, localPromise.reject());
//in case of timeout
lock = timeout ? lock.timeout(timeout) : lock;
return lock.then(
() => {
if (fn) return localPromise.resolve(fn()).finally(free);
return free;
},
err => {
err.timeout = Date.now() - start;
stop = true;
if (/E11000/.test(err.message) || err.name === "ValidationError") {
const error = new Error(`unable to acquire lock ${lockName}`);
error.name = "MutexLockError";
throw error;
}
throw err;
}
);
}
};
return chainable ? localPromise(cleaned).then(() => res) : res;
}
module.exports = create;