Skip to content

[Python] Add PCollection.with_side_outputs for chainable side outputs#38268

Draft
hjtran wants to merge 4 commits intoapache:masterfrom
hjtran:pr/pcollection-side-outputs-impl
Draft

[Python] Add PCollection.with_side_outputs for chainable side outputs#38268
hjtran wants to merge 4 commits intoapache:masterfrom
hjtran:pr/pcollection-side-outputs-impl

Conversation

@hjtran
Copy link
Copy Markdown
Contributor

@hjtran hjtran commented Apr 22, 2026

Summary

Adds PCollection.with_side_outputs(**kwargs) to the Python SDK so transforms can return a single, chainable PCollection that also carries named side-output PCollections, accessible via result.side_outputs.<tag>.

class MyFilter(beam.PTransform):
    def expand(self, pcoll):
        results = pcoll | beam.ParDo(FilterDoFn()).with_outputs('dropped', main='main')
        return results['main'].with_side_outputs(dropped=results['dropped'])

# Most users — just chain
inputs | MyFilter() | NextTransform()

# When you need the side output
result = inputs | MyFilter()
result.side_outputs.dropped

Today, transform authors face a tradeoff between returning a PCollection (chainable, but loses side outputs) or a dict/PCollectionTuple (preserves side outputs, but breaks chaining). This change lets them have both.

Design

  • New _SideOutputsContainer exposes attribute (container.dropped) and item (container["dropped"]) access plus iteration / len / in.
  • PCollection.with_side_outputs(**kwargs) returns a copy.copy(self) with the side-output mapping attached. The original is unchanged. Validation: each value must be a PCollection (TypeError) on the same pipeline (ValueError).
  • Pipeline._apply_internal and Pipeline._replace both call a shared helper that registers the side outputs on the wrapping AppliedPTransformonly for the top-level returned PCollection. This makes side outputs first-class in the pipeline graph (visible to runners, the proto, visualization, and YAML's Transform.tag resolution).
  • The helper enforces two apply-time invariants:
    • Provenance: each side output's producer must be the wrapping transform itself or one of its descendants. Foreign PCollections are rejected with ValueError.
    • Tag collision: if a side-output tag already exists in the wrapping transform's outputs, the value must be the same PCollection object (idempotent); otherwise ValueError.
  • __copy__ is added to PCollection so copy.copy(self) doesn't dispatch to the existing __reduce_ex__ anti-pickling hook.

Chaining behavior

pc | A | B is unchanged. B receives A's main output as a normal PCollection. A's side outputs are not carried forward — capture the intermediate result if you need them. This matches the behavior described in the proposal and requires no special code.

Caveats

  • **kwargs syntax restricts tags to valid Python identifiers. Tags with hyphens/spaces are not supported here; users with arbitrary string tags should keep using the existing dict / DoOutputsTuple patterns.
  • .side_outputs is a construction-time ergonomic, not a durable graph property. After from_runner_api, the reconstructed PCollection won't have _side_outputs populated, but the side outputs themselves still exist as named outputs on the producer's AppliedPTransform.
  • Side outputs do not participate in composite-boundary type checking (matches the existing behavior of side outputs accessed via DoOutputsTuple).

Follow-ups (not in this PR)

  • ParDo.with_side_outputs(...) convenience method.
  • Beam YAML dot-modifier integration (the existing get_pcollection already resolves Transform.tag by walking the producer's outputs dict, so this should mostly "just work" once verified).

Tests

pvalue_test.py covers: copy semantics, attribute / index access, missing-tag errors, type/pipeline validation, empty container, double-call replace.

pipeline_test.py covers: end-to-end materialization with assert_that, the canonical MyFilter-style composite wrapping with_outputs, foreign-pcollection rejection, tag-collision rejection, idempotent same-object collision, replacement-path registration via Pipeline.replace_all, runner API round-trip, and the nested-return non-flattening case.

Test plan

  • cd sdks/python && python -m pytest apache_beam/pvalue_test.py apache_beam/pipeline_test.py — 84 passed, 1 skipped
  • ruff check --config=sdks/python/ruff.toml on the four changed files — clean
  • yapf --diff on the four changed files — clean

Joey Tran and others added 3 commits April 22, 2026 14:23
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the Apache Beam Python SDK by enabling PCollections to carry named side outputs while remaining chainable. This solves a long-standing usability issue where transform authors had to choose between returning a chainable PCollection or a container that preserves side outputs. The changes introduce a new API for attaching side outputs and ensure these are correctly registered within the pipeline graph, maintaining compatibility with existing runner APIs and composite transform structures.

Highlights

  • New PCollection.with_side_outputs method: Introduced a new method to PCollection that allows transforms to return a chainable PCollection while also carrying named side-output PCollections.
  • Side output access: Added a _SideOutputsContainer to provide ergonomic attribute and item access to side outputs, such as result.side_outputs.tag.
  • Pipeline graph integration: Updated Pipeline._apply_internal and _replace to automatically register these side outputs on the wrapping AppliedPTransform, ensuring they are visible to runners and pipeline visualization tools.
  • Validation and Safety: Implemented strict validation to ensure side outputs belong to the same pipeline and are produced by the correct transform, preventing common graph construction errors.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Rename _FilterWithSideOutputs/_SplitOddDoFn helpers in pipeline_test.py
to RemoveEvens/_RemoveEvensDoFn for clarity, and flip the DoFn semantics
so the name matches the behavior (evens go to the 'dropped' side output).

Drop two tests that were not pulling their weight:
  - test_pcollection_side_outputs_wraps_with_outputs: fully subsumed by
    test_pcollection_side_outputs_end_to_end, which already asserts the
    applied transform's outputs contain both tags.
  - test_pcollection_side_outputs_allows_idempotent_tag_collision: only
    reachable by reaching past the public API to set _side_outputs
    manually; the defensive impl branch is still worth keeping but the
    test is not.

Merge the two pvalue_test.py validation tests into a single
test_with_side_outputs_validation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant