From 8d574dccb8ccb6dd4c272e7db3237ca0d7fd065d Mon Sep 17 00:00:00 2001 From: Kyle Montemayor Date: Thu, 7 May 2026 21:01:26 +0000 Subject: [PATCH] Guard data preprocessor against empty preprocessing specs Clamp ThreadPoolExecutor max_workers to >=1 in the enumerator and short-circuit __preprocess_all_data_references when both node and edge spec dicts are empty, so an empty DataPreprocessorConfig no longer crashes with `max_workers must be greater than 0`. Co-Authored-By: Claude Opus 4.7 (1M context) --- gigl/src/data_preprocessor/data_preprocessor.py | 7 +++++++ gigl/src/data_preprocessor/lib/enumerate/utils.py | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/gigl/src/data_preprocessor/data_preprocessor.py b/gigl/src/data_preprocessor/data_preprocessor.py index 95d063aaf..216b06b21 100644 --- a/gigl/src/data_preprocessor/data_preprocessor.py +++ b/gigl/src/data_preprocessor/data_preprocessor.py @@ -352,6 +352,13 @@ def __build_data_reference_str(references: Iterable[DataReference]) -> str: edge_ref_to_preprocessing_spec ) + if num_dataflow_jobs == 0: + logger.info("No data references to preprocess; skipping Dataflow.") + return PreprocessedMetadataReferences( + node_data=node_refs_and_results, + edge_data=edge_refs_and_results, + ) + with concurrent.futures.ThreadPoolExecutor( max_workers=num_dataflow_jobs ) as executor: diff --git a/gigl/src/data_preprocessor/lib/enumerate/utils.py b/gigl/src/data_preprocessor/lib/enumerate/utils.py index b606d7edb..e26c14db0 100644 --- a/gigl/src/data_preprocessor/lib/enumerate/utils.py +++ b/gigl/src/data_preprocessor/lib/enumerate/utils.py @@ -251,7 +251,7 @@ def __enumerate_all_node_references( f"Launch {len(node_data_references)} node enumeration jobs in parallel." ) with concurrent.futures.ThreadPoolExecutor( - max_workers=len(node_data_references) + max_workers=max(1, len(node_data_references)) ) as executor: futures: list[concurrent.futures.Future] = list() for node_data_ref in node_data_references: @@ -278,7 +278,7 @@ def __enumerate_all_edge_references( f"Launch {len(edge_data_references)} edge enumeration jobs in parallel." ) with concurrent.futures.ThreadPoolExecutor( - max_workers=len(edge_data_references) + max_workers=max(1, len(edge_data_references)) ) as executor: futures: list[concurrent.futures.Future] = list() for edge_data_ref in edge_data_references: