Fix/3849: defer state publish in RunnableLoadJob until _on_completed marke…#3854
Fix/3849: defer state publish in RunnableLoadJob until _on_completed marke…#3854ugbotueferhire wants to merge 4 commits intodlt-hub:develfrom
Conversation
|
Hi @Travior , could you review these changes when you have some time? Happy to make any adjustments needed. Thanks for your time! |
There was a problem hiding this comment.
Hey, @ugbotueferhire thanks for opening a PR for this.
The main changes look very good, the comments are a bit verbose but not the end of the world.
There are two main concerns that I have:
- the changes look to correctly solve the race condition, but we should probably introduce a regression test that reproduces this ordering deterministically. I would look at
tests/load/test_jobs.py::test_on_completed_fires_before_semaphore_releaseandtests/common/configuration/test_container.py::test_container_thread_affinityon how to do tests around load_jobs and how to usethreading.Barrier - right now if
_on_completedraises (for example when the.dltfolder becomes unavailable) the job stays in a non-terminal state. We need to set a terminal state without swallowing the exception, otherwise execution will halt indefinitely. There is no test that checks this right now, which would be a great addition too
If there is anything that is unclear or you need further help feel free to ping me, thanks again for your work!
|
Hey @Travior, thanks for the feedback 🤙🏾 I've addressed both concerns:
All 27 tests in |
|
Thanks for coming back to this. I'll have a look at the tests tomorrow and will come back to you |
|
Hey @Travior, reminder incase you forgot, Please Whenever you have a moment, would love to get this over the line. Thanks! |
|
This looks quite good now. |
|
Thanks for flagging this. just updated _on_completed so we only store the callback error if the job hadn’t already failed, which means we now keep the original destination exception in the double-failure case. I also added a regression test for that path. |
|
Thanks a lot, I'll let CI run and check in on this tomorrow morning! |
[Fix] – Eliminate race condition between worker state publish and package finalization in
RunnableLoadJobDescription
Context:
A race condition existed in
RunnableLoadJob.run_managed()whereself._statewas published to a terminal value ("completed"or"failed") before the_on_completedcallback had finished persisting the pending-transition marker to disk. This created a window where the main loader thread could observe the terminal state viajob.state(), proceed to finalize the load package (renaming its directory), and collide with the worker thread still writing the transition marker inside the now-renamed directory. The result was intermittent filesystem errors or silently lost transition markers, potentially compromising data integrity on resume. See #3849.Approach:
The fix introduces a local
terminal_statevariable inrun_managed()that tracks the intended terminal state throughout execution and thefinallyblock. The key behavioral change is:self._stateis only assigned after_on_completed()has finished persisting the marker and_finished_athas been set. This ensures the main thread cannot observe a terminal state until all side effects are complete._done_event.release()call is now unconditionally placed afterself._stateis published, but still gated on non-retry terminal states. This preserves the existing wake-up semantics while guaranteeing correct ordering.Modified file:
dlt/common/destination/client.py—RunnableLoadJob.run_managed()methodExecution ordering (before → after):
Impact
Tests
Unit Tests (
tests/load/test_jobs.py) — 24/24 PASSED ✅test_on_completed_called_on_success_on_completedfires with("completed", None)on successtest_on_completed_called_on_failure_on_completedfires with("failed", traceback)on terminal exceptiontest_on_completed_not_called_on_retry_on_completedis never called on transient exceptionstest_on_completed_exception_propagates_on_completedare not swallowedtest_on_completed_fires_before_semaphore_release_on_completedexecutes before_finished_atis set and before the semaphore is released — directly validates the ordering guarantee this fix providestest_runnable_job_resultstest_set_final_state_*set_final_state()correctly forces terminal statesIntegration Tests (
tests/load/test_dummy_client.py) — 51/52 PASSED ✅All functional loader-loop tests pass, including:
test_completed_looptest_resume_with_pending_completed_transitiontest_resume_with_pending_failed_transitiontest_load_single_threadtest_load_multiple_packagestest_spool_job_retry_*VISUALS
Note
test_big_load_packages(1/52 failure) is a pre-existing performance benchmark that expects 1000 empty jobs to complete in under 15 seconds. It timed out at 37s due to local machine load — this is environment-specific and unrelated to this change.