Skip to content

Commit

Permalink
renamed vars
Browse files Browse the repository at this point in the history
  • Loading branch information
eleanorjboyd committed Jan 8, 2024
1 parent 9da43cf commit 500deb6
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 71 deletions.
50 changes: 33 additions & 17 deletions src/client/common/pipes/namedPipes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,63 @@ import * as path from 'path';
import * as rpc from 'vscode-jsonrpc/node';
import { traceVerbose } from '../../logging';

export interface NamesPipeConnected {
onClosed(): Promise<void>;
export interface ConnectedServerObj {
serverOnCloseCallback(): Promise<void>;
}

export function createNamedPipeServer(
pipeName: string,
connectedCallback: (value: [rpc.MessageReader, rpc.MessageWriter]) => void,
): Promise<NamesPipeConnected> {
onConnectionCallback: (value: [rpc.MessageReader, rpc.MessageWriter]) => void,
): Promise<ConnectedServerObj> {
traceVerbose(`Creating named pipe server on ${pipeName}`);
let closedResolve: () => void;
const closed = new Promise<void>((resolve, _reject) => {
closedResolve = resolve;
});

let connectionCount = 0;
return new Promise((resolve, reject) => {
// create a server, resolves and returns server on listen
const server = net.createServer((socket) => {
// this lambda function is called whenever a client connects to the server
console.log('new client is connected to the socket: ', socket);
connectionCount += 1;
console.log('connectionCount +1 = ', connectionCount);
socket.on('close', () => {
// close event is emitted by client to the server
connectionCount -= 1;
console.log('connectionCount -1 = ', connectionCount);
console.log('client emitted close event, connectionCount -1 = ', connectionCount);
if (connectionCount <= 0) {
console.log('all sockets are now closed 0 on the count!, closing resolver?');
// if all clients are closed, close the server
console.log('all clients connected to server are now closed, closing the server');
server.close();
// this closedResolve calls the dispose method in the clients
closedResolve();
}
});
// not recieving the reader writer for all connections
connectedCallback([

// upon connection create a reader and writer and pass it to the callback
onConnectionCallback([
new rpc.SocketMessageReader(socket, 'utf-8'),
new rpc.SocketMessageWriter(socket, 'utf-8'),
]);
});
const closedServerCallback = new Promise<void>((resolveOnServerClose, _reject) => {
// get executed on connection close
console.log('connection closed');
server.on('close', () => {
// server closed occurs when all clients are closed
console.log('server signal closing');
resolveOnServerClose();
});
// is not resolved until the server is closed
});
server.on('error', reject);

server.listen(pipeName, () => {
// this function is called when the server is listening
server.removeListener('error', reject);
resolve({
onClosed: () => closed,
});
const connectedServer = {
// when onClosed event is called, so is closed function
// goes backwards up the chain, when resolve2 is called, so is onClosed that means server.onClosed() on the other end can work
// event C
serverOnCloseCallback: () => closedServerCallback,
};
resolve(connectedServer);
});
});
}
Expand Down
59 changes: 27 additions & 32 deletions src/client/testing/testController/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,58 +192,52 @@ interface ExecutionResultMessage extends Message {
}

export async function startRunResultNamedPipe(
callback: (payload: ExecutionTestPayload | EOTTestPayload) => void,
deferredTillAllServerClose: Deferred<void>,
dataReceivedCallback: (payload: ExecutionTestPayload | EOTTestPayload) => void,
deferredTillServerClose: Deferred<void>,
cancellationToken?: CancellationToken,
): Promise<{ name: string } & Disposable> {
const pipeName: string = generateRandomPipeName('python-test-results');
let dispose: () => void = () => {
let disposeOfServer: () => void = () => {
/* noop */
};
const server = await createNamedPipeServer(pipeName, ([reader, _writer]) => {
// this lambda function is: onConnectionCallback
// this is called once per client connecting to the server
traceVerbose(`Test Result named pipe ${pipeName} connected`);
let disposables: (Disposable | undefined)[] = [reader];
let perConnectionDisposables: (Disposable | undefined)[] = [reader];

dispose = () => {
// create a function to dispose of the server
disposeOfServer = () => {
traceVerbose(`Test Result named pipe ${pipeName} disposed`);
console.log(`Test Result named pipe ${pipeName} disposed`);
disposables.forEach((d) => d?.dispose());
disposables = [];
deferredTillAllServerClose.resolve();
// dispose of all data listeners and cancelation listeners
perConnectionDisposables.forEach((d) => d?.dispose());
perConnectionDisposables = [];
deferredTillServerClose.resolve();
};
disposables.push(
perConnectionDisposables.push(
// per connection, add a listener for the cancellation token and the data
cancellationToken?.onCancellationRequested(() => {
traceVerbose(`Test Result named pipe ${pipeName} cancelled`);
dispose();
// if cancel is called on one connection, dispose of all connections
disposeOfServer();
}),
reader.listen((data: Message) => {
traceVerbose(`Test Result named pipe ${pipeName} received data`);
callback((data as ExecutionResultMessage).params as ExecutionTestPayload | EOTTestPayload);
}),
reader.onClose(() => {
// reader is still hitting on close, I don't think we want this to happen?

// connectionCount = server.getConnectionCount();
// connectionCount[0] -= 1;
// server.setCurrentConnectionCount(connectionCount);
// if (connectionCount[0] === 0) {
// callback(createEOTPayload(true));
// traceVerbose(`Test Result named pipe ${pipeName} closed? idk how many tuimes tho`);
console.log('reader.onClose');
// dispose();
// } else {
// traceVerbose('Test Result NOT closed, there are still connections');
// }
// if EOT, call decrement connection count (callback)
dataReceivedCallback((data as ExecutionResultMessage).params as ExecutionTestPayload | EOTTestPayload);
}),
);
});
server.onClosed().then(() => {
traceVerbose(`Test Result named pipe ${pipeName} closed`);
console.log('server on close from utils');
dispose();
server.serverOnCloseCallback().then(() => {
// this is called once the server close, once per run instance
traceVerbose(`Test Result named pipe ${pipeName} closed`);
console.log('server on close from utils');
// dispose of all data listeners and cancelation listeners
disposeOfServer();
});
});

return { name: pipeName, dispose };
return { name: pipeName, dispose: disposeOfServer };
}

interface DiscoveryResultMessage extends Message {
Expand Down Expand Up @@ -276,6 +270,7 @@ export async function startDiscoveryNamedPipe(
callback((data as DiscoveryResultMessage).params as DiscoveredTestPayload | EOTTestPayload);
}),
reader.onClose(() => {
console.log('EJFB.1: reader on close event occurred!');
callback(createEOTPayload(true));
traceVerbose(`Test Discovery named pipe ${pipeName} closed`);
dispose();
Expand Down
42 changes: 20 additions & 22 deletions src/client/testing/testController/pytest/pytestExecutionAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,26 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
): Promise<ExecutionTestPayload> {
// deferredTillEOT is resolved when all data sent over payload is received
const deferredTillEOT: Deferred<void> = utils.createTestingDeferred();
const deferredTillAllServerClose: Deferred<void> = utils.createTestingDeferred();

const deferredTillServerClose: Deferred<void> = utils.createTestingDeferred();

const dataReceivedCallback = (data: ExecutionTestPayload | EOTTestPayload) => {
if ('eot' in data && data.eot === true) {
// eot sent once per connection
deferredTillEOT.resolve();
console.log('eot reached');
} else if (runInstance && !runInstance.token.isCancellationRequested) {
this.resultResolver?.resolveExecution(data, runInstance);
console.log('resolve data', data);
} else {
traceError(`No run instance found, cannot resolve execution, for workspace ${uri.fsPath}.`);
}
};
const { name, dispose } = await utils.startRunResultNamedPipe(
(data: ExecutionTestPayload | EOTTestPayload) => {
if ('eot' in data && data.eot === true) {
// this resolves deferredTillEOT after single connection closed
// is there even a way to confirm all data has been sent from all connections?
// this would require tracking EOT # and comparing to connectionCount which seems too hard / unneeded
deferredTillEOT.resolve();
console.log('eot reached');
} else if (runInstance && !runInstance.token.isCancellationRequested) {
this.resultResolver?.resolveExecution(data, runInstance);
console.log('resolve data', data);
} else {
traceError(`No run instance found, cannot resolve execution, for workspace ${uri.fsPath}.`);
}
},
deferredTillAllServerClose,
dataReceivedCallback,
deferredTillServerClose,
runInstance?.token,
);
// does it get here?? Does it get stuck
runInstance?.token.onCancellationRequested(() => {
traceInfo(`Test run cancelled, resolving 'till EOT' deferred for ${uri.fsPath}.`);
// if canceled, stop listening for results
Expand All @@ -78,13 +78,11 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
deferredTillEOT,
);
} finally {
// wait for data and all connections to close
// wait for to send EOT
await deferredTillEOT.promise;
await deferredTillAllServerClose.promise;
console.log("past 'till EOT' promise, going for disposal");
// connectionCount;

traceVerbose('deferredTill EOT resolved');
await deferredTillServerClose.promise;
traceVerbose('Server closed await now resolved');
}

// placeholder until after the rewrite is adopted
Expand Down

0 comments on commit 500deb6

Please sign in to comment.