-
Notifications
You must be signed in to change notification settings - Fork 0
AP-683 Add Retries with Backoffs to DAG #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jason-raitz
wants to merge
1
commit into
main
Choose a base branch
from
AP-683_retry-fetching-with-backoff
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| from pathlib import Path | ||
| from shutil import copyfile | ||
| from typing import List | ||
| import requests | ||
| from airflow.exceptions import AirflowFailException | ||
| from airflow.providers.smtp.operators.smtp import EmailOperator | ||
| from airflow.sdk import dag, task, task_group, Param, get_current_context | ||
|
|
@@ -27,6 +28,7 @@ | |
| from mokelumne.plugins.static_files.helpers import static_files_run_dir | ||
| from mokelumne.util.storage import run_dir | ||
| from mokelumne.util.tind_csv_writer import TindCsvWriter, is_single_image_record | ||
| from tind_client.errors import TooManyRequestsError | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -110,6 +112,12 @@ | |
| description_md="The maximum height for the fetched image. Must be less than 8000px." | ||
| ), | ||
| }, | ||
| default_args={ | ||
| "retries": 3, | ||
| "retry_delay": 3, | ||
| "retry_exponential_backoff": True, | ||
| "max_retry_delay": 600, # 10 minutes | ||
| }, | ||
| tags=["batch-image", "csv", "generate-descriptions", "llm", "process",], | ||
| ) | ||
| def gen_llm_image_descriptions(): | ||
|
|
@@ -217,6 +225,13 @@ def fetch_image_to_record_directory(run_id: str, fetcher: ImageFetcher, | |
| ) | ||
|
|
||
| path = str(fetcher.fetch_one_image_for_record(tind_id, run_id)) | ||
| except TooManyRequestsError as ex: | ||
| ti = context["ti"] | ||
| if ti.try_number <= ti.max_tries: | ||
| logger.warning("TIND API returned 429; marking record for retry") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does Airflow not log this automatically? |
||
| raise | ||
| logger.warning("TIND API returned 429 on final attempt; marking record as failed") | ||
| return RunStatus(tind_id=tind_id, status="failed", description="TIND API too busy, try again later", path="") | ||
| except Exception as ex: # pylint: disable=broad-exception-caught | ||
| logger.warning("Fetcher encountered exception", exc_info=ex) | ||
| return RunStatus(tind_id=tind_id, status="failed", description=str(ex), path="") | ||
|
|
@@ -383,7 +398,7 @@ def collate_csvs(output_dir_str: str) -> str: | |
| return timestamp | ||
|
|
||
| @task | ||
| def generate_summary(output_dir_str: str, timestamp: str) -> str: | ||
| def generate_summary(output_dir_str: str, timestamp: str) -> str: # pylint: disable=too-many-locals | ||
| """Generate a summary of the files in the collated path.""" | ||
| def count_success_fail_of_csv(csv_file: Path, success: str) -> tuple[int, int, int]: | ||
| """Count the success and failure rows for the given CSV.""" | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| """Tests for gen_llm_image_descriptions DAG.""" | ||
| # pylint: disable=redefined-outer-name | ||
|
|
||
| from unittest.mock import patch, MagicMock | ||
| import pytest | ||
|
|
||
| from airflow.dag_processing.dagbag import DagBag | ||
| from pathlib import Path | ||
| from tind_client.errors import TooManyRequestsError | ||
|
|
||
| dag_dir = Path(__file__).resolve().parent.parent.parent / "mokelumne/dags" | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def fetch_fn(): | ||
| """Fixture to get the fetch function from the DAG.""" | ||
| dagbag = DagBag(dag_folder=dag_dir.resolve(), include_examples=False) | ||
| dag = dagbag.get_dag("gen_llm_image_descriptions") | ||
| return dag.get_task("fetch_images.fetch_image_to_record_directory").python_callable | ||
|
|
||
|
|
||
| def _mock_context(try_number: int, max_tries: int) -> dict: | ||
| mock_ti = MagicMock() | ||
| mock_ti.try_number = try_number | ||
| mock_ti.max_tries = max_tries | ||
| return {"params": {"max_width": 8000, "max_height": 8000}, "run_id": "test", "ti": mock_ti} | ||
|
|
||
|
|
||
| def test_429_causes_task_retry(fetch_fn): | ||
| """If retries remain, a TindClient's TooManyRequestsError (429) triggers a retry.""" | ||
| mock_fetcher = MagicMock() | ||
| mock_fetcher.get_metadata_for_record.side_effect = TooManyRequestsError() | ||
|
|
||
| with patch( | ||
| f"{fetch_fn.__module__}.get_current_context", | ||
| return_value=_mock_context(try_number=1, max_tries=3), | ||
| ): | ||
| with pytest.raises(TooManyRequestsError): | ||
| fetch_fn("test_run", mock_fetcher, "12345") | ||
|
|
||
|
|
||
| def test_429_on_final_attempt_returns_failed_status(fetch_fn): | ||
| """If last retry gets a TooManyRequestsError, the task returns a failed status.""" | ||
| mock_fetcher = MagicMock() | ||
| mock_fetcher.get_metadata_for_record.side_effect = TooManyRequestsError() | ||
|
|
||
| with patch( | ||
| f"{fetch_fn.__module__}.get_current_context", | ||
| return_value=_mock_context(try_number=4, max_tries=3), | ||
| ): | ||
| result = fetch_fn("test_run", mock_fetcher, "12345") | ||
| assert result.tind_id == "12345" | ||
| assert result.status == "failed" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This applies retry logic to every task, including ones whose likely failure modes are not amenable to retries (e.g. creating directories or writing summaries to local disks). Why this versus only applying the retry to logic to TIND- or Bedrock-related tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a general pattern that we can use to set a default number of replies for a DAG's tasks, and then override as needed. Should I add
"retries": 0to any specific tasks?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is more of a design question: do we want retry-on-failure to be the normal and expected thing, and fail-fast to be abnormal?
Personally, I lean towards fail-fast being better. One potential situation Dan's comment made me think of is: if we have a disk full situation, and we fail to create the run directory, then we are going to do two more retries… and write more Airflow logs to the same disk.
Retry-on-failure should then be an "opt-in" behaviour, on tasks that we believe could fail in a situation where a retry would make a difference. Those tasks would be TIND and Bedrock, since both of those are API servers that could be temporarily busy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes @awilfox that's precisely the scenario I was thinking. I think a blanket-retry policy is just likely to cover up or delay addressing issues that can't be handled by retrying. I also think it's conceptually muddled, a bit like overly broad typehinting or catch statements: is this function expected to need to be retried? Is there a blanket-retry policy? Did the previous developers just throw this in without thinking about it? Etc.