-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
283 lines (255 loc) · 11.1 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/**
* @author Roman Decker
* Open sourced with permission from innovation.rocks consulting gmbh
*
* @module cafs
*/
'use strict';
const _ = require('lodash');
const Promise = require('bluebird');
const stream = require('stream');
const fs = require('fs');
const uuid = require('uuid');
const getHash = Promise.promisify(require('hash-stream'));
const log = require('debug')('cafs');
const streamBuffers = require('stream-buffers');
const path = require('path');
const os = require('os');
/**
* Factory for creating a new cafs
*
* @param options: Options for cafs
* @param options.store: The underlying store to use
* @param [options.hashAlgorithm='sha1'] The underlying hashing algorithm to use (any of `crypto.getHashes()`)
* @param [options.mapKey=info => (info.hash ? info.hash : uuid.v4())] A function used to generate
* the key under which to put data into the underlying store. This function is called whenever data
* is put into the store data is retrieved from the store. It will receive an object holding two
* keys: `hash` (optional) and `meta`. The hash is optional because cafs will stream data directly
* into the store without buffering anything in between, in which case the hash of the data is not
* necessarily known. `meta` is whatever you pass to the function manipulating the store as metadata
* (see `put`, `stream`, etc...). By default, this will just return a UUID for everything that does
* not yet have a hash und returns just the hash otherwise.
* @param [options.log] A custom logging function, should behave like `console.log()`
*
* @return {Fs} A new cafs-instance
*/
module.exports = function(options) {
return new Fs(options);
};
module.exports.DirectoryStore = require('./lib/backends/DirectoryStore.js');
module.exports.MemoryStore = require('./lib/backends/MemoryStore.js');
module.exports.CacheStore = require('./lib/backends/CacheStore.js');
/**
* Create a new cafs
*
* @param {Object} options Options for cafs
* @param {Object} options.store The underlying store to use
* @param {String} [options.hashAlgorithm='sha1'] The underlying hashing algorithm to use (any of `crypto.getHashes()`)
* @param {Function} [options.mapKey] A function used to generate unique keys when storing temporary blobs
* @param {Function} [options.log] A custom logging function, should behave like `console.log()`
*
* @return A new cafs-instance
*/
function Fs(options) {
this.options = _.defaults({}, options, {
hashAlgorithm: 'sha1',
mapKey: info => (info.hash ? info.hash : uuid.v4()),
log
});
this.store = this.options.store;
this.log = this.options.log;
}
/**
* Put a blob into the cafs.
*
* @param {String|Readable|Buffer} source If passed a buffer, will store the contents of the given buffer.
* If passed a stream, will store the whole stream once it has ended.
* If passed a string, will store the file pointed to by the given path.
* @param {String} name Meta-data.
* @return {Promise<Any>} Whatever the underlying back-end returns.
*/
Fs.prototype.put = function(source, meta) {
return this.preparePut(source, meta).then(info => this.finalizePut(info));
};
/**
* Prepare storing a new blob in the cafs. This will cause the underlying store to receive a call to
* ensure with a temporary key (as generated by calling `options.mapKey` without a hash). The put
* operation can be finalized by calling `cafs#finalizePut` with the returned object (or just its
* `key`).
*
* @param {String|Readable|Buffer} source If passed a buffer, will store the contents of the given buffer.
* If passed a stream, will store the whole stream once it has ended.
* If passed a string, will store the file pointed to by the given path.
* @param {String} name Meta-data.
* @return {Promise<Any>} Whatever the underlying back-end returns.
*/
Fs.prototype.preparePut = function(source, meta) {
const sourceStream = getStream(source);
const tmp = this.options.mapKey({ meta });
let size = 0;
// keep track of the size during streaming
sourceStream.on('data', data => {
size += data.length;
});
this.log(`Storing intermediate file at ${tmp}`);
// simultaneously hash the source and pass it to the underlying store's ensure(), while also
// making sure that errors are being tracked (when not handled by the store)
return Promise.join(
this.getHash(sourceStream),
this.store.ensure(tmp, sourceStream),
new Promise((resolve, reject) => {
sourceStream.on('error', reject);
sourceStream.on('end', resolve);
})
).spread(hash => {
this.log(`Prepared put for ${hash}, size: ${size} bytes`);
return { hash, size, key: tmp, meta };
});
};
/**
* Finalize a put-operation prepared by `cafs#preparePut()`. Take in an info-object as returned by
* `cafs#preparePut()`, alternatively, just a `key` can be passed as well. This will use
* `options.mapKey` to determine the key under which to store the final blob in the store and than called
* `store#move()` to move the temporary blob created by `cafs#preparePut()` there.
*
* @param {Object} info The info returned by `cafs#preparePut()`.
* @param {String} info.key Used for retrieving the current temporary-key.
* @param {String} info.hash Used for calculating the new key.
* @param {String} info.meta Used for calculating the new key.
* @return {Promise<Any>} Whatever the underlying back-end returns.
*/
Fs.prototype.finalizePut = function(info) {
const key = info.key;
const dest = this.options.mapKey({ hash: info.hash, meta: info.meta });
this.log(`Move ${key} to ${dest}`);
return this.store.move(key, dest).then(() => {
const ret = _.clone(info);
ret.key = dest;
this.log(`Moved ${key} to ${dest}`);
return ret;
});
};
/**
* Checks if there is already a blob with that key in the underlying store.
*
* @param {Object|String} info A key (or key holding object as returned by `cafs#preparePut()`)
* @return {Promise<Boolean>} True if the underlying store already has a blob with this key.
*/
Fs.prototype.has = function(info) {
this.log('Checking if %o exists', info);
return Promise.try(() => this.store.exists(getKey(info)));
};
/**
* Just like `cafs#has()` but takes the same arguments as `cafs#put` and checks whether or not this
* blob is already present in the underlying store.
*
* @param {String|Readable|Buffer} source If passed a buffer, will store the contents of the given buffer.
* If passed a stream, will store the whole stream once it has ended.
* If passed a string, will store the file pointed to by the given path.
*
* @return {Promise<Boolean>} True if the underlying store already holds this blob.
*/
Fs.prototype.hasContent = Fs.prototype.hasFile = function(source, meta) {
return this.getHash(source)
.then(hash => {
const key = this.options.mapKey({ hash, meta });
return this.has(key);
})
.then(function(ret) {
return ret;
});
};
/**
* Get the contents of a blob as a stream. This is the preferred method of accessing things from the
* store.
*
* @param {Object|String} info A key (or key holding object as returned by `cafs#preparePut()`)
* @param {Writable} dest Destination to stream to. Basically anything that can be piped to can be
* used as destination.
* @param {Object} [options] Will be passed along to the underlying call to `store#stream()`
* @return {Promise<Any>} Resolves when streaming has finished. Basically whatever `store#stream()`
* returns.
*/
Fs.prototype.stream = function(info, dest, options) {
// make sure the returned promise is a bluebird promise
return Promise.resolve(this.store.stream(getKey(info), dest, options));
};
/**
* Just like `cafs#stream()` but resolves with a buffer holding the blob. Just uses `cafs#stream()`
* under the hood.
*
* @see `cafs#stream()`
*
* @param {Object|String} info A key (or key holding object as returned by `cafs#preparePut()`)
* @param {Object} [options] Will be passed along to the underlying call to `store#stream()`
* @return {Promise<Buffer>} A buffer holding the contents of the requested Blob.
*/
Fs.prototype.readFile = function(info, options) {
this.log('Reading whole file %o', info);
const buf = new streamBuffers.WritableStreamBuffer();
return this.stream(info, buf, options).then(() => {
const ret = buf.getContents();
return ret;
});
};
/**
* Convenience function for getting the blob at `info` from the store as a temporary file. Will
* return a bluebird promise which has been set up with a proper disposer so that the file will be
* cleaned up when you use bluebird's `Promise.using()`. The returned Promise resolves with the path
* of the created temporary file. Whenever possible, try to use streams instead of file access to
* interact with data in the cafs for performance reasons.
*
* @param {Object|String} info A key (or key holding object as returned by `cafs#preparePut()`)
* @param {Object} [options=path.join(os.tmpdir(), uuid.v4())] Additional options describing the
* path of the temporary file to create. If passed a string, will use that as the destination file
* name. See below for what happens when passed an object
* @param {String} options.suffix Will use the given string as a suffix for the temporary file
* name. The file will still be placed in `os.tmpdir()`.
* @return {Promise<String>} The full path of the temporary file. The bluebird-promise returned here
* has a `disposer` set, so that it can be used with `Promise.using()` in order to clean up the file.
*/
Fs.prototype.getTemporaryFile = function(info, options = path.join(os.tmpdir(), uuid.v4())) {
let dest;
if (typeof options === 'string') {
dest = options;
} else if (typeof options === 'object' && 'suffix' in options) {
dest = path.join(os.tmpdir(), uuid.v4() + options.suffix);
}
return this.stream(info, fs.createWriteStream(dest))
.thenReturn(dest)
.disposer(() => Promise.fromCallback(cb => fs.unlink(dest, cb)));
};
/**
* Removes the given file from the cafs. `info` should be the key returned from `preparePut` or an
* object holding the key under `key`.
*
* @return {Promise} Resolves when file is deleted from store
*/
Fs.prototype.unlink = Fs.prototype.remove = function(info) {
return Promise.try(() => this.store.unlink(getKey(info)));
};
/**
* Get the hash (according to this cafs' configured hash algorithm) of the given source
*
* @param {Buffer|Stream|String} source If given a string, will interpret it as a file name.
* @return {Promise<String>} The hash of the given source, encoded as a hex-string
*/
Fs.prototype.getHash = function(source) {
const sourceStream = getStream(source);
return getHash(sourceStream, this.options.hashAlgorithm).call('toString', 'hex');
};
// helper functions
function getKey(info) {
return info.key || info;
}
function getStream(source) {
if (source instanceof Buffer) {
const ret = new stream.PassThrough();
ret.end(source);
return ret;
} else if (typeof source === 'string') {
return fs.createReadStream(source);
} else {
return source;
}
}