From f792dd0aabbf087a7266b6bddc41bf3d5adb6a01 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 8 Apr 2026 18:25:31 +0000 Subject: [PATCH 01/13] change runinference yaml name to vertexai --- .../yaml/tests/{runinference.yaml => runinference_vertexai.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sdks/python/apache_beam/yaml/tests/{runinference.yaml => runinference_vertexai.yaml} (100%) diff --git a/sdks/python/apache_beam/yaml/tests/runinference.yaml b/sdks/python/apache_beam/yaml/tests/runinference_vertexai.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/tests/runinference.yaml rename to sdks/python/apache_beam/yaml/tests/runinference_vertexai.yaml From 129e3f95cf23095ea723c26835f24ec04da8cce3 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 8 Apr 2026 18:26:05 +0000 Subject: [PATCH 02/13] add huggingface support --- sdks/python/apache_beam/yaml/yaml_ml.py | 46 +++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index 51f18c733046..88ac2b52d3bc 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -282,6 +282,52 @@ def inference_output_type(self): ('model_id', Optional[str])]) +@ModelHandlerProvider.register_handler_type('HuggingFacePipeline') +class HuggingFacePipelineProvider(ModelHandlerProvider): + def __init__( + self, + task: str = "", + model: str = "", + preprocess: Optional[dict[str, str]] = None, + postprocess: Optional[dict[str, str]] = None, + device: Optional[str] = None, + inference_fn: Optional[dict[str, str]] = None, + load_pipeline_args: Optional[dict[str, Any]] = None, + **kwargs): + try: + from apache_beam.ml.inference.huggingface_inference import HuggingFacePipelineModelHandler + except ImportError: + raise ValueError( + 'Unable to import HuggingFacePipelineModelHandler. Please ' + 'install transformers dependencies.') + + kwargs = {k: v for k, v in kwargs.items() if not k.startswith('_')} + + inference_fn_obj = self.parse_processing_transform( + inference_fn, 'inference_fn') if inference_fn else None + + handler_kwargs = {} + if inference_fn_obj: + handler_kwargs['inference_fn'] = inference_fn_obj + + _handler = HuggingFacePipelineModelHandler( + task=task, + model=model, + device=device, + load_pipeline_args=load_pipeline_args, + **handler_kwargs, + **kwargs) + + super().__init__(_handler, preprocess, postprocess) + + @staticmethod + def validate(model_handler_spec): + pass + + def inference_output_type(self): + return Any + + @beam.ptransform.ptransform_fn def run_inference( pcoll, From 6c62366349e94537a8473a94c39f97bca3cd4f4c Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 8 Apr 2026 18:27:20 +0000 Subject: [PATCH 03/13] add huggingface test --- .../yaml/tests/runinference_huggingface.yaml | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml diff --git a/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml b/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml new file mode 100644 index 000000000000..7c429e6067a1 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +fixtures: + - name: mock_pipeline + type: unittest.mock.patch + config: + target: transformers.pipeline + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - text: "Hello world" + - text: "Bye world" + - type: RunInference + config: + model_handler: + type: "HuggingFacePipeline" + config: + task: "text-classification" + model: "unused" + inference_fn: + callable: | + def mock_inference(batch, pipeline, inference_args): + return [[dict(label='POSITIVE', score=0.9)] for _ in batch] + preprocess: + callable: 'lambda x: x.text' + - type: MapToFields + config: + language: python + fields: + text: text + inference: + callable: | + def get_json(x): + import json + return json.dumps(x.inference.inference, indent=0).strip() + - type: AssertEqual + config: + elements: + - text: "Hello world" + inference: "[\n{\n\"label\": \"POSITIVE\",\n\"score\": 0.9\n}\n]" + - text: "Bye world" + inference: "[\n{\n\"label\": \"POSITIVE\",\n\"score\": 0.9\n}\n]" + options: + yaml_experimental_features: ['ML'] From e0218462ec5de6eb6ccd303c61756c008f1fa03c Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 15 Apr 2026 11:59:44 +0000 Subject: [PATCH 04/13] address gemini --- sdks/python/apache_beam/yaml/yaml_ml.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index 88ac2b52d3bc..8110b8da5259 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -286,11 +286,11 @@ def inference_output_type(self): class HuggingFacePipelineProvider(ModelHandlerProvider): def __init__( self, - task: str = "", - model: str = "", + task: Optional[str] = None, + model: Optional[str] = None, preprocess: Optional[dict[str, str]] = None, postprocess: Optional[dict[str, str]] = None, - device: Optional[str] = None, + device: Optional[Any] = None, inference_fn: Optional[dict[str, str]] = None, load_pipeline_args: Optional[dict[str, Any]] = None, **kwargs): @@ -322,7 +322,11 @@ def __init__( @staticmethod def validate(model_handler_spec): - pass + config = model_handler_spec.get('config', {}) + if not config.get('task') and not config.get('model'): + raise ValueError( + "HuggingFacePipeline requires either 'task' or " + "'model' to be specified.") def inference_output_type(self): return Any From c9d1c2acea9a0e960abf71894e0b24797c46207d Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 15 Apr 2026 14:32:20 +0000 Subject: [PATCH 05/13] fix lint issues --- sdks/python/apache_beam/yaml/yaml_ml.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index 8110b8da5259..f213db425ade 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -311,8 +311,8 @@ def __init__( handler_kwargs['inference_fn'] = inference_fn_obj _handler = HuggingFacePipelineModelHandler( - task=task, - model=model, + task=task or "", + model=model or "", device=device, load_pipeline_args=load_pipeline_args, **handler_kwargs, From 52dccba5d4c2dad9caf1237d29c5b626f7625659 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 16 Apr 2026 15:29:50 +0000 Subject: [PATCH 06/13] try adding transformers to testenv --- sdks/python/tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index ffc39a086efe..e3eb1f808207 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -33,7 +33,7 @@ pip_pre = True # allow apps that support color to use it. passenv=TERM,CLOUDSDK_CONFIG,DOCKER_*,TESTCONTAINERS_*,TC_*,ALLOYDB_PASSWORD # Set [] options for pip installation of apache-beam tarball. -extras = test,dataframe,hadoop,redis,tfrecord,yaml +extras = test,dataframe,hadoop,redis,tfrecord,yaml,transformers # Don't warn that these commands aren't installed. allowlist_externals = false From 1132860d4bfaad752434ed8ea7fef71f805ce687 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 16 Apr 2026 18:46:50 +0000 Subject: [PATCH 07/13] revert tox change and apply transformers to only the workflow needed --- .github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml | 2 +- sdks/python/tox.ini | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml index 7d17fd2140c9..4ad25d72e991 100644 --- a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml +++ b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml @@ -91,7 +91,7 @@ jobs: - name: run PreCommit Yaml Xlang Direct script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:yamlIntegrationTests -PbeamPythonExtra=ml_test,yaml + gradle-command: :sdks:python:yamlIntegrationTests -PbeamPythonExtra=ml_test,yaml,transformers - name: Archive Python Test Results uses: actions/upload-artifact@v7 if: failure() diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index e3eb1f808207..ffc39a086efe 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -33,7 +33,7 @@ pip_pre = True # allow apps that support color to use it. passenv=TERM,CLOUDSDK_CONFIG,DOCKER_*,TESTCONTAINERS_*,TC_*,ALLOYDB_PASSWORD # Set [] options for pip installation of apache-beam tarball. -extras = test,dataframe,hadoop,redis,tfrecord,yaml,transformers +extras = test,dataframe,hadoop,redis,tfrecord,yaml # Don't warn that these commands aren't installed. allowlist_externals = false From c4dad0a0f59c1d98f2ced1d5bf82b4e8407acdd5 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 4 May 2026 17:12:49 +0000 Subject: [PATCH 08/13] minor import and tweaks to get to work --- .../python/apache_beam/ml/inference/huggingface_inference.py | 5 ++++- sdks/python/apache_beam/yaml/yaml_ml.py | 3 +-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index a9893ea9290c..6ba69db2c8c8 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -32,7 +32,10 @@ import torch from transformers import AutoModel from transformers import Pipeline -from transformers import TFAutoModel +try: + from transformers import TFAutoModel +except ImportError: + TFAutoModel = Any from transformers import pipeline from apache_beam.ml.inference import utils diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index f213db425ade..5cbf17b222cd 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -321,8 +321,7 @@ def __init__( super().__init__(_handler, preprocess, postprocess) @staticmethod - def validate(model_handler_spec): - config = model_handler_spec.get('config', {}) + def validate(config): if not config.get('task') and not config.get('model'): raise ValueError( "HuggingFacePipeline requires either 'task' or " From d311bb826af2130735ccb842386885d4395c41ab Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 4 May 2026 19:50:16 +0000 Subject: [PATCH 09/13] fix lint --- sdks/python/apache_beam/ml/inference/huggingface_inference.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index 6ba69db2c8c8..bc956b2035ac 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -32,6 +32,7 @@ import torch from transformers import AutoModel from transformers import Pipeline + try: from transformers import TFAutoModel except ImportError: From c93432fe0715803cc6450918c6d3589edce8b21f Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 5 May 2026 02:35:48 +0000 Subject: [PATCH 10/13] updated test logic and model logic --- .../yaml/tests/runinference_huggingface.yaml | 40 +++++++++---------- sdks/python/apache_beam/yaml/yaml_ml.py | 4 +- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml b/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml index 7c429e6067a1..8728a6f544ad 100644 --- a/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml +++ b/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml @@ -13,12 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -fixtures: - - name: mock_pipeline - type: unittest.mock.patch - config: - target: transformers.pipeline - pipelines: - pipeline: type: chain @@ -26,19 +20,27 @@ pipelines: - type: Create config: elements: - - text: "Hello world" - - text: "Bye world" + - text: "I love Apache Beam!" + - text: "I hate this error." - type: RunInference config: model_handler: type: "HuggingFacePipeline" config: task: "text-classification" - model: "unused" inference_fn: callable: | - def mock_inference(batch, pipeline, inference_args): - return [[dict(label='POSITIVE', score=0.9)] for _ in batch] + def real_inference(batch, pipeline, inference_args): + predictions = pipeline(batch, **inference_args) + + # If it's a single dictionary (batch size of 1), wrap it in a list + if isinstance(predictions, dict): + predictions = [predictions] + + return { + 'label': [p['label'] for p in predictions], + 'score': [p['score'] for p in predictions] + } preprocess: callable: 'lambda x: x.text' - type: MapToFields @@ -46,17 +48,15 @@ pipelines: language: python fields: text: text - inference: - callable: | - def get_json(x): - import json - return json.dumps(x.inference.inference, indent=0).strip() + sentiment: + callable: 'lambda x: x.inference.inference["label"]' - type: AssertEqual config: elements: - - text: "Hello world" - inference: "[\n{\n\"label\": \"POSITIVE\",\n\"score\": 0.9\n}\n]" - - text: "Bye world" - inference: "[\n{\n\"label\": \"POSITIVE\",\n\"score\": 0.9\n}\n]" + - text: "I love Apache Beam!" + sentiment: "POSITIVE" + - text: "I hate this error." + sentiment: "NEGATIVE" + options: yaml_experimental_features: ['ML'] diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index 5cbf17b222cd..05cbed3bd456 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -311,8 +311,8 @@ def __init__( handler_kwargs['inference_fn'] = inference_fn_obj _handler = HuggingFacePipelineModelHandler( - task=task or "", - model=model or "", + task=task, + model=model, device=device, load_pipeline_args=load_pipeline_args, **handler_kwargs, From 9e1e9e3967e2b02735d3a26ec7a907b8da73b851 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 6 May 2026 15:23:47 +0000 Subject: [PATCH 11/13] add new task with transformer dependency supplied --- sdks/python/build.gradle | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index 5f09dff57e8f..e676fd110433 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -124,10 +124,20 @@ tasks.register("generateYamlDocs") { outputs.file "${buildDir}/yaml-examples.html" } +tasks.register("installYamlIntegrationTestDeps") { + dependsOn installGcpTest + doLast { + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && pip install --pre --retries 10 ${buildDir}/apache-beam.tar.gz[ml_test,yaml,transformers]" + } + } +} + tasks.register("yamlIntegrationTests") { description "Runs precommit integration tests for yaml pipelines." - dependsOn installGcpTest + dependsOn installYamlIntegrationTestDeps // Need to build all expansion services referenced in apache_beam/yaml/*.* // grep -oh 'sdk.*Jar' sdks/python/apache_beam/yaml/*.yaml | sort | uniq dependsOn ":sdks:java:extensions:schemaio-expansion-service:shadowJar" @@ -146,7 +156,7 @@ tasks.register("yamlIntegrationTests") { tasks.register("postCommitYamlIntegrationTests") { description "Runs postcommit integration tests for yaml pipelines - parameterized by yamlTestSet." - dependsOn installGcpTest + dependsOn installYamlIntegrationTestDeps // Need to build all expansion services referenced in apache_beam/yaml/*.* // grep -oh 'sdk.*Jar' sdks/python/apache_beam/yaml/*.yaml | sort | uniq dependsOn ":sdks:java:extensions:schemaio-expansion-service:shadowJar" From bc1f8b592b5ae2204ca6152760915e275cd77bf0 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 6 May 2026 15:24:22 +0000 Subject: [PATCH 12/13] revert changes --- .github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml index 4ad25d72e991..7d17fd2140c9 100644 --- a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml +++ b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml @@ -91,7 +91,7 @@ jobs: - name: run PreCommit Yaml Xlang Direct script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:yamlIntegrationTests -PbeamPythonExtra=ml_test,yaml,transformers + gradle-command: :sdks:python:yamlIntegrationTests -PbeamPythonExtra=ml_test,yaml - name: Archive Python Test Results uses: actions/upload-artifact@v7 if: failure() From a0334829995f3db26dedbc53a965aba4befe6e2f Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 6 May 2026 15:25:06 +0000 Subject: [PATCH 13/13] try again without the try except --- .../apache_beam/ml/inference/huggingface_inference.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index bc956b2035ac..a9893ea9290c 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -32,11 +32,7 @@ import torch from transformers import AutoModel from transformers import Pipeline - -try: - from transformers import TFAutoModel -except ImportError: - TFAutoModel = Any +from transformers import TFAutoModel from transformers import pipeline from apache_beam.ml.inference import utils