Skip to content

Commit

Permalink
Client support for targeted signals (microsoft#22321)
Browse files Browse the repository at this point in the history
## Description
Client side changes needed to support targeting signals to a specific
client id.

Signals are now sent with v2 signals protocol (`ISentSignalMessage`)

Unnecessary override of `submitSignal` function is removed from
localDocumentDeltaConnection. This is handled in documentDeltaConnection
of base driver

These changes follow the server changes to support targeted signals
microsoft#19519

[ADO Task
7026](https://dev.azure.com/fluidframework/internal/_workitems/edit/7026)
  • Loading branch information
WillieHabi authored Sep 14, 2024
1 parent da0c9fa commit f8d3fed
Show file tree
Hide file tree
Showing 12 changed files with 648 additions and 103 deletions.
6 changes: 5 additions & 1 deletion packages/common/core-interfaces/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@
"typescript": "~5.4.5"
},
"typeValidation": {
"broken": {},
"broken": {
"Interface_ISignalEnvelope": {
"backCompat": false
}
},
"entrypoint": "internal"
}
}
11 changes: 9 additions & 2 deletions packages/common/core-interfaces/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@

/**
* @internal
*
* @privateRemarks
* `IRuntimeSignalEnvelope` is an interface that mirrors `ISignalEnvelope` for signals that come from an external
* caller (not sent by a client—so no `clientBroadcastSignalSequenceNumber`) and are always addressed
* to the Container (so no `address`).
*
* See at `server/routerlicious/packages/lambdas/src/utils/messageGenerator.ts`.
*/
export interface ISignalEnvelope {
/**
Expand All @@ -13,9 +20,9 @@ export interface ISignalEnvelope {
address?: string;

/**
* Identifier for the signal being submitted.
* Signal tracking identifier for self submitted broadcast signals.
*/
clientSignalSequenceNumber: number;
clientBroadcastSignalSequenceNumber?: number;

/**
* The contents of the envelope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ declare type old_as_current_for_Interface_ISignalEnvelope = requireAssignableTo<
* typeValidation.broken:
* "Interface_ISignalEnvelope": {"backCompat": false}
*/
// @ts-expect-error compatibility expected to be broken
declare type current_as_old_for_Interface_ISignalEnvelope = requireAssignableTo<TypeOnly<current.ISignalEnvelope>, TypeOnly<old.ISignalEnvelope>>

/*
Expand Down
44 changes: 38 additions & 6 deletions packages/drivers/driver-base/src/documentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
IConnect,
IConnected,
IDocumentMessage,
type ISentSignalMessage,
ISignalClient,
ITokenClaims,
ScopeType,
Expand All @@ -39,6 +40,8 @@ import type { Socket } from "socket.io-client";
// For now, this package is versioned and released in unison with the specific drivers
import { pkgVersion as driverVersion } from "./packageVersion.js";

const feature_submit_signals_v2 = "submit_signals_v2";

/**
* Represents a connection to a stream of delta updates.
* @internal
Expand Down Expand Up @@ -312,9 +315,24 @@ export class DocumentDeltaConnection
this.checkNotDisposed();
return this.details.initialClients;
}

/**
* Emits 'submitOp' messages.
* @param type - Must be 'submitOp'.
* @param messages - An array of document messages to submit.
*/
protected emitMessages(type: "submitOp", messages: IDocumentMessage[][]): void;
protected emitMessages(type: "submitSignal", messages: string[][]): void;

/**
* Emits 'submitSignal' messages.
*
* **Note:** When using `ISentSignalMessage[]`, the service must support the `submit_signals_v2` feature.
* @param type - Must be 'submitSignal'.
* @param messages - An array of signals to submit. Can be either `string[][]` or `ISentSignalMessage[]`.
*/
protected emitMessages(
type: "submitSignal",
messages: string[][] | ISentSignalMessage[],
): void;
protected emitMessages(type: string, messages: unknown): void {
// Although the implementation here disconnects the socket and does not reuse it, other subclasses
// (e.g. OdspDocumentDeltaConnection) may reuse the socket. In these cases, we need to avoid emitting
Expand Down Expand Up @@ -343,11 +361,20 @@ export class DocumentDeltaConnection
public submitSignal(content: string, targetClientId?: string): void {
this.checkNotDisposed();

if (targetClientId && this.details.supportedFeatures?.submit_signals_v2 !== true) {
throw new UsageError("Sending signals to specific client ids is not supported.");
// Check for server-side support of v2 signals
if (this.details.supportedFeatures?.submit_signals_v2 === true) {
const signal: ISentSignalMessage = { content };
if (targetClientId !== undefined) {
signal.targetClientId = targetClientId;
}
this.emitMessages("submitSignal", [signal]);
} else if (targetClientId !== undefined) {
throw new UsageError(
"Sending signals to specific client ids is not supported with this service.",
);
} else {
this.emitMessages("submitSignal", [[content]]);
}

this.emitMessages("submitSignal", [[content]]);
}

/**
Expand Down Expand Up @@ -433,6 +460,11 @@ export class DocumentDeltaConnection
this.socket.on("signal", this.earlySignalHandler);
this.earlyOpHandlerAttached = true;

connectMessage.supportedFeatures = {
...connectMessage.supportedFeatures,
[feature_submit_signals_v2]: true,
};

// Socket.io's reconnect_attempt event is unreliable, so we track connect_error count instead.
let internalSocketConnectionFailureCount: number = 0;
const isInternalSocketReconnectionEnabled = (): boolean => this.socket.io.reconnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,6 @@ export class LocalDocumentDeltaConnection extends DocumentDeltaConnection {
});
}

/**
* Submits a new signal to the server
*/
public submitSignal(message: string): void {
this.emitMessages("submitSignal", [[message]]);
}

/**
* Send a "disconnect" message on the socket.
* @param disconnectReason - The reason of the disconnection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import { SocketIOClientStatic } from "./socketModule.js";
const protocolVersions = ["^0.4.0", "^0.3.0", "^0.2.0", "^0.1.0"];
const feature_get_ops = "api_get_ops";
const feature_flush_ops = "api_flush_ops";
const feature_submit_signals_v2 = "submit_signals_v2";

export interface FlushResult {
lastPersistedSequenceNumber?: number;
Expand Down Expand Up @@ -296,9 +295,7 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
relayUserAgent: [client.details.environment, ` driverVersion:${pkgVersion}`].join(";"),
};

connectMessage.supportedFeatures = {
[feature_submit_signals_v2]: true,
};
connectMessage.supportedFeatures = {};

// Reference to this client supporting get_ops flow.
if (mc.config.getBoolean("Fluid.Driver.Odsp.GetOpsEnabled") !== false) {
Expand Down
3 changes: 2 additions & 1 deletion packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,9 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
if (this.handler === undefined) {
throw new Error("Attempted to process an inbound signal without a handler attached");
}

this.handler.processSignal({
clientId: message.clientId,
...message,
content: JSON.parse(message.content as string),
});
});
Expand Down
23 changes: 16 additions & 7 deletions packages/runtime/container-runtime/src/connectionTelemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,24 @@ class OpPerfTelemetry {
}
export interface IPerfSignalReport {
/**
* Identifier for the signal being submitted in order to
* Identifier to track broadcast signals being submitted in order to
* allow collection of data around the roundtrip of signal messages.
*/
signalSequenceNumber: number;
broadcastSignalSequenceNumber: number;

/**
* Accumulates the total number of broadcast signals sent during the current signal latency measurement window.
* This value represents the total number of signals sent since the latency measurement began and is used
* logged in telemetry when the latency measurement completes.
*/
totalSignalsSentInLatencyWindow: number;

/**
* Counts the number of broadcast signals sent since the last latency measurement was initiated.
* This counter increments with each broadcast signal sent. When a new latency measurement starts,
* this counter is added to `totalSignalsSentInLatencyWindow` and then reset to zero.
*/
signalsSentSinceLastLatencyMeasurement: number;

/**
* Number of signals that were expected but not received.
Expand All @@ -465,11 +479,6 @@ export interface IPerfSignalReport {
*/
roundTripSignalSequenceNumber: number | undefined;

/**
* The lower bound of the upcoming (now current) tracking group
*/
baseSignalTrackingGroupSequenceNumber: number | undefined;

/**
* Next expected signal sequence number to be received.
*/
Expand Down
Loading

0 comments on commit f8d3fed

Please sign in to comment.