Skip to content

Commit 5a1f6ed

Browse files
authored
Keep DualStatsManager as a try/except import as it is soon going to be removed (#65676)
1 parent 052d27f commit 5a1f6ed

3 files changed

Lines changed: 18 additions & 15 deletions

File tree

providers/edge3/src/airflow/providers/edge3/models/edge_worker.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@
2828
from airflow.providers.common.compat.sdk import AirflowException, timezone
2929
from airflow.providers.common.compat.sqlalchemy.orm import mapped_column
3030
from airflow.providers.edge3.models.edge_base import Base
31-
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS
3231
from airflow.utils.log.logging_mixin import LoggingMixin
3332
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
3433
from airflow.utils.session import NEW_SESSION, provide_session
3534
from airflow.utils.sqlalchemy import UtcDateTime
3635

36+
try:
37+
from airflow.sdk.observability.stats import DualStatsManager
38+
except ImportError:
39+
DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2.1 compat
40+
3741
if TYPE_CHECKING:
3842
from collections.abc import Sequence
3943

@@ -180,9 +184,7 @@ def set_metrics(
180184
"free_concurrency",
181185
}
182186

183-
if AIRFLOW_V_3_2_PLUS:
184-
from airflow.sdk.observability.stats import DualStatsManager
185-
187+
if DualStatsManager is not None:
186188
try:
187189
DualStatsManager.gauge(
188190
"edge_worker.status",

providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
from fastapi import Body, Depends, status
2323
from sqlalchemy import select, update
2424

25+
try:
26+
from airflow.sdk.observability.stats import DualStatsManager
27+
except ImportError:
28+
DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat
2529
from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001
2630
from airflow.api_fastapi.common.router import AirflowRouter
2731
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
2832
from airflow.executors.workloads import ExecuteTask
2933
from airflow.providers.common.compat.sdk import timezone
3034
from airflow.providers.edge3.models.edge_job import EdgeJobModel
31-
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS
3235
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
3336
from airflow.providers.edge3.worker_api.datamodels import (
3437
EdgeJobFetched,
@@ -88,9 +91,7 @@ def fetch(
8891
session.commit()
8992
# Edge worker does not backport emitted Airflow metrics, so export some metrics
9093
tags = {"dag_id": job.dag_id, "task_id": job.task_id, "queue": job.queue}
91-
if AIRFLOW_V_3_2_PLUS:
92-
from airflow.sdk.observability.stats import DualStatsManager
93-
94+
if DualStatsManager is not None:
9495
DualStatsManager.incr("edge_worker.ti.start", tags=tags)
9596
else:
9697
from airflow.providers.common.compat.sdk import Stats
@@ -149,9 +150,7 @@ def state(
149150
"queue": job.queue,
150151
"state": str(state),
151152
}
152-
if AIRFLOW_V_3_2_PLUS:
153-
from airflow.sdk.observability.stats import DualStatsManager
154-
153+
if DualStatsManager is not None:
155154
DualStatsManager.incr(
156155
"edge_worker.ti.finish",
157156
tags=tags,

providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
2929
from airflow.providers.common.compat.sdk import timezone
3030
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics
31-
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS
3231
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
3332
from airflow.providers.edge3.worker_api.datamodels import (
3433
WorkerQueueUpdateBody,
@@ -37,6 +36,11 @@
3736
WorkerStateBody,
3837
)
3938

39+
try:
40+
from airflow.sdk.observability.stats import DualStatsManager
41+
except ImportError:
42+
DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat
43+
4044
worker_router = AirflowRouter(
4145
tags=["Worker"],
4246
prefix="/worker",
@@ -217,9 +221,7 @@ def set_state(
217221
worker.sysinfo = body.sysinfo
218222
worker.last_update = timezone.utcnow()
219223
session.commit()
220-
if AIRFLOW_V_3_2_PLUS:
221-
from airflow.sdk.observability.stats import DualStatsManager
222-
224+
if DualStatsManager is not None:
223225
DualStatsManager.incr(
224226
"edge_worker.heartbeat_count",
225227
1,

0 commit comments

Comments
 (0)