Skip to content

Commit

Permalink
Add ability to resubscribe to a subscription (#1192)
Browse files Browse the repository at this point in the history
  • Loading branch information
scott-rc authored Jan 27, 2024
1 parent 685858a commit 32c89b2
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 33 deletions.
8 changes: 4 additions & 4 deletions src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export const command = (async (ctx, firstRun = true) => {

// subscribes to the graphql subscription that will listen and send
// back the server contract status
const unsubscribe = filesync.edit.subscribe({
const subscription = filesync.edit.subscribe({
subscription: REMOTE_SERVER_CONTRACT_STATUS_SUBSCRIPTION,
variables: () => ({ localFilesVersion: String(filesync.filesVersion), force: ctx.args["--force"] }),
onError: (error) => {
Expand All @@ -150,7 +150,7 @@ export const command = (async (ctx, firstRun = true) => {
}
}

unsubscribe();
subscription.unsubscribe();
return;
},
onData: async ({ publishStatus }): Promise<void> => {
Expand Down Expand Up @@ -183,14 +183,14 @@ export const command = (async (ctx, firstRun = true) => {
await confirm(ctx, { message: "Do you want to continue?" });
}

unsubscribe();
subscription.unsubscribe();
ctx.args["--force"] = true;
await command(ctx, false);
return;
}

const handleCompletion = (message: string | null | undefined, color: "red" | "green"): void => {
unsubscribe();
subscription.unsubscribe();

if (color === "red") {
spinner.fail();
Expand Down
4 changes: 2 additions & 2 deletions src/commands/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export const command: Command<SyncArgs> = async (ctx) => {
* Subscribe to file changes on Gadget and apply them to the local
* filesystem.
*/
const unsubscribeFromGadgetChanges = filesync.subscribeToGadgetChanges({
const filesyncSubscription = filesync.subscribeToGadgetChanges({
onError: (error) => ctx.abort(error),
beforeChanges: ({ changed, deleted }) => {
// add all the files and directories we're about to touch to
Expand Down Expand Up @@ -305,7 +305,7 @@ export const command: Command<SyncArgs> = async (ctx) => {
ctx.onAbort(async (reason) => {
ctx.log.info("stopping", { reason });

unsubscribeFromGadgetChanges();
filesyncSubscription.unsubscribe();
fileWatcher.close();
clearInterval(clearRecentWritesInterval);
sendChangesToGadget.flush();
Expand Down
77 changes: 55 additions & 22 deletions src/services/app/edit/edit.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ExecutionResult } from "graphql-ws";
import type { Promisable } from "type-fest";
import type { Context } from "../../command/context.js";
import { type HttpOptions } from "../../http/http.js";
Expand Down Expand Up @@ -68,7 +69,7 @@ export class Edit {
* Execute a GraphQL mutation.
*
* @param request - The query and variables to send to the server.
* @param request.mutation - The GraphQL query to execute.
* @param request.mutation - The GraphQL mutation to execute.
* @param request.variables - The variables to send to the server.
* @param request.http - {@linkcode HttpOptions} to pass to http.
* @returns The data returned by the server.
Expand Down Expand Up @@ -108,9 +109,9 @@ export class Edit {
* @param options - The query and variables to send to the server.
* @param options.subscription - The GraphQL subscription to subscribe to.
* @param options.variables - The variables to send to the server.
* @param options.onData - A callback that will be called with the data returned by the server.
* @param options.onError - A callback that will be called with any errors returned by the server.
* @param options.onComplete - A callback that will be called when the subscription is complete.
* @param options.onData - A callback that will be called when data is received from the server.
* @param options.onError - A callback that will be called when an error is received from the server.
* @param options.onComplete - A callback that will be called when the subscription ends.
* @returns A function to unsubscribe from the subscription.
*/
subscribe<Subscription extends GraphQLSubscription>({
Expand All @@ -122,34 +123,50 @@ export class Edit {
onData: (data: Subscription["Data"]) => Promisable<void>;
onError: (error: EditError) => Promisable<void>;
onComplete?: () => Promisable<void>;
}): () => void {
}): EditSubscription<Subscription> {
const name = options.subscription.split(/ |\(/, 2)[1];
const ctx = this.ctx.child({
let ctx = this.ctx.child({
fields: { edit: { subscription: name } },
devFields: { edit: { subscription: name, variables: unthunk(options.variables) } },
});

const onResponse = async (response: ExecutionResult<Subscription["Data"], Subscription["Extensions"]>): Promise<void> => {
if (response.errors) {
unsubscribe();
await options.onError(new EditError(options.subscription, response.errors));
return;
}

if (!response.data) {
unsubscribe();
await options.onError(new EditError(options.subscription, "Subscription response did not contain data"));
return;
}

await onData(response.data);
};

ctx.log.info("subscribing to graphql subscription");
const unsubscribe = this.#client.subscribe(ctx, {
...options,
onResponse: async (response) => {
if (response.errors) {
unsubscribe();
await options.onError(new EditError(options.subscription, response.errors));
return;
}
let unsubscribe = this.#client.subscribe(ctx, { ...options, onResponse });

if (!response.data) {
unsubscribe();
await options.onError(new EditError(options.subscription, "Subscription response did not contain data"));
return;
return {
unsubscribe,
resubscribe: (variables) => {
unsubscribe();

if (variables !== undefined) {
options.variables = variables;
}

await onData(response.data);
},
});
ctx = this.ctx.child({
fields: { edit: { subscription: name } },
devFields: { edit: { subscription: name, variables: unthunk(options.variables) } },
});

return unsubscribe;
ctx.log.info("re-subscribing to graphql subscription");
unsubscribe = this.#client.subscribe(ctx, { ...options, onResponse });
},
};
}

/**
Expand All @@ -159,3 +176,19 @@ export class Edit {
await this.#client.dispose();
}
}

/**
* An object that can be used to unsubscribe and resubscribe to an
* ongoing Edit GraphQL subscription.
*/
export type EditSubscription<Subscription extends GraphQLSubscription> = {
/**
* Unsubscribe from the subscription.
*/
unsubscribe(): void;

/**
* Resubscribe to the subscription.
*/
resubscribe(variables?: Thunk<Subscription["Variables"]> | null): void;
};
10 changes: 5 additions & 5 deletions src/services/filesync/filesync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { FileSyncEncoding, type FileSyncChangedEventInput, type FileSyncDeletedE
import type { App } from "../app/app.js";
import { getApps } from "../app/app.js";
import { AppArg } from "../app/arg.js";
import { Edit } from "../app/edit/edit.js";
import { Edit, type EditSubscription } from "../app/edit/edit.js";
import { EditError } from "../app/edit/error.js";
import {
FILE_SYNC_COMPARISON_HASHES_QUERY,
Expand Down Expand Up @@ -260,10 +260,10 @@ export class FileSync {
Unknown environment:
${environment}
Did you mean one of these?
`.concat(` • ${similarEnvironments.join("\n • ")}`),
);
}
Expand Down Expand Up @@ -389,7 +389,7 @@ export class FileSync {
beforeChanges?: (data: { changed: string[]; deleted: string[] }) => Promisable<void>;
afterChanges?: (data: { changes: Changes }) => Promisable<void>;
onError: (error: unknown) => void;
}): () => void {
}): EditSubscription<REMOTE_FILE_SYNC_EVENTS_SUBSCRIPTION> {
return this.edit.subscribe({
subscription: REMOTE_FILE_SYNC_EVENTS_SUBSCRIPTION,
// the reason this is a function rather than a static value is
Expand Down

0 comments on commit 32c89b2

Please sign in to comment.