Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,21 @@ internal class WebSocketCoreImpl(
return connectDeferred.await()
}

/**
* Intentionally closes the WebSocket connection with the given code and reason.
*
* New connection attempts can be made after this call.
*/
private fun close(code: Int, reason: String?) = wsScope.launch {
connectedMutex.withLock {
val holder = connectionHolder.getAndSet(null)
holder?.webSocket?.close(code, reason)
holder?.urlObserverJob?.cancel()

cleanupClosingSocket()
}
}

/**
* Handles a new connection attempt when not yet connected.
* This is called when the URL flow emits before [connectDeferred] is completed.
Expand Down Expand Up @@ -627,7 +642,7 @@ internal class WebSocketCoreImpl(

override fun shutdown() {
Timber.i("Shutting down websocket")
connectionHolder.get()?.webSocket?.close(1001, "Session removed from app.")
close(1001, "Session removed from app.")
}

// ----- WebSocketListener section
Expand Down Expand Up @@ -763,7 +778,7 @@ internal class WebSocketCoreImpl(
}
if (activeMessages.isEmpty()) {
Timber.i("No more subscriptions, closing connection.")
connectionHolder.get()?.webSocket?.close(1001, "Done listening to subscriptions.")
close(1001, "Done listening to subscriptions.")
} else {
Timber.i("Still ${activeMessages.size} messages in the queue, not closing connection.")
}
Expand Down Expand Up @@ -960,6 +975,30 @@ internal class WebSocketCoreImpl(
)
}

/**
* Clean up deferred and messages when the socket is being closed.
*/
@GuardedBy("connectedMutex")
private fun cleanupClosingSocket() {
// Complete connected and pending connect deferred
pendingConnectDeferred?.complete(false)
pendingConnectDeferred = null
authCompleted.completeExceptionally(IOException("Connection closed"))
authCompleted = CompletableDeferred()
Comment thread
TimoPtr marked this conversation as resolved.

// Complete all pending simple messages with error
activeMessages
.filterValues { it is ActiveMessage.Simple }
.forEach { (key, activeMessage) ->
val completed =
activeMessage.responseDeferred.completeExceptionally(IOException("Connection closed"))
if (!completed) {
Timber.w("Response deferred was already completed, skipping IOException")
}
activeMessages.remove(key)
}
}

private fun handleClosingSocket() {
val previousState = connectionState
val closeReason = pendingCloseReason ?: WebSocketState.Closed.Reason.OTHER
Expand Down Expand Up @@ -990,23 +1029,7 @@ internal class WebSocketCoreImpl(
// Cancel URL observer - connect() will recreate it if needed
holder?.urlObserverJob?.cancel()

// Complete the connected deferred if still active
if (authCompleted.isActive) {
authCompleted.completeExceptionally(IOException("Connection closed"))
}
authCompleted = CompletableDeferred()

// Complete all pending simple messages with error
activeMessages
.filterValues { it is ActiveMessage.Simple }
.forEach { (key, activeMessage) ->
val completed =
activeMessage.responseDeferred.completeExceptionally(IOException("Connection closed"))
if (!completed) {
Timber.w("Response deferred was already completed, skipping IOException")
}
activeMessages.remove(key)
}
cleanupClosingSocket()
hasSubscriptions = activeMessages.any { it.value is ActiveMessage.Subscription }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
Expand Down Expand Up @@ -867,9 +868,45 @@ shutdown()

webSocketCore.shutdown()

// Wait for close() to be called
advanceUntilIdle()

verify { mockConnection.close(1001, "Session removed from app.") }
}

@Test
fun `Given a shutdown connection When connect is invoked before onClosed Then it reconnects`() = runTest {
setupServer(backgroundScope = backgroundScope)
every {
mockConnection.close(1001, "Session removed from app.")
} answers {
backgroundScope.launch {
// Simulate queue delay before onClosed is called after close
delay(100)
webSocketListener.onClosed(mockConnection, 1001, "Session removed from app.")
}
true
}

prepareAuthenticationAnswer()
assertTrue(webSocketCore.connect())
// Should connect for the first time
verify(exactly = 1) { mockOkHttpClient.newWebSocket(any(), webSocketListener) }

advanceTimeBy(100)

webSocketCore.shutdown()
runCurrent()
verify(exactly = 1) { mockConnection.close(1001, "Session removed from app.") }

advanceTimeBy(50)

assertTrue(webSocketCore.connect())
advanceUntilIdle()
// Should connect again
verify(exactly = 2) { mockOkHttpClient.newWebSocket(any(), webSocketListener) }
}

/*
misc
*/
Expand Down
Loading