Skip to content

Commit

Permalink
Fixes duplicated messages with fifo and dispose
Browse files Browse the repository at this point in the history
  • Loading branch information
karthiknadig committed Oct 8, 2024
1 parent f44ff1f commit cbea3a0
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 85 deletions.
67 changes: 39 additions & 28 deletions src/client/common/pipes/namedPipes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

import * as cp from 'child_process';
import * as crypto from 'crypto';
import * as fs from 'fs';
import * as fs from 'fs-extra';
import * as net from 'net';
import * as os from 'os';
import * as path from 'path';
import * as rpc from 'vscode-jsonrpc/node';
import { CancellationError, CancellationToken } from 'vscode';
import { CancellationError, CancellationToken, Disposable } from 'vscode';
import { traceVerbose } from '../../logging';
import { isWindows } from '../platform/platformService';
import { createDeferred } from '../utils/async';
Expand Down Expand Up @@ -73,6 +73,9 @@ export async function createWriterPipe(pipeName: string, token?: CancellationTok
}
// linux implementation of FIFO
await mkfifo(pipeName);
try {
await fs.chmod(pipeName, 0o666);
} catch {}
const writer = fs.createWriteStream(pipeName, {
encoding: 'utf-8',
});
Expand All @@ -86,14 +89,14 @@ class CombinedReader implements rpc.MessageReader {

private _onPartialMessage = new rpc.Emitter<rpc.PartialMessageInfo>();

private _listeners = new rpc.Emitter<rpc.NotificationMessage>();

private _readers: rpc.MessageReader[] = [];
private _callback: rpc.DataCallback = () => {};

private _disposables: rpc.Disposable[] = [];

private _readers: rpc.MessageReader[] = [];

constructor() {
this._disposables.push(this._onClose, this._onError, this._onPartialMessage, this._listeners);
this._disposables.push(this._onClose, this._onError, this._onPartialMessage);
}

onError: rpc.Event<Error> = this._onError.event;
Expand All @@ -103,34 +106,41 @@ class CombinedReader implements rpc.MessageReader {
onPartialMessage: rpc.Event<rpc.PartialMessageInfo> = this._onPartialMessage.event;

listen(callback: rpc.DataCallback): rpc.Disposable {
return this._listeners.event(callback);
this._callback = callback;
return new Disposable(() => (this._callback = () => {}));
}

add(reader: rpc.MessageReader): void {
this._readers.push(reader);
this._disposables.push(
reader.onError((error) => this._onError.fire(error)),
reader.onClose(() => this.dispose()),
reader.onPartialMessage((info) => this._onPartialMessage.fire(info)),
reader.listen((msg) => {
this._listeners.fire(msg as rpc.NotificationMessage);
}),
);
reader.listen((msg) => {
this._callback(msg as rpc.NotificationMessage);
});
this._disposables.push(reader);
reader.onClose(() => {
this.remove(reader);
if (this._readers.length === 0) {
this._onClose.fire();
}
});
reader.onError((e) => {
this.remove(reader);
this._onError.fire(e);
});
}

error(error: Error): void {
this._onError.fire(error);
remove(reader: rpc.MessageReader): void {
const found = this._readers.find((r) => r === reader);
if (found) {
this._readers = this._readers.filter((r) => r !== reader);
reader.dispose();
}
}

dispose(): void {
this._onClose.fire();
this._disposables.forEach((disposable) => {
try {
disposable.dispose();
} catch (e) {
/* noop */
}
});
this._readers.forEach((r) => r.dispose());
this._readers = [];
this._disposables.forEach((disposable) => disposable.dispose());
this._disposables = [];
}
}

Expand Down Expand Up @@ -168,8 +178,9 @@ export async function createReaderPipe(pipeName: string, token?: CancellationTok
}
// mac/linux implementation of FIFO
await mkfifo(pipeName);
const reader = fs.createReadStream(pipeName, {
encoding: 'utf-8',
});
try {
await fs.chmod(pipeName, 0o666);
} catch {}
const reader = fs.createReadStream(pipeName, { encoding: 'utf-8' });
return new rpc.StreamMessageReader(reader, 'utf-8');
}
96 changes: 49 additions & 47 deletions src/client/testing/testController/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,44 +230,47 @@ export async function startRunResultNamedPipe(
dataReceivedCallback: (payload: ExecutionTestPayload | EOTTestPayload) => void,
deferredTillServerClose: Deferred<void>,
cancellationToken?: CancellationToken,
): Promise<{ name: string } & Disposable> {
): Promise<string> {
traceVerbose('Starting Test Result named pipe');
const pipeName: string = '/Users/eleanorboyd/testingFiles/inc_dec_example/temp.txt'; // generateRandomPipeName('python-test-results');
const pipeName: string = generateRandomPipeName('python-test-results');

let disposeOfServer: () => void = () => {
deferredTillServerClose.resolve();
/* noop */
};
const reader = await createReaderPipe(pipeName, cancellationToken);
traceVerbose(`Test Discovery named pipe ${pipeName} connected`);
let perConnectionDisposables: (Disposable | undefined)[] = [reader];

// create a function to dispose of the server
disposeOfServer = () => {
// dispose of all data listeners and cancelation listeners
perConnectionDisposables.forEach((d) => d?.dispose());
perConnectionDisposables = [];
traceVerbose(`Test Results named pipe ${pipeName} connected`);
let disposables: Disposable[] = [];
const disposable = new Disposable(() => {
traceVerbose(`Test Results named pipe ${pipeName} disposed`);
disposables.forEach((d) => d.dispose());
disposables = [];
deferredTillServerClose.resolve();
};
perConnectionDisposables.push(
cancellationToken?.onCancellationRequested(() => {
console.log(`Test Result named pipe ${pipeName} cancelled`);
// if cancel is called on one connection, dispose of all connections
disposeOfServer();
}),
});

if (cancellationToken) {
disposables.push(
cancellationToken?.onCancellationRequested(() => {
console.log(`Test Result named pipe ${pipeName} cancelled`);
disposable.dispose();
}),
);
}
disposables.push(
reader,
reader.listen((data: Message) => {
traceVerbose(`Test Result named pipe ${pipeName} received data`);
// if EOT, call decrement connection count (callback)
dataReceivedCallback((data as ExecutionResultMessage).params as ExecutionTestPayload | EOTTestPayload);
}),
reader.onClose(() => {
// this is called once the server close, once per run instance
traceVerbose(`Test Result named pipe ${pipeName} closed. Disposing of listener/s.`);
// dispose of all data listeners and cancelation listeners
disposable.dispose();
}),
reader.onError((error) => {
traceError(`Test Results named pipe ${pipeName} error:`, error);
}),
);
reader.onClose(() => {
// this is called once the server close, once per run instance
traceVerbose(`Test Result named pipe ${pipeName} closed. Disposing of listener/s.`);
// dispose of all data listeners and cancelation listeners
disposeOfServer();
});
return { name: pipeName, dispose: disposeOfServer };

return pipeName;
}

interface DiscoveryResultMessage extends Message {
Expand All @@ -277,46 +280,45 @@ interface DiscoveryResultMessage extends Message {
export async function startDiscoveryNamedPipe(
callback: (payload: DiscoveredTestPayload | EOTTestPayload) => void,
cancellationToken?: CancellationToken,
): Promise<{ name: string } & Disposable> {
): Promise<string> {
traceVerbose('Starting Test Discovery named pipe');
// const pipeName: string = '/Users/eleanorboyd/testingFiles/inc_dec_example/temp33.txt';
const pipeName: string = generateRandomPipeName('python-test-discovery');
let dispose: () => void = () => {
/* noop */
};
const reader = await createReaderPipe(pipeName, cancellationToken);

reader.listen((data: Message) => {
traceVerbose(`Test Discovery named pipe ${pipeName} received data`);
callback((data as DiscoveryResultMessage).params as DiscoveredTestPayload | EOTTestPayload);
});
traceVerbose(`Test Discovery named pipe ${pipeName} connected`);
let disposables: (Disposable | undefined)[] = [reader];
dispose = () => {
let disposables: Disposable[] = [];
const disposable = new Disposable(() => {
traceVerbose(`Test Discovery named pipe ${pipeName} disposed`);
disposables.forEach((d) => d?.dispose());
disposables.forEach((d) => d.dispose());
disposables = [];
};
});

if (cancellationToken) {
disposables.push(
cancellationToken.onCancellationRequested(() => {
traceVerbose(`Test Discovery named pipe ${pipeName} cancelled`);
disposable.dispose();
}),
);
}

disposables.push(
cancellationToken?.onCancellationRequested(() => {
traceVerbose(`Test Discovery named pipe ${pipeName} cancelled`);
dispose();
}),
reader,
reader.listen((data: Message) => {
traceVerbose(`Test Discovery named pipe ${pipeName} received data`);
callback((data as DiscoveryResultMessage).params as DiscoveredTestPayload | EOTTestPayload);
}),
reader.onClose(() => {
callback(createEOTPayload(false));
traceVerbose(`Test Discovery named pipe ${pipeName} closed`);
dispose();
disposable.dispose();
}),
reader.onError((error) => {
traceError(`Test Discovery named pipe ${pipeName} error:`, error);
dispose();
}),
);
return { name: pipeName, dispose };
return pipeName;
}

export async function startTestIdServer(testIds: string[]): Promise<number> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class PytestTestDiscoveryAdapter implements ITestDiscoveryAdapter {
async discoverTests(uri: Uri, executionFactory?: IPythonExecutionFactory): Promise<DiscoveredTestPayload> {
const deferredTillEOT: Deferred<void> = createDeferred<void>();

const { name, dispose } = await startDiscoveryNamedPipe((data: DiscoveredTestPayload | EOTTestPayload) => {
const name = await startDiscoveryNamedPipe((data: DiscoveredTestPayload | EOTTestPayload) => {
this.resultResolver?.resolveDiscovery(data, deferredTillEOT);
});

Expand All @@ -48,7 +48,6 @@ export class PytestTestDiscoveryAdapter implements ITestDiscoveryAdapter {
} finally {
await deferredTillEOT.promise;
traceVerbose('deferredTill EOT resolved');
dispose();
}
// this is only a placeholder to handle function overloading until rewrite is finished
const discoveryPayload: DiscoveredTestPayload = { cwd: uri.fsPath, status: 'success' };
Expand Down
19 changes: 11 additions & 8 deletions src/client/testing/testController/pytest/pytestExecutionAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { TestRun, Uri } from 'vscode';
import { CancellationToken, CancellationTokenSource, TestRun, Uri } from 'vscode';
import * as path from 'path';
import { ChildProcess } from 'child_process';
import { IConfigurationService, ITestOutputChannel } from '../../../common/types';
Expand Down Expand Up @@ -48,16 +48,18 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
traceError(`No run instance found, cannot resolve execution, for workspace ${uri.fsPath}.`);
}
};
const { name, dispose: serverDispose } = await utils.startRunResultNamedPipe(
const cSource = new CancellationTokenSource();
runInstance?.token.onCancellationRequested(() => cSource.cancel());

const name = await utils.startRunResultNamedPipe(
dataReceivedCallback, // callback to handle data received
deferredTillServerClose, // deferred to resolve when server closes
runInstance?.token, // token to cancel
cSource.token, // token to cancel
);
runInstance?.token.onCancellationRequested(() => {
traceInfo(`Test run cancelled, resolving 'till EOT' deferred for ${uri.fsPath}.`);
// if canceled, stop listening for results
deferredTillEOT.resolve();
serverDispose(); // this will resolve deferredTillServerClose

const executionPayload: ExecutionTestPayload = {
cwd: uri.fsPath,
Expand All @@ -73,7 +75,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
testIds,
name,
deferredTillEOT,
serverDispose,
cSource,
runInstance,
debugBool,
executionFactory,
Expand All @@ -100,7 +102,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
testIds: string[],
resultNamedPipeName: string,
deferredTillEOT: Deferred<void>,
serverDispose: () => void,
serverCancel: CancellationTokenSource,
runInstance?: TestRun,
debugBool?: boolean,
executionFactory?: IPythonExecutionFactory,
Expand Down Expand Up @@ -167,7 +169,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
};
traceInfo(`Running DEBUG pytest with arguments: ${testArgs} for workspace ${uri.fsPath} \r\n`);
await debugLauncher!.launchDebugger(launchOptions, () => {
serverDispose(); // this will resolve deferredTillServerClose
serverCancel.cancel();
deferredTillEOT?.resolve();
});
} else {
Expand Down Expand Up @@ -238,11 +240,12 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
}
// this doesn't work, it instead directs us to the noop one which is defined first
// potentially this is due to the server already being close, if this is the case?
serverDispose(); // this will resolve deferredTillServerClose
}

// deferredTillEOT is resolved when all data sent on stdout and stderr is received, close event is only called when this occurs
// due to the sync reading of the output.
deferredTillExecClose.resolve();
serverCancel.cancel();
});
await deferredTillExecClose.promise;
}
Expand Down

0 comments on commit cbea3a0

Please sign in to comment.