From 1e49f75a190b1a912def36b2e111478f8042b23b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Fri, 24 Apr 2026 13:09:05 +0200 Subject: [PATCH 01/10] Update comment in BeamModulePlugin.groovy around ErrorProne Intended suppressions with justifications --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 57a56e66b168..005a8b587804 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1548,6 +1548,7 @@ class BeamModulePlugin implements Plugin { "MockNotUsedInProduction", "NullableWildcard", "SuperCallToObjectMethod", + // Intended suppressions with justifications // for encoding efficiency and backward compatibility "EnumOrdinal", // widely used in non-public methods From 1364034979ba48be54ddea2841ccf2c3e5bbad60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Fri, 24 Apr 2026 13:10:10 +0200 Subject: [PATCH 02/10] Revert "Update comment in BeamModulePlugin.groovy around ErrorProne" --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 1 - 1 file changed, 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 005a8b587804..57a56e66b168 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1548,7 +1548,6 @@ class BeamModulePlugin implements Plugin { "MockNotUsedInProduction", "NullableWildcard", "SuperCallToObjectMethod", - // Intended suppressions with justifications // for encoding efficiency and backward compatibility "EnumOrdinal", // widely used in non-public methods From 1868a5c1ec0e96b67d27a8e0873647b0956ee586 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 17 Apr 2026 14:57:53 +0000 Subject: [PATCH 03/10] Refactor metadata propagation in ReduceFnRunner to support extensible PipelineMetadata --- .../beam/runners/core/CombinedMetadata.java | 94 ++++++++++++++++++ .../core/CombinedMetadataCombiner.java | 64 +++++++++++++ .../beam/runners/core/MetadataCombiner.java | 25 +++++ .../beam/runners/core/ReduceFnRunner.java | 49 ++++++++-- .../runners/core/CombinedMetadataTest.java | 78 +++++++++++++++ .../beam/runners/core/ReduceFnTester.java | 5 +- .../worker/StreamingDataflowWorkerTest.java | 24 ++++- .../beam/sdk/values/WindowedValues.java | 4 + .../transforms/MetadataPropagationTest.java | 95 +++++++++++++++++-- 9 files changed, 418 insertions(+), 20 deletions(-) create mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java create mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java create mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/MetadataCombiner.java create mode 100644 runners/core-java/src/test/java/org/apache/beam/runners/core/CombinedMetadataTest.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java new file mode 100644 index 000000000000..a2cbc04f4813 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.values.CausedByDrain; + +/** + * Encapsulates metadata that propagates with elements in the pipeline. + * + *

This metadata is sent along with elements. It currently includes fields like {@link + * CausedByDrain}, and is designed to be extensible to support future metadata fields such as + * OpenTelemetry context or CDC (Change Data Capture) kind. + * + *

The purpose of this class is to group targeted metadata fields together. This makes it easier + * to define combination strategies (e.g., when accumulating state in {@code ReduceFnRunner}) when + * multiple elements are merged or grouped, without having to extend method signatures or state + * handling for every new metadata field. + */ +@AutoValue +public abstract class CombinedMetadata { + public abstract CausedByDrain causedByDrain(); + + public static CombinedMetadata create(CausedByDrain causedByDrain) { + return new AutoValue_CombinedMetadata(causedByDrain); + } + + public static CombinedMetadata createDefault() { + return create(CausedByDrain.NORMAL); + } + + public static class Coder extends AtomicCoder { + private static final Coder INSTANCE = new Coder(); + + public static Coder of() { + return INSTANCE; + } + + @Override + public void encode(CombinedMetadata value, OutputStream outStream) throws IOException { + if (value == null) { + NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream); + return; + } + BeamFnApi.Elements.ElementMetadata.Builder builder = + BeamFnApi.Elements.ElementMetadata.newBuilder(); + builder.setDrain( + value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN + ? BeamFnApi.Elements.DrainMode.Enum.DRAINING + : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING); + + NullableCoder.of(ByteArrayCoder.of()).encode(builder.build().toByteArray(), outStream); + } + + @Override + public CombinedMetadata decode(InputStream inStream) throws IOException { + byte[] bytes = NullableCoder.of(ByteArrayCoder.of()).decode(inStream); + if (bytes == null) { + return CombinedMetadata.createDefault(); + } + BeamFnApi.Elements.ElementMetadata proto = + BeamFnApi.Elements.ElementMetadata.parseFrom(bytes); + + CausedByDrain causedByDrain = + proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING + ? CausedByDrain.CAUSED_BY_DRAIN + : CausedByDrain.NORMAL; + + return CombinedMetadata.create(causedByDrain); + } + } +} diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java new file mode 100644 index 000000000000..9bb280b62940 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.values.CausedByDrain; + +/** Combiner for CombinedMetadata. */ +class CombinedMetadataCombiner implements MetadataCombiner { + private static final CombinedMetadataCombiner INSTANCE = new CombinedMetadataCombiner(); + + public static CombinedMetadataCombiner of() { + return INSTANCE; + } + + private final CausedByDrainCombiner causedByDrainCombiner = CausedByDrainCombiner.of(); + + @Override + public CombinedMetadata createAccumulator() { + return CombinedMetadata.create(causedByDrainCombiner.createAccumulator()); + } + + @Override + public CombinedMetadata addInput(CombinedMetadata accumulator, CombinedMetadata input) { + return CombinedMetadata.create( + causedByDrainCombiner.addInput(accumulator.causedByDrain(), input.causedByDrain())); + } + + /** Combiner for CausedByDrain metadata. */ + static class CausedByDrainCombiner implements MetadataCombiner { + private static final CausedByDrainCombiner INSTANCE = new CausedByDrainCombiner(); + + public static CausedByDrainCombiner of() { + return INSTANCE; + } + + @Override + public CausedByDrain createAccumulator() { + return CausedByDrain.NORMAL; + } + + @Override + public CausedByDrain addInput(CausedByDrain current, CausedByDrain input) { + if (current == CausedByDrain.CAUSED_BY_DRAIN || input == CausedByDrain.CAUSED_BY_DRAIN) { + return CausedByDrain.CAUSED_BY_DRAIN; + } + return CausedByDrain.NORMAL; + } + } +} diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MetadataCombiner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MetadataCombiner.java new file mode 100644 index 000000000000..55884a8e43a8 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MetadataCombiner.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +/** Interface for combining pipeline metadata. */ +interface MetadataCombiner { + T createAccumulator(); + + T addInput(T accumulator, T input); +} diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 0721ddc4685e..59ea8fe59fb1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; @@ -107,6 +108,10 @@ public class ReduceFnRunner { *

  • It uses discarding or accumulation mode according to the {@link WindowingStrategy}. * */ + static final StateTag> METADATA_TAG = + StateTags.makeSystemTagInternal( + StateTags.value("combinedMetadata", CombinedMetadata.Coder.of())); + private final WindowingStrategy windowingStrategy; private final WindowedValueReceiver> outputter; @@ -376,7 +381,7 @@ public void processElements(Iterable> values) throws Excep emit( contextFactory.base(window, StateStyle.DIRECT), contextFactory.base(window, StateStyle.RENAMED), - CausedByDrain.NORMAL); + CombinedMetadata.createDefault()); } // We're all done with merging and emitting elements so can compress the activeWindow state. @@ -590,6 +595,17 @@ private void processElement(Map windowToMergeResult, WindowedValue value.getTimestamp(), StateStyle.DIRECT, value.causedByDrain()); + + ValueState metadataState = directContext.state().access(METADATA_TAG); + CombinedMetadata currentMetadata = metadataState.read(); + if (currentMetadata == null) { + currentMetadata = CombinedMetadata.createDefault(); + } + CombinedMetadata inputMetadata = CombinedMetadata.create(value.causedByDrain()); + CombinedMetadata newMetadata = + CombinedMetadataCombiner.of().addInput(currentMetadata, inputMetadata); + metadataState.write(newMetadata); + if (triggerRunner.isClosed(directContext.state())) { // This window has already been closed. droppedDueToClosedWindow.inc(); @@ -792,7 +808,7 @@ public void onTimers(Iterable timers) throws Exception { renamedContext, true /* isFinished */, windowActivation.isEndOfWindow, - windowActivation.causedByDrain); + CombinedMetadata.create(windowActivation.causedByDrain)); checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold); } @@ -810,7 +826,10 @@ public void onTimers(Iterable timers) throws Exception { if (windowActivation.windowIsActiveAndOpen() && triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())) { - emit(directContext, renamedContext, windowActivation.causedByDrain); + emit( + directContext, + renamedContext, + CombinedMetadata.create(windowActivation.causedByDrain)); } if (windowActivation.isEndOfWindow) { @@ -874,6 +893,7 @@ private void clearAllState( triggerRunner.clearState( directContext.window(), directContext.timers(), directContext.state()); paneInfoTracker.clear(directContext.state()); + directContext.state().access(METADATA_TAG).clear(); } else { // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2). // For (1), if !activeWindows.isActive then the window must be merging and has been @@ -934,8 +954,9 @@ private void prefetchEmit( private void emit( ReduceFn.Context directContext, ReduceFn.Context renamedContext, - CausedByDrain causedByDrain) + CombinedMetadata metadata) throws Exception { + checkState( triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())); @@ -950,13 +971,14 @@ private void emit( // Run onTrigger to produce the actual pane contents. // As a side effect it will clear all element holds, but not necessarily any // end-of-window or garbage collection holds. - onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/, causedByDrain); + onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/, metadata); // Now that we've triggered, the pane is empty. nonEmptyPanes.clearPane(renamedContext.state()); // Cleanup buffered data if appropriate if (shouldDiscard) { + directContext.state().access(METADATA_TAG).clear(); // Cleanup flavor C: The user does not want any buffered data to persist between panes. reduceFn.clearState(renamedContext); } @@ -1009,8 +1031,19 @@ private void prefetchOnTrigger( ReduceFn.Context renamedContext, final boolean isFinished, boolean isEndOfWindow, - CausedByDrain causedByDrain) + CombinedMetadata metadata) throws Exception { + ValueState metadataState = directContext.state().access(METADATA_TAG); + CombinedMetadata aggregatedMetadata = metadataState.read(); + if (aggregatedMetadata == null) { + aggregatedMetadata = CombinedMetadata.createDefault(); + } + CombinedMetadata fullyAggregatedMetadata = + CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata); + final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain(); + if (isFinished) { + metadataState.clear(); + } // Extract the window hold, and as a side effect clear it. final WatermarkHold.OldAndNewHolds pair = watermarkHold.extractAndRelease(renamedContext, isFinished).read(); @@ -1081,12 +1114,12 @@ private void prefetchOnTrigger( .setValue(KV.of(key, toOutput)) .setTimestamp(outputTimestamp) .setWindows(windows) - .setCausedByDrain(causedByDrain) + .setCausedByDrain(aggregatedCausedByDrain) .setPaneInfo(paneInfo) .setReceiver(outputter) .output(); }, - causedByDrain); + aggregatedCausedByDrain); reduceFn.onTrigger(renamedTriggerContext); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/CombinedMetadataTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/CombinedMetadataTest.java new file mode 100644 index 000000000000..ad96566d0272 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/CombinedMetadataTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.values.CausedByDrain; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link CombinedMetadata}. */ +@RunWith(JUnit4.class) +public class CombinedMetadataTest { + + @Test + public void testCoderEncodeDecode() throws Exception { + CombinedMetadata metadata = CombinedMetadata.create(CausedByDrain.CAUSED_BY_DRAIN); + CombinedMetadata.Coder coder = CombinedMetadata.Coder.of(); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + coder.encode(metadata, outStream); + + ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); + CombinedMetadata decoded = coder.decode(inStream); + + assertEquals(metadata, decoded); + } + + @Test + public void testCoderDecodeNull() throws Exception { + CombinedMetadata.Coder coder = CombinedMetadata.Coder.of(); + + // Encode null using NullableCoder on ByteArray + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream); + + ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); + CombinedMetadata decoded = coder.decode(inStream); + + assertEquals(CombinedMetadata.createDefault(), decoded); + } + + @Test + public void testCoderDecodeEmptyBytes() throws Exception { + CombinedMetadata.Coder coder = CombinedMetadata.Coder.of(); + + // Encode empty byte array using NullableCoder on ByteArray + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + NullableCoder.of(ByteArrayCoder.of()).encode(new byte[0], outStream); + + ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); + CombinedMetadata decoded = coder.decode(inStream); + + // ElementMetadata.parseFrom(empty bytes) should yield default proto with NOT_DRAINING + // which translates to CausedByDrain.NORMAL, which is the default! + assertEquals(CombinedMetadata.createDefault(), decoded); + } +} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 43b6a3cb0cb0..2326a1c77d38 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -318,7 +318,7 @@ public boolean hasNoActiveWindows() { public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), - ImmutableSet.of(TriggerStateMachineRunner.FINISHED_BITS_TAG)); + ImmutableSet.of(TriggerStateMachineRunner.FINISHED_BITS_TAG, ReduceFnRunner.METADATA_TAG)); } @SafeVarargs @@ -331,7 +331,8 @@ public final void assertHasOnlyGlobalAndStateFor(W... expectedWindows) { PaneInfoTracker.PANE_INFO_TAG, WatermarkHold.watermarkHoldTagForTimestampCombiner( objectStrategy.getTimestampCombiner()), - WatermarkHold.EXTRA_HOLD_TAG)); + WatermarkHold.EXTRA_HOLD_TAG, + ReduceFnRunner.METADATA_TAG)); } @SafeVarargs diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 3a25a671ca92..196763bb1f74 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -1807,6 +1807,7 @@ public void testMergeWindows() throws Exception { String timerTagPrefix = "/s" + window + "+0"; ByteString bufferTag = ByteString.copyFromUtf8(window + "+ubuf"); ByteString paneInfoTag = ByteString.copyFromUtf8(window + "+upaneInfo"); + ByteString combinedMetadataTag = ByteString.copyFromUtf8(window + "+ucombinedMetadata"); String watermarkDataHoldTag = window + "+uhold"; String watermarkExtraHoldTag = window + "+uextra"; String stateFamily = "MergeWindows"; @@ -1946,11 +1947,20 @@ public void testMergeWindows() throws Exception { assertThat( "" + actualOutput.getValueUpdatesList(), actualOutput.getValueUpdatesList(), - Matchers.contains( + Matchers.containsInAnyOrder( Matchers.equalTo( Windmill.TagValue.newBuilder() .setTag(paneInfoTag) .setStateFamily(stateFamily) + .setValue( + Windmill.Value.newBuilder() + .setTimestamp(Long.MAX_VALUE) + .setData(ByteString.EMPTY)) + .build()), + Matchers.equalTo( + Windmill.TagValue.newBuilder() + .setTag(combinedMetadataTag) + .setStateFamily(stateFamily) .setValue( Windmill.Value.newBuilder() .setTimestamp(Long.MAX_VALUE) @@ -2097,6 +2107,7 @@ public void testMergeWindowsCaching() throws Exception { String timerTagPrefix = "/s" + window + "+0"; ByteString bufferTag = ByteString.copyFromUtf8(window + "+ubuf"); ByteString paneInfoTag = ByteString.copyFromUtf8(window + "+upaneInfo"); + ByteString combinedMetadataTag = ByteString.copyFromUtf8(window + "+ucombinedMetadata"); String watermarkDataHoldTag = window + "+uhold"; String watermarkExtraHoldTag = window + "+uextra"; String stateFamily = "MergeWindows"; @@ -2236,11 +2247,20 @@ public void testMergeWindowsCaching() throws Exception { assertThat( "" + actualOutput.getValueUpdatesList(), actualOutput.getValueUpdatesList(), - Matchers.contains( + Matchers.containsInAnyOrder( Matchers.equalTo( Windmill.TagValue.newBuilder() .setTag(paneInfoTag) .setStateFamily(stateFamily) + .setValue( + Windmill.Value.newBuilder() + .setTimestamp(Long.MAX_VALUE) + .setData(ByteString.EMPTY)) + .build()), + Matchers.equalTo( + Windmill.TagValue.newBuilder() + .setTag(combinedMetadataTag) + .setStateFamily(stateFamily) .setValue( Windmill.Value.newBuilder() .setTimestamp(Long.MAX_VALUE) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index ba2720f5e39b..70168024762e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -866,6 +866,10 @@ public static void setMetadataSupported() { metadataSupported = true; } + public static void setMetadataNotSupported() { + metadataSupported = false; + } + public static boolean isMetadataSupported() { return metadataSupported; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java index a2ff99905f6c..cba13ecb6bf2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java @@ -17,13 +17,23 @@ */ package org.apache.beam.sdk.transforms; +import org.apache.beam.sdk.coders.BooleanCoder; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.CausedByDrain; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.WindowedValues; -import org.junit.Ignore; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -36,10 +46,14 @@ public class MetadataPropagationTest { /** Tests for metadata propagation. */ @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - static class CausedByDrainSettingDoFn extends DoFn { + static class CausedByDrainSettingDoFn extends DoFn { @ProcessElement - public void process(OutputReceiver r) { - r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output(); + public void process(@Element Boolean isDrain, OutputReceiver r) { + if (isDrain) { + r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output(); + } else { + r.builder("value").setCausedByDrain(CausedByDrain.NORMAL).output(); + } } } @@ -52,12 +66,11 @@ public void process(ProcessContext pc, OutputReceiver r) { @Test @Category(NeedsRunner.class) - @Ignore public void testMetadataPropagationAcrossShuffleParameter() { WindowedValues.WindowedValueCoder.setMetadataSupported(); PCollection results = pipeline - .apply(Create.of(1)) + .apply(Create.of(true)) .apply(ParDo.of(new CausedByDrainSettingDoFn())) .apply(Redistribute.arbitrarily()) .apply(ParDo.of(new CausedByDrainExtractingDoFn())); @@ -68,11 +81,11 @@ public void testMetadataPropagationAcrossShuffleParameter() { } @Test - @Category(NeedsRunner.class) + @Category({ValidatesRunner.class, NeedsRunner.class}) public void testMetadataPropagationParameter() { PCollection results = pipeline - .apply(Create.of(1)) + .apply(Create.of(true)) .apply(ParDo.of(new CausedByDrainSettingDoFn())) .apply(ParDo.of(new CausedByDrainExtractingDoFn())); @@ -80,4 +93,70 @@ public void testMetadataPropagationParameter() { pipeline.run(); } + + static class CausedByDrainExtracingFromGBKDoFn + extends DoFn>, String> { + @ProcessElement + public void process(ProcessContext pc, OutputReceiver r) { + r.output(pc.causedByDrain().toString()); + } + } + + /** + * Tests metadata propagation across GroupByKey. Note: This test works only with DirectRunner and + * runners that support metadata propagation (e.g. via a flag to enable metadata encoding in + * coders). It fails on portable runners like Prism because they do not have implementation for + * metadata propagation, leading to coder mismatches. + */ + @Test + @Category(NeedsRunner.class) + public void testMetadataPropagationAcrossGBK() { + WindowedValues.WindowedValueCoder.setMetadataSupported(); + Instant baseTime = new Instant(0); + TestStream stream = + TestStream.create(BooleanCoder.of()) + .advanceWatermarkTo(baseTime) + .addElements(TimestampedValue.of(false, baseTime.plus(Duration.standardSeconds(10)))) + .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(1))) + .addElements( + TimestampedValue.of(false, baseTime.plus(Duration.standardSeconds(71))), + TimestampedValue.of(true, baseTime.plus(Duration.standardSeconds(72)))) + .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(2))) + .addElements( + TimestampedValue.of(false, baseTime.plus(Duration.standardSeconds(130))), + TimestampedValue.of(true, baseTime.plus(Duration.standardSeconds(131))), // drain + TimestampedValue.of(false, baseTime.plus(Duration.standardSeconds(132)))) + .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3))) + .addElements( + TimestampedValue.of(false, baseTime.plus(Duration.standardSeconds(181)))) // normal + .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(4))) + .advanceWatermarkToInfinity(); + + Duration windowDuration = Duration.standardMinutes(1); + IntervalWindow window1 = new IntervalWindow(baseTime, windowDuration); + IntervalWindow window2 = new IntervalWindow(window1.end(), windowDuration); + IntervalWindow window3 = new IntervalWindow(window2.end(), windowDuration); + IntervalWindow window4 = new IntervalWindow(window3.end(), windowDuration); + + PCollection results = + pipeline + .apply(stream) + .apply(ParDo.of(new CausedByDrainSettingDoFn())) + .apply(WithKeys.of("1")) + .apply(Window.into(FixedWindows.of(windowDuration))) + .apply(GroupByKey.create()) + .apply(ParDo.of(new CausedByDrainExtracingFromGBKDoFn())); + + PAssert.that(results).inWindow(window1).containsInAnyOrder("NORMAL"); + PAssert.that(results).inWindow(window2).containsInAnyOrder("CAUSED_BY_DRAIN"); + PAssert.that(results).inWindow(window3).containsInAnyOrder("CAUSED_BY_DRAIN"); + PAssert.that(results).inWindow(window4).containsInAnyOrder("NORMAL"); + + pipeline.run(); + } + + @After + public void tearDown() { + WindowedValues.WindowedValueCoder.setMetadataNotSupported(); + } } From f4a309efd881172a1e9c5f88020eaab6b95ff123 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 17 Apr 2026 16:04:44 +0000 Subject: [PATCH 04/10] refactor gathering metadata --- .../beam/runners/core/ReduceFnRunner.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 59ea8fe59fb1..96fe88ac43ff 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -665,15 +665,15 @@ private class WindowActivation { // garbage collect the window. We'll consider any timer at or after the // end-of-window time to be a signal to garbage collect. public final boolean isGarbageCollection; - public final CausedByDrain causedByDrain; + public final CombinedMetadata combinedMetadata; WindowActivation( ReduceFn.Context directContext, ReduceFn.Context renamedContext, - CausedByDrain causedByDrain) { + CombinedMetadata combinedMetadata) { this.directContext = directContext; this.renamedContext = renamedContext; - this.causedByDrain = causedByDrain; + this.combinedMetadata = combinedMetadata; W window = directContext.window(); // The output watermark is before the end of the window if it is either unknown @@ -758,7 +758,8 @@ public void onTimers(Iterable timers) throws Exception { ReduceFn.Context renamedContext = contextFactory.base(window, StateStyle.RENAMED); WindowActivation windowActivation = - new WindowActivation(directContext, renamedContext, timer.causedByDrain()); + new WindowActivation( + directContext, renamedContext, CombinedMetadata.create(timer.causedByDrain())); windowActivations.put(window, windowActivation); // Perform prefetching of state to determine if the trigger should fire. @@ -794,7 +795,7 @@ public void onTimers(Iterable timers) throws Exception { directContext.window(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime(), - windowActivation.causedByDrain); + windowActivation.combinedMetadata.causedByDrain()); boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen(); if (windowIsActiveAndOpen) { @@ -808,7 +809,7 @@ public void onTimers(Iterable timers) throws Exception { renamedContext, true /* isFinished */, windowActivation.isEndOfWindow, - CombinedMetadata.create(windowActivation.causedByDrain)); + windowActivation.combinedMetadata); checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold); } @@ -826,10 +827,7 @@ public void onTimers(Iterable timers) throws Exception { if (windowActivation.windowIsActiveAndOpen() && triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())) { - emit( - directContext, - renamedContext, - CombinedMetadata.create(windowActivation.causedByDrain)); + emit(directContext, renamedContext, windowActivation.combinedMetadata); } if (windowActivation.isEndOfWindow) { From 2e367ebb59a71e9b1949218f789a787734a25e8c Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 21 Apr 2026 10:38:31 +0000 Subject: [PATCH 05/10] Add missing dependency on model:fn-execution to runners:core-java --- runners/core-java/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle index 9f24ce39b974..403cf4f2bc5a 100644 --- a/runners/core-java/build.gradle +++ b/runners/core-java/build.gradle @@ -42,6 +42,7 @@ dependencies { implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:job-management", configuration: "shadow") + implementation project(path: ":model:fn-execution", configuration: "shadow") implementation library.java.vendored_guava_32_1_2_jre implementation library.java.joda_time implementation library.java.vendored_grpc_1_69_0 From 075ac782ca38e29de0c3bb77f7470ecd58c16eeb Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 24 Apr 2026 11:56:42 +0000 Subject: [PATCH 06/10] Use CombiningState --- .../core/CombinedMetadataCombiner.java | 18 ++++++++++- .../beam/runners/core/ReduceFnRunner.java | 30 +++++++++---------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java index 9bb280b62940..557549fa58c1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java @@ -17,10 +17,12 @@ */ package org.apache.beam.runners.core; +import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.values.CausedByDrain; /** Combiner for CombinedMetadata. */ -class CombinedMetadataCombiner implements MetadataCombiner { +class CombinedMetadataCombiner + extends CombineFn { private static final CombinedMetadataCombiner INSTANCE = new CombinedMetadataCombiner(); public static CombinedMetadataCombiner of() { @@ -40,6 +42,20 @@ public CombinedMetadata addInput(CombinedMetadata accumulator, CombinedMetadata causedByDrainCombiner.addInput(accumulator.causedByDrain(), input.causedByDrain())); } + @Override + public CombinedMetadata mergeAccumulators(Iterable accumulators) { + CombinedMetadata result = createAccumulator(); + for (CombinedMetadata accum : accumulators) { + result = addInput(result, accum); + } + return result; + } + + @Override + public CombinedMetadata extractOutput(CombinedMetadata accumulator) { + return accumulator; + } + /** Combiner for CausedByDrain metadata. */ static class CausedByDrainCombiner implements MetadataCombiner { private static final CausedByDrainCombiner INSTANCE = new CausedByDrainCombiner(); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 96fe88ac43ff..cf08cf02f4cc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -38,8 +38,8 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; @@ -108,9 +108,11 @@ public class ReduceFnRunner { *
  • It uses discarding or accumulation mode according to the {@link WindowingStrategy}. * */ - static final StateTag> METADATA_TAG = - StateTags.makeSystemTagInternal( - StateTags.value("combinedMetadata", CombinedMetadata.Coder.of())); + static final StateTag> + METADATA_TAG = + StateTags.makeSystemTagInternal( + StateTags.combiningValue( + "combinedMetadata", CombinedMetadata.Coder.of(), CombinedMetadataCombiner.of())); private final WindowingStrategy windowingStrategy; @@ -596,16 +598,6 @@ private void processElement(Map windowToMergeResult, WindowedValue StateStyle.DIRECT, value.causedByDrain()); - ValueState metadataState = directContext.state().access(METADATA_TAG); - CombinedMetadata currentMetadata = metadataState.read(); - if (currentMetadata == null) { - currentMetadata = CombinedMetadata.createDefault(); - } - CombinedMetadata inputMetadata = CombinedMetadata.create(value.causedByDrain()); - CombinedMetadata newMetadata = - CombinedMetadataCombiner.of().addInput(currentMetadata, inputMetadata); - metadataState.write(newMetadata); - if (triggerRunner.isClosed(directContext.state())) { // This window has already been closed. droppedDueToClosedWindow.inc(); @@ -620,6 +612,11 @@ private void processElement(Map windowToMergeResult, WindowedValue continue; } + directContext + .state() + .access(METADATA_TAG) + .add(CombinedMetadata.create(value.causedByDrain())); + activeWindows.ensureWindowIsActive(window); ReduceFn.ProcessValueContext renamedContext = contextFactory.forValue( @@ -1031,7 +1028,8 @@ private void prefetchOnTrigger( boolean isEndOfWindow, CombinedMetadata metadata) throws Exception { - ValueState metadataState = directContext.state().access(METADATA_TAG); + CombiningState metadataState = + directContext.state().access(METADATA_TAG); CombinedMetadata aggregatedMetadata = metadataState.read(); if (aggregatedMetadata == null) { aggregatedMetadata = CombinedMetadata.createDefault(); @@ -1041,6 +1039,8 @@ private void prefetchOnTrigger( final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain(); if (isFinished) { metadataState.clear(); + } else { + metadataState.add(metadata); } // Extract the window hold, and as a side effect clear it. final WatermarkHold.OldAndNewHolds pair = From d37f97253c76c7f4217df95debc300d00a5cfe6b Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 24 Apr 2026 14:11:31 +0000 Subject: [PATCH 07/10] fix matcher --- .../dataflow/worker/StreamingDataflowWorkerTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 196763bb1f74..30905ccc050a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -2140,12 +2140,19 @@ public void testMergeWindowsCaching() throws Exception { assertThat( actualOutput.getBagUpdatesList(), - Matchers.contains( + Matchers.containsInAnyOrder( Matchers.equalTo( Windmill.TagBag.newBuilder() .setTag(bufferTag) .setStateFamily(stateFamily) .addValues(bufferData) + .build()), + Matchers.equalTo( + Windmill.TagBag.newBuilder() + .setTag(combinedMetadataTag) + .setStateFamily(stateFamily) + .setDeleteAll(true) + .addValues(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x08, 0x01})) .build()))); verifyHolds(actualOutput, buildHold(watermarkDataHoldTag, 0, false)); From 6e2fd02a53861af962a7be166ea423d65cb2459b Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 24 Apr 2026 14:20:09 +0000 Subject: [PATCH 08/10] fix asserts due to new state added --- .../wrappers/streaming/WindowDoFnOperatorTest.java | 14 ++++++++++++-- .../worker/StreamingDataflowWorkerTest.java | 8 +++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java index 3bee828f23dd..3145c14442d4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java @@ -156,7 +156,12 @@ public void testTimerCleanupOfPendingTimerList() throws Exception { // Note that the following is 1 because the state is key-partitioned assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(1)); - assertThat(testHarness.numKeyedStateEntries(), is(6)); + // Expected 8 state entries: + // - 2 entries for user buffer state ("buf") - one per key + // - 2 entries for watermark hold state ("hold") - one per key + // - 2 entries for non-empty panes count state ("count") - one per key + // - 2 entries for "combinedMetadata" state (added in this PR) - one per key + assertThat(testHarness.numKeyedStateEntries(), is(8)); // close bundle testHarness.setProcessingTime( testHarness.getProcessingTime() @@ -169,7 +174,12 @@ public void testTimerCleanupOfPendingTimerList() throws Exception { // Note that the following is zero because we only the first key is active assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0)); - assertThat(testHarness.numKeyedStateEntries(), is(3)); + // Expected 4 state entries remaining for the second key (which is still active): + // - 1 entry for user buffer state ("buf") + // - 1 entry for watermark hold state ("hold") + // - 1 entry for non-empty panes count state ("count") + // - 1 entry for "combinedMetadata" state + assertThat(testHarness.numKeyedStateEntries(), is(4)); // close bundle testHarness.setProcessingTime( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 30905ccc050a..333384aebe0b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -1840,12 +1840,18 @@ public void testMergeWindows() throws Exception { assertThat( actualOutput.getBagUpdatesList(), - Matchers.contains( + Matchers.containsInAnyOrder( Matchers.equalTo( Windmill.TagBag.newBuilder() .setTag(bufferTag) .setStateFamily(stateFamily) .addValues(bufferData) + .build()), + Matchers.equalTo( + Windmill.TagBag.newBuilder() + .setTag(combinedMetadataTag) + .setStateFamily(stateFamily) + .addValues(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x08, 0x01})) .build()))); verifyHolds(actualOutput, buildHold(watermarkDataHoldTag, 0, false)); From 805282450927be0ea3400c253804bb7f744f9988 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 24 Apr 2026 16:11:48 +0000 Subject: [PATCH 09/10] fix tests related to new state added, fix spotbugs --- .../core/CombinedMetadataCombiner.java | 6 +-- .../worker/StreamingDataflowWorkerTest.java | 38 ++++++++----------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java index 557549fa58c1..a2f3f26520ef 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java @@ -29,17 +29,15 @@ public static CombinedMetadataCombiner of() { return INSTANCE; } - private final CausedByDrainCombiner causedByDrainCombiner = CausedByDrainCombiner.of(); - @Override public CombinedMetadata createAccumulator() { - return CombinedMetadata.create(causedByDrainCombiner.createAccumulator()); + return CombinedMetadata.create(CausedByDrainCombiner.of().createAccumulator()); } @Override public CombinedMetadata addInput(CombinedMetadata accumulator, CombinedMetadata input) { return CombinedMetadata.create( - causedByDrainCombiner.addInput(accumulator.causedByDrain(), input.causedByDrain())); + CausedByDrainCombiner.of().addInput(accumulator.causedByDrain(), input.causedByDrain())); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 333384aebe0b..c4600175434f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -1953,20 +1953,11 @@ public void testMergeWindows() throws Exception { assertThat( "" + actualOutput.getValueUpdatesList(), actualOutput.getValueUpdatesList(), - Matchers.containsInAnyOrder( + Matchers.contains( Matchers.equalTo( Windmill.TagValue.newBuilder() .setTag(paneInfoTag) .setStateFamily(stateFamily) - .setValue( - Windmill.Value.newBuilder() - .setTimestamp(Long.MAX_VALUE) - .setData(ByteString.EMPTY)) - .build()), - Matchers.equalTo( - Windmill.TagValue.newBuilder() - .setTag(combinedMetadataTag) - .setStateFamily(stateFamily) .setValue( Windmill.Value.newBuilder() .setTimestamp(Long.MAX_VALUE) @@ -1976,12 +1967,18 @@ public void testMergeWindows() throws Exception { assertThat( "" + actualOutput.getBagUpdatesList(), actualOutput.getBagUpdatesList(), - Matchers.contains( + Matchers.containsInAnyOrder( Matchers.equalTo( Windmill.TagBag.newBuilder() .setTag(bufferTag) .setStateFamily(stateFamily) .setDeleteAll(true) + .build()), + Matchers.equalTo( + Windmill.TagBag.newBuilder() + .setTag(combinedMetadataTag) + .setStateFamily(stateFamily) + .setDeleteAll(true) .build()))); verifyHolds( @@ -2260,20 +2257,11 @@ public void testMergeWindowsCaching() throws Exception { assertThat( "" + actualOutput.getValueUpdatesList(), actualOutput.getValueUpdatesList(), - Matchers.containsInAnyOrder( + Matchers.contains( Matchers.equalTo( Windmill.TagValue.newBuilder() .setTag(paneInfoTag) .setStateFamily(stateFamily) - .setValue( - Windmill.Value.newBuilder() - .setTimestamp(Long.MAX_VALUE) - .setData(ByteString.EMPTY)) - .build()), - Matchers.equalTo( - Windmill.TagValue.newBuilder() - .setTag(combinedMetadataTag) - .setStateFamily(stateFamily) .setValue( Windmill.Value.newBuilder() .setTimestamp(Long.MAX_VALUE) @@ -2283,12 +2271,18 @@ public void testMergeWindowsCaching() throws Exception { assertThat( "" + actualOutput.getBagUpdatesList(), actualOutput.getBagUpdatesList(), - Matchers.contains( + Matchers.containsInAnyOrder( Matchers.equalTo( Windmill.TagBag.newBuilder() .setTag(bufferTag) .setStateFamily(stateFamily) .setDeleteAll(true) + .build()), + Matchers.equalTo( + Windmill.TagBag.newBuilder() + .setTag(combinedMetadataTag) + .setStateFamily(stateFamily) + .setDeleteAll(true) .build()))); verifyHolds( From fe489f8ab8e40b77f58d77c5b7691818d64b746e Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 24 Apr 2026 21:44:57 +0000 Subject: [PATCH 10/10] fix merge windows tests --- .../dataflow/worker/StreamingDataflowWorkerTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index c4600175434f..779ed50c2ffd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -1922,6 +1922,11 @@ public void testMergeWindows() throws Exception { .getValueBuilder() .setTimestamp(0) .setData(ByteString.EMPTY); + dataBuilder + .addBagsBuilder() + .setTag(combinedMetadataTag) + .setStateFamily(stateFamily) + .addValues(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x08, 0x01})); server.whenGetDataCalled().thenReturn(dataResponse.build()); expectedBytesRead += dataBuilder.build().getSerializedSize(); @@ -2308,7 +2313,7 @@ public void testMergeWindowsCaching() throws Exception { CacheStats stats = worker.getStateCacheStats(); LOG.info("cache stats {}", stats); assertEquals(1, stats.hitCount()); - assertEquals(4, stats.missCount()); + assertEquals(5, stats.missCount()); worker.stop(); }