diff --git a/src/adapter/src/coord/statement_logging.rs b/src/adapter/src/coord/statement_logging.rs index 4f8a9d165a324..33eeb8f50acca 100644 --- a/src/adapter/src/coord/statement_logging.rs +++ b/src/adapter/src/coord/statement_logging.rs @@ -526,6 +526,11 @@ impl Coordinator { /// Set the `cluster_id` for a statement, once it's known. /// + /// Swallows any errors that occur during cluster resolution. This is fine, because if the + /// cluster went away during sequencing, then the sequencing will error out anyway. (We don't + /// want to make this synchronous just to report the error back, because we want these + /// statement logging messages to be pipelined.) + /// /// TODO(peek-seq): We could do cluster resolution and packing in the frontend task, and just /// send over the rows. pub(crate) fn set_statement_execution_cluster( @@ -533,11 +538,13 @@ impl Coordinator { id: StatementLoggingId, cluster_id: ClusterId, ) { - let cluster_name = self.catalog().get_cluster(cluster_id).name.clone(); - self.mutate_record(id, |record| { - record.cluster_name = Some(cluster_name); - record.cluster_id = Some(cluster_id); - }); + if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) { + let cluster_name = cluster.name.clone(); + self.mutate_record(id, |record| { + record.cluster_name = Some(cluster_name); + record.cluster_id = Some(cluster_id); + }); + } } /// Set the `execution_timestamp` for a statement, once it's known diff --git a/test/cluster/mzcompose.py b/test/cluster/mzcompose.py index 4348bb0b9925f..94ee5bc403f2d 100644 --- a/test/cluster/mzcompose.py +++ b/test/cluster/mzcompose.py @@ -4109,6 +4109,63 @@ def subscribe(): assert not thread.is_alive(), f"Thread {thread.name} is still running" +def workflow_test_drop_cluster_during_peeks(c: Composition) -> None: + """Race peeks against DROP/CREATE of their target cluster; environmentd + must not panic in `set_statement_execution_cluster`.""" + + with c.override(Materialized()): + c.up("materialized") + + c.sql( + """ + ALTER SYSTEM SET statement_logging_max_sample_rate = 1.0; + ALTER SYSTEM SET statement_logging_default_sample_rate = 1.0; + """, + port=6877, + user="mz_system", + ) + c.sql("CREATE CLUSTER victim SIZE 'scale=1,workers=1'") + + stop = [False] + + def peek_loop() -> None: + while not stop[0]: + try: + with c.sql_cursor() as cur: + cur.execute("SET auto_route_catalog_queries = false") + cur.execute("SET cluster = victim") + while not stop[0]: + cur.execute("SELECT 1") + cur.fetchone() + except DatabaseError: + pass + + def churn_loop() -> None: + while not stop[0]: + try: + with c.sql_cursor() as cur: + cur.execute("DROP CLUSTER IF EXISTS victim CASCADE") + cur.execute("CREATE CLUSTER victim SIZE 'scale=1,workers=1'") + except DatabaseError: + pass + + threads = [ + PropagatingThread(target=peek_loop, name=f"peek-{i}") for i in range(8) + ] + [PropagatingThread(target=churn_loop, name="churn")] + for t in threads: + t.start() + try: + time.sleep(30) + finally: + stop[0] = True + for t in threads: + t.join(timeout=10) + + with c.sql_cursor() as cur: + cur.execute("SELECT 1") + assert cur.fetchone() == (1,) + + def workflow_test_refresh_mv_warmup( c: Composition, parser: WorkflowArgumentParser ) -> None: