Skip to content

Commit 87c50c6

Browse files
Mathdeeshunping
andauthored
[Python] Support large pipeline options via file (#37379)
* Support large pipeline options in Python SDK #37370 * reformatted code to match the project's standards to pass checks * Fixed PythonFormatterPreCommitscript error" * Fix import order: move google.protobuf above apache_beam * Removed blank line between import and google import * Addressed the review comments: cleaned up comments, align error message with JAVA and GO SDKs * Fixed Formatting Error * Fixed line lenght that causes lint error * rerun tests * Rerun tests * Fix comment formatting in sdk_worker_main.py * Fix: Update exception handling after bot review * Set bootstrap log level to INFO in create_harness * Fix formatting in sdk_worker_main.py * Documents: Updated CHANGES.md for file-based pipeline options * Fix CHANGES.md formatting issues * Fix CHANGES.md, correct issue link * Fix CHANGES.md, correct issue link * Updated issue number in brackets to [#37370] in CHANGES.md * Updated issue number in brackets to [#37370] in CHANGES.md --------- Co-authored-by: Shunping Huang <shunping@google.com>
1 parent 59922a3 commit 87c50c6

3 files changed

Lines changed: 32 additions & 5 deletions

File tree

CHANGES.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@
6868

6969
## New Features / Improvements
7070

71-
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
71+
* Added support for large pipeline options via a file (Python) ([#37370](https://github.com/apache/beam/issues/37370)).
7272

7373
## Breaking Changes
7474

75-
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
75+
* The Python SDK container's `boot.go` now passes pipeline options through a file instead of the `PIPELINE_OPTIONS` environment variable. If a user pairs a new Python SDK container with an older SDK version (which does not support the file-based approach), the pipeline options will not be recognized and the pipeline will fail. Users must ensure their SDK and container versions are synchronized ([#37370](https://github.com/apache/beam/issues/37370)).
7676

7777
## Deprecations
7878

sdks/python/apache_beam/runners/worker/sdk_worker_main.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ def _import_beam_plugins(plugins):
7373
def create_harness(environment, dry_run=False):
7474
"""Creates SDK Fn Harness."""
7575

76+
# Bootstrap log level to capture startup events until pipeline options are
77+
# parsed and the actual log level is set.
78+
logging.getLogger().setLevel(logging.INFO)
79+
7680
deferred_exception = None
7781
if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
7882
try:
@@ -93,8 +97,24 @@ def create_harness(environment, dry_run=False):
9397
else:
9498
fn_log_handler = None
9599

96-
pipeline_options_dict = _load_pipeline_options(
97-
environment.get('PIPELINE_OPTIONS'))
100+
options_json = environment.get('PIPELINE_OPTIONS')
101+
102+
# We check if options are stored in the file.
103+
if 'PIPELINE_OPTIONS_FILE' in environment:
104+
options_file = environment['PIPELINE_OPTIONS_FILE']
105+
try:
106+
with open(options_file, 'r') as f:
107+
options_json = f.read()
108+
_LOGGER.info('Load pipeline options from file: %s', options_file)
109+
except Exception:
110+
_LOGGER.error(
111+
'Failed to load pipeline options from file: %s',
112+
options_file,
113+
exc_info=True)
114+
raise
115+
116+
pipeline_options_dict = _load_pipeline_options(options_json)
117+
98118
default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
99119
logging.getLogger().setLevel(default_log_level)
100120
_set_log_level_overrides(pipeline_options_dict)
@@ -239,6 +259,7 @@ def terminate_sdk_harness():
239259

240260

241261
def _load_pipeline_options(options_json):
262+
"""Deserialize the pipeline options from a JSON string into a dictionary."""
242263
if options_json is None:
243264
return {}
244265
options = json.loads(options_json)
@@ -256,6 +277,8 @@ def _load_pipeline_options(options_json):
256277

257278

258279
def _parse_pipeline_options(options_json):
280+
"""Parses the pipeline options from a JSON string into a PipelineOptions
281+
object."""
259282
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
260283

261284

sdks/python/container/boot.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,11 @@ func launchSDKProcess() error {
259259

260260
// (3) Invoke python
261261

262-
os.Setenv("PIPELINE_OPTIONS", options)
262+
// Write the JSON string of pipeline options into a file to prevent "argument list too long" error.
263+
if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
264+
logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)
265+
}
266+
263267
os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
264268
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
265269
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())

0 commit comments

Comments
 (0)