AP-683 Add Retries with Backoffs to DAG#65
Conversation
There was a problem hiding this comment.
Pull request overview
Adds Airflow-level retry/backoff configuration to the gen_llm_image_descriptions DAG and ensures that transient TIND failures (HTTP 429 and 5xx) propagate as exceptions so Airflow will retry the task, while other HTTP errors continue to be recorded as per-record failures. Tests are added at the unit level (image fetcher) and at the DAG-task level.
Changes:
- Add
default_args(3 retries, exponential backoff up to 10 minutes) to the DAG decorator and re-raiserequests.HTTPErrorfor status 429 and ≥500 insidefetch_image_to_record_directory. - Add a unit test verifying that a 429 from the mocked TIND hook propagates out of the
ImageFetcher. - Add a DAG-level test that mocks the embedded partial/expand task callable and asserts a 429 raises through, plus relax two pylint rules to accommodate the DAG file.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| mokelumne/dags/gen_llm_image_descriptions.py | Adds retry default_args and selective re-raise of retryable HTTP errors (429/5xx); adds a too-many-locals disable on generate_summary. |
| test/unit/test_image_fetcher.py | Adds MockTindHookWith429 and a unit test asserting HTTPError propagation from the fetcher. |
| test/tests/test_gen_llm_image_descriptions.py | New DAG-level test asserting that 429 from TIND raises through the fetch task callable. |
| pyproject.toml | Disables additional pylint rules (pointless-statement, too-many-statements) project-wide. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| from airflow.dag_processing.dagbag import DagBag | ||
| from pathlib import Path | ||
|
|
||
| dag_dir = Path(__file__).resolve().parent.parent / "mokelumne/dags" |
There was a problem hiding this comment.
I'm pretty sure this is false since the tests passed in CI and locally when I ran it.
There was a problem hiding this comment.
This is weird!
awilcox [s002 Mon 18 12:40] mokelumne: cat -> test/tests/test.py
from pathlib import Path
print(Path(__file__))
print(Path(__file__).resolve())
print(Path(__file__).resolve().parent.parent)
print(Path(__file__).resolve().parent.parent / "mokelumne/dags")
awilcox [s002 Mon 18 12:40] mokelumne: python3 test/tests/test.py
/Users/awilcox/Code/UCB/mokelumne/test/tests/test.py
/Users/awilcox/Code/UCB/mokelumne/test/tests/test.py
/Users/awilcox/Code/UCB/mokelumne/test
/Users/awilcox/Code/UCB/mokelumne/test/mokelumne/dags
I can confirm that you're right and it passed in CI. I can't explain that, unless perhaps the DagBag isn't even paying attention to the dag_folder parameter and is going based on airflow.cfg. What happens if you remove dag_folder?
anarchivist
left a comment
There was a problem hiding this comment.
r+; just want clarification on the linter config chagnes.
88d33a9 to
eec95e4
Compare
awilfox
left a comment
There was a problem hiding this comment.
This is a good design but we need to make a small change to the TIND client for it to work properly.
| from airflow.dag_processing.dagbag import DagBag | ||
| from pathlib import Path | ||
|
|
||
| dag_dir = Path(__file__).resolve().parent.parent / "mokelumne/dags" |
There was a problem hiding this comment.
This is weird!
awilcox [s002 Mon 18 12:40] mokelumne: cat -> test/tests/test.py
from pathlib import Path
print(Path(__file__))
print(Path(__file__).resolve())
print(Path(__file__).resolve().parent.parent)
print(Path(__file__).resolve().parent.parent / "mokelumne/dags")
awilcox [s002 Mon 18 12:40] mokelumne: python3 test/tests/test.py
/Users/awilcox/Code/UCB/mokelumne/test/tests/test.py
/Users/awilcox/Code/UCB/mokelumne/test/tests/test.py
/Users/awilcox/Code/UCB/mokelumne/test
/Users/awilcox/Code/UCB/mokelumne/test/mokelumne/dags
I can confirm that you're right and it passed in CI. I can't explain that, unless perhaps the DagBag isn't even paying attention to the dag_folder parameter and is going based on airflow.cfg. What happens if you remove dag_folder?
673cb07 to
ea512ef
Compare
awilfox
left a comment
There was a problem hiding this comment.
Good, but we have a custom exception class for HTTP 429, so it needs to be caught instead of requests.HTTPError (it isn't a subclass of that).
|
I've reworked the error catching and the testing to handle retries within expectations and the edge case when we've exceeded the expected retries. I've also included a pylint directive to ignore 'redefined-outer-name' in the test. I believe pylint doesn't agree with the way that pytest uses a module fixture that is then inserted into a test. Note: I found a neat way to mock the context for the test module. This may help us in other DAG tests. |
awilfox
left a comment
There was a problem hiding this comment.
r+wc. Looking pretty good, these are minor.
| ), | ||
| }, | ||
| default_args={ | ||
| "retries": 3, |
There was a problem hiding this comment.
This applies retry logic to every task, including ones whose likely failure modes are not amenable to retries (e.g. creating directories or writing summaries to local disks). Why this versus only applying the retry to logic to TIND- or Bedrock-related tasks?
There was a problem hiding this comment.
It's a general pattern that we can use to set a default number of replies for a DAG's tasks, and then override as needed. Should I add "retries": 0 to any specific tasks?
There was a problem hiding this comment.
I suppose this is more of a design question: do we want retry-on-failure to be the normal and expected thing, and fail-fast to be abnormal?
Personally, I lean towards fail-fast being better. One potential situation Dan's comment made me think of is: if we have a disk full situation, and we fail to create the run directory, then we are going to do two more retries… and write more Airflow logs to the same disk.
Retry-on-failure should then be an "opt-in" behaviour, on tasks that we believe could fail in a situation where a retry would make a difference. Those tasks would be TIND and Bedrock, since both of those are API servers that could be temporarily busy.
There was a problem hiding this comment.
Yes @awilfox that's precisely the scenario I was thinking. I think a blanket-retry policy is just likely to cover up or delay addressing issues that can't be handled by retrying. I also think it's conceptually muddled, a bit like overly broad typehinting or catch statements: is this function expected to need to be retried? Is there a blanket-retry policy? Did the previous developers just throw this in without thinking about it? Etc.
9273b3a to
9efe0b9
Compare
- added default retry args to DAG
- add unit tests
- added Dag-Task_Group-Task test to ensure task retries a
TooManyRequestsError from the python-tind-client
- some extra efforts are made to properly mock a task that is
embedded in an Airflow partial/expand operation
- bumps python-tind-client to v0.2.4
Co-authored-by: Anna Wilcox <AWilcox@Wilcox-Tech.com>
9efe0b9 to
288a1e7
Compare
awilfox
left a comment
There was a problem hiding this comment.
Mostly looks good, but I do agree with Dan's assessment that we probably don't want retry to be default for all tasks in this DAG.
| ), | ||
| }, | ||
| default_args={ | ||
| "retries": 3, |
There was a problem hiding this comment.
I suppose this is more of a design question: do we want retry-on-failure to be the normal and expected thing, and fail-fast to be abnormal?
Personally, I lean towards fail-fast being better. One potential situation Dan's comment made me think of is: if we have a disk full situation, and we fail to create the run directory, then we are going to do two more retries… and write more Airflow logs to the same disk.
Retry-on-failure should then be an "opt-in" behaviour, on tasks that we believe could fail in a situation where a retry would make a difference. Those tasks would be TIND and Bedrock, since both of those are API servers that could be temporarily busy.
| except TooManyRequestsError as ex: | ||
| ti = context["ti"] | ||
| if ti.try_number <= ti.max_tries: | ||
| logger.warning("TIND API returned 429; marking record for retry") |
There was a problem hiding this comment.
Does Airflow not log this automatically?
Purpose
During a DAG run, calls to TIND may result in HTTP status 429
too many requestsresponses. We should ensure that these responses are properly raised so that any DAG task that called them can properly retry. Also, we should put in some simple retry configuration for overall DAG operations.Changes