Skip to content

Commit

Permalink
net: gracefully disconnect better from remote (microsoft#218972)
Browse files Browse the repository at this point in the history
* net: gracefully disconnect better from remote

This adds in a step to gracefully disconnect (send a "disconnect"
message and then await the flush) before closing the workbench. In some
cases, like Remote - SSH, sending messages is more async than others. In
the exec server connection we handle a `zlib.flate` stream to compress
data to and from the VS Code server, and its API is asynchronous, so
this lets us ensure the stream is drained before giving the go-ahead
to close up shop

This lifecycle phase is a little awkward and I ended it putting it
directly in the lifecycle service: we don't want to do this in
`onWillShutdown` because some contributions want to save data to the
remote workspace, and if shutdown is vetoed it would leave a broken
state. But `onDidShutdown` is synchronous and already depended upon by
several other points, and changing that also felt a bit risky.

cc @alexdima

Refs microsoft#211462, will require some small adoption in Remote - SSH to close.

* undo unrelated change

* add priority to joiners

* some cleanup

* cleanup

* tweaks

* `runWithFakedTimers`

* comment

* 💄

---------

Co-authored-by: Benjamin Pasero <[email protected]>
  • Loading branch information
connor4312 and bpasero authored Jul 2, 2024
1 parent 48e40d3 commit cacaccb
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 21 deletions.
12 changes: 9 additions & 3 deletions src/vs/base/parts/ipc/common/ipc.net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ export class Client<TContext = string> extends IPCClient<TContext> {
override dispose(): void {
super.dispose();
const socket = this.protocol.getSocket();
// should be sent gracefully with a .flush(), but try to send it out as a
// last resort here if nothing else:
this.protocol.sendDisconnect();
this.protocol.dispose();
socket.end();
Expand Down Expand Up @@ -808,6 +810,7 @@ export interface PersistentProtocolOptions {
export class PersistentProtocol implements IMessagePassingProtocol {

private _isReconnecting: boolean;
private _didSendDisconnect?: boolean;

private _outgoingUnackMsg: Queue<ProtocolMessage>;
private _outgoingMsgId: number;
Expand Down Expand Up @@ -910,9 +913,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
}

sendDisconnect(): void {
const msg = new ProtocolMessage(ProtocolMessageType.Disconnect, 0, 0, getEmptyBuffer());
this._socketWriter.write(msg);
this._socketWriter.flush();
if (!this._didSendDisconnect) {
this._didSendDisconnect = true;
const msg = new ProtocolMessage(ProtocolMessageType.Disconnect, 0, 0, getEmptyBuffer());
this._socketWriter.write(msg);
this._socketWriter.flush();
}
}

sendPause(): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import { ExtensionMessageCollector, ExtensionPoint, ExtensionsRegistry, IExtensi
import { LazyCreateExtensionHostManager } from 'vs/workbench/services/extensions/common/lazyCreateExtensionHostManager';
import { ResponsiveState } from 'vs/workbench/services/extensions/common/rpcProtocol';
import { IExtensionActivationHost as IWorkspaceContainsActivationHost, checkActivateWorkspaceContainsExtension, checkGlobFileExists } from 'vs/workbench/services/extensions/common/workspaceContains';
import { ILifecycleService } from 'vs/workbench/services/lifecycle/common/lifecycle';
import { ILifecycleService, WillShutdownJoinerOrder } from 'vs/workbench/services/lifecycle/common/lifecycle';
import { IExtensionHostExitInfo, IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteAgentService';

const hasOwnProperty = Object.hasOwnProperty;
Expand Down Expand Up @@ -195,6 +195,16 @@ export abstract class AbstractExtensionService extends Disposable implements IEx
}
}));

this._register(this._lifecycleService.onWillShutdown(event => {
if (this._remoteAgentService.getConnection()) {
event.join(() => this._remoteAgentService.endConnection(), {
id: 'join.disconnectRemote',
label: nls.localize('disconnectRemote', "Disconnect Remote Agent"),
order: WillShutdownJoinerOrder.Last // after others have joined that might depend on a remote connection
});
}
}));

this._register(this._lifecycleService.onDidShutdown(() => {
// We need to disconnect the management connection before killing the local extension host.
// Otherwise, the local extension host might terminate the underlying tunnel before the
Expand Down
42 changes: 39 additions & 3 deletions src/vs/workbench/services/lifecycle/common/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,34 @@ export interface BeforeShutdownErrorEvent {
readonly error: Error;
}

export enum WillShutdownJoinerOrder {

/**
* Joiners to run before the `Last` joiners. This is the default order and best for
* most cases. You can be sure that services are still functional at this point.
*/
Default = 1,

/**
* The joiners to run last. This should ONLY be used in rare cases when you have no
* dependencies to workbench services or state. The workbench may be in a state where
* resources can no longer be accessed or changed.
*/
Last
}

export interface IWillShutdownEventJoiner {
id: string;
label: string;
readonly id: string;
readonly label: string;
readonly order?: WillShutdownJoinerOrder;
}

export interface IWillShutdownEventDefaultJoiner extends IWillShutdownEventJoiner {
readonly order?: WillShutdownJoinerOrder.Default;
}

export interface IWillShutdownEventLastJoiner extends IWillShutdownEventJoiner {
readonly order: WillShutdownJoinerOrder.Last;
}

/**
Expand Down Expand Up @@ -95,10 +120,21 @@ export interface WillShutdownEvent {
* Allows to join the shutdown. The promise can be a long running operation but it
* will block the application from closing.
*
* @param promise the promise to join the shutdown event.
* @param joiner to identify the join operation in case it takes very long or never
* completes.
*/
join(promise: Promise<void>, joiner: IWillShutdownEventDefaultJoiner): void;

/**
* Allows to join the shutdown at the end. The promise can be a long running operation but it
* will block the application from closing.
*
* @param promiseFn the promise to join the shutdown event.
* @param joiner to identify the join operation in case it takes very long or never
* completes.
*/
join(promise: Promise<void>, joiner: IWillShutdownEventJoiner): void;
join(promiseFn: (() => Promise<void>), joiner: IWillShutdownEventLastJoiner): void;

/**
* Allows to access the joiners that have not finished joining this event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*--------------------------------------------------------------------------------------------*/

import { handleVetos } from 'vs/platform/lifecycle/common/lifecycle';
import { ShutdownReason, ILifecycleService, IWillShutdownEventJoiner } from 'vs/workbench/services/lifecycle/common/lifecycle';
import { ShutdownReason, ILifecycleService, IWillShutdownEventJoiner, WillShutdownJoinerOrder } from 'vs/workbench/services/lifecycle/common/lifecycle';
import { IStorageService } from 'vs/platform/storage/common/storage';
import { ipcRenderer } from 'vs/base/parts/sandbox/electron-sandbox/globals';
import { ILogService } from 'vs/platform/log/common/log';
Expand Down Expand Up @@ -155,19 +155,24 @@ export class NativeLifecycleService extends AbstractLifecycleService {

protected async handleWillShutdown(reason: ShutdownReason): Promise<void> {
const joiners: Promise<void>[] = [];
const lastJoiners: (() => Promise<void>)[] = [];
const pendingJoiners = new Set<IWillShutdownEventJoiner>();
const cts = new CancellationTokenSource();

this._onWillShutdown.fire({
reason,
token: cts.token,
joiners: () => Array.from(pendingJoiners.values()),
join(promise, joiner) {
joiners.push(promise);

// Track promise completion
join(promiseOrPromiseFn, joiner) {
pendingJoiners.add(joiner);
promise.finally(() => pendingJoiners.delete(joiner));

if (joiner.order === WillShutdownJoinerOrder.Last) {
const promiseFn = typeof promiseOrPromiseFn === 'function' ? promiseOrPromiseFn : () => promiseOrPromiseFn;
lastJoiners.push(() => promiseFn().finally(() => pendingJoiners.delete(joiner)));
} else {
const promise = typeof promiseOrPromiseFn === 'function' ? promiseOrPromiseFn() : promiseOrPromiseFn;
promise.finally(() => pendingJoiners.delete(joiner));
joiners.push(promise);
}
},
force: () => {
cts.dispose(true);
Expand All @@ -181,10 +186,16 @@ export class NativeLifecycleService extends AbstractLifecycleService {
try {
await raceCancellation(Promises.settled(joiners), cts.token);
} catch (error) {
this.logService.error(`[lifecycle]: Error during will-shutdown phase (error: ${toErrorMessage(error)})`); // this error will not prevent the shutdown
} finally {
longRunningWillShutdownWarning.dispose();
this.logService.error(`[lifecycle]: Error during will-shutdown phase in default joiners (error: ${toErrorMessage(error)})`);
}

try {
await raceCancellation(Promises.settled(lastJoiners.map(lastJoiner => lastJoiner())), cts.token);
} catch (error) {
this.logService.error(`[lifecycle]: Error during will-shutdown phase in last joiners (error: ${toErrorMessage(error)})`);
}

longRunningWillShutdownWarning.dispose();
}

shutdown(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
*--------------------------------------------------------------------------------------------*/

import assert from 'assert';
import { timeout } from 'vs/base/common/async';
import { DisposableStore } from 'vs/base/common/lifecycle';
import { runWithFakedTimers } from 'vs/base/test/common/timeTravelScheduler';
import { ensureNoDisposablesAreLeakedInTestSuite } from 'vs/base/test/common/utils';
import { ShutdownReason } from 'vs/workbench/services/lifecycle/common/lifecycle';
import { ShutdownReason, WillShutdownJoinerOrder } from 'vs/workbench/services/lifecycle/common/lifecycle';
import { NativeLifecycleService } from 'vs/workbench/services/lifecycle/electron-sandbox/lifecycleService';
import { workbenchInstantiationService } from 'vs/workbench/test/electron-sandbox/workbenchTestServices';

Expand Down Expand Up @@ -155,5 +157,34 @@ suite('Lifecycleservice', function () {
assert.strictEqual(joinCalled, true);
});

test('onWillShutdown - join order', async function () {
return runWithFakedTimers({ useFakeTimers: true }, async () => {
const order: string[] = [];

disposables.add(lifecycleService.onWillShutdown(e => {
e.join(async () => {
order.push('disconnect start');
await timeout(1);
order.push('disconnect end');
}, { id: 'test', label: 'test', order: WillShutdownJoinerOrder.Last });

e.join((async () => {
order.push('default start');
await timeout(1);
order.push('default end');
})(), { id: 'test', label: 'test', order: WillShutdownJoinerOrder.Default });
}));

await lifecycleService.testHandleWillShutdown(ShutdownReason.QUIT);

assert.deepStrictEqual(order, [
'default start',
'default end',
'disconnect start',
'disconnect end'
]);
});
});

ensureNoDisposablesAreLeakedInTestSuite();
});
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ export abstract class AbstractRemoteAgentService extends Disposable implements I
);
}

async endConnection(): Promise<void> {
if (this._connection) {
await this._connection.end();
this._connection.dispose();
}
}

private _withChannel<R>(callback: (channel: IChannel, connection: IRemoteAgentConnection) => Promise<R>, fallback: R): Promise<R> {
const connection = this.getConnection();
if (!connection) {
Expand Down Expand Up @@ -159,6 +166,8 @@ class RemoteAgentConnection extends Disposable implements IRemoteAgentConnection
this._connection = null;
}

end: () => Promise<void> = () => Promise.resolve();

getChannel<T extends IChannel>(channelName: string): T {
return <T>getDelayedChannel(this._getOrCreateConnection().then(c => c.getChannel(channelName)));
}
Expand Down Expand Up @@ -222,6 +231,10 @@ class RemoteAgentConnection extends Disposable implements IRemoteAgentConnection
connection.protocol.onDidDispose(() => {
connection.dispose();
});
this.end = () => {
connection.protocol.sendDisconnect();
return connection.protocol.drain();
};
this._register(connection.onDidStateChange(e => this._onDidStateChange.fire(e)));
return connection.client;
}
Expand Down
6 changes: 6 additions & 0 deletions src/vs/workbench/services/remote/common/remoteAgentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ export interface IRemoteAgentService {
*/
getRoundTripTime(): Promise<number | undefined>;

/**
* Gracefully ends the current connection, if any.
*/
endConnection(): Promise<void>;

getDiagnosticInfo(options: IDiagnosticInfoOptions): Promise<IDiagnosticInfo | undefined>;
updateTelemetryLevel(telemetryLevel: TelemetryLevel): Promise<void>;
logTelemetry(eventName: string, data?: ITelemetryData): Promise<void>;
Expand All @@ -54,6 +59,7 @@ export interface IRemoteAgentConnection {
readonly onReconnecting: Event<void>;
readonly onDidStateChange: Event<PersistentConnectionEvent>;

end(): Promise<void>;
dispose(): void;
getChannel<T extends IChannel>(channelName: string): T;
withChannel<T extends IChannel, R>(channelName: string, callback: (channel: T) => Promise<R>): Promise<R>;
Expand Down
7 changes: 4 additions & 3 deletions src/vs/workbench/test/browser/workbenchTestServices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ export class TestLifecycleService extends Disposable implements ILifecycleServic

this._onWillShutdown.fire({
join: p => {
this.shutdownJoiners.push(p);
this.shutdownJoiners.push(typeof p === 'function' ? p() : p);
},
joiners: () => [],
force: () => { /* No-Op in tests */ },
Expand Down Expand Up @@ -1405,8 +1405,8 @@ export class TestWillShutdownEvent implements WillShutdownEvent {
reason = ShutdownReason.CLOSE;
token = CancellationToken.None;

join(promise: Promise<void>, joiner: IWillShutdownEventJoiner): void {
this.value.push(promise);
join(promise: Promise<void> | (() => Promise<void>), joiner: IWillShutdownEventJoiner): void {
this.value.push(typeof promise === 'function' ? promise() : promise);
}

force() { /* No-Op in tests */ }
Expand Down Expand Up @@ -2117,6 +2117,7 @@ export class TestRemoteAgentService implements IRemoteAgentService {
async logTelemetry(eventName: string, data?: ITelemetryData): Promise<void> { }
async flushTelemetry(): Promise<void> { }
async getRoundTripTime(): Promise<number | undefined> { return undefined; }
async endConnection(): Promise<void> { }
}

export class TestRemoteExtensionsScannerService implements IRemoteExtensionsScannerService {
Expand Down

0 comments on commit cacaccb

Please sign in to comment.