Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,20 @@ internal class WebSocketCoreImpl(
return connectDeferred.await()
}

/**
* Closes the WebSocket connection with the given code and reason.
*
* New connection attempts can be made after this call.
*/
private suspend fun close(code: Int, reason: String?) = connectedMutex.withLock {
// Cancel this so new connection attempts will create a new deferred and not await this one
pendingConnectDeferred?.cancel()
pendingConnectDeferred = null

connectionHolder.get()?.webSocket?.close(code, reason)
connectionHolder.set(null)
Comment thread
kmod-midori marked this conversation as resolved.
Outdated
}

/**
* Handles a new connection attempt when not yet connected.
* This is called when the URL flow emits before [connectDeferred] is completed.
Expand Down Expand Up @@ -627,7 +641,9 @@ internal class WebSocketCoreImpl(

override fun shutdown() {
Timber.i("Shutting down websocket")
connectionHolder.get()?.webSocket?.close(1001, "Session removed from app.")
wsScope.launch {
close(1001, "Session removed from app.")
}
}

// ----- WebSocketListener section
Expand Down Expand Up @@ -763,7 +779,7 @@ internal class WebSocketCoreImpl(
}
if (activeMessages.isEmpty()) {
Timber.i("No more subscriptions, closing connection.")
connectionHolder.get()?.webSocket?.close(1001, "Done listening to subscriptions.")
close(1001, "Done listening to subscriptions.")
} else {
Timber.i("Still ${activeMessages.size} messages in the queue, not closing connection.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
Expand Down Expand Up @@ -870,6 +871,38 @@ shutdown()
verify { mockConnection.close(1001, "Session removed from app.") }
}

@Test
fun reconnectsAfterShutdown() = runTest {
Comment thread
kmod-midori marked this conversation as resolved.
Outdated
setupServer(backgroundScope = backgroundScope)
every {
mockConnection.close(1001, "Session removed from app.")
} answers {
backgroundScope.launch {
// Simulate queue delay before onClosed is called after close
delay(100)
webSocketListener.onClosed(mockConnection, 1001, "Session removed from app.")
}
true
}

prepareAuthenticationAnswer()
assertTrue(webSocketCore.connect())
// Should connect for the first time
verify(exactly = 1) { mockOkHttpClient.newWebSocket(any(), webSocketListener) }

advanceTimeBy(100)

webSocketCore.shutdown()
advanceUntilIdle()
verify(exactly = 1) { mockConnection.close(1001, "Session removed from app.") }

advanceTimeBy(50)

assertTrue(webSocketCore.connect())
advanceUntilIdle()
verify(exactly = 2) { mockOkHttpClient.newWebSocket(any(), webSocketListener) }
}

/*
misc
*/
Expand Down
Loading