diff --git a/gigl/src/data_preprocessor/data_preprocessor.py b/gigl/src/data_preprocessor/data_preprocessor.py index 95d063aaf..216b06b21 100644 --- a/gigl/src/data_preprocessor/data_preprocessor.py +++ b/gigl/src/data_preprocessor/data_preprocessor.py @@ -352,6 +352,13 @@ def __build_data_reference_str(references: Iterable[DataReference]) -> str: edge_ref_to_preprocessing_spec ) + if num_dataflow_jobs == 0: + logger.info("No data references to preprocess; skipping Dataflow.") + return PreprocessedMetadataReferences( + node_data=node_refs_and_results, + edge_data=edge_refs_and_results, + ) + with concurrent.futures.ThreadPoolExecutor( max_workers=num_dataflow_jobs ) as executor: diff --git a/gigl/src/data_preprocessor/lib/enumerate/utils.py b/gigl/src/data_preprocessor/lib/enumerate/utils.py index b606d7edb..e26c14db0 100644 --- a/gigl/src/data_preprocessor/lib/enumerate/utils.py +++ b/gigl/src/data_preprocessor/lib/enumerate/utils.py @@ -251,7 +251,7 @@ def __enumerate_all_node_references( f"Launch {len(node_data_references)} node enumeration jobs in parallel." ) with concurrent.futures.ThreadPoolExecutor( - max_workers=len(node_data_references) + max_workers=max(1, len(node_data_references)) ) as executor: futures: list[concurrent.futures.Future] = list() for node_data_ref in node_data_references: @@ -278,7 +278,7 @@ def __enumerate_all_edge_references( f"Launch {len(edge_data_references)} edge enumeration jobs in parallel." ) with concurrent.futures.ThreadPoolExecutor( - max_workers=len(edge_data_references) + max_workers=max(1, len(edge_data_references)) ) as executor: futures: list[concurrent.futures.Future] = list() for edge_data_ref in edge_data_references: