diff --git a/common/src/main/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImpl.kt b/common/src/main/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImpl.kt index 5e202d068e4..53f40214222 100644 --- a/common/src/main/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImpl.kt +++ b/common/src/main/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImpl.kt @@ -363,6 +363,21 @@ internal class WebSocketCoreImpl( return connectDeferred.await() } + /** + * Intentionally closes the WebSocket connection with the given code and reason. + * + * New connection attempts can be made after this call. + */ + private fun close(code: Int, reason: String?) = wsScope.launch { + connectedMutex.withLock { + val holder = connectionHolder.getAndSet(null) + holder?.webSocket?.close(code, reason) + holder?.urlObserverJob?.cancel() + + cleanupClosingSocket() + } + } + /** * Handles a new connection attempt when not yet connected. * This is called when the URL flow emits before [connectDeferred] is completed. @@ -627,7 +642,7 @@ internal class WebSocketCoreImpl( override fun shutdown() { Timber.i("Shutting down websocket") - connectionHolder.get()?.webSocket?.close(1001, "Session removed from app.") + close(1001, "Session removed from app.") } // ----- WebSocketListener section @@ -763,7 +778,7 @@ internal class WebSocketCoreImpl( } if (activeMessages.isEmpty()) { Timber.i("No more subscriptions, closing connection.") - connectionHolder.get()?.webSocket?.close(1001, "Done listening to subscriptions.") + close(1001, "Done listening to subscriptions.") } else { Timber.i("Still ${activeMessages.size} messages in the queue, not closing connection.") } @@ -960,6 +975,30 @@ internal class WebSocketCoreImpl( ) } + /** + * Clean up deferred and messages when the socket is being closed. + */ + @GuardedBy("connectedMutex") + private fun cleanupClosingSocket() { + // Complete connected and pending connect deferred + pendingConnectDeferred?.complete(false) + pendingConnectDeferred = null + authCompleted.completeExceptionally(IOException("Connection closed")) + authCompleted = CompletableDeferred() + + // Complete all pending simple messages with error + activeMessages + .filterValues { it is ActiveMessage.Simple } + .forEach { (key, activeMessage) -> + val completed = + activeMessage.responseDeferred.completeExceptionally(IOException("Connection closed")) + if (!completed) { + Timber.w("Response deferred was already completed, skipping IOException") + } + activeMessages.remove(key) + } + } + private fun handleClosingSocket() { val previousState = connectionState val closeReason = pendingCloseReason ?: WebSocketState.Closed.Reason.OTHER @@ -990,23 +1029,7 @@ internal class WebSocketCoreImpl( // Cancel URL observer - connect() will recreate it if needed holder?.urlObserverJob?.cancel() - // Complete the connected deferred if still active - if (authCompleted.isActive) { - authCompleted.completeExceptionally(IOException("Connection closed")) - } - authCompleted = CompletableDeferred() - - // Complete all pending simple messages with error - activeMessages - .filterValues { it is ActiveMessage.Simple } - .forEach { (key, activeMessage) -> - val completed = - activeMessage.responseDeferred.completeExceptionally(IOException("Connection closed")) - if (!completed) { - Timber.w("Response deferred was already completed, skipping IOException") - } - activeMessages.remove(key) - } + cleanupClosingSocket() hasSubscriptions = activeMessages.any { it.value is ActiveMessage.Subscription } } diff --git a/common/src/test/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImplTest.kt b/common/src/test/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImplTest.kt index 47045569a6d..902e30e1f2f 100644 --- a/common/src/test/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImplTest.kt +++ b/common/src/test/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImplTest.kt @@ -39,6 +39,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.advanceTimeBy @@ -867,9 +868,45 @@ shutdown() webSocketCore.shutdown() + // Wait for close() to be called + advanceUntilIdle() + verify { mockConnection.close(1001, "Session removed from app.") } } + @Test + fun `Given a shutdown connection When connect is invoked before onClosed Then it reconnects`() = runTest { + setupServer(backgroundScope = backgroundScope) + every { + mockConnection.close(1001, "Session removed from app.") + } answers { + backgroundScope.launch { + // Simulate queue delay before onClosed is called after close + delay(100) + webSocketListener.onClosed(mockConnection, 1001, "Session removed from app.") + } + true + } + + prepareAuthenticationAnswer() + assertTrue(webSocketCore.connect()) + // Should connect for the first time + verify(exactly = 1) { mockOkHttpClient.newWebSocket(any(), webSocketListener) } + + advanceTimeBy(100) + + webSocketCore.shutdown() + runCurrent() + verify(exactly = 1) { mockConnection.close(1001, "Session removed from app.") } + + advanceTimeBy(50) + + assertTrue(webSocketCore.connect()) + advanceUntilIdle() + // Should connect again + verify(exactly = 2) { mockOkHttpClient.newWebSocket(any(), webSocketListener) } + } + /* misc */