Skip to content

Commit 9292e1d

Browse files
authored
[Dataflow Streaming] Prevent commit threads from sharing commit streams (#37847)
Since the commit threads were sharing the WindmillStreamPool, the different commit streams can end up sharing commit streams. This change gives each commit thread its own WindmillStreamPool and avoids the commit stream sharing problem.
1 parent 85c1b88 commit 9292e1d

1 file changed

Lines changed: 6 additions & 3 deletions

File tree

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -464,10 +464,13 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
464464
@SuppressWarnings("methodref.receiver.bound")
465465
WorkCommitter workCommitter =
466466
StreamingEngineWorkCommitter.builder()
467+
// Use a separate stream pool for each committer. This ensures the commit
468+
// threads are fully isolated.
467469
.setCommitWorkStreamFactory(
468-
WindmillStreamPool.create(
469-
numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream)
470-
::getCloseableStream)
470+
() ->
471+
WindmillStreamPool.create(
472+
1, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream)
473+
.getCloseableStream())
471474
.setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
472475
.setNumCommitSenders(numCommitThreads)
473476
.setOnCommitComplete(this::onCompleteCommit)

0 commit comments

Comments
 (0)