diff --git a/console/src/api/materialize/SubscribeManager.ts b/console/src/api/materialize/SubscribeManager.ts index b47ac2bd0cbbc..83544ee5b143a 100644 --- a/console/src/api/materialize/SubscribeManager.ts +++ b/console/src/api/materialize/SubscribeManager.ts @@ -141,6 +141,10 @@ export class SubscribeManager implements Connectable { ); }; + setRequest = (request: SqlRequest) => { + this.sqlRequest = request; + }; + disconnect = () => { clearInterval(this.flushIntervalHandle); this.socket.disconnect(); diff --git a/console/src/api/materialize/WebsocketConnectionManager.test.ts b/console/src/api/materialize/WebsocketConnectionManager.test.ts index 1076136cc49eb..f71bd63f37659 100644 --- a/console/src/api/materialize/WebsocketConnectionManager.test.ts +++ b/console/src/api/materialize/WebsocketConnectionManager.test.ts @@ -205,6 +205,33 @@ describe("WebsocketConnectionManager", () => { vi.advanceTimersByTime(5000); expect(mockTarget.reconnect).not.toHaveBeenCalled(); }); + + it("does not start a second connect while a handshake is in progress", () => { + const store = getStore(); + store.set( + environmentsWithHealth, + new Map([ + ["aws/us-east-1", createHealthyEnvironment("localhost:6875")], + ]) as unknown as EnvironmentsWithHealth, + ); + + // Constructor starts a connect; handshake is still in progress. + manager = new WebsocketConnectionManager( + mockTarget, + store, + reconnectionStateAtom, + ); + expect(mockTarget.reconnect).toHaveBeenCalledTimes(1); + + // Overlapping reconnect is dropped while the handshake is in progress. + manager.reconnect(); + expect(mockTarget.reconnect).toHaveBeenCalledTimes(1); + + // Once the handshake completes, the next reconnect proceeds. + mockTarget.simulateOpen(); + manager.reconnect(); + expect(mockTarget.reconnect).toHaveBeenCalledTimes(2); + }); }); describe("destroy", () => { diff --git a/console/src/api/materialize/WebsocketConnectionManager.ts b/console/src/api/materialize/WebsocketConnectionManager.ts index e416c98932321..e6884a62a46cd 100644 --- a/console/src/api/materialize/WebsocketConnectionManager.ts +++ b/console/src/api/materialize/WebsocketConnectionManager.ts @@ -81,6 +81,8 @@ export class WebsocketConnectionManager { private hasEverConnected = false; private retryTimer: ReturnType | undefined; private initialized = false; + /** Set while a connect is mid-handshake. Reentry would strand the CONNECTING socket on Safari. */ + private connectInFlight = false; private unsubscribeFromClose: (() => void) | undefined; private unsubscribeFromOpen: (() => void) | undefined; @@ -148,17 +150,15 @@ export class WebsocketConnectionManager { private handleEnvironmentChange = () => { const currentEnvironment = this.getCurrentEnvironment(); const nowHealthy = this.isEnvironmentHealthy(currentEnvironment); - const prevHealthy = this.isHealthy; this.isHealthy = nowHealthy; - // Update http address if environment is enabled if (currentEnvironment?.state === "enabled") { this.currentHttpAddress = currentEnvironment.httpAddress; } if (nowHealthy) { - if (!prevHealthy || !this.target.isConnected()) { + if (!this.target.isConnected()) { this.resumeConnection(); } } else { @@ -192,6 +192,7 @@ export class WebsocketConnectionManager { // --- Target event handlers --- private handleTargetClose = () => { + this.connectInFlight = false; if (this.isHealthy) { this.scheduleRetry(); } @@ -199,12 +200,18 @@ export class WebsocketConnectionManager { }; private handleTargetOpen = () => { + this.connectInFlight = false; this.hasEverConnected = true; this.retryAttempt = 0; this.clearRetryTimer(); this.notifyStateChange(); }; + /** Tear down and reopen the socket. Used on SQL request changes. */ + reconnect() { + this.attemptConnection(); + } + // --- Retry scheduling --- private scheduleRetry() { @@ -225,18 +232,18 @@ export class WebsocketConnectionManager { } private attemptConnection() { - if (this.target.isConnected()) return; - if (this.currentHttpAddress) { - const sessionVariables = this.options.getSessionVariables?.({ - hasEverConnected: this.hasEverConnected, - }); - try { - this.target.reconnect(this.currentHttpAddress, sessionVariables); - } catch { - // If the WebSocket constructor throws (e.g. network blocked), - // schedule another retry - this.scheduleRetry(); - } + if (this.connectInFlight) return; + if (!this.currentHttpAddress) return; + + const sessionVariables = this.options.getSessionVariables?.({ + hasEverConnected: this.hasEverConnected, + }); + this.connectInFlight = true; + try { + this.target.reconnect(this.currentHttpAddress, sessionVariables); + } catch { + this.connectInFlight = false; + this.scheduleRetry(); } } diff --git a/console/src/api/materialize/useSubscribe.ts b/console/src/api/materialize/useSubscribe.ts index 8fc625cef59e5..706a3a15b67e0 100644 --- a/console/src/api/materialize/useSubscribe.ts +++ b/console/src/api/materialize/useSubscribe.ts @@ -141,7 +141,13 @@ export function useGlobalUpsertSubscribe>( React.useEffect(() => { const cleanup = subscribe.onChange(() => { const snapshot = subscribe.getSnapshot(); - if (getStore().get(options.atom) === snapshot) return; + const current = getStore().get(options.atom); + if (current === snapshot) return; + + // Hold cached atom data through a fresh manager's empty pre-snapshot state. + const snapshotIsEmptyPreload = + !snapshot.snapshotComplete && !snapshot.data.length && !snapshot.error; + if (snapshotIsEmptyPreload && current.data.length) return; setValue(snapshot); }); diff --git a/console/src/hooks/useAutomaticallyConnectSocket.ts b/console/src/hooks/useAutomaticallyConnectSocket.ts index b1626797911ae..4e125b9b718ee 100644 --- a/console/src/hooks/useAutomaticallyConnectSocket.ts +++ b/console/src/hooks/useAutomaticallyConnectSocket.ts @@ -19,7 +19,6 @@ import { ReconnectionState, WebsocketConnectionManager, } from "~/api/materialize/WebsocketConnectionManager"; -import { currentEnvironmentState } from "~/store/environments"; // Atom for reconnection state - can be shared across components if needed export const reconnectionStateAtom = atom({ @@ -74,27 +73,16 @@ export const useAutomaticallyConnectSocket = ({ }; }, [target, store, getSessionVariablesRef]); - // Handle request changes for subscribe queries - const currentEnvironment = useAtomValue(currentEnvironmentState); const previousRequest = usePrevious(request); React.useEffect(() => { if (!subscribe || !request) return; if (previousRequest === request) return; - if (currentEnvironment?.state !== "enabled") return; - - subscribe.connect( - request, - currentEnvironment.httpAddress, - getSessionVariablesRef.current?.({ hasEverConnected: false }), - ); - }, [ - subscribe, - request, - previousRequest, - currentEnvironment, - getSessionVariablesRef, - ]); + subscribe.setRequest(request); + // The manager owns the initial connect; only force a reconnect on changes. + if (previousRequest === undefined) return; + managerRef.current?.reconnect(); + }, [subscribe, request, previousRequest]); return { reconnectionState }; };