diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index c57b5e3b1a0e..4c970c4b5f7c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -69,6 +69,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -414,19 +415,8 @@ public Job translate(List packages) { // back end as well. If streaming engine is not enabled make sure the experiments are also // not enabled. if (options.isEnableStreamingEngine()) { - List experiments = options.getExperiments(); - if (experiments == null) { - experiments = new ArrayList(); - } else { - experiments = new ArrayList(experiments); - } - if (!experiments.contains(GcpOptions.STREAMING_ENGINE_EXPERIMENT)) { - experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT); - } - if (!experiments.contains(GcpOptions.WINDMILL_SERVICE_EXPERIMENT)) { - experiments.add(GcpOptions.WINDMILL_SERVICE_EXPERIMENT); - } - options.setExperiments(experiments); + ExperimentalOptions.addExperiment(options, GcpOptions.STREAMING_ENGINE_EXPERIMENT); + ExperimentalOptions.addExperiment(options, GcpOptions.WINDMILL_SERVICE_EXPERIMENT); } else { List experiments = options.getExperiments(); if (experiments != null) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index b375de661885..c82592ade2f2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1244,13 +1244,12 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded // to Runner v2. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { - List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); - if (!experiments.contains("use_runner_v2")) { + if (!firstNonNull(options.getExperiments(), Collections.emptyList()) + .contains("use_runner_v2")) { LOG.info( "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" + " transforms or pipeline needed a transform upgrade."); - options.setExperiments( - ImmutableList.builder().addAll(experiments).add("use_runner_v2").build()); + ExperimentalOptions.addExperiment(options, "use_runner_v2"); } } if (useUnifiedWorker(options)) { @@ -1260,21 +1259,10 @@ public DataflowPipelineJob run(Pipeline pipeline) { throw new IllegalArgumentException( "Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); } - List experiments = - new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true - if (!experiments.contains("use_runner_v2")) { - experiments.add("use_runner_v2"); - } - if (!experiments.contains("use_unified_worker")) { - experiments.add("use_unified_worker"); - } - if (!experiments.contains("beam_fn_api")) { - experiments.add("beam_fn_api"); - } - if (!experiments.contains("use_portable_job_submission")) { - experiments.add("use_portable_job_submission"); - } - options.setExperiments(ImmutableList.copyOf(experiments)); + ExperimentalOptions.addExperiment(options, "use_runner_v2"); + ExperimentalOptions.addExperiment(options, "use_unified_worker"); + ExperimentalOptions.addExperiment(options, "beam_fn_api"); + ExperimentalOptions.addExperiment(options, "use_portable_job_submission"); // Ensure that logging via the FnApi is enabled options.as(SdkHarnessOptions.class).setEnableLogViaFnApi(true); } @@ -1301,14 +1289,10 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.setStreaming(true); { - List experiments = - options.getExperiments() == null - ? new ArrayList<>() - : new ArrayList<>(options.getExperiments()); // Experiment marking that the harness supports tag encoding v2 // Backend will enable tag encoding v2 only if the harness supports it. - experiments.add("streaming_engine_state_tag_encoding_v2_supported"); - options.setExperiments(ImmutableList.copyOf(experiments)); + ExperimentalOptions.addExperiment( + options, "streaming_engine_state_tag_encoding_v2_supported"); } if (useUnifiedWorker(options)) { @@ -1413,15 +1397,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages); if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) { - List experiments = - firstNonNull(dataflowOptions.getExperiments(), Collections.emptyList()); - if (!experiments.contains("use_staged_dataflow_worker_jar")) { - dataflowOptions.setExperiments( - ImmutableList.builder() - .addAll(experiments) - .add("use_staged_dataflow_worker_jar") - .build()); - } + ExperimentalOptions.addExperiment(options, "use_staged_dataflow_worker_jar"); } Job newJob = jobSpecification.getJob(); @@ -1480,11 +1456,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { .collect(Collectors.toList()); if (minCpuFlags.isEmpty()) { - dataflowOptions.setExperiments( - ImmutableList.builder() - .addAll(experiments) - .add("min_cpu_platform=" + dataflowOptions.getMinCpuPlatform()) - .build()); + ExperimentalOptions.addExperiment( + dataflowOptions, "min_cpu_platform=" + dataflowOptions.getMinCpuPlatform()); } else { LOG.warn( "Flag min_cpu_platform is defined in both top level PipelineOption, " @@ -1520,12 +1493,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { // enable upload_graph when the graph is too large byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8); int jobGraphByteSize = jobGraphBytes.length; - if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES - && !hasExperiment(options, "upload_graph") - && !useUnifiedWorker(options)) { - List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); - options.setExperiments( - ImmutableList.builder().addAll(experiments).add("upload_graph").build()); + if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES && !useUnifiedWorker(options)) { + ExperimentalOptions.addExperiment(options, "upload_graph"); LOG.info( "The job graph size ({} in bytes) is larger than {}. Automatically add " + "the upload_graph option to experiments.", diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 4d5acdc8071e..2a7823e69091 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -55,6 +55,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.BackOffUtils; @@ -113,13 +114,13 @@ private static DataflowWorkerHarnessOptions testOptions( options.setProject("project"); options.setJobId("job"); options.setWorkerId("worker"); - List experiments = - options.getExperiments() == null ? new ArrayList<>() : options.getExperiments(); + if (enableStreamingEngine) { - experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT); + ExperimentalOptions.addExperiment(options, GcpOptions.STREAMING_ENGINE_EXPERIMENT); + } + for (String experiment : additionalExperiments) { + ExperimentalOptions.addExperiment(options, experiment); } - experiments.addAll(additionalExperiments); - options.setExperiments(experiments); options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE); options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK); diff --git a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py index b4074d156ce7..9ae8f2e90f99 100644 --- a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py +++ b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py @@ -51,7 +51,7 @@ def test_xlang_parquetio_write(self): address = 'localhost:%s' % port try: with TestPipeline() as p: - p.get_pipeline_options().view_as(DebugOptions).experiments.append( + p.get_pipeline_options().view_as(DebugOptions).add_experiment( 'jar_packages=' + expansion_jar) p.not_use_test_runner_api = True _ = p \ diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py index eb9617cfae34..c3b0be8e781d 100644 --- a/sdks/python/apache_beam/io/iobase_test.py +++ b/sdks/python/apache_beam/io/iobase_test.py @@ -196,14 +196,7 @@ def test_try_split_with_any_exception(self): class UseSdfBoundedSourcesTests(unittest.TestCase): def _run_sdf_wrapper_pipeline(self, source, expected_values): with beam.Pipeline() as p: - experiments = (p._options.view_as(DebugOptions).experiments or []) - - # Setup experiment option to enable using SDFBoundedSourceWrapper - if 'beam_fn_api' not in experiments: - # Required so mocking below doesn't mock Create used in assert_that. - experiments.append('beam_fn_api') - - p._options.view_as(DebugOptions).experiments = experiments + p._options.view_as(DebugOptions).add_experiment('beam_fn_api') actual = p | beam.io.Read(source) assert_that(actual, equal_to(expected_values)) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 3cce2c5bb773..8146097ba0f9 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -233,10 +233,7 @@ def __init__( # set default experiments for portable runners # (needs to occur prior to pipeline construction) if runner.is_fnapi_compatible(): - experiments = (self._options.view_as(DebugOptions).experiments or []) - if not 'beam_fn_api' in experiments: - experiments.append('beam_fn_api') - self._options.view_as(DebugOptions).experiments = experiments + self._options.view_as(DebugOptions).add_experiment('beam_fn_api') self.local_tempdir = tempfile.mkdtemp(prefix='beam-pipeline-temp') diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 29cb36071488..77f3911e6c1b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -146,16 +146,13 @@ def __init__( ]) # TODO: Use enumerated type instead of strings for job types. if job_type.startswith('FNAPI_'): - self.debug_options.experiments = self.debug_options.experiments or [] - - debug_options_experiments = self.debug_options.experiments # Add use_multiple_sdk_containers flag if it's not already present. Do not # add the flag if 'no_use_multiple_sdk_containers' is present. # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK # till version 2.4. - if ('use_multiple_sdk_containers' not in debug_options_experiments and - 'no_use_multiple_sdk_containers' not in debug_options_experiments): - debug_options_experiments.append('use_multiple_sdk_containers') + if ('no_use_multiple_sdk_containers' + not in (self.debug_options.experiments or [])): + self.debug_options.add_experiment('use_multiple_sdk_containers') # FlexRS if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED': self.proto.flexResourceSchedulingGoal = ( diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index fdf291cb6f12..3e1d9b360f23 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -146,11 +146,7 @@ def run_pipeline( RuntimeValueProvider.set_runtime_options({}) # Setup "beam_fn_api" experiment options if lacked. - experiments = ( - options.view_as(pipeline_options.DebugOptions).experiments or []) - if not 'beam_fn_api' in experiments: - experiments.append('beam_fn_api') - options.view_as(pipeline_options.DebugOptions).experiments = experiments + options.view_as(pipeline_options.DebugOptions).add_experiment('beam_fn_api') # This is sometimes needed if type checking is disabled # to enforce that the inputs (and outputs) of GroupByKey operations