Skip to content

Commit

Permalink
feat: handle device disconnection (#1174)
Browse files Browse the repository at this point in the history
This PRs adds the last missing piece to the new device management API:
handling device disconnections.

## What's new?

### Audio, video manager
- If a device is disconnected, we deselect that device (not available in
RN)
- If a track ends, we update the device manager status to `disabled`
- If the "default" device is replaced, we restart the device (not
available in RN)

### Screenshare manager
- If a track ends, we update the device manager status to `disabled`

### Speaker manager
- If a device is disconnected, we deselect that device (not available in
RN)

## Implementation details

### Watching for disconnected devices
The `deviceIds$` Observable notifies the manager about device
connections/disconnections. This Observable uses `enumerateDevices`, so
it will never ask for permissions (we only need to ask for permissions
if we need device labels, but we don't need them for watching
disconnected devices).

### Handling track ending
We add the `ended` event handler to all tracks of a media stream, if any
track ends, we update the manager's status to `disabled`.

### Handling new default device
If a device's `deviceId` stays the same, but it's `groupId` is changed,
a device is replaced. We restart the stream if the device was previously
enabled.

⚠️ The PR also removed unused device helper methods, that are remainings
of the old device API that's already removed, on the offchance a
customer uses any of these helpers they should upgrade to the new,
simlified device management API:
- For JS client:
https://getstream.io/video/docs/javascript/guides/camera-and-microphone/
- For React:
https://getstream.io/video/docs/react/guides/camera-and-microphone/
- For RN:
https://getstream.io/video/docs/reactnative/core/camera-and-microphone/
  • Loading branch information
szuperaz authored Nov 13, 2023
1 parent b5006bf commit ae3779f
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 142 deletions.
11 changes: 10 additions & 1 deletion packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,11 @@ export class Call {

this.clientStore.unregisterCall(this);
this.state.setCallingState(CallingState.LEFT);

this.camera.removeSubscriptions();
this.microphone.removeSubscriptions();
this.screenShare.removeSubscriptions();
this.speaker.removeSubscriptions();
};

/**
Expand Down Expand Up @@ -1004,7 +1009,11 @@ export class Call {
await this.initCamera({ setStatus: true });
await this.initMic({ setStatus: true });
} catch (error) {
this.logger('warn', 'Camera and/or mic init failed during join call');
this.logger(
'warn',
'Camera and/or mic init failed during join call',
error,
);
}

// 3. once we have the "joinResponse", and possibly reconciled the local state
Expand Down
102 changes: 98 additions & 4 deletions packages/client/src/devices/InputMediaDeviceManager.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Observable } from 'rxjs';
import { Observable, Subscription, combineLatest, pairwise } from 'rxjs';
import { Call } from '../Call';
import { CallingState } from '../store';
import { InputMediaDeviceManagerState } from './InputMediaDeviceManagerState';
import { isReactNative } from '../helpers/platforms';
import { Logger } from '../coordinator/connection/types';
import { getLogger } from '../logger';
import { TrackType } from '../gen/video/sfu/models/models';
import { deviceIds$ } from './devices';

export abstract class InputMediaDeviceManager<
T extends InputMediaDeviceManagerState<C>,
Expand All @@ -20,13 +21,22 @@ export abstract class InputMediaDeviceManager<
*/
disablePromise?: Promise<void>;
logger: Logger;
private subscriptions: Subscription[] = [];
private isTrackStoppedDueToTrackEnd = false;

protected constructor(
protected readonly call: Call,
public readonly state: T,
protected readonly trackType: TrackType,
) {
this.logger = getLogger([`${TrackType[trackType].toLowerCase()} manager`]);
if (
deviceIds$ &&
!isReactNative() &&
(this.trackType === TrackType.AUDIO || this.trackType === TrackType.VIDEO)
) {
this.handleDisconnectedOrReplacedDevices();
}
}

/**
Expand Down Expand Up @@ -125,6 +135,10 @@ export abstract class InputMediaDeviceManager<
await this.applySettingsToStream();
}

removeSubscriptions = () => {
this.subscriptions.forEach((s) => s.unsubscribe());
};

protected async applySettingsToStream() {
if (this.state.status === 'enabled') {
await this.muteStream();
Expand Down Expand Up @@ -204,9 +218,6 @@ export abstract class InputMediaDeviceManager<
stream = this.state.mediaStream;
this.unmuteTracks();
} else {
if (this.state.mediaStream) {
this.stopTracks();
}
const defaultConstraints = this.state.defaultConstraints;
const constraints: MediaTrackConstraints = {
...defaultConstraints,
Expand All @@ -219,6 +230,89 @@ export abstract class InputMediaDeviceManager<
}
if (this.state.mediaStream !== stream) {
this.state.setMediaStream(stream);
this.getTracks().forEach((track) => {
track.addEventListener('ended', async () => {
if (this.enablePromise) {
await this.enablePromise;
}
if (this.disablePromise) {
await this.disablePromise;
}
if (this.state.status === 'enabled') {
this.isTrackStoppedDueToTrackEnd = true;
setTimeout(() => {
this.isTrackStoppedDueToTrackEnd = false;
}, 2000);
await this.disable();
}
});
});
}
}

private get mediaDeviceKind() {
if (this.trackType === TrackType.AUDIO) {
return 'audioinput';
}
if (this.trackType === TrackType.VIDEO) {
return 'videoinput';
}
return '';
}

private handleDisconnectedOrReplacedDevices() {
this.subscriptions.push(
combineLatest([
deviceIds$!.pipe(pairwise()),
this.state.selectedDevice$,
]).subscribe(async ([[prevDevices, currentDevices], deviceId]) => {
if (!deviceId) {
return;
}
if (this.enablePromise) {
await this.enablePromise;
}
if (this.disablePromise) {
await this.disablePromise;
}

let isDeviceDisconnected = false;
let isDeviceReplaced = false;
const currentDevice = this.findDeviceInList(currentDevices, deviceId);
const prevDevice = this.findDeviceInList(prevDevices, deviceId);
if (!currentDevice && prevDevice) {
isDeviceDisconnected = true;
} else if (
currentDevice &&
prevDevice &&
currentDevice.deviceId === prevDevice.deviceId &&
currentDevice.groupId !== prevDevice.groupId
) {
isDeviceReplaced = true;
}

if (isDeviceDisconnected) {
await this.disable();
this.select(undefined);
}
if (isDeviceReplaced) {
if (
this.isTrackStoppedDueToTrackEnd &&
this.state.status === 'disabled'
) {
await this.enable();
this.isTrackStoppedDueToTrackEnd = false;
} else {
await this.applySettingsToStream();
}
}
}),
);
}

private findDeviceInList(devices: MediaDeviceInfo[], deviceId: string) {
return devices.find(
(d) => d.deviceId === deviceId && d.kind === this.mediaDeviceKind,
);
}
}
28 changes: 27 additions & 1 deletion packages/client/src/devices/SpeakerManager.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
import { Subscription, combineLatest } from 'rxjs';
import { isReactNative } from '../helpers/platforms';
import { SpeakerState } from './SpeakerState';
import { getAudioOutputDevices } from './devices';
import { deviceIds$, getAudioOutputDevices } from './devices';

export class SpeakerManager {
public readonly state = new SpeakerState();
private subscriptions: Subscription[] = [];

constructor() {
if (deviceIds$ && !isReactNative()) {
this.subscriptions.push(
combineLatest([deviceIds$!, this.state.selectedDevice$]).subscribe(
([devices, deviceId]) => {
if (!deviceId) {
return;
}
const device = devices.find(
(d) => d.deviceId === deviceId && d.kind === 'audiooutput',
);
if (!device) {
this.select('');
}
},
),
);
}
}

/**
* Lists the available audio output devices
Expand All @@ -30,6 +52,10 @@ export class SpeakerManager {
this.state.setDevice(deviceId);
}

removeSubscriptions = () => {
this.subscriptions.forEach((s) => s.unsubscribe());
};

/**
* Set the volume of the audio elements
* @param volume a number between 0 and 1
Expand Down
8 changes: 7 additions & 1 deletion packages/client/src/devices/__tests__/CameraManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ import { StreamClient } from '../../coordinator/connection/client';
import { CallingState, StreamVideoWriteableStateStore } from '../../store';

import { afterEach, beforeEach, describe, expect, it, Mock, vi } from 'vitest';
import { mockCall, mockVideoDevices, mockVideoStream } from './mocks';
import {
mockCall,
mockDeviceIds$,
mockVideoDevices,
mockVideoStream,
} from './mocks';
import { getVideoStream } from '../devices';
import { TrackType } from '../../gen/video/sfu/models/models';
import { CameraManager } from '../CameraManager';
Expand All @@ -17,6 +22,7 @@ vi.mock('../devices.ts', () => {
return of(mockVideoDevices);
}),
getVideoStream: vi.fn(() => Promise.resolve(mockVideoStream())),
deviceIds$: mockDeviceIds$(),
};
});

Expand Down
107 changes: 105 additions & 2 deletions packages/client/src/devices/__tests__/InputMediaDeviceManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ import { StreamClient } from '../../coordinator/connection/client';
import { CallingState, StreamVideoWriteableStateStore } from '../../store';

import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { mockCall, mockVideoDevices, mockVideoStream } from './mocks';
import {
MockTrack,
emitDeviceIds,
mockCall,
mockDeviceIds$,
mockVideoDevices,
mockVideoStream,
} from './mocks';
import { InputMediaDeviceManager } from '../InputMediaDeviceManager';
import { InputMediaDeviceManagerState } from '../InputMediaDeviceManagerState';
import { of } from 'rxjs';
Expand All @@ -16,8 +23,17 @@ vi.mock('../../Call.ts', () => {
};
});

vi.mock('../devices.ts', () => {
console.log('MOCKING devices API');
return {
deviceIds$: mockDeviceIds$(),
};
});

class TestInputMediaDeviceManagerState extends InputMediaDeviceManagerState {
public getDeviceIdFromStream = vi.fn(() => 'mock-device-id');
public getDeviceIdFromStream = vi.fn(
(stream) => stream.getVideoTracks()[0].getSettings().deviceId,
);
}

class TestInputMediaDeviceManager extends InputMediaDeviceManager<TestInputMediaDeviceManagerState> {
Expand Down Expand Up @@ -217,6 +233,93 @@ describe('InputMediaDeviceManager.test', () => {
});
});

it('should set status to disabled if track ends', async () => {
vi.useFakeTimers();

await manager.enable();

vi.spyOn(manager, 'enable');
vi.spyOn(manager, 'listDevices').mockImplementationOnce(() =>
of(mockVideoDevices.slice(1)),
);
await (
(manager.state.mediaStream?.getTracks()[0] as MockTrack).eventHandlers[
'ended'
] as Function
)();
await vi.runAllTimersAsync();

expect(manager.state.status).toBe('disabled');
expect(manager.enable).not.toHaveBeenCalled();

vi.useRealTimers();
});

it('should restart track if the default device is replaced and status is enabled', async () => {
vi.useFakeTimers();
emitDeviceIds(mockVideoDevices);

await manager.enable();
const device = mockVideoDevices[0];
await manager.select(device.deviceId);

//@ts-expect-error
vi.spyOn(manager, 'applySettingsToStream');

emitDeviceIds([
{ ...device, groupId: device.groupId + 'new' },
...mockVideoDevices.slice(1),
]);

await vi.runAllTimersAsync();

expect(manager['applySettingsToStream']).toHaveBeenCalledOnce();
expect(manager.state.status).toBe('enabled');

vi.useRealTimers();
});

it('should do nothing if default device is replaced and status is disabled', async () => {
vi.useFakeTimers();
emitDeviceIds(mockVideoDevices);

const device = mockVideoDevices[0];
await manager.select(device.deviceId);
await manager.disable();

emitDeviceIds([
{ ...device, groupId: device.groupId + 'new' },
...mockVideoDevices.slice(1),
]);

await vi.runAllTimersAsync();

expect(manager.state.status).toBe('disabled');
expect(manager.disablePromise).toBeUndefined();
expect(manager.enablePromise).toBeUndefined();
expect(manager.state.selectedDevice).toBe(device.deviceId);

vi.useRealTimers();
});

it('should disable stream and deselect device if selected device is disconnected', async () => {
vi.useFakeTimers();
emitDeviceIds(mockVideoDevices);

await manager.enable();
const device = mockVideoDevices[0];
await manager.select(device.deviceId);

emitDeviceIds(mockVideoDevices.slice(1));

await vi.runAllTimersAsync();

expect(manager.state.selectedDevice).toBe(undefined);
expect(manager.state.status).toBe('disabled');

vi.useRealTimers();
});

afterEach(() => {
vi.clearAllMocks();
vi.resetModules();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ import { StreamClient } from '../../coordinator/connection/client';
import { CallingState, StreamVideoWriteableStateStore } from '../../store';

import { afterEach, beforeEach, describe, expect, it, Mock, vi } from 'vitest';
import { mockAudioDevices, mockAudioStream, mockCall } from './mocks';
import {
mockAudioDevices,
mockAudioStream,
mockCall,
mockDeviceIds$,
} from './mocks';
import { getAudioStream } from '../devices';
import { TrackType } from '../../gen/video/sfu/models/models';
import { MicrophoneManager } from '../MicrophoneManager';
Expand All @@ -22,6 +27,7 @@ vi.mock('../devices.ts', () => {
return of(mockAudioDevices);
}),
getAudioStream: vi.fn(() => Promise.resolve(mockAudioStream())),
deviceIds$: mockDeviceIds$(),
};
});

Expand Down
Loading

0 comments on commit ae3779f

Please sign in to comment.