diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index ee4ab9c75e..72a2bb5a75 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -2078,10 +2078,12 @@ export class Call { mediaStream: MediaStream | undefined, ...trackTypes: TrackType[] ) => { - if (!this.sfuClient || !this.sfuClient.sessionId) return; + const sessionId = this.sfuClient?.sessionId; + if (!sessionId) return; + await this.notifyTrackMuteState(!mediaStream, ...trackTypes); + if (this.sfuClient?.sessionId !== sessionId) return; - const { sessionId } = this.sfuClient; for (const trackType of trackTypes) { const streamStateProp = trackTypeToParticipantStreamKey(trackType); if (!streamStateProp) continue; diff --git a/packages/client/src/__tests__/Call.publishing.test.ts b/packages/client/src/__tests__/Call.publishing.test.ts index b4a4239462..3950383090 100644 --- a/packages/client/src/__tests__/Call.publishing.test.ts +++ b/packages/client/src/__tests__/Call.publishing.test.ts @@ -290,6 +290,109 @@ describe('Publishing and Unpublishing tracks', () => { expect(participant!.screenShareStream).toBeUndefined(); expect(participant!.screenShareAudioStream).toBeUndefined(); }); + + it('does not throw if sfuClient is cleared while the mute-state RPC is in flight', async () => { + let releaseMuteUpdate!: () => void; + let signalMuteUpdateEntered!: () => void; + const muteUpdateEntered = new Promise( + (resolve) => (signalMuteUpdateEntered = resolve), + ); + sfuClient.updateMuteStates = vi.fn().mockImplementation(() => { + signalMuteUpdateEntered(); + return new Promise((resolve) => (releaseMuteUpdate = resolve)); + }); + + const track = new MediaStreamTrack() as MediaStreamAudioTrack; + const mediaStream = new MediaStream(); + vi.spyOn(mediaStream, 'getAudioTracks').mockReturnValue([track]); + + const inflight = call.publish(mediaStream, TrackType.AUDIO); + + await muteUpdateEntered; + + call['sfuClient'] = undefined; + releaseMuteUpdate(); + + await inflight; + }); + + it('updates local stream state when sfuClient is replaced with the same session id', async () => { + let releaseMuteUpdate!: () => void; + let signalMuteUpdateEntered!: () => void; + const muteUpdateEntered = new Promise( + (resolve) => (signalMuteUpdateEntered = resolve), + ); + sfuClient.updateMuteStates = vi.fn().mockImplementation(() => { + signalMuteUpdateEntered(); + return new Promise((resolve) => (releaseMuteUpdate = resolve)); + }); + + const track = new MediaStreamTrack() as MediaStreamAudioTrack; + const mediaStream = new MediaStream(); + vi.spyOn(mediaStream, 'getAudioTracks').mockReturnValue([track]); + + const inflight = call.publish(mediaStream, TrackType.AUDIO); + + await muteUpdateEntered; + + const replacementSfuClient = vi.fn() as unknown as StreamSfuClient; + // @ts-expect-error sessionId is readonly + replacementSfuClient['sessionId'] = sessionId; + replacementSfuClient.updateMuteStates = vi.fn(); + call['sfuClient'] = replacementSfuClient; + releaseMuteUpdate(); + + await inflight; + + const participant = call.state.findParticipantBySessionId(sessionId); + expect(participant?.publishedTracks).toEqual([TrackType.AUDIO]); + expect(participant?.audioStream).toBe(mediaStream); + }); + + it('skips local stream state update when sfuClient is replaced with a new session id', async () => { + let releaseMuteUpdate!: () => void; + let signalMuteUpdateEntered!: () => void; + const muteUpdateEntered = new Promise( + (resolve) => (signalMuteUpdateEntered = resolve), + ); + sfuClient.updateMuteStates = vi.fn().mockImplementation(() => { + signalMuteUpdateEntered(); + return new Promise((resolve) => (releaseMuteUpdate = resolve)); + }); + + const track = new MediaStreamTrack() as MediaStreamAudioTrack; + const mediaStream = new MediaStream(); + vi.spyOn(mediaStream, 'getAudioTracks').mockReturnValue([track]); + + const inflight = call.publish(mediaStream, TrackType.AUDIO); + + await muteUpdateEntered; + + const replacementSessionId = 'replacement-session-id'; + // @ts-expect-error partial data + call.state.updateOrAddParticipant(replacementSessionId, { + sessionId: replacementSessionId, + publishedTracks: [], + }); + + const replacementSfuClient = vi.fn() as unknown as StreamSfuClient; + // @ts-expect-error sessionId is readonly + replacementSfuClient['sessionId'] = replacementSessionId; + replacementSfuClient.updateMuteStates = vi.fn(); + call['sfuClient'] = replacementSfuClient; + releaseMuteUpdate(); + + await inflight; + + const originalParticipant = + call.state.findParticipantBySessionId(sessionId); + const replacementParticipant = + call.state.findParticipantBySessionId(replacementSessionId); + expect(originalParticipant?.publishedTracks).toEqual([]); + expect(originalParticipant?.audioStream).toBeUndefined(); + expect(replacementParticipant?.publishedTracks).toEqual([]); + expect(replacementParticipant?.audioStream).toBeUndefined(); + }); }); describe('Deprecated methods', () => { diff --git a/packages/client/src/rpc/retryable.ts b/packages/client/src/rpc/retryable.ts index e0281c000a..dce6a5e1f6 100644 --- a/packages/client/src/rpc/retryable.ts +++ b/packages/client/src/rpc/retryable.ts @@ -44,7 +44,6 @@ export const retryable = async < let result: FinishedUnaryCall | undefined = undefined; do { if (attempt > 0) await sleep(retryInterval(attempt)); - if (signal?.aborted) throw new Error(signal.reason); try { result = await rpc({ attempt });