Skip to content

[IcebergIO] Support hash distribution mode when writing rows#38061

Merged
ahmedabu98 merged 17 commits intoapache:masterfrom
ahmedabu98:group-partitions
May 4, 2026
Merged

[IcebergIO] Support hash distribution mode when writing rows#38061
ahmedabu98 merged 17 commits intoapache:masterfrom
ahmedabu98:group-partitions

Conversation

@ahmedabu98
Copy link
Copy Markdown
Contributor

Adding a new sink code path that groups rows by partition before writing, making partitioned writes a lot more efficient and scalable.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the IcebergIO sink by adding an optional feature to group rows by partition before writing them to the destination. This change is designed to optimize performance and reduce the creation of small files in partitioned Iceberg tables. The implementation introduces new transforms and utility classes to handle the grouping and writing logic, while also updating the existing API and test suites to support this new configuration.

Highlights

  • Partitioned Write Optimization: Introduced a new sink code path that groups rows by partition before writing, which significantly improves efficiency and scalability for partitioned tables by reducing the number of small files.
  • New Components: Added 'AssignDestinationsAndPartitions', 'WritePartitionedRowsToFiles', 'WriteToPartitions', and 'BeamRowWrapper' to support the new grouping logic.
  • API Updates: Updated 'IcebergIO.WriteRows' and 'IcebergWriteSchemaTransformProvider' to include a 'groupByPartitions' configuration option.
  • Test Coverage: Updated 'IcebergIOWriteTest' and 'IcebergWriteSchemaTransformProviderTest' to use parameterized tests, ensuring both grouped and non-grouped write paths are verified.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@ahmedabu98 ahmedabu98 marked this pull request as draft April 3, 2026 05:32
@ahmedabu98 ahmedabu98 marked this pull request as ready for review April 13, 2026 21:28
@ahmedabu98 ahmedabu98 changed the title [IcebergIO] Groups rows by partition before writing [IcebergIO] Support hash distribution mode when writing rows Apr 13, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @claudevdm for label python.
R: @chamikaramj for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 15, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 58.51%. Comparing base (48a6ceb) to head (fa5ccda).
⚠️ Report is 46 commits behind head on master.

Additional details and impacted files
@@              Coverage Diff              @@
##             master   #38061       +/-   ##
=============================================
+ Coverage     54.61%   58.51%    +3.89%     
- Complexity     1689    15428    +13739     
=============================================
  Files          1067     2851     +1784     
  Lines        168152   280076   +111924     
  Branches       1226    12332    +11106     
=============================================
+ Hits          91835   163873    +72038     
- Misses        74118   109777    +35659     
- Partials       2199     6426     +4227     
Flag Coverage Δ
java 64.58% <ø> (-2.76%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @claudevdm @chamikaramj

@claudevdm
Copy link
Copy Markdown
Collaborator

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new distribution mode for Iceberg writes, allowing rows to be shuffled by partition key before writing to reduce the number of small files. It includes the implementation of AssignDestinationsAndPartitions and WriteToPartitions transforms, along with a BeamRowWrapper for Iceberg's StructLike interface. Feedback focuses on critical serialization issues in AssignDoFn, where non-serializable maps must be marked transient and initialized in @Setup. Additionally, improvements are suggested for resource management in WritePartitionedRowsToFiles using try-finally blocks and optimizing the table cache with double-checked locking.

Copy link
Copy Markdown
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}

/**
* Groups incoming rows by partition before sending to writes, ensuring that a given bundle is
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDirectWriteByteLimit()));

switch (getDistributionMode()) {
case NONE:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support "RANGE" ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. We can add it in the future if there's demand


@SchemaFieldDescription(
"Defines distribution of write data. Supported distributions:"
+ "\n- none: don't shuffle rows (default)"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably cleaner to make this setHashDistributionMode and make it a boolean ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather keep it this way in case we add support for Range. More straightforward to deal with one config option. This also mirrors how Flink does it

T get(Row data, int pos);
}

private static @Nullable PositionalGetter<?> buildGetter(FieldType beamType, Type icebergType) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also came up in a previous PR. Can you clarify why we would need the wrapper class here ? Also is there a way to avoid having to iterate through the set of types in multiple places ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some java doc

import org.apache.iceberg.util.UUIDUtil;
import org.checkerframework.checker.nullness.qual.Nullable;

public class BeamRowWrapper implements StructLike {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation on why this class is needed and how to use it. A BeamRowWrapper sounds like a more general utility that might have to live outside Iceberg ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is specifically for Iceberg (helps Iceberg fetch data directly from a Beam Row, instead of having to copy the data over to an Iceberg Record)

} else {
try {
// see if table already exists with a spec
// TODO(ahmedabu98): improve this by periodically refreshing the table to fetch updated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a Github issue to the TODO ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier);
Table table = getOrCreateTable(destination, dataSchema);

// TODO(ahmedabu98): cache this
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixed it instead

supportsNamespaces.createNamespace(namespace);
LOG.info("Created new namespace '{}'.", namespace);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this namespace
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's at least log it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import org.joda.time.Duration;

class WriteToPartitions extends PTransform<PCollection<KV<Row, Row>>, IcebergWriteResult> {
private static final long DEFAULT_BYTES_PER_FILE = (1L << 29); // 512mb
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is known to be a good value for Iceberg ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the ideal Parquet file size is generally 512MB. It may help to extend a config option for users to provide their own, or fetch the option from existing table properties. Will defer that to a future PR though

input.apply(Managed.write(ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

// Read back and check records are correct
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a test to confirm that less files get created when distribution mode is set to HASH ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to test with direct runner because it's autosharding implementation is not smart. Sometimes it creates more shards than necessary.

I added a test for grouping minus autosharding though to validate the number of files created == number of partitions

@ahmedabu98 ahmedabu98 requested a review from chamikaramj May 1, 2026 13:20
@ahmedabu98
Copy link
Copy Markdown
Contributor Author

Thanks @chamikaramj, I addressed your comments

@ahmedabu98
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for partitioned writes in IcebergIO, adding DistributionMode (NONE and HASH) and autosharding capabilities. Key components include AssignDestinationsAndPartitions for record routing, BeamRowWrapper for adapting Beam rows to Iceberg's StructLike interface, and WriteToPartitions for managing the shuffled write path. Feedback highlights several improvement opportunities: addressing potential staleness in cached partition metadata, reducing expensive catalog calls in the hot path via shared caching, and minimizing thread contention by using more granular locking instead of synchronizing on a static cache. Additionally, debug print statements should be removed from the production code.

// see if table already exists with a spec
// TODO(https://github.com/apache/beam/issues/38337): improve this by periodically
// refreshing the table to fetch updated specs
spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling catalog().loadTable() inside processElement can be very expensive, especially in pipelines with many unique table identifiers or high bundle counts. While the results are cached within the DoFn instance's partitionKeys map, the first element for every unique table in every bundle will still trigger a catalog call. Consider using a static cache (similar to WritePartitionedRowsToFiles.LAST_REFRESHED_TABLE_CACHE) to share table metadata across bundles and workers.

@chamikaramj
Copy link
Copy Markdown
Contributor

LGTM. Thanks.

@chamikaramj
Copy link
Copy Markdown
Contributor

retest this please

@ahmedabu98 ahmedabu98 merged commit 268ae1a into apache:master May 4, 2026
170 of 176 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants