Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/state/CallViewModel/localMember/Publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
type LocalTrack,
type LocalTrackPublication,
ParticipantEvent,
PublishTrackError,
RoomEvent,
Track,
} from "livekit-client";
import { BehaviorSubject } from "rxjs";
Expand Down Expand Up @@ -357,4 +359,50 @@ describe("Bug fix", () => {
expect(track!.isMuted).toBe(true);
}
});

// When the connection is created, we call createAndSetupTracks immediately.
// But the livekit room connection is not yet established: `Connection#start` must
// first get the sfu config, and then connect to the livekit room.
// Livekit has support for this case by queuing publications until the room is connected,
// but this can time out if the room connection takes too long (15s)
it("Recovers failed publication due to room connection timeout", async () => {
// setLogLevel(`debug`);
const publisher = new Publisher(
scope,
connection,
mockMediaDevices({}),
muteStates,
constant({ supported: false, processor: undefined }),
logger,
);
audioEnabled$.next(true);
videoEnabled$.next(true);

const enableCameraAndMicrophoneSpy = vi.spyOn(
localParticipant,
"enableCameraAndMicrophone",
);

enableCameraAndMicrophoneSpy
.mockImplementationOnce(() => {
throw new PublishTrackError(
"publishing rejected as engine not connected within timeout",
408,
);
})
.mockImplementationOnce(async () => {
return Promise.resolve();
});

// call createAndSetupTracks which will attempt to publish and fail with the simulated timeout
await publisher.createAndSetupTracks();

// Now simulate the connection finally beeing connected
connection.livekitRoom.emit(RoomEvent.Connected);

await flushPromises();

// Should have called enableCameraAndMicrophone again to retry publication
expect(enableCameraAndMicrophoneSpy).toHaveBeenCalledTimes(2);
});
});
41 changes: 36 additions & 5 deletions src/state/CallViewModel/localMember/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
LocalVideoTrack,
ParticipantEvent,
type Room as LivekitRoom,
RoomEvent as LivekitRoomEvent,
Track,
} from "livekit-client";
import {
Expand Down Expand Up @@ -89,6 +90,23 @@ export class Publisher {
ParticipantEvent.LocalTrackPublished,
this.onLocalTrackPublished.bind(this),
);

this.connection.livekitRoom.once(LivekitRoomEvent.Connected, () => {
Comment on lines 90 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm noticing some leaks: both the LocalTrackPublished callback and the Connected callback should not remain set after the scope ends

// When `createAndSetupTracks` is called before the connection is established,
// there is an internal timeout in livekit that could be triggered if it takes
// too long to connect.
// It is no-op if tracks are already created, so it is safe to call it again.
const audio = this.muteStates.audio.enabled$.value;
const video = this.muteStates.video.enabled$.value;
this.enableTracks(audio, video, this.connection.livekitRoom).catch(
(e) => {
this.logger.error(
"Failed to enable tracks after connection established",
e,
);
},
);
});
}

// LiveKit will publish the tracks as soon as they are created
Expand Down Expand Up @@ -157,16 +175,29 @@ export class Publisher {
// We are using the `ParticipantEvent.LocalTrackPublished` to be notified
// when tracks are actually published, and at that point
// we can pause upstream if needed (depending on if startPublishing has been called).
this.enableTracks(audio, video, lkRoom).catch((e) => {
// If it is PublisherTrackError (408), i.e the publishing timed out,
// because it took too long to connet to the room, we are safe because
// we registered to the RoomEvent.Connected to create the tracks once connected.
this.logger.error("Failed to enable tracks", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to inform the console log about that as well:
because it took too long to connet to the room, we are safe

and (very minor) connet -> connect`

});

return Promise.resolve();
}

private async enableTracks(
audio: boolean,
video: boolean,
lkRoom: LivekitRoom,
): Promise<void> {
if (audio && video) {
// Enable both at once in order to have a single permission prompt!
void lkRoom.localParticipant.enableCameraAndMicrophone();
await lkRoom.localParticipant.enableCameraAndMicrophone();
} else if (audio) {
void lkRoom.localParticipant.setMicrophoneEnabled(true);
await lkRoom.localParticipant.setMicrophoneEnabled(true);
} else if (video) {
void lkRoom.localParticipant.setCameraEnabled(true);
await lkRoom.localParticipant.setCameraEnabled(true);
}

return Promise.resolve();
}

private async pauseUpstreams(
Expand Down
2 changes: 2 additions & 0 deletions src/utils/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ export function withTestScheduler(
interface EmitterMock<T> {
on: (...args: unknown[]) => T;
off: (...args: unknown[]) => T;
once: (...args: unknown[]) => T;
addListener: (...args: unknown[]) => T;
removeListener: (...args: unknown[]) => T;
emit: (event: string | symbol, ...args: unknown[]) => boolean;
Expand All @@ -186,6 +187,7 @@ export function mockEmitter<T>(): EmitterMock<T> {
return {
on: ee.on.bind(ee) as unknown as (...args: unknown[]) => T,
off: ee.off.bind(ee) as unknown as (...args: unknown[]) => T,
once: ee.once.bind(ee) as unknown as (...args: unknown[]) => T,
addListener: ee.addListener.bind(ee) as unknown as (
...args: unknown[]
) => T,
Expand Down
Loading