From 9ad785622387c7b9eb4d52af7f76c68ec4ee6b14 Mon Sep 17 00:00:00 2001 From: Aakash Baskaran Date: Sat, 18 Apr 2026 16:22:21 -0400 Subject: [PATCH 1/3] Use add_experiment() instead of experiments.append() in Python Replaces manual list manipulation pattern with DebugOptions.add_experiment() which handles null-init and deduplication internally. Resolves https://github.com/apache/beam/issues/19347 --- .../apache_beam/io/external/xlang_parquetio_test.py | 3 +-- sdks/python/apache_beam/io/iobase_test.py | 9 +-------- sdks/python/apache_beam/pipeline.py | 5 +---- .../apache_beam/runners/dataflow/internal/apiclient.py | 8 ++------ .../runners/portability/fn_api_runner/fn_runner.py | 6 +----- 5 files changed, 6 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py index b4074d156ce7..9b70db21571e 100644 --- a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py +++ b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py @@ -51,8 +51,7 @@ def test_xlang_parquetio_write(self): address = 'localhost:%s' % port try: with TestPipeline() as p: - p.get_pipeline_options().view_as(DebugOptions).experiments.append( - 'jar_packages=' + expansion_jar) + p.get_pipeline_options().view_as(DebugOptions).add_experiment('jar_packages=' + expansion_jar) p.not_use_test_runner_api = True _ = p \ | beam.Create([ diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py index eb9617cfae34..c3b0be8e781d 100644 --- a/sdks/python/apache_beam/io/iobase_test.py +++ b/sdks/python/apache_beam/io/iobase_test.py @@ -196,14 +196,7 @@ def test_try_split_with_any_exception(self): class UseSdfBoundedSourcesTests(unittest.TestCase): def _run_sdf_wrapper_pipeline(self, source, expected_values): with beam.Pipeline() as p: - experiments = (p._options.view_as(DebugOptions).experiments or []) - - # Setup experiment option to enable using SDFBoundedSourceWrapper - if 'beam_fn_api' not in experiments: - # Required so mocking below doesn't mock Create used in assert_that. - experiments.append('beam_fn_api') - - p._options.view_as(DebugOptions).experiments = experiments + p._options.view_as(DebugOptions).add_experiment('beam_fn_api') actual = p | beam.io.Read(source) assert_that(actual, equal_to(expected_values)) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 3cce2c5bb773..8146097ba0f9 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -233,10 +233,7 @@ def __init__( # set default experiments for portable runners # (needs to occur prior to pipeline construction) if runner.is_fnapi_compatible(): - experiments = (self._options.view_as(DebugOptions).experiments or []) - if not 'beam_fn_api' in experiments: - experiments.append('beam_fn_api') - self._options.view_as(DebugOptions).experiments = experiments + self._options.view_as(DebugOptions).add_experiment('beam_fn_api') self.local_tempdir = tempfile.mkdtemp(prefix='beam-pipeline-temp') diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 29cb36071488..a5890355c168 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -146,16 +146,12 @@ def __init__( ]) # TODO: Use enumerated type instead of strings for job types. if job_type.startswith('FNAPI_'): - self.debug_options.experiments = self.debug_options.experiments or [] - - debug_options_experiments = self.debug_options.experiments # Add use_multiple_sdk_containers flag if it's not already present. Do not # add the flag if 'no_use_multiple_sdk_containers' is present. # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK # till version 2.4. - if ('use_multiple_sdk_containers' not in debug_options_experiments and - 'no_use_multiple_sdk_containers' not in debug_options_experiments): - debug_options_experiments.append('use_multiple_sdk_containers') + if ('no_use_multiple_sdk_containers' not in (self.debug_options.experiments or [])): + self.debug_options.add_experiment('use_multiple_sdk_containers') # FlexRS if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED': self.proto.flexResourceSchedulingGoal = ( diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index fdf291cb6f12..3e1d9b360f23 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -146,11 +146,7 @@ def run_pipeline( RuntimeValueProvider.set_runtime_options({}) # Setup "beam_fn_api" experiment options if lacked. - experiments = ( - options.view_as(pipeline_options.DebugOptions).experiments or []) - if not 'beam_fn_api' in experiments: - experiments.append('beam_fn_api') - options.view_as(pipeline_options.DebugOptions).experiments = experiments + options.view_as(pipeline_options.DebugOptions).add_experiment('beam_fn_api') # This is sometimes needed if type checking is disabled # to enforce that the inputs (and outputs) of GroupByKey operations From 6e70264e30c6df297b9adb962ae9a1d73678e916 Mon Sep 17 00:00:00 2001 From: Aakash Baskaran Date: Tue, 21 Apr 2026 01:07:34 -0400 Subject: [PATCH 2/3] Use ExperimentalOptions.addExperiment() instead of get/set pattern in Java Replaces getExperiments()/modify/setExperiments() boilerplate with ExperimentalOptions.addExperiment() which handles null-init and deduplication. Resolves https://github.com/apache/beam/issues/19347 --- .../dataflow/DataflowPipelineTranslator.java | 16 +----- .../beam/runners/dataflow/DataflowRunner.java | 55 ++++--------------- .../client/grpc/GrpcWindmillServer.java | 11 ++-- 3 files changed, 20 insertions(+), 62 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index c57b5e3b1a0e..4c970c4b5f7c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -69,6 +69,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -414,19 +415,8 @@ public Job translate(List packages) { // back end as well. If streaming engine is not enabled make sure the experiments are also // not enabled. if (options.isEnableStreamingEngine()) { - List experiments = options.getExperiments(); - if (experiments == null) { - experiments = new ArrayList(); - } else { - experiments = new ArrayList(experiments); - } - if (!experiments.contains(GcpOptions.STREAMING_ENGINE_EXPERIMENT)) { - experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT); - } - if (!experiments.contains(GcpOptions.WINDMILL_SERVICE_EXPERIMENT)) { - experiments.add(GcpOptions.WINDMILL_SERVICE_EXPERIMENT); - } - options.setExperiments(experiments); + ExperimentalOptions.addExperiment(options, GcpOptions.STREAMING_ENGINE_EXPERIMENT); + ExperimentalOptions.addExperiment(options, GcpOptions.WINDMILL_SERVICE_EXPERIMENT); } else { List experiments = options.getExperiments(); if (experiments != null) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index b375de661885..9939876a431b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1244,13 +1244,11 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded // to Runner v2. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { - List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); - if (!experiments.contains("use_runner_v2")) { + if (!firstNonNull(options.getExperiments(), Collections.emptyList()).contains("use_runner_v2")) { LOG.info( "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" + " transforms or pipeline needed a transform upgrade."); - options.setExperiments( - ImmutableList.builder().addAll(experiments).add("use_runner_v2").build()); + ExperimentalOptions.addExperiment(options, "use_runner_v2"); } } if (useUnifiedWorker(options)) { @@ -1260,21 +1258,10 @@ public DataflowPipelineJob run(Pipeline pipeline) { throw new IllegalArgumentException( "Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); } - List experiments = - new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true - if (!experiments.contains("use_runner_v2")) { - experiments.add("use_runner_v2"); - } - if (!experiments.contains("use_unified_worker")) { - experiments.add("use_unified_worker"); - } - if (!experiments.contains("beam_fn_api")) { - experiments.add("beam_fn_api"); - } - if (!experiments.contains("use_portable_job_submission")) { - experiments.add("use_portable_job_submission"); - } - options.setExperiments(ImmutableList.copyOf(experiments)); + ExperimentalOptions.addExperiment(options, "use_runner_v2"); + ExperimentalOptions.addExperiment(options, "use_unified_worker"); + ExperimentalOptions.addExperiment(options, "beam_fn_api"); + ExperimentalOptions.addExperiment(options, "use_portable_job_submission"); // Ensure that logging via the FnApi is enabled options.as(SdkHarnessOptions.class).setEnableLogViaFnApi(true); } @@ -1301,14 +1288,9 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.setStreaming(true); { - List experiments = - options.getExperiments() == null - ? new ArrayList<>() - : new ArrayList<>(options.getExperiments()); // Experiment marking that the harness supports tag encoding v2 // Backend will enable tag encoding v2 only if the harness supports it. - experiments.add("streaming_engine_state_tag_encoding_v2_supported"); - options.setExperiments(ImmutableList.copyOf(experiments)); + ExperimentalOptions.addExperiment(options, "streaming_engine_state_tag_encoding_v2_supported"); } if (useUnifiedWorker(options)) { @@ -1413,15 +1395,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages); if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) { - List experiments = - firstNonNull(dataflowOptions.getExperiments(), Collections.emptyList()); - if (!experiments.contains("use_staged_dataflow_worker_jar")) { - dataflowOptions.setExperiments( - ImmutableList.builder() - .addAll(experiments) - .add("use_staged_dataflow_worker_jar") - .build()); - } + ExperimentalOptions.addExperiment(options, "use_staged_dataflow_worker_jar"); } Job newJob = jobSpecification.getJob(); @@ -1480,11 +1454,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { .collect(Collectors.toList()); if (minCpuFlags.isEmpty()) { - dataflowOptions.setExperiments( - ImmutableList.builder() - .addAll(experiments) - .add("min_cpu_platform=" + dataflowOptions.getMinCpuPlatform()) - .build()); + ExperimentalOptions.addExperiment(dataflowOptions, "min_cpu_platform=" + dataflowOptions.getMinCpuPlatform()); } else { LOG.warn( "Flag min_cpu_platform is defined in both top level PipelineOption, " @@ -1521,11 +1491,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8); int jobGraphByteSize = jobGraphBytes.length; if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES - && !hasExperiment(options, "upload_graph") && !useUnifiedWorker(options)) { - List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); - options.setExperiments( - ImmutableList.builder().addAll(experiments).add("upload_graph").build()); + ExperimentalOptions.addExperiment(options, "upload_graph"); LOG.info( "The job graph size ({} in bytes) is larger than {}. Automatically add " + "the upload_graph option to experiments.", @@ -1533,7 +1500,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { CREATE_JOB_REQUEST_LIMIT_BYTES); } - if (hasExperiment(options, "upload_graph") && useUnifiedWorker(options)) { + if (useUnifiedWorker(options)) { ArrayList experiments = new ArrayList<>(options.getExperiments()); while (experiments.remove("upload_graph")) {} options.setExperiments(experiments); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 4d5acdc8071e..2a7823e69091 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -55,6 +55,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.BackOffUtils; @@ -113,13 +114,13 @@ private static DataflowWorkerHarnessOptions testOptions( options.setProject("project"); options.setJobId("job"); options.setWorkerId("worker"); - List experiments = - options.getExperiments() == null ? new ArrayList<>() : options.getExperiments(); + if (enableStreamingEngine) { - experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT); + ExperimentalOptions.addExperiment(options, GcpOptions.STREAMING_ENGINE_EXPERIMENT); + } + for (String experiment : additionalExperiments) { + ExperimentalOptions.addExperiment(options, experiment); } - experiments.addAll(additionalExperiments); - options.setExperiments(experiments); options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE); options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK); From e717dc40eb29f1921e361d0bdb11825e6b7a966b Mon Sep 17 00:00:00 2001 From: Aakash Baskaran Date: Tue, 21 Apr 2026 23:12:50 -0400 Subject: [PATCH 3/3] Fix upload_graph condition regression and formatting issues --- .../beam/runners/dataflow/DataflowRunner.java | 14 ++++++++------ .../io/external/xlang_parquetio_test.py | 3 ++- .../runners/dataflow/internal/apiclient.py | 3 ++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9939876a431b..c82592ade2f2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1244,7 +1244,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded // to Runner v2. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { - if (!firstNonNull(options.getExperiments(), Collections.emptyList()).contains("use_runner_v2")) { + if (!firstNonNull(options.getExperiments(), Collections.emptyList()) + .contains("use_runner_v2")) { LOG.info( "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" + " transforms or pipeline needed a transform upgrade."); @@ -1290,7 +1291,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { { // Experiment marking that the harness supports tag encoding v2 // Backend will enable tag encoding v2 only if the harness supports it. - ExperimentalOptions.addExperiment(options, "streaming_engine_state_tag_encoding_v2_supported"); + ExperimentalOptions.addExperiment( + options, "streaming_engine_state_tag_encoding_v2_supported"); } if (useUnifiedWorker(options)) { @@ -1454,7 +1456,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { .collect(Collectors.toList()); if (minCpuFlags.isEmpty()) { - ExperimentalOptions.addExperiment(dataflowOptions, "min_cpu_platform=" + dataflowOptions.getMinCpuPlatform()); + ExperimentalOptions.addExperiment( + dataflowOptions, "min_cpu_platform=" + dataflowOptions.getMinCpuPlatform()); } else { LOG.warn( "Flag min_cpu_platform is defined in both top level PipelineOption, " @@ -1490,8 +1493,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // enable upload_graph when the graph is too large byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8); int jobGraphByteSize = jobGraphBytes.length; - if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES - && !useUnifiedWorker(options)) { + if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES && !useUnifiedWorker(options)) { ExperimentalOptions.addExperiment(options, "upload_graph"); LOG.info( "The job graph size ({} in bytes) is larger than {}. Automatically add " @@ -1500,7 +1502,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { CREATE_JOB_REQUEST_LIMIT_BYTES); } - if (useUnifiedWorker(options)) { + if (hasExperiment(options, "upload_graph") && useUnifiedWorker(options)) { ArrayList experiments = new ArrayList<>(options.getExperiments()); while (experiments.remove("upload_graph")) {} options.setExperiments(experiments); diff --git a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py index 9b70db21571e..9ae8f2e90f99 100644 --- a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py +++ b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py @@ -51,7 +51,8 @@ def test_xlang_parquetio_write(self): address = 'localhost:%s' % port try: with TestPipeline() as p: - p.get_pipeline_options().view_as(DebugOptions).add_experiment('jar_packages=' + expansion_jar) + p.get_pipeline_options().view_as(DebugOptions).add_experiment( + 'jar_packages=' + expansion_jar) p.not_use_test_runner_api = True _ = p \ | beam.Create([ diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index a5890355c168..77f3911e6c1b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -150,7 +150,8 @@ def __init__( # add the flag if 'no_use_multiple_sdk_containers' is present. # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK # till version 2.4. - if ('no_use_multiple_sdk_containers' not in (self.debug_options.experiments or [])): + if ('no_use_multiple_sdk_containers' + not in (self.debug_options.experiments or [])): self.debug_options.add_experiment('use_multiple_sdk_containers') # FlexRS if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':