Skip to content
This repository has been archived by the owner on Dec 13, 2019. It is now read-only.

Remove legacy events from Node #2381

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions packages/node/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,3 @@ export const methodNameToImplementation = controllers.reduce(

export const createRpcRouter = (requestHandler: RequestHandler) =>
new RpcRouter({ controllers, requestHandler });

export const eventNameToImplementation = {
[NODE_EVENTS.PROTOCOL_MESSAGE_EVENT]: handleReceivedProtocolMessage,
[NODE_EVENTS.REJECT_INSTALL]: handleRejectProposalMessage,
[NODE_EVENTS.REJECT_INSTALL_VIRTUAL]: handleRejectProposalMessage
};
28 changes: 27 additions & 1 deletion packages/node/src/message-handling/handle-protocol-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import { NO_PROPOSED_APP_INSTANCE_FOR_APP_INSTANCE_ID } from "../methods/errors"
import { StateChannel } from "../models";
import { RequestHandler } from "../request-handler";
import RpcRouter from "../rpc-router";
import { NODE_EVENTS, NodeMessageWrappedProtocolMessage } from "../types";
import {
DepositConfirmationMessage,
NODE_EVENTS,
NodeMessageWrappedProtocolMessage
} from "../types";
import { bigNumberifyJson, getCreate2MultisigAddress } from "../utils";

/**
Expand All @@ -43,6 +47,16 @@ export async function handleReceivedProtocolMessage(

if (seq === UNASSIGNED_SEQ_NO) return;

// FIXME: Very ugly hack for this one-off "event" style use case
let thisIsADepositConfirmed = false;
if (protocol === Protocol.Uninstall) {
const appInstanceId = (data.params as UninstallParams).appIdentityHash;
const appInstance = await store.getAppInstance(appInstanceId);
if (appInstance.appInterface.addr === networkContext.CoinBalanceRefundApp) {
thisIsADepositConfirmed = true;
}
}

await protocolRunner.runProtocolWithMessage(data);

const outgoingEventData = getOutgoingEventDataFromProtocol(
Expand Down Expand Up @@ -83,6 +97,18 @@ export async function handleReceivedProtocolMessage(
}
}

if (thisIsADepositConfirmed) {
router.emit(
NODE_EVENTS.DEPOSIT_CONFIRMED,
{
from: publicIdentifier,
type: NODE_EVENTS.DEPOSIT_CONFIRMED,
data: {} // TODO: Validate correct values for the amount, token, etc
} as DepositConfirmationMessage,
"outgoing"
);
}

if (outgoingEventData) {
await emitOutgoingNodeMessage(router, outgoingEventData);
}
Expand Down
20 changes: 3 additions & 17 deletions packages/node/src/methods/state-channel/deposit/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export default class DepositController extends NodeController {
requestHandler: RequestHandler,
params: Node.DepositParams
): Promise<Node.DepositResult> {
const { outgoing, provider } = requestHandler;
const { outgoing, provider, publicIdentifier } = requestHandler;
const { multisigAddress, tokenAddress } = params;

params.tokenAddress = tokenAddress || CONVENTION_FOR_ETH_TOKEN_ADDRESS;
Expand All @@ -87,25 +87,11 @@ export default class DepositController extends NodeController {
await makeDeposit(requestHandler, params);
await uninstallBalanceRefundApp(requestHandler, params);

// send deposit confirmation to counter party _after_ the balance refund
// app is installed so as to prevent needing to handle the case of
// the counter party hitting the issue of
// "Cannot deposit while another deposit is occurring in the channel."
const { messagingService, publicIdentifier, store } = requestHandler;
const [counterpartyAddress] = await StateChannel.getPeersAddressFromChannel(
publicIdentifier,
store,
multisigAddress
);

const payload: DepositConfirmationMessage = {
outgoing.emit(NODE_EVENTS.DEPOSIT_CONFIRMED, {
from: publicIdentifier,
type: NODE_EVENTS.DEPOSIT_CONFIRMED,
data: params
};

await messagingService.send(counterpartyAddress, payload);
outgoing.emit(NODE_EVENTS.DEPOSIT_CONFIRMED, payload);
});

return {
multisigBalance: await provider.getBalance(multisigAddress)
Expand Down
15 changes: 8 additions & 7 deletions packages/node/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,25 +337,26 @@ export class Node {
* solely to the deffered promise's resolve callback.
*/
private async handleReceivedMessage(msg: NodeTypes.NodeMessage) {
if (!Object.values(NODE_EVENTS).includes(msg.type)) {
console.error(`Received message with unknown event type: ${msg.type}`);
if (!this.requestHandler.hasMessageHandler(msg.type)) {
throw new Error(`Received message with unknown type: ${msg.type}.}`);
}

const isProtocolMessage = (msg: NodeTypes.NodeMessage) =>
msg.type === NODE_EVENTS.PROTOCOL_MESSAGE_EVENT;

const isExpectingResponse = (msg: NodeMessageWrappedProtocolMessage) =>
this.ioSendDeferrals.has(msg.data.processID);

if (
isProtocolMessage(msg) &&
isExpectingResponse(msg as NodeMessageWrappedProtocolMessage)
) {
await this.handleIoSendDeferral(msg as NodeMessageWrappedProtocolMessage);
} else if (this.requestHandler.isLegacyEvent(msg.type)) {
await this.requestHandler.callEvent(msg.type, msg);
} else {
await this.rpcRouter.emit(msg.type, msg);
return await this.handleIoSendDeferral(
msg as NodeMessageWrappedProtocolMessage
);
}

return await this.requestHandler.callMessageHandler(msg);
}

private async handleIoSendDeferral(msg: NodeMessageWrappedProtocolMessage) {
Expand Down
78 changes: 33 additions & 45 deletions packages/node/src/request-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@ import { NetworkContext, Node } from "@counterfactual/types";
import { Signer } from "ethers";
import { BaseProvider, JsonRpcProvider } from "ethers/providers";
import EventEmitter from "eventemitter3";
import log from "loglevel";

import { eventNameToImplementation, methodNameToImplementation } from "./api";
import { methodNameToImplementation } from "./api";
import { ProtocolRunner } from "./engine";
import { handleRejectProposalMessage } from "./message-handling/handle-node-message";
import { handleReceivedProtocolMessage } from "./message-handling/handle-protocol-message";
import ProcessQueue from "./process-queue";
import RpcRouter from "./rpc-router";
import { Store } from "./store";
import { NODE_EVENTS, NodeEvents } from "./types";
import {
NODE_EVENTS,
NodeEvents,
NodeMessageWrappedProtocolMessage,
RejectProposalMessage
} from "./types";
import { prettyPrintObject } from "./utils";

/**
Expand All @@ -18,7 +24,6 @@ import { prettyPrintObject } from "./utils";
*/
export class RequestHandler {
private readonly methods = new Map();
private readonly events = new Map();

router!: RpcRouter;

Expand All @@ -33,13 +38,12 @@ export class RequestHandler {
readonly provider: BaseProvider,
readonly wallet: Signer,
readonly blocksNeededForConfirmation: number,
public readonly processQueue: ProcessQueue
readonly processQueue: ProcessQueue
) {}

injectRouter(router: RpcRouter) {
this.router = router;
this.mapPublicApiMethods();
this.mapEventHandlers();
}

/**
Expand Down Expand Up @@ -82,51 +86,35 @@ export class RequestHandler {
}
}

/**
* This maps the Node event names to their respective handlers.
*
* These are the events being listened on to detect requests from peer Nodes.
* https://github.com/counterfactual/monorepo/blob/master/packages/cf.js/API_REFERENCE.md#events
*/
private mapEventHandlers() {
for (const eventName of Object.values(NODE_EVENTS)) {
this.events.set(eventName, eventNameToImplementation[eventName]);
}
}

/**
* This is internally called when an event is received from a peer Node.
* Node consumers can separately setup their own callbacks for incoming events.
* @param event
* @param msg
*/
public async callEvent(event: NodeEvents, msg: Node.NodeMessage) {
const controllerExecutionMethod = this.events.get(event);
const controllerCount = this.router.eventListenerCount(event);

if (!controllerExecutionMethod && controllerCount === 0) {
if (event === NODE_EVENTS.DEPOSIT_CONFIRMED) {
log.info(
`No event handler for counter depositing into channel: ${JSON.stringify(
msg,
undefined,
4
)}`
public async callMessageHandler(msg: Node.NodeMessage) {
switch (msg.type) {
case NODE_EVENTS.PROTOCOL_MESSAGE_EVENT:
await handleReceivedProtocolMessage(
this,
// TODO: Replace type cast with input validation
msg as NodeMessageWrappedProtocolMessage
);
} else {
throw Error(`Recent ${event} which has no event handler`);
}
}
break;

case NODE_EVENTS.REJECT_INSTALL:
case NODE_EVENTS.REJECT_INSTALL_VIRTUAL:
// TODO: Replace type cast with input validation
await handleRejectProposalMessage(this, msg as RejectProposalMessage);
break;

if (controllerExecutionMethod) {
await controllerExecutionMethod(this, msg);
default:
throw new Error(`Received unknown message ${msg.type}`);
}

this.router.emit(event, msg);
this.router.emit(msg.type, msg);
}

public async isLegacyEvent(event: NodeEvents) {
return this.events.has(event);
public async hasMessageHandler(event: NodeEvents) {
return [
NODE_EVENTS.PROTOCOL_MESSAGE_EVENT,
NODE_EVENTS.REJECT_INSTALL,
NODE_EVENTS.REJECT_INSTALL_VIRTUAL
].includes(event);
}

public async getSigner(): Promise<Signer> {
Expand Down