Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyoshiaki committed Jan 1, 2024
1 parent 8ffeb12 commit 50777ee
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 75 deletions.
86 changes: 46 additions & 40 deletions packages/ice/src/ice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { InterfaceAddresses } from "../../common/src/network";
import { Candidate, candidateFoundation, candidatePriority } from "./candidate";
import { DnsLookup } from "./dns/lookup";
import { TransactionError } from "./exceptions";
import { Future, PQueue, difference, future, randomString } from "./helper";
import { Future, PQueue, future, randomString } from "./helper";
import { classes, methods } from "./stun/const";
import { Message, parseMessage } from "./stun/message";
import { StunProtocol } from "./stun/protocol";
Expand All @@ -24,28 +24,29 @@ import { getHostAddresses, normalizeFamilyNodeV18 } from "./utils";
const log = debug("werift-ice : packages/ice/src/ice.ts : log");

export class Connection {
readonly stunServer?: Address;
readonly turnServer?: Address;
readonly useIpv4: boolean;
readonly useIpv6: boolean;
readonly options: IceOptions;

iceControlling: boolean;
localUserName = randomString(4);
localPassword = randomString(22);
remotePassword: string = "";
remoteUsername: string = "";
remoteIsLite = false;
checkList: CandidatePair[] = [];
localCandidates: Candidate[] = [];
stunServer?: Address;
turnServer?: Address;
useIpv4: boolean;
useIpv6: boolean;
options: IceOptions;
remoteCandidatesEnd = false;
_localCandidatesEnd = false;
_tieBreaker: bigint = BigInt(new Uint64BE(randomBytes(64)).toString());
state: IceState = "new";
dnsLookup?: DnsLookup;
restarted = false;

readonly onData = new Event<[Buffer, number]>();
readonly stateChanged = new Event<[IceState]>();
restarting = false;

/**@private */
_localCandidatesEnd = false;
/**@private */
_tieBreaker: bigint = BigInt(new Uint64BE(randomBytes(64)).toString());
private dnsLookup?: DnsLookup;
private _remoteCandidates: Candidate[] = [];
// P2P接続完了したソケット
private nominated?: CandidatePair;
Expand All @@ -58,7 +59,11 @@ export class Connection {
private queryConsentHandle?: Future;
private promiseGatherCandidates?: Event<[]>;

constructor(public iceControlling: boolean, options?: Partial<IceOptions>) {
readonly onData = new Event<[Buffer, number]>();
readonly stateChanged = new Event<[IceState]>();

constructor(iceControlling: boolean, options?: Partial<IceOptions>) {
this.iceControlling = iceControlling;
this.options = {
...defaultOptions,
...options,
Expand Down Expand Up @@ -308,6 +313,7 @@ export class Connection {
this.queryConsentHandle = future(this.queryConsent());

this.setState("connected");
this.restarting = false;
}

private unfreezeInitial() {
Expand Down Expand Up @@ -511,33 +517,14 @@ export class Connection {
}

send = async (data: Buffer) => {
// """
// Send a datagram on the first component.

// If the connection is not established, a `ConnectionError` is raised.

// :param data: The data to be sent.
// """
await this.sendTo(data);
};

private async sendTo(data: Buffer) {
// """
// Send a datagram on the specified component.

// If the connection is not established, a `ConnectionError` is raised.

// :param data: The data to be sent.
// :param component: The component on which to send the data.
// """
const activePair = this.nominated;
if (activePair) {
await activePair.protocol.sendData(data, activePair.remoteAddr);
} else {
// log("Cannot send data, ice not connected");
return;
}
}
};

getDefaultCandidate() {
const candidates = this.localCandidates.sort(
Expand Down Expand Up @@ -675,12 +662,6 @@ export class Connection {
this.sortCheckList();
}

resetNominatedPair() {
log("resetNominatedPair");
this.nominated = undefined;
this.nominating = false;
}

private checkComplete(pair: CandidatePair) {
pair.handle = undefined;
if (pair.state === CandidatePairState.SUCCEEDED) {
Expand Down Expand Up @@ -963,6 +944,31 @@ export class Connection {
log("sendStun error", e);
});
}

restart() {
this.restarting = true;

this.localUserName = randomString(4);
this.localPassword = randomString(22);
this.remotePassword = "";
this.remoteUsername = "";
this.remoteIsLite = false;
this.checkList = [];
this.localCandidates = [];
this.remoteCandidatesEnd = false;
this.state = "new";
this._localCandidatesEnd = false;
this._remoteCandidates = [];
this.nominated = undefined;
this.nominating = false;
this.checkListDone = false;
this.checkListState = new PQueue<number>();
this.earlyChecks = [];
this.localCandidatesStart = false;
this.protocols = [];
this.queryConsentHandle = undefined;
this.promiseGatherCandidates = undefined;
}
}

export class CandidatePair {
Expand Down
66 changes: 43 additions & 23 deletions packages/webrtc/src/peerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ export class RTCPeerConnection extends EventTarget {
this.transceivers[index] = t;
}

candidatesSent = new Set<string>();

readonly iceGatheringStateChange = new Event<[IceGathererState]>();
readonly iceConnectionStateChange = new Event<[RTCIceConnectionState]>();
readonly signalingStateChange = new Event<[RTCSignalingState]>();
Expand Down Expand Up @@ -147,6 +145,12 @@ export class RTCPeerConnection extends EventTarget {
return this.router.extIdUriMap;
}

get dataChannels() {
const channels =
Object.values(this.sctpTransport?.dataChannels ?? {}) ?? [];
return channels;
}

constructor(config: Partial<PeerConfig> = {}) {
super();

Expand Down Expand Up @@ -419,6 +423,7 @@ export class RTCPeerConnection extends EventTarget {
});

const channel = new RTCDataChannel(this.sctpTransport, parameters);
this.sctpTransport.dataChannels[channel.id] = channel;
return channel;
}

Expand Down Expand Up @@ -528,16 +533,6 @@ export class RTCPeerConnection extends EventTarget {

candidate.foundation = "candidate:" + candidate.foundation;

// prevent ice candidates that have already been sent from being being resent
// when the connection is renegotiated during a later setLocalDescription call.
if (candidate.sdpMid) {
const candidateKey = `${candidate.foundation}:${candidate.sdpMid}`;
if (this.candidatesSent.has(candidateKey)) {
return;
}
this.candidatesSent.add(candidateKey);
}

this.onIceCandidate.execute(candidate.toJSON());
if (this.onicecandidate) {
this.onicecandidate({ candidate: candidate.toJSON() });
Expand Down Expand Up @@ -595,8 +590,7 @@ export class RTCPeerConnection extends EventTarget {
this.setSignalingState("stable");
}

// # assign MID
description.media.forEach((media, i) => {
for (const [i, media] of enumerate(description.media)) {
const mid = media.rtp.muxId!;
this.seenMid.add(mid);
if (["audio", "video"].includes(media.kind)) {
Expand All @@ -608,10 +602,13 @@ export class RTCPeerConnection extends EventTarget {
if (media.kind === "application" && this.sctpTransport) {
this.sctpTransport.mid = mid;
}
});
}

const setupRole = (dtlsTransport: RTCDtlsTransport) => {
const iceTransport = dtlsTransport.iceTransport;
if (iceTransport.connection.restarting) {
return;
}

// # set ICE role
if (description.type === "offer") {
Expand All @@ -634,14 +631,17 @@ export class RTCPeerConnection extends EventTarget {
}
}
};
this.dtlsTransports.forEach((d) => setupRole(d));

for (const d of this.dtlsTransports) {
setupRole(d);
}

// # configure direction
if (["answer", "pranswer"].includes(description.type)) {
this.transceivers.forEach((t) => {
for (const t of this.transceivers) {
const direction = andDirection(t.direction, t.offerDirection);
t.setCurrentDirection(direction);
});
}
}

// for trickle ice
Expand Down Expand Up @@ -670,11 +670,12 @@ export class RTCPeerConnection extends EventTarget {
);
}

description.media
.filter((m) => ["audio", "video"].includes(m.kind))
.forEach((m, i) => {
addTransportDescription(m, this.transceivers[i].dtlsTransport);
});
for (const [i, m] of enumerate(
description.media.filter((m) => ["audio", "video"].includes(m.kind)),
)) {
addTransportDescription(m, this.transceivers[i].dtlsTransport);
}

const sctpMedia = description.media.find((m) => m.kind === "application");
if (this.sctpTransport && sctpMedia) {
addTransportDescription(sctpMedia, this.sctpTransport.dtlsTransport);
Expand Down Expand Up @@ -751,6 +752,18 @@ export class RTCPeerConnection extends EventTarget {

private async connect() {
if (this.transportEstablished) {
await Promise.allSettled(
this.dtlsTransports.map(async (dtlsTransport) => {
const { iceTransport } = dtlsTransport;
if (iceTransport.connection.restarting) {
await iceTransport.start().catch((err) => {
log("iceTransport.start failed", err);
throw err;
});
}
}),
);

return;
}
log("start connect");
Expand Down Expand Up @@ -784,6 +797,13 @@ export class RTCPeerConnection extends EventTarget {
this.setConnectionState("connected");
}

restartIce() {
for (const ice of this.iceTransports) {
ice.restart();
}
this.needNegotiation();
}

private getLocalRtpParams(transceiver: RTCRtpTransceiver): RTCRtpParameters {
if (transceiver.mid == undefined) throw new Error("mid not assigned");

Expand Down
48 changes: 36 additions & 12 deletions packages/webrtc/src/transport/ice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ export class RTCIceTransport {

readonly onStateChange = new Event<[RTCIceConnectionState]>();

private waitStart?: Event<[]>;

constructor(private gather: RTCIceGatherer) {
this.connection = this.gather.connection;
this.connection.stateChanged.subscribe((state) => {
Expand Down Expand Up @@ -65,18 +63,21 @@ export class RTCIceTransport {
this.connection.remotePassword !== remoteParameters.password)
) {
log("restartIce", remoteParameters);
this.connection.resetNominatedPair();
this.gather.restart();
}
this.connection.setRemoteParams(remoteParameters);
}

async start() {
if (this.state === "closed") throw new Error("RTCIceTransport is closed");
if (!this.connection.remotePassword || !this.connection.remoteUsername)
if (this.state === "closed") {
throw new Error("RTCIceTransport is closed");
}
if (!this.connection.remotePassword || !this.connection.remoteUsername) {
throw new Error("remoteParams missing");
}

if (this.waitStart) await this.waitStart.asPromise();
this.waitStart = new Event();
if (this.gather.waitStart) await this.gather.waitStart.asPromise();
this.gather.waitStart = new Event();

this.setState("checking");

Expand All @@ -87,7 +88,11 @@ export class RTCIceTransport {
throw error;
}

this.waitStart.complete();
this.gather.waitStart.complete();
}

restart() {
this.gather.restart();
}

async stop() {
Expand All @@ -113,8 +118,10 @@ export const IceGathererStates = ["new", "gathering", "complete"] as const;
export type IceGathererState = (typeof IceGathererStates)[number];

export class RTCIceGatherer {
candidatesSent = new Set<string>();
onIceCandidate: (candidate: IceCandidate) => void = () => {};
gatheringState: IceGathererState = "new";
waitStart?: Event<[]>;

readonly onGatheringStateChange = new Event<[IceGathererState]>();
readonly connection: Connection;
Expand All @@ -126,9 +133,18 @@ export class RTCIceGatherer {
async gather() {
if (this.gatheringState === "new") {
this.setState("gathering");
await this.connection.gatherCandidates((candidate) =>
this.onIceCandidate(candidateFromIce(candidate)),
);
await this.connection.gatherCandidates((candidate) => {
// prevent ice candidates that have already been sent from being being resent
// when the connection is renegotiated during a later setLocalDescription call.

const candidateKey = candidate.foundation;
if (this.candidatesSent.has(candidateKey)) {
return;
}

this.candidatesSent.add(candidateKey);
this.onIceCandidate(candidateFromIce(candidate));
});
this.setState("complete");
}
}
Expand All @@ -146,7 +162,15 @@ export class RTCIceGatherer {
return params;
}

private setState(state: IceGathererState) {
restart() {
this.setState("new");
this.candidatesSent.clear();
this.waitStart = undefined;

this.connection.restart();
}

setState(state: IceGathererState) {
if (state !== this.gatheringState) {
this.gatheringState = state;
this.onGatheringStateChange.execute(state);
Expand Down
Loading

0 comments on commit 50777ee

Please sign in to comment.