diff --git a/.github/workflows/npmPublish.yml b/.github/workflows/npmPublish.yml new file mode 100644 index 0000000..c67953d --- /dev/null +++ b/.github/workflows/npmPublish.yml @@ -0,0 +1,39 @@ +name: Publish to npm + +on: + release: + types: [published] + +# Allow one concurrent deployment +concurrency: + group: "npm" + cancel-in-progress: true + +env: + # 7 GiB by default on GitHub, setting to 6 GiB + # https://docs.github.com/en/actions/using-github-hosted-runners/about-github-hosted-runners#supported-runners-and-hardware-resources + NODE_OPTIONS: --max-old-space-size=6144 + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up Node + uses: actions/setup-node@v4 + with: + node-version: 18 + cache: "npm" + - name: Install dependencies + run: npm install --package-lock-only && npm ci + - name: Build + run: npm run build + - name: Configure npm for publishing + run: echo "//registry.npmjs.org/:_authToken=${{ secrets.NPM_PUBLISH_TOKEN }}" > ~/.npmrc + - name: Publish + run: npm publish --access public + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_PUBLISH_TOKEN }} + - name: Cleanup + run: rm -rf ~/.npmrc diff --git a/package-lock.json b/package-lock.json index a009f5c..d60f294 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,11 +9,10 @@ "version": "0.1.1", "license": "AGPLv3", "dependencies": { - "@filen/sdk": "^0.1.92", + "@filen/sdk": "^0.1.128", "@parcel/watcher": "^2.4.1", "fs-extra": "^11.2.0", - "msgpackr": "^1.10.1", - "uuid": "^9.0.1" + "uuid": "^10.0.0" }, "devDependencies": { "@jest/globals": "^29.7.0", @@ -1034,9 +1033,9 @@ } }, "node_modules/@filen/sdk": { - "version": "0.1.92", - "resolved": "https://registry.npmjs.org/@filen/sdk/-/sdk-0.1.92.tgz", - "integrity": "sha512-wMcHX2H8BAybb13U1M6nCMkqVvBo5S2zwrgYxAiQFcbWrWel2RFSp6Ecv70A2eA8IHnNc84/Skgt9fjwRm0Kvw==", + "version": "0.1.128", + "resolved": "https://registry.npmjs.org/@filen/sdk/-/sdk-0.1.128.tgz", + "integrity": "sha512-Qw0aAO0nopZuMKs6nZyBNcsDUNWz/2oknoUSP0n6nELcbc9mY69259MjZCSztVcSsZY+8skUlRqr1a4i3R6KhA==", "dependencies": { "agentkeepalive": "^4.5.0", "axios": "^1.6.7", @@ -1054,6 +1053,18 @@ "node": ">=18" } }, + "node_modules/@filen/sdk/node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/@hapi/hoek": { "version": "9.3.0", "resolved": "https://registry.npmjs.org/@hapi/hoek/-/hoek-9.3.0.tgz", @@ -2010,78 +2021,6 @@ "@jridgewell/sourcemap-codec": "^1.4.14" } }, - "node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.2.tgz", - "integrity": "sha512-9bfjwDxIDWmmOKusUcqdS4Rw+SETlp9Dy39Xui9BEGEk19dDwH0jhipwFzEff/pFg95NKymc6TOTbRKcWeRqyQ==", - "cpu": [ - "arm64" - ], - "optional": true, - "os": [ - "darwin" - ] - }, - "node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.2.tgz", - "integrity": "sha512-lwriRAHm1Yg4iDf23Oxm9n/t5Zpw1lVnxYU3HnJPTi2lJRkKTrps1KVgvL6m7WvmhYVt/FIsssWay+k45QHeuw==", - "cpu": [ - "x64" - ], - "optional": true, - "os": [ - "darwin" - ] - }, - "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.2.tgz", - "integrity": "sha512-MOI9Dlfrpi2Cuc7i5dXdxPbFIgbDBGgKR5F2yWEa6FVEtSWncfVNKW5AKjImAQ6CZlBK9tympdsZJ2xThBiWWA==", - "cpu": [ - "arm" - ], - "optional": true, - "os": [ - "linux" - ] - }, - "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.2.tgz", - "integrity": "sha512-FU20Bo66/f7He9Fp9sP2zaJ1Q8L9uLPZQDub/WlUip78JlPeMbVL8546HbZfcW9LNciEXc8d+tThSJjSC+tmsg==", - "cpu": [ - "arm64" - ], - "optional": true, - "os": [ - "linux" - ] - }, - "node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.2.tgz", - "integrity": "sha512-gsWNDCklNy7Ajk0vBBf9jEx04RUxuDQfBse918Ww+Qb9HCPoGzS+XJTLe96iN3BVK7grnLiYghP/M4L8VsaHeA==", - "cpu": [ - "x64" - ], - "optional": true, - "os": [ - "linux" - ] - }, - "node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.2.tgz", - "integrity": "sha512-O+6Gs8UeDbyFpbSh2CPEz/UOrrdWPTBYNblZK5CxxLisYt4kGX3Sc+czffFonyjiGSq3jWLwJS/CCJc7tBr4sQ==", - "cpu": [ - "x64" - ], - "optional": true, - "os": [ - "win32" - ] - }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -3302,11 +3241,11 @@ } }, "node_modules/braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dependencies": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" }, "engines": { "node": ">=8" @@ -3885,9 +3824,9 @@ "dev": true }, "node_modules/engine.io-client": { - "version": "3.5.3", - "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-3.5.3.tgz", - "integrity": "sha512-qsgyc/CEhJ6cgMUwxRRtOndGVhIu5hpL5tR4umSpmX/MvkFoIxUTM7oFMDQumHNzlNLwSVy6qhstFPoWTf7dOw==", + "version": "3.5.4", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-3.5.4.tgz", + "integrity": "sha512-ydc8uuMMDxC5KCKNJN3zZKYJk2sgyTuTZQ7Aj1DJSsLKAcizA/PzWivw8fZMIjJVBo2CJOYzntv4FSjY/Lr//g==", "dependencies": { "component-emitter": "~1.3.0", "component-inherit": "0.0.3", @@ -3897,7 +3836,7 @@ "indexof": "0.0.1", "parseqs": "0.0.6", "parseuri": "0.0.6", - "ws": "~7.4.2", + "ws": "~7.5.10", "xmlhttprequest-ssl": "~1.6.2", "yeast": "0.1.2" } @@ -4415,9 +4354,9 @@ } }, "node_modules/fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", "dependencies": { "to-regex-range": "^5.0.1" }, @@ -7190,35 +7129,6 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, - "node_modules/msgpackr": { - "version": "1.10.1", - "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.10.1.tgz", - "integrity": "sha512-r5VRLv9qouXuLiIBrLpl2d5ZvPt8svdQTl5/vMvE4nzDMyEX4sgW5yWhuBBj5UmgwOTWj8CIdSXn5sAfsHAWIQ==", - "optionalDependencies": { - "msgpackr-extract": "^3.0.2" - } - }, - "node_modules/msgpackr-extract": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.2.tgz", - "integrity": "sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==", - "hasInstallScript": true, - "optional": true, - "dependencies": { - "node-gyp-build-optional-packages": "5.0.7" - }, - "bin": { - "download-msgpackr-prebuilds": "bin/download-prebuilds.js" - }, - "optionalDependencies": { - "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.2", - "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.2", - "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.2", - "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.2", - "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.2", - "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.2" - } - }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", @@ -7233,17 +7143,6 @@ "node": "^16 || ^18 || >= 20" } }, - "node_modules/node-gyp-build-optional-packages": { - "version": "5.0.7", - "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.7.tgz", - "integrity": "sha512-YlCCc6Wffkx0kHkmam79GKvDQ6x+QZkMjFGrIMxgFNILFvGSbCp2fCBC55pGTT9gVaz8Na5CLmxt/urtzRv36w==", - "optional": true, - "bin": { - "node-gyp-build-optional-packages": "bin.js", - "node-gyp-build-optional-packages-optional": "optional.js", - "node-gyp-build-optional-packages-test": "build-test.js" - } - }, "node_modules/node-int64": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", @@ -8352,9 +8251,9 @@ "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==" }, "node_modules/uuid": { - "version": "9.0.1", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", - "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==", "funding": [ "https://github.com/sponsors/broofa", "https://github.com/sponsors/ctavan" @@ -8547,9 +8446,9 @@ } }, "node_modules/ws": { - "version": "7.4.6", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.4.6.tgz", - "integrity": "sha512-YmhHDO4MzaDLB+M9ym/mDA5z0naX8j7SIlT8f8z+I0VtzsRbekxEutHSme7NPS2qE8StCYQNUnfWdXta/Yu85A==", + "version": "7.5.10", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.10.tgz", + "integrity": "sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ==", "engines": { "node": ">=8.3.0" }, diff --git a/package.json b/package.json index b817392..d4ea616 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@filen/sync", - "version": "0.1.1", + "version": "0.1.2", "description": "Filen Sync", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -49,10 +49,9 @@ "wait-on": "^7.2.0" }, "dependencies": { - "@filen/sdk": "^0.1.92", + "@filen/sdk": "^0.1.128", "@parcel/watcher": "^2.4.1", "fs-extra": "^11.2.0", - "msgpackr": "^1.10.1", - "uuid": "^9.0.1" + "uuid": "^10.0.0" } } diff --git a/src/index.ts b/src/index.ts index d1a083f..06fa0f5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ -import type { SyncPair } from "./types" +import { type SyncPair } from "./types" import Sync from "./lib/sync" import { type FilenSDKConfig } from "@filen/sdk" diff --git a/src/lib/deltas.ts b/src/lib/deltas.ts index 28f243f..6dfbb97 100644 --- a/src/lib/deltas.ts +++ b/src/lib/deltas.ts @@ -1,6 +1,6 @@ import type Sync from "./sync" -import type { LocalTree } from "./filesystems/local" -import type { RemoteTree } from "./filesystems/remote" +import { type LocalTree, type LocalTreeError } from "./filesystems/local" +import { type RemoteTree } from "./filesystems/remote" import pathModule from "path" export type Delta = { path: string } & ( @@ -101,7 +101,6 @@ export class Deltas { /** * Process the directory trees and return all sync deltas. - * @date 3/2/2024 - 8:42:25 AM * * @public * @async @@ -109,27 +108,36 @@ export class Deltas { * currentLocalTree: LocalTree * currentRemoteTree: RemoteTree * previousLocalTree: LocalTree - * previousRemoteTree: RemoteTree + * previousRemoteTree: RemoteTree, + * currentLocalTreeErrors: LocalTreeError[] * }} param0 * @param {LocalTree} param0.currentLocalTree * @param {RemoteTree} param0.currentRemoteTree * @param {LocalTree} param0.previousLocalTree * @param {RemoteTree} param0.previousRemoteTree + * @param {{}} param0.currentLocalTreeErrors * @returns {Promise} */ public async process({ currentLocalTree, currentRemoteTree, previousLocalTree, - previousRemoteTree + previousRemoteTree, + currentLocalTreeErrors }: { currentLocalTree: LocalTree currentRemoteTree: RemoteTree previousLocalTree: LocalTree previousRemoteTree: RemoteTree + currentLocalTreeErrors: LocalTreeError[] }): Promise { const deltas: Delta[] = [] const pathsAdded: Record = {} + const erroredLocalPaths: Record = {} + + for (const error of currentLocalTreeErrors) { + erroredLocalPaths[error.relativePath] = true + } // Local file/directory move/rename @@ -137,7 +145,14 @@ export class Deltas { const currentItem = currentLocalTree.inodes[inode] const previousItem = previousLocalTree.inodes[inode] - if (!currentItem || !previousItem || pathsAdded[currentItem.path] || pathsAdded[previousItem.path]) { + if ( + !currentItem || + !previousItem || + pathsAdded[currentItem.path] || + pathsAdded[previousItem.path] || + erroredLocalPaths[currentItem.path] || + erroredLocalPaths[previousItem.path] + ) { continue } @@ -228,7 +243,7 @@ export class Deltas { // Local deletions for (const path in previousLocalTree.tree) { - if (pathsAdded[path]) { + if (pathsAdded[path] || erroredLocalPaths[path]) { continue } @@ -265,10 +280,10 @@ export class Deltas { } } - // Local additions/changes + // Local additions/filemodifications for (const path in currentLocalTree.tree) { - if (pathsAdded[path]) { + if (pathsAdded[path] || erroredLocalPaths[path]) { continue } diff --git a/src/lib/filesystems/local.ts b/src/lib/filesystems/local.ts index f3bb621..ac9971a 100644 --- a/src/lib/filesystems/local.ts +++ b/src/lib/filesystems/local.ts @@ -8,7 +8,8 @@ import { SYNC_INTERVAL } from "../../constants" import crypto from "crypto" import { pipeline } from "stream" import { promisify } from "util" -import type { CloudItem } from "@filen/sdk" +import { type CloudItem, PauseSignal } from "@filen/sdk" +import { postMessageToMain } from "../ipc" const pipelineAsync = promisify(pipeline) @@ -23,7 +24,15 @@ export type LocalItem = { export type LocalDirectoryTree = Record export type LocalDirectoryINodes = Record -export type LocalTree = { tree: LocalDirectoryTree; inodes: LocalDirectoryINodes } +export type LocalTree = { + tree: LocalDirectoryTree + inodes: LocalDirectoryINodes +} +export type LocalTreeError = { + localPath: string + relativePath: string + error: Error +} /** * LocalFileSystem @@ -36,7 +45,11 @@ export type LocalTree = { tree: LocalDirectoryTree; inodes: LocalDirectoryINodes export class LocalFileSystem { private readonly sync: Sync public lastDirectoryChangeTimestamp = Date.now() - SYNC_INTERVAL * 2 - public getDirectoryTreeCache: { timestamp: number; tree: LocalDirectoryTree; inodes: LocalDirectoryINodes } = { + public getDirectoryTreeCache: { + timestamp: number + tree: LocalDirectoryTree + inodes: LocalDirectoryINodes + } = { timestamp: 0, tree: {}, inodes: {} @@ -59,66 +72,74 @@ export class LocalFileSystem { /** * Get the local directory tree. - * @date 3/2/2024 - 12:38:13 PM * * @public * @async - * @returns {Promise} + * @returns {Promise<{ + * result: LocalTree + * errors: LocalTreeError[] + * }>} */ - public async getDirectoryTree(): Promise { + public async getDirectoryTree(): Promise<{ + result: LocalTree + errors: LocalTreeError[] + }> { if ( this.lastDirectoryChangeTimestamp > 0 && this.getDirectoryTreeCache.timestamp > 0 && this.lastDirectoryChangeTimestamp < this.getDirectoryTreeCache.timestamp ) { return { - tree: this.getDirectoryTreeCache.tree, - inodes: this.getDirectoryTreeCache.inodes + result: { + tree: this.getDirectoryTreeCache.tree, + inodes: this.getDirectoryTreeCache.inodes + }, + errors: [] } } + const isWindows = process.platform === "win32" const tree: LocalDirectoryTree = {} const inodes: LocalDirectoryINodes = {} + const errors: LocalTreeError[] = [] const dir = await fs.readdir(this.sync.syncPair.localPath, { recursive: true, encoding: "utf-8" }) - const promises: Promise[] = [] - for (const entry of dir) { - promises.push( - new Promise((resolve, reject) => { - if (entry.startsWith(".filen.trash.local")) { - resolve() + await promiseAllChunked( + dir.map(async entry => { + if (entry.startsWith(".filen.trash.local")) { + return + } - return + const itemPath = pathModule.join(this.sync.syncPair.localPath, entry) + const entryPath = `/${isWindows ? entry.replace(/\\/g, "/") : entry}` + + try { + const stats = await fs.stat(itemPath) + const item: LocalItem = { + lastModified: parseInt(stats.mtimeMs as unknown as string), // Sometimes comes as a float, but we need an int + type: stats.isDirectory() ? "directory" : "file", + path: entryPath, + creation: parseInt(stats.birthtimeMs as unknown as string), // Sometimes comes as a float, but we need an int + size: stats.size, + inode: stats.ino } - const itemPath = pathModule.join(this.sync.syncPair.localPath, entry) - const entryPath = `/${process.platform === "win32" ? entry.replace(/\\/g, "/") : entry}` - - fs.stat(itemPath) - .then(stats => { - const item: LocalItem = { - lastModified: parseInt(stats.mtimeMs as unknown as string), // Sometimes comes as a float, but we need an int - type: stats.isDirectory() ? "directory" : "file", - path: entryPath, - creation: parseInt(stats.birthtimeMs as unknown as string), // Sometimes comes as a float, but we need an int - size: stats.size, - inode: stats.ino - } - - tree[entryPath] = item - inodes[stats.ino] = item - - resolve() + tree[entryPath] = item + inodes[stats.ino] = item + } catch (e) { + if (e instanceof Error) { + errors.push({ + localPath: itemPath, + relativePath: entryPath, + error: e }) - .catch(reject) - }) - ) - } - - await promiseAllChunked(promises) + } + } + }) + ) this.getDirectoryTreeCache = { timestamp: Date.now(), @@ -126,7 +147,13 @@ export class LocalFileSystem { inodes } - return { tree, inodes } + return { + result: { + tree, + inodes + }, + errors + } } /** @@ -314,21 +341,147 @@ export class LocalFileSystem { */ public async upload({ relativePath }: { relativePath: string }): Promise { const localPath = pathModule.join(this.sync.syncPair.localPath, relativePath) - const parentPath = pathModule.posix.dirname(relativePath) - - await this.sync.remoteFileSystem.mkdir({ relativePath: parentPath }) + let readStream: fs.ReadStream | null = null + const signalKey = `upload:${relativePath}` - const parentUUID = await this.sync.remoteFileSystem.pathToItemUUID({ relativePath: parentPath }) + if (!this.sync.pauseSignals[signalKey]) { + this.sync.pauseSignals[signalKey] = new PauseSignal() + } - if (!parentUUID) { - throw new Error(`Could not upload ${relativePath}: Parent path not found.`) + if (!this.sync.abortControllers[signalKey]) { + this.sync.abortControllers[signalKey] = new AbortController() } - const hash = await this.createFileHash({ relativePath, algorithm: "sha512" }) + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "upload", + type: "queued", + relativePath, + localPath + } + }) - this.sync.localFileHashes[relativePath] = hash + try { + const parentPath = pathModule.posix.dirname(relativePath) + + readStream = fs.createReadStream(localPath) + + await this.sync.remoteFileSystem.mkdir({ relativePath: parentPath }) + + const parentUUID = await this.sync.remoteFileSystem.pathToItemUUID({ relativePath: parentPath }) + + if (!parentUUID) { + throw new Error(`Could not upload ${relativePath}: Parent path not found.`) + } - return await this.sync.sdk.cloud().uploadLocalFile({ source: localPath, parent: parentUUID }) + const hash = await this.createFileHash({ relativePath, algorithm: "sha512" }) + + this.sync.localFileHashes[relativePath] = hash + + const item = await this.sync.sdk.cloud().uploadLocalFileStream({ + source: readStream, + parent: parentUUID, + name: pathModule.basename(localPath), + pauseSignal: this.sync.pauseSignals[signalKey], + abortSignal: this.sync.abortControllers[signalKey]?.signal, + onError: err => { + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "upload", + type: "error", + relativePath, + localPath, + error: err + } + }) + }, + onProgress: bytes => { + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "upload", + type: "progress", + relativePath, + localPath, + bytes + } + }) + }, + onStarted: () => { + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "upload", + type: "started", + relativePath, + localPath + } + }) + } + }) + + await this.sync.remoteFileSystem.itemsMutex.acquire() + + this.sync.remoteFileSystem.getDirectoryTreeCache.tree[relativePath] = { + ...item, + path: relativePath + } + + this.sync.remoteFileSystem.getDirectoryTreeCache.uuids[item.uuid] = { + ...item, + path: relativePath + } + + this.sync.remoteFileSystem.itemsMutex.release() + + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "upload", + type: "finished", + relativePath, + localPath + } + }) + + return item + } catch (e) { + if (e instanceof Error) { + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "upload", + type: "error", + relativePath, + localPath, + error: e + } + }) + } + + throw e + } finally { + if (readStream) { + try { + if (!readStream.closed && !readStream.destroyed) { + readStream.destroy() + } + } catch { + // Noop + } + } + + delete this.sync.pauseSignals[signalKey] + delete this.sync.abortControllers[signalKey] + } } } diff --git a/src/lib/filesystems/remote.ts b/src/lib/filesystems/remote.ts index 5b5f0eb..c012e6f 100644 --- a/src/lib/filesystems/remote.ts +++ b/src/lib/filesystems/remote.ts @@ -1,24 +1,34 @@ import type Sync from "../sync" -import type { CloudItemTree, FSItemType, FileMetadata, FolderMetadata } from "@filen/sdk" +import { type CloudItemTree, type FSItemType, type FileMetadata, type FolderMetadata, PauseSignal } from "@filen/sdk" import pathModule from "path" import { Semaphore } from "../../semaphore" import fs from "fs-extra" -import type { DistributiveOmit, Prettify } from "../../types" +import { type DistributiveOmit, type Prettify } from "../../types" +import { postMessageToMain } from "../ipc" +import { convertTimestampToMs } from "../../utils" export type RemoteItem = Prettify & { path: string }> export type RemoteDirectoryTree = Record export type RemoteDirectoryUUIDs = Record -export type RemoteTree = { tree: RemoteDirectoryTree; uuids: RemoteDirectoryUUIDs } +export type RemoteTree = { + tree: RemoteDirectoryTree + uuids: RemoteDirectoryUUIDs +} export class RemoteFileSystem { private readonly sync: Sync - public getDirectoryTreeCache: { timestamp: number; tree: RemoteDirectoryTree; uuids: RemoteDirectoryUUIDs } = { + public getDirectoryTreeCache: { + timestamp: number + tree: RemoteDirectoryTree + uuids: RemoteDirectoryUUIDs + } = { timestamp: 0, tree: {}, uuids: {} } private readonly mutex = new Semaphore(1) private readonly mkdirMutex = new Semaphore(1) + public readonly itemsMutex = new Semaphore(1) public constructor({ sync }: { sync: Sync }) { this.sync = sync @@ -56,7 +66,10 @@ export class RemoteFileSystem { uuids } - return { tree, uuids } + return { + tree, + uuids + } } /** @@ -118,6 +131,8 @@ export class RemoteFileSystem { if (parentPath === "/" || parentPath === "." || parentPath.length <= 0) { const uuid = await this.sync.sdk.cloud().createDirectory({ name: basename, parent: this.sync.syncPair.remoteParentUUID }) + await this.itemsMutex.acquire() + this.getDirectoryTreeCache.tree[relativePath] = { type: "directory", uuid, @@ -126,6 +141,16 @@ export class RemoteFileSystem { path: relativePath } + this.getDirectoryTreeCache.uuids[uuid] = { + type: "directory", + uuid, + name: basename, + size: 0, + path: relativePath + } + + this.itemsMutex.release() + return uuid } @@ -152,6 +177,8 @@ export class RemoteFileSystem { const parentUUID = parentIsBase ? this.sync.syncPair.remoteParentUUID : parentItem.uuid const uuid = await this.sync.sdk.cloud().createDirectory({ name: partBasename, parent: parentUUID }) + await this.itemsMutex.acquire() + this.getDirectoryTreeCache.tree[relativePath] = { type: "directory", uuid, @@ -159,6 +186,16 @@ export class RemoteFileSystem { size: 0, path: relativePath } + + this.getDirectoryTreeCache.uuids[uuid] = { + type: "directory", + uuid, + name: partBasename, + size: 0, + path: relativePath + } + + this.itemsMutex.release() } } @@ -197,18 +234,19 @@ export class RemoteFileSystem { try { const uuid = await this.pathToItemUUID({ relativePath }) + const item = this.getDirectoryTreeCache.tree[relativePath] - if (!uuid || !this.getDirectoryTreeCache.tree[relativePath]) { + if (!uuid || !item) { return } const acceptedTypes: FSItemType[] = !type ? ["directory", "file"] : type === "directory" ? ["directory"] : ["file"] - if (!acceptedTypes.includes(this.getDirectoryTreeCache.tree[relativePath]!.type)) { + if (!acceptedTypes.includes(item.type)) { return } - if (this.getDirectoryTreeCache.tree[relativePath]!.type === "directory") { + if (item.type === "directory") { if (permanent) { await this.sync.sdk.cloud().deleteDirectory({ uuid }) } else { @@ -222,13 +260,24 @@ export class RemoteFileSystem { } } + await this.itemsMutex.acquire() + delete this.getDirectoryTreeCache.tree[relativePath] + delete this.getDirectoryTreeCache.uuids[uuid] for (const entry in this.getDirectoryTreeCache.tree) { - if (entry.startsWith(relativePath + "/")) { + if (entry.startsWith(relativePath + "/") || entry === relativePath) { + const entryItem = this.getDirectoryTreeCache.tree[entry] + + if (entryItem) { + delete this.getDirectoryTreeCache.uuids[entryItem.uuid] + } + delete this.getDirectoryTreeCache.tree[entry] } } + + this.itemsMutex.release() } finally { this.mutex.release() } @@ -286,7 +335,10 @@ export class RemoteFileSystem { } if (item.type === "directory") { - await this.sync.sdk.cloud().renameDirectory({ uuid, name: newBasename }) + await this.sync.sdk.cloud().renameDirectory({ + uuid, + name: newBasename + }) } else { await this.sync.sdk.cloud().renameFile({ uuid, @@ -294,21 +346,17 @@ export class RemoteFileSystem { name: newBasename }) } - - const oldItem = this.getDirectoryTreeCache.tree[fromRelativePath] - - if (oldItem) { - this.getDirectoryTreeCache.tree[toRelativePath] = { - ...oldItem, - name: newBasename - } + } else { + if (toRelativePath.startsWith(fromRelativePath)) { + return } - delete this.getDirectoryTreeCache.tree[fromRelativePath] - } else { if (oldBasename !== newBasename) { if (item.type === "directory") { - await this.sync.sdk.cloud().renameDirectory({ uuid, name: newBasename }) + await this.sync.sdk.cloud().renameDirectory({ + uuid, + name: newBasename + }) } else { await this.sync.sdk.cloud().renameFile({ uuid, @@ -320,13 +368,17 @@ export class RemoteFileSystem { if (newParentPath === "/" || newParentPath === "." || newParentPath === "") { if (item.type === "directory") { - await this.sync.sdk - .cloud() - .moveDirectory({ uuid, to: this.sync.syncPair.remoteParentUUID, metadata: itemMetadata as FolderMetadata }) + await this.sync.sdk.cloud().moveDirectory({ + uuid, + to: this.sync.syncPair.remoteParentUUID, + metadata: itemMetadata as FolderMetadata + }) } else { - await this.sync.sdk - .cloud() - .moveFile({ uuid, to: this.sync.syncPair.remoteParentUUID, metadata: itemMetadata as FileMetadata }) + await this.sync.sdk.cloud().moveFile({ + uuid, + to: this.sync.syncPair.remoteParentUUID, + metadata: itemMetadata as FileMetadata + }) } } else { await this.mkdir({ relativePath: newParentPath }) @@ -338,40 +390,64 @@ export class RemoteFileSystem { } if (item.type === "directory") { - await this.sync.sdk - .cloud() - .moveDirectory({ uuid, to: newParentItem.uuid!, metadata: itemMetadata as FolderMetadata }) + await this.sync.sdk.cloud().moveDirectory({ + uuid, + to: newParentItem.uuid!, + metadata: itemMetadata as FolderMetadata + }) } else { - await this.sync.sdk.cloud().moveFile({ uuid, to: newParentItem.uuid, metadata: itemMetadata as FileMetadata }) + await this.sync.sdk.cloud().moveFile({ + uuid, + to: newParentItem.uuid, + metadata: itemMetadata as FileMetadata + }) } } - const oldItem = this.getDirectoryTreeCache.tree[fromRelativePath] + await this.itemsMutex.acquire() - if (oldItem) { - this.getDirectoryTreeCache.tree[toRelativePath] = { - ...oldItem, - name: newBasename - } + this.getDirectoryTreeCache.tree[toRelativePath] = { + ...item, + name: pathModule.basename(toRelativePath), + path: toRelativePath + } + + this.getDirectoryTreeCache.uuids[item.uuid] = { + ...item, + name: pathModule.basename(toRelativePath), + path: toRelativePath } delete this.getDirectoryTreeCache.tree[fromRelativePath] for (const oldPath in this.getDirectoryTreeCache.tree) { - if (oldPath.startsWith(fromRelativePath + "/")) { + if (oldPath.startsWith(fromRelativePath + "/") && oldPath !== fromRelativePath) { const newPath = oldPath.split(fromRelativePath).join(toRelativePath) const oldItem = this.getDirectoryTreeCache.tree[oldPath] if (oldItem) { this.getDirectoryTreeCache.tree[newPath] = { ...oldItem, - name: newBasename + name: pathModule.basename(newPath), + path: newPath } - } - delete this.getDirectoryTreeCache.tree[oldPath] + delete this.getDirectoryTreeCache.tree[oldPath] + + const oldItemUUID = this.getDirectoryTreeCache.uuids[oldItem.uuid] + + if (oldItemUUID) { + this.getDirectoryTreeCache.uuids[oldItem.uuid] = { + ...oldItemUUID, + name: pathModule.basename(newPath), + path: newPath + } + } + } } } + + this.itemsMutex.release() } } finally { this.mutex.release() @@ -390,35 +466,127 @@ export class RemoteFileSystem { */ public async download({ relativePath }: { relativePath: string }): Promise { const localPath = pathModule.posix.join(this.sync.syncPair.localPath, relativePath) + const signalKey = `upload:${relativePath}` - const uuid = await this.pathToItemUUID({ relativePath }) - const item = this.getDirectoryTreeCache.tree[relativePath] - - if (!uuid || !item) { - throw new Error(`Could not download ${relativePath}: File not found.`) + if (!this.sync.pauseSignals[signalKey]) { + this.sync.pauseSignals[signalKey] = new PauseSignal() } - if (item.type === "directory") { - throw new Error(`Could not download ${relativePath}: Not a file.`) + if (!this.sync.abortControllers[signalKey]) { + this.sync.abortControllers[signalKey] = new AbortController() } - const tmpPath = await this.sync.sdk.cloud().downloadFileToLocal({ - uuid, - bucket: item.bucket, - region: item.region, - chunks: item.chunks, - version: item.version, - key: item.key, - size: item.size + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "download", + type: "queued", + relativePath, + localPath + } }) - await fs.move(tmpPath, localPath, { - overwrite: true - }) + try { + const uuid = await this.pathToItemUUID({ relativePath }) + const item = this.getDirectoryTreeCache.tree[relativePath] + + if (!uuid || !item) { + throw new Error(`Could not download ${relativePath}: File not found.`) + } + + if (item.type === "directory") { + throw new Error(`Could not download ${relativePath}: Not a file.`) + } - await fs.utimes(localPath, Date.now(), item.lastModified) + const tmpPath = await this.sync.sdk.cloud().downloadFileToLocal({ + uuid, + bucket: item.bucket, + region: item.region, + chunks: item.chunks, + version: item.version, + key: item.key, + size: item.size, + pauseSignal: this.sync.pauseSignals[signalKey], + abortSignal: this.sync.abortControllers[signalKey]?.signal, + onError: err => { + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "download", + type: "error", + relativePath, + localPath, + error: err + } + }) + }, + onProgress: bytes => { + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "download", + type: "progress", + relativePath, + localPath, + bytes + } + }) + }, + onStarted: () => { + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "download", + type: "started", + relativePath, + localPath + } + }) + } + }) + + await fs.move(tmpPath, localPath, { + overwrite: true + }) + + await fs.utimes(localPath, Date.now(), convertTimestampToMs(item.lastModified)) + + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "download", + type: "finished", + relativePath, + localPath + } + }) + + return await fs.stat(localPath) + } catch (e) { + if (e instanceof Error) { + postMessageToMain({ + type: "transfer", + syncPair: this.sync.syncPair, + data: { + of: "download", + type: "error", + relativePath, + localPath, + error: e + } + }) + } - return await fs.stat(localPath) + throw e + } finally { + delete this.sync.pauseSignals[signalKey] + delete this.sync.abortControllers[signalKey] + } } } diff --git a/src/lib/ipc.ts b/src/lib/ipc.ts new file mode 100644 index 0000000..64ed3b8 --- /dev/null +++ b/src/lib/ipc.ts @@ -0,0 +1,78 @@ +import { isMainThread, parentPort } from "worker_threads" +import { type SyncMessage } from "../types" + +const postMessageToMainProgressThrottle: Record< + string, + { + next: number + storedBytes: number + } +> = {} + +// We have to throttle the "progress" events of the "download"/"upload" message type. The SDK sends too many events for the IPC to handle properly. +// It freezes the main process if we don't throttle it. +export function throttlePostMessageToMain(message: SyncMessage, callback: (message: SyncMessage) => void) { + const now = Date.now() + let key = "" + + if (message.type === "transfer" && message.data.type === "progress") { + key = `${message.type}:${message.data.relativePath}:${message.data.localPath}:${message.data.type}` + + if (!postMessageToMainProgressThrottle[key]) { + postMessageToMainProgressThrottle[key] = { + next: 0, + storedBytes: 0 + } + } + + postMessageToMainProgressThrottle[key]!.storedBytes += message.data.bytes + + if (postMessageToMainProgressThrottle[key]!.next > now) { + return + } + + message = { + ...message, + data: { + ...message.data, + bytes: postMessageToMainProgressThrottle[key]!.storedBytes + } + } + } + + callback(message) + + if (key.length > 0 && postMessageToMainProgressThrottle[key] && message.type === "transfer") { + postMessageToMainProgressThrottle[key]!.storedBytes = 0 + postMessageToMainProgressThrottle[key]!.next = now + 100 + + if ( + message.data.type === "error" || + message.data.type === "queued" || + // TODO: Stopped event (message.data.type === "stopped") + message.data.type === "finished" + ) { + delete postMessageToMainProgressThrottle[key] + } + } +} + +/** + * Send a message to the main thread if we are running inside a worker thread, otherwise send it to the process interface. + * + * @export + * @param {SyncMessage} message + */ +export function postMessageToMain(message: SyncMessage): void { + throttlePostMessageToMain(message, throttledMessage => { + if (!isMainThread || !parentPort) { + if (process.send) { + process.send(throttledMessage) + } + + return + } + + parentPort.postMessage(throttledMessage) + }) +} diff --git a/src/lib/state.ts b/src/lib/state.ts index 9b4d302..afc0e2c 100644 --- a/src/lib/state.ts +++ b/src/lib/state.ts @@ -1,10 +1,10 @@ import type Sync from "./sync" import pathModule from "path" import fs from "fs-extra" -import { unpack, pack } from "msgpackr" -import type { RemoteTree, RemoteItem } from "./filesystems/remote" -import type { LocalTree, LocalItem } from "./filesystems/local" -import type { DoneTask } from "./tasks" +import { serialize, deserialize } from "v8" +import { type RemoteTree, type RemoteItem } from "./filesystems/remote" +import { type LocalTree, type LocalItem } from "./filesystems/local" +import { type DoneTask } from "./tasks" const STATE_VERSION = 1 @@ -261,7 +261,7 @@ export class State { public async saveLocalFileHashes(): Promise { const path = pathModule.join(this.statePath, "localFileHashes") - const serialized = pack(this.sync.localFileHashes) + const serialized = serialize(this.sync.localFileHashes) await fs.ensureDir(this.statePath) await fs.writeFile(path, serialized) @@ -278,7 +278,7 @@ export class State { const buffer = await fs.readFile(path) - this.sync.localFileHashes = unpack(buffer) + this.sync.localFileHashes = deserialize(buffer) } public async initialize(): Promise { @@ -301,15 +301,15 @@ export class State { const [localBuffer, remoteBuffer] = await Promise.all([fs.readFile(localPath), fs.readFile(remotePath)]) - this.sync.previousLocalTree = unpack(localBuffer) - this.sync.previousRemoteTree = unpack(remoteBuffer) + this.sync.previousLocalTree = deserialize(localBuffer) + this.sync.previousRemoteTree = deserialize(remoteBuffer) } public async savePreviousTrees(): Promise { const localPath = pathModule.join(this.statePath, "previousLocalTree") const remotePath = pathModule.join(this.statePath, "previousRemoteTree") - const localSerialized = pack(this.sync.previousLocalTree) - const remoteSerialized = pack(this.sync.previousRemoteTree) + const localSerialized = serialize(this.sync.previousLocalTree) + const remoteSerialized = serialize(this.sync.previousRemoteTree) await fs.ensureDir(this.statePath) await Promise.all([fs.writeFile(localPath, localSerialized), fs.writeFile(remotePath, remoteSerialized)]) diff --git a/src/lib/sync.ts b/src/lib/sync.ts index 1ac15bb..c66c704 100644 --- a/src/lib/sync.ts +++ b/src/lib/sync.ts @@ -1,11 +1,13 @@ -import SDK, { type FilenSDKConfig } from "@filen/sdk" -import type { SyncPair } from "../types" +import SDK, { type FilenSDKConfig, type PauseSignal } from "@filen/sdk" +import { type SyncPair, type SyncMessage } from "../types" import { SYNC_INTERVAL } from "../constants" import { LocalFileSystem, LocalTree } from "./filesystems/local" import { RemoteFileSystem, RemoteTree } from "./filesystems/remote" import Deltas from "./deltas" import Tasks from "./tasks" import State from "./state" +import { postMessageToMain } from "./ipc" +import { isMainThread, parentPort } from "worker_threads" /** * Sync @@ -27,6 +29,8 @@ export class Sync { public readonly tasks: Tasks public readonly state: State public readonly dbPath: string + public readonly abortControllers: Record = {} + public readonly pauseSignals: Record = {} /** * Creates an instance of Sync. @@ -47,6 +51,49 @@ export class Sync { this.deltas = new Deltas({ sync: this }) this.tasks = new Tasks({ sync: this }) this.state = new State({ sync: this }) + + this.setupMainThreadListeners() + } + + /** + * Sets up receiving message from the main thread. + * + * @private + */ + private setupMainThreadListeners(): void { + if (isMainThread || !parentPort) { + return + } + + parentPort.removeAllListeners() + + parentPort.on("message", (message: SyncMessage) => { + if (message.type === "stopTransfer" && message.syncPair.uuid === this.syncPair.uuid) { + const abortController = this.abortControllers[`${message.data.of}:${message.data.relativePath}`] + + if (!abortController || abortController.signal.aborted) { + return + } + + abortController.abort() + } else if (message.type === "pauseTransfer" && message.syncPair.uuid === this.syncPair.uuid) { + const pauseSignal = this.pauseSignals[`${message.data.of}:${message.data.relativePath}`] + + if (!pauseSignal || pauseSignal.isPaused()) { + return + } + + pauseSignal.pause() + } else if (message.type === "resumeTransfer" && message.syncPair.uuid === this.syncPair.uuid) { + const pauseSignal = this.pauseSignals[`${message.data.of}:${message.data.relativePath}`] + + if (!pauseSignal || !pauseSignal.isPaused()) { + return + } + + pauseSignal.resume() + } + }) } public async initialize(): Promise { @@ -59,8 +106,7 @@ export class Sync { try { //local/remote smoke test - await this.localFileSystem.startDirectoryWatcher() - await this.state.initialize() + await Promise.all([this.localFileSystem.startDirectoryWatcher(), this.state.initialize()]) this.run() } catch (e) { @@ -71,44 +117,159 @@ export class Sync { } private async run(): Promise { + postMessageToMain({ + type: "cycleStarted", + syncPair: this.syncPair + }) + try { + postMessageToMain({ + type: "cycleWaitingForLocalDirectoryChangesStarted", + syncPair: this.syncPair + }) + await this.localFileSystem.waitForLocalDirectoryChanges() + postMessageToMain({ + type: "cycleWaitingForLocalDirectoryChangesDone", + syncPair: this.syncPair + }) + + postMessageToMain({ + type: "cycleGettingTreesStarted", + syncPair: this.syncPair + }) + + // eslint-disable-next-line prefer-const let [currentLocalTree, currentRemoteTree] = await Promise.all([ this.localFileSystem.getDirectoryTree(), this.remoteFileSystem.getDirectoryTree() ]) + postMessageToMain({ + type: "cycleGettingTreesDone", + syncPair: this.syncPair + }) + + postMessageToMain({ + type: "localTreeErrors", + syncPair: this.syncPair, + data: currentLocalTree.errors + }) + + postMessageToMain({ + type: "cycleProcessingDeltasStarted", + syncPair: this.syncPair + }) + const deltas = await this.deltas.process({ - currentLocalTree, + currentLocalTree: currentLocalTree.result, currentRemoteTree, previousLocalTree: this.previousLocalTree, - previousRemoteTree: this.previousRemoteTree + previousRemoteTree: this.previousRemoteTree, + currentLocalTreeErrors: currentLocalTree.errors + }) + + postMessageToMain({ + type: "cycleProcessingDeltasDone", + syncPair: this.syncPair + }) + + postMessageToMain({ + type: "deltas", + syncPair: this.syncPair, + data: deltas }) - console.log(deltas) + console.log({ deltas, localErrors: currentLocalTree.errors }) - const doneTasks = await this.tasks.process({ deltas }) + postMessageToMain({ + type: "cycleProcessingTasksStarted", + syncPair: this.syncPair + }) + + const { doneTasks, errors } = await this.tasks.process({ deltas }) - console.log(doneTasks) + console.log({ doneTasks, errors }) + + postMessageToMain({ + type: "cycleProcessingTasksDone", + syncPair: this.syncPair + }) + + postMessageToMain({ + type: "doneTasks", + syncPair: this.syncPair, + data: { + tasks: doneTasks, + errors + } + }) if (doneTasks.length > 0) { - const applied = this.state.applyDoneTasksToState({ doneTasks, currentLocalTree, currentRemoteTree }) + postMessageToMain({ + type: "cycleApplyingStateStarted", + syncPair: this.syncPair + }) + + const applied = this.state.applyDoneTasksToState({ + doneTasks, + currentLocalTree: currentLocalTree.result, + currentRemoteTree + }) - currentLocalTree = applied.currentLocalTree + currentLocalTree.result = applied.currentLocalTree currentRemoteTree = applied.currentRemoteTree + + postMessageToMain({ + type: "cycleApplyingStateDone", + syncPair: this.syncPair + }) } - this.previousLocalTree = currentLocalTree + postMessageToMain({ + type: "cycleSavingStateStarted", + syncPair: this.syncPair + }) + + this.previousLocalTree = currentLocalTree.result this.previousRemoteTree = currentRemoteTree await this.state.save() + + postMessageToMain({ + type: "cycleSavingStateDone", + syncPair: this.syncPair + }) + + postMessageToMain({ + type: "cycleSuccess", + syncPair: this.syncPair + }) } catch (e) { console.error(e) // TODO: Proper debugger + + if (e instanceof Error) { + postMessageToMain({ + type: "cycleError", + syncPair: this.syncPair, + data: e + }) + } } finally { + postMessageToMain({ + type: "cycleFinished", + syncPair: this.syncPair + }) + setTimeout(() => { this.run() }, SYNC_INTERVAL) + + postMessageToMain({ + type: "cycleRestarting", + syncPair: this.syncPair + }) } } } diff --git a/src/lib/tasks.ts b/src/lib/tasks.ts index c501ff9..ba996dd 100644 --- a/src/lib/tasks.ts +++ b/src/lib/tasks.ts @@ -1,9 +1,32 @@ import type Sync from "./sync" -import type { Delta } from "./deltas" -import { promiseAllSettledChunked } from "../utils" -import type { CloudItem } from "@filen/sdk" +import { type Delta } from "./deltas" +import { promiseAllChunked } from "../utils" +import { type CloudItem } from "@filen/sdk" import fs from "fs-extra" +export type TaskError = { + path: string + error: Error + type: + | "uploadFile" + | "createRemoteDirectory" + | "createLocalDirectory" + | "deleteLocalFile" + | "deleteRemoteFile" + | "deleteLocalDirectory" + | "deleteRemoteDirectory" + | "downloadFile" + | "moveLocalFile" + | "renameLocalFile" + | "moveRemoteFile" + | "renameRemoteFile" + | "renameRemoteDirectory" + | "renameLocalDirectory" + | "moveRemoteDirectory" + | "moveLocalFile" + | "moveLocalDirectory" +} + export type DoneTask = { path: string } & ( | { type: "uploadFile"; item: CloudItem } | { @@ -192,38 +215,48 @@ export class Tasks { /** * Process all deltas. - * @date 3/5/2024 - 3:59:51 PM * * @public * @async * @param {{ deltas: Delta[] }} param0 * @param {{}} param0.deltas - * @returns {Promise} + * @returns {Promise<{ + * doneTasks: DoneTask[] + * errors: TaskError[] + * }>} */ - public async process({ deltas }: { deltas: Delta[] }): Promise { - // Work on deltas from "left to right" (ascending order, path length). - deltas = deltas.sort((a, b) => a.path.split("/").length - b.path.split("/").length) - + public async process({ deltas }: { deltas: Delta[] }): Promise<{ + doneTasks: DoneTask[] + errors: TaskError[] + }> { const executed: DoneTask[] = [] - const promises: Promise[] = [] - - for (const delta of deltas) { - promises.push( - new Promise((resolve, reject) => { - this.processTask({ delta }) - .then(doneTask => { - executed.push(doneTask) - - resolve() - }) - .catch(reject) - }) - ) - } + const errors: TaskError[] = [] + + await promiseAllChunked( + // Work on deltas from "left to right" (ascending order, path length). + deltas + .sort((a, b) => a.path.split("/").length - b.path.split("/").length) + .map(async delta => { + try { + const doneTask = await this.processTask({ delta }) - await promiseAllSettledChunked(promises) + executed.push(doneTask) + } catch (e) { + if (e instanceof Error) { + errors.push({ + path: delta.path, + type: delta.type, + error: e + }) + } + } + }) + ) - return executed + return { + doneTasks: executed, + errors + } } } diff --git a/src/types.ts b/src/types.ts index 6bb7609..346ee85 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,7 @@ +import { type LocalTreeError } from "./lib/filesystems/local" +import { type Delta } from "./lib/deltas" +import { type DoneTask, type TaskError } from "./lib/tasks" + export type SyncMode = "twoWay" | "localToCloud" | "localBackup" | "cloudToLocal" | "cloudBackup" export type SyncPair = { @@ -15,3 +19,149 @@ export type Prettify = { [K in keyof T]: T[K] // eslint-disable-next-line @typescript-eslint/ban-types } & {} + +export type CycleState = + | "cycleStarted" + | "cycleFinished" + | "cycleError" + | "cycleSuccess" + | "cycleWaitingForLocalDirectoryChangesStarted" + | "cycleWaitingForLocalDirectoryChangesDone" + | "cycleGettingTreesStarted" + | "cycleGettingTreesDone" + | "cycleProcessingDeltasStarted" + | "cycleProcessingDeltasDone" + | "cycleProcessingTasksStarted" + | "cycleProcessingTasksDone" + | "cycleApplyingStateStarted" + | "cycleApplyingStateDone" + | "cycleSavingStateStarted" + | "cycleSavingStateDone" + | "cycleRestarting" + +export type SyncMessage = { syncPair: SyncPair } & ( + | { + type: "transfer" + data: + | { + of: "upload" | "download" + type: "progress" + relativePath: string + localPath: string + bytes: number + } + | { + of: "upload" | "download" + type: "queued" + relativePath: string + localPath: string + } + | { + of: "upload" | "download" + type: "started" + relativePath: string + localPath: string + } + | { + of: "upload" | "download" + type: "finished" + relativePath: string + localPath: string + } + | { + of: "upload" | "download" + type: "error" + relativePath: string + localPath: string + error: Error + } + } + | { + type: "localTreeErrors" + data: LocalTreeError[] + } + | { + type: "deltas" + data: Delta[] + } + | { + type: "doneTasks" + data: { + tasks: DoneTask[] + errors: TaskError[] + } + } + | { + type: "cycleStarted" + } + | { + type: "cycleFinished" + } + | { + type: "cycleError" + data: Error + } + | { + type: "cycleSuccess" + } + | { + type: "cycleWaitingForLocalDirectoryChangesStarted" + } + | { + type: "cycleWaitingForLocalDirectoryChangesDone" + } + | { + type: "cycleGettingTreesStarted" + } + | { + type: "cycleGettingTreesDone" + } + | { + type: "cycleProcessingDeltasStarted" + } + | { + type: "cycleProcessingDeltasDone" + } + | { + type: "cycleProcessingTasksStarted" + } + | { + type: "cycleProcessingTasksDone" + } + | { + type: "cycleApplyingStateStarted" + } + | { + type: "cycleApplyingStateDone" + } + | { + type: "cycleSavingStateStarted" + } + | { + type: "cycleSavingStateDone" + } + | { + type: "cycleRestarting" + } + | { + type: "stopTransfer" + data: { + of: "upload" | "download" + relativePath: string + } + } + | { + type: "pauseTransfer" + data: { + of: "upload" | "download" + relativePath: string + } + } + | { + type: "resumeTransfer" + data: { + of: "upload" | "download" + relativePath: string + } + } +) diff --git a/src/utils.ts b/src/utils.ts index 437ce9d..5a3cfbb 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -52,3 +52,20 @@ export async function promiseAllSettledChunked(promises: Promise[], chunkS return results } + +/** + * Convert a timestamp from seconds to milliseconds. + * + * @export + * @param {number} timestamp + * @returns {number} + */ +export function convertTimestampToMs(timestamp: number): number { + const now = Date.now() + + if (Math.abs(now - timestamp) < Math.abs(now - timestamp * 1000)) { + return timestamp + } + + return Math.floor(timestamp * 1000) +}