diff --git a/.gitignore b/.gitignore index 86958c842..6c1a554c7 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,12 @@ tests/test_data/rapl/* credentials* .codecarbon.config* scripts/agent-vm.personal.config.sh + +# Added by ggshield +.cache_ggshield + +# Added by ggshield +.cache_ggshield + +# Added by ggshield +.cache_ggshield diff --git a/codecarbon/core/api_client.py b/codecarbon/core/api_client.py index 34067c71c..6e1af2146 100644 --- a/codecarbon/core/api_client.py +++ b/codecarbon/core/api_client.py @@ -209,15 +209,17 @@ def add_emission(self, carbon_emission: dict): "ApiClient.add_emission still no run_id, aborting for this time !" ) return False - if carbon_emission["duration"] < 1: + duration = float(carbon_emission["duration"]) + if duration <= 0: logger.warning( - "ApiClient : emissions not sent because of a duration smaller than 1." + "ApiClient : emissions not sent because duration is zero or negative." ) return False + duration_for_api = max(1, int(round(duration))) emission = EmissionCreate( timestamp=get_datetime_with_timezone(), run_id=self.run_id, - duration=int(carbon_emission["duration"]), + duration=duration_for_api, emissions_sum=carbon_emission["emissions"], emissions_rate=carbon_emission["emissions_rate"], cpu_power=carbon_emission["cpu_power"], diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index ad712c9f0..7abc297f2 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -7,6 +7,7 @@ import os import platform import re +import threading import time import uuid from abc import ABC, abstractmethod @@ -60,6 +61,21 @@ _sentinel = object() +@dataclasses.dataclass(frozen=True) +class HttpRequestBaseline: + """Per-request totals snapshot for FastAPI middleware (lifespan tracker).""" + + task_name: str + started_at: float + duration_at_start: float + emissions: float + cpu_energy: float + gpu_energy: float + ram_energy: float + energy_consumed: float + water_consumed: float + + class BaseEmissionsTracker(ABC): """ Primary abstraction with Emissions Tracking functionality. @@ -425,6 +441,7 @@ def __init__( self._tasks: Dict[str, Task] = {} self._active_task: Optional[str] = None self._active_task_emissions_at_start: Optional[EmissionsData] = None + self._http_measure_lock = threading.Lock() # Tracking mode detection self._hardware = [] resource_tracker = ResourceTracker(self) @@ -710,8 +727,112 @@ def stop_task(self, task_name: str = None) -> EmissionsData: self._active_task = None self._active_task_emissions_at_start = None # Clear task-specific start data + if self._scheduler is not None and self._scheduler._stopped: + if self._start_time is not None: + self._scheduler.start() + return task_emission_data + def _resolve_http_task_name(self, task_name: str) -> str: + """Return a unique task name for HTTP request tracking.""" + if not task_name: + task_name = uuid.uuid4().__str__() + if task_name in self._tasks: + task_name += "_" + uuid.uuid4().__str__() + return task_name + + def mark_http_request_start(self, task_name: str) -> HttpRequestBaseline: + """Snapshot cumulative totals at request start (FastAPI lifespan path). + + Use with :meth:`finish_http_request` while the main tracker scheduler keeps + running. Avoids per-request scheduler and hardware restarts from + :meth:`start_task`. + + Args: + task_name: Logical name for this HTTP request (e.g. route key). + + Returns: + Baseline to pass to :meth:`finish_http_request`. + + Raises: + RuntimeError: If the tracker has not been started with :meth:`start`. + """ + if self._start_time is None: + raise RuntimeError("EmissionsTracker.start() must run before HTTP requests") + with self._http_measure_lock: + resolved = self._resolve_http_task_name(task_name) + self._tasks[resolved] = Task(task_name=resolved) + duration_at_start = time.perf_counter() - self._start_time + return HttpRequestBaseline( + task_name=resolved, + started_at=time.perf_counter(), + duration_at_start=duration_at_start, + emissions=self._total_emissions, + cpu_energy=self._total_cpu_energy.kWh, + gpu_energy=self._total_gpu_energy.kWh, + ram_energy=self._total_ram_energy.kWh, + energy_consumed=self._total_energy.kWh, + water_consumed=self._total_water.litres, + ) + + def finish_http_request( + self, baseline: HttpRequestBaseline + ) -> Optional[EmissionsData]: + """Compute per-request emissions from a :meth:`mark_http_request_start` baseline. + + Args: + baseline: Value returned by :meth:`mark_http_request_start`. + + Returns: + Request-scoped :class:`~codecarbon.output.EmissionsData`, or ``None`` if + the task record is missing. + """ + with self._http_measure_lock: + task = self._tasks.get(baseline.task_name) + if task is None: + logger.warning( + "finish_http_request: unknown task %s", baseline.task_name + ) + return None + self._measure_power_and_energy() + emissions_at_stop = self._prepare_emissions_data() + previous = dataclasses.replace(emissions_at_stop) + previous.emissions = baseline.emissions + previous.cpu_energy = baseline.cpu_energy + previous.gpu_energy = baseline.gpu_energy + previous.ram_energy = baseline.ram_energy + previous.energy_consumed = baseline.energy_consumed + previous.water_consumed = baseline.water_consumed + previous.duration = baseline.duration_at_start + + task_emission_data = dataclasses.replace(emissions_at_stop) + request_duration = time.perf_counter() - baseline.started_at + task_emission_data.duration = Time.from_seconds(request_duration).seconds + task_emission_data.compute_delta_emission(previous) + + task.emissions_data = task_emission_data + task.is_active = False + return task_emission_data + + def persist_completed_task(self, task_name: str) -> None: + """Push a finished task's emissions to API handlers (e.g. after ``stop_task``). + + Args: + task_name: Name of the task that was stopped with :meth:`stop_task`. + """ + if not self._save_to_api: + return + task = self._tasks.get(task_name) + if task is None or task.is_active or task.emissions_data is None: + return + if task.uploaded_to_api: + return + task_payload = [task.out()] + for handler in self._output_handlers: + if isinstance(handler, CodeCarbonAPIOutput): + handler.task_out(task_payload, self._experiment_name) + task.uploaded_to_api = True + @suppress(Exception) def flush(self) -> Optional[float]: """ @@ -808,13 +929,24 @@ def _persist_data( experiment_name=None, ): task_emissions_data = [] + api_task_emissions_data = [] for task in self._tasks: - task_emissions_data.append(self._tasks[task].out()) + task_entry = self._tasks[task].out() + task_emissions_data.append(task_entry) + if not self._tasks[task].uploaded_to_api: + api_task_emissions_data.append(task_entry) for handler in self._output_handlers: handler.out(total_emissions, delta_emissions) if len(task_emissions_data) > 0: - handler.task_out(task_emissions_data, experiment_name) + if isinstance(handler, CodeCarbonAPIOutput): + if api_task_emissions_data: + handler.task_out(api_task_emissions_data, experiment_name) + for task_obj in self._tasks.values(): + if not task_obj.is_active and task_obj.emissions_data: + task_obj.uploaded_to_api = True + else: + handler.task_out(task_emissions_data, experiment_name) def _update_emissions(self) -> None: """ diff --git a/codecarbon/external/task.py b/codecarbon/external/task.py index e3d7ecbae..705cf6160 100644 --- a/codecarbon/external/task.py +++ b/codecarbon/external/task.py @@ -17,6 +17,7 @@ def __init__(self, task_name): # , task_measure self.task_name: str = task_name self.start_time = time.perf_counter() self.is_active = True + self.uploaded_to_api = False def out(self): return TaskEmissionsData( diff --git a/codecarbon/integrations/__init__.py b/codecarbon/integrations/__init__.py new file mode 100644 index 000000000..9c5777a96 --- /dev/null +++ b/codecarbon/integrations/__init__.py @@ -0,0 +1 @@ +"""Optional integrations for frameworks and platforms.""" diff --git a/codecarbon/integrations/fastapi/__init__.py b/codecarbon/integrations/fastapi/__init__.py new file mode 100644 index 000000000..f4c86edb7 --- /dev/null +++ b/codecarbon/integrations/fastapi/__init__.py @@ -0,0 +1,17 @@ +"""FastAPI integration: middleware and lifespan helpers.""" + +from codecarbon.integrations.fastapi.lifespan import create_codecarbon_lifespan +from codecarbon.integrations.fastapi.middleware import ( + CodeCarbonMiddleware, + add_codecarbon_middleware, + log_request_complete, + shutdown_codecarbon_middleware, +) + +__all__ = [ + "CodeCarbonMiddleware", + "add_codecarbon_middleware", + "create_codecarbon_lifespan", + "log_request_complete", + "shutdown_codecarbon_middleware", +] diff --git a/codecarbon/integrations/fastapi/_headers.py b/codecarbon/integrations/fastapi/_headers.py new file mode 100644 index 000000000..9926b747c --- /dev/null +++ b/codecarbon/integrations/fastapi/_headers.py @@ -0,0 +1,161 @@ +"""Configurable response headers from emissions measurements.""" + +from __future__ import annotations + +from collections.abc import Callable, Mapping, Sequence +from typing import Union + +from starlette.requests import Request +from starlette.responses import Response + +from codecarbon.output_methods.emissions_data import EmissionsData + +HeaderConfig = Union[bool, str, Sequence[str], Mapping[str, str], None] +HeaderFormatter = Callable[[EmissionsData, Request], Mapping[str, str]] + +FIELD_UNITS: dict[str, str] = { + "emissions": "kg", + "emissions_rate": "kg-per-s", + "duration": "s", + "energy_consumed": "kwh", + "cpu_energy": "kwh", + "gpu_energy": "kwh", + "ram_energy": "kwh", + "water_consumed": "l", + "cpu_power": "w", + "gpu_power": "w", + "ram_power": "w", + "cpu_utilization_percent": "percent", + "gpu_utilization_percent": "percent", + "ram_utilization_percent": "percent", + "ram_used_gb": "gb", + "pue": "ratio", + "wue": "l-per-kwh", +} + +HEADER_PRESETS: dict[str, dict[str, str]] = { + "emissions": {"emissions": "X-CodeCarbon-Emissions-kg"}, + "default": { + "emissions": "X-CodeCarbon-Emissions-kg", + "energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh", + "duration": "X-CodeCarbon-Duration-s", + "emissions_rate": "X-CodeCarbon-Emissions-Rate-kg-per-s", + }, + "energy": { + "emissions": "X-CodeCarbon-Emissions-kg", + "energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh", + "cpu_energy": "X-CodeCarbon-Cpu-Energy-kwh", + "gpu_energy": "X-CodeCarbon-Gpu-Energy-kwh", + "ram_energy": "X-CodeCarbon-Ram-Energy-kwh", + "duration": "X-CodeCarbon-Duration-s", + }, + "power": { + "emissions": "X-CodeCarbon-Emissions-kg", + "cpu_power": "X-CodeCarbon-Cpu-Power-w", + "gpu_power": "X-CodeCarbon-Gpu-Power-w", + "ram_power": "X-CodeCarbon-Ram-Power-w", + "duration": "X-CodeCarbon-Duration-s", + }, +} + +FULL_HEADER_FIELDS: tuple[str, ...] = tuple(FIELD_UNITS.keys()) + + +def _auto_header_name(field: str) -> str: + unit = FIELD_UNITS.get(field, "") + title = "-".join(part.capitalize() for part in field.split("_")) + suffix = f"-{unit}" if unit else "" + return f"X-CodeCarbon-{title}{suffix}" + + +def resolve_header_mapping(config: HeaderConfig) -> dict[str, str]: + """Normalize ``response_headers`` settings to ``{field_name: header_name}``. + + Args: + config: ``None`` or ``False`` for no headers; ``True`` for the emissions preset; + a preset name (``emissions``, ``default``, ``energy``, ``power``, ``full``); + a sequence of field names (auto header names); or an explicit mapping. + + Returns: + Mapping from :class:`~codecarbon.output_methods.emissions_data.EmissionsData` + attribute names to HTTP header names. + + Raises: + ValueError: If ``config`` is a string that is not a known preset (other than + ``full``). + """ + if config is None or config is False: + return {} + if config is True: + return dict(HEADER_PRESETS["emissions"]) + if isinstance(config, str): + preset = HEADER_PRESETS.get(config) + if preset is None: + if config == "full": + return {field: _auto_header_name(field) for field in FULL_HEADER_FIELDS} + raise ValueError(f"Unknown response_headers preset: {config!r}") + return dict(preset) + if isinstance(config, Mapping): + return dict(config) + return {field: _auto_header_name(field) for field in config} + + +def header_name_value_pairs( + emissions_data: EmissionsData, + header_mapping: Mapping[str, str], + request: Request | None = None, + header_formatter: HeaderFormatter | None = None, +) -> Mapping[str, str]: + """Resolve emission fields to HTTP header names and string values.""" + if header_formatter is not None: + if request is None: + raise ValueError("request is required when header_formatter is set") + return header_formatter(emissions_data, request) + return { + header_name: str(getattr(emissions_data, field)) + for field, header_name in header_mapping.items() + if hasattr(emissions_data, field) + } + + +def emissions_header_items( + emissions_data: EmissionsData, + header_mapping: Mapping[str, str], + request: Request, + header_formatter: HeaderFormatter | None = None, +) -> list[tuple[bytes, bytes]]: + """Build ASGI header pairs for emission fields. + + Args: + emissions_data: Measured values for this request. + header_mapping: Field name to HTTP header name. + request: Current HTTP request (for custom formatters). + header_formatter: Optional override for header name/value pairs. + + Returns: + List of ``(name, value)`` byte tuples for ASGI ``response.start`` messages. + """ + pairs = header_name_value_pairs( + emissions_data, header_mapping, request, header_formatter + ) + return [ + (name.encode("latin-1"), value.encode("latin-1")) for name, value in pairs.items() + ] + + +def apply_response_headers( + response: Response, + emissions_data: EmissionsData, + header_mapping: Mapping[str, str], +) -> None: + """Write selected emission fields onto an HTTP response as headers. + + Args: + response: Outgoing Starlette response (headers are updated in place). + emissions_data: Values read via ``getattr`` for each key in ``header_mapping``. + header_mapping: Field name to HTTP header name; unknown fields are skipped. + """ + for name, value in header_name_value_pairs( + emissions_data, header_mapping + ).items(): + response.headers[name] = value diff --git a/codecarbon/integrations/fastapi/_routing.py b/codecarbon/integrations/fastapi/_routing.py new file mode 100644 index 000000000..2bf385d00 --- /dev/null +++ b/codecarbon/integrations/fastapi/_routing.py @@ -0,0 +1,131 @@ +"""Route naming and endpoint filter helpers for FastAPI/Starlette.""" + +from collections.abc import Iterable +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from starlette.requests import Request + +DEFAULT_EXCLUDE: frozenset[str] = frozenset( + { + "/docs", + "/redoc", + "/openapi.json", + "/health", + "/healthz", + "/ready", + "/live", + } +) + +HTTP_METHODS = frozenset( + {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", "TRACE", "CONNECT"} +) + + +def get_endpoint_path(request: "Request") -> str: + """Return the mounted route template or the raw URL path. + + Args: + request: Current Starlette/FastAPI request. + + Returns: + Route template such as ``/items/{item_id}``, or ``request.url.path``. + """ + route = request.scope.get("route") + if route is not None: + return route.path + return request.url.path + + +def build_endpoint_key(request: "Request") -> str: + """Build a stable endpoint identifier such as ``GET /predict``. + + Args: + request: Current Starlette/FastAPI request. + + Returns: + HTTP method plus route template or URL path. + """ + return f"{request.method} {get_endpoint_path(request)}" + + +def is_method_pattern(pattern: str) -> bool: + """Return True when ``pattern`` is ``METHOD /path``.""" + method, _, path = pattern.partition(" ") + return method in HTTP_METHODS and path.startswith("/") + + +def matches_filter_pattern( + pattern: str, + endpoint_key: str, + endpoint_path: str, + url_path: str, + *, + exclude: bool, +) -> bool: + """Return True when an include or exclude pattern matches the request.""" + if is_method_pattern(pattern): + return endpoint_key == pattern + if not pattern.startswith("/"): + return endpoint_key == pattern + if exclude: + return ( + url_path == pattern + or url_path.startswith(f"{pattern}/") + or endpoint_path == pattern + ) + return endpoint_path == pattern + + +def should_track_request( + request: "Request", + include: Iterable[str] | None, + exclude: Iterable[str], +) -> bool: + """Return True when the request should be measured. + + Patterns use one of two forms: + + * ``METHOD /route/template`` — one HTTP method on one route (e.g. ``GET /predict``) + * ``/route/template`` — any method on that route, or a URL path prefix when excluding + + Args: + request: Current Starlette/FastAPI request. + include: When set, only matching endpoints are tracked. + exclude: Endpoints or URL prefixes to skip. + + Returns: + True when CodeCarbon should track this request. + """ + url_path = request.url.path + if include is None: + needs_full_match = any(is_method_pattern(pattern) for pattern in exclude) + if not needs_full_match: + for pattern in exclude: + if url_path == pattern or url_path.startswith(f"{pattern}/"): + return False + return True + endpoint_key = build_endpoint_key(request) + endpoint_path = get_endpoint_path(request) + for pattern in exclude: + if matches_filter_pattern( + pattern, + endpoint_key, + endpoint_path, + url_path, + exclude=True, + ): + return False + if include is None: + return True + return any( + matches_filter_pattern( + pattern, + endpoint_key, + endpoint_path, + url_path, + exclude=False, + ) + for pattern in include + ) diff --git a/codecarbon/integrations/fastapi/lifespan.py b/codecarbon/integrations/fastapi/lifespan.py new file mode 100644 index 000000000..5544d8882 --- /dev/null +++ b/codecarbon/integrations/fastapi/lifespan.py @@ -0,0 +1,40 @@ +"""Lifespan helpers for sharing one ``EmissionsTracker`` across requests.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Any + +from codecarbon import EmissionsTracker +from codecarbon.integrations.fastapi.middleware import shutdown_codecarbon_middleware + + +@asynccontextmanager +async def create_codecarbon_lifespan( + app: Any, + *, + project_name: str = "codecarbon-fastapi", + **tracker_kwargs: Any, +) -> AsyncIterator[None]: + """Start a tracker for the app lifetime and expose it on ``app.state``. + + Args: + app: Starlette/FastAPI application with ``state`` namespace. + project_name: ``project_name`` for :class:`~codecarbon.EmissionsTracker`. + **tracker_kwargs: Extra constructor kwargs for the tracker. + + Yields: + ``None`` while the app runs. + """ + merged = dict(tracker_kwargs) + merged.setdefault("allow_multiple_runs", True) + tracker = EmissionsTracker(project_name=project_name, **merged) + tracker.start() + app.state.codecarbon_tracker = tracker + try: + yield + finally: + tracker.stop() + app.state.codecarbon_tracker = None + shutdown_codecarbon_middleware(app) diff --git a/codecarbon/integrations/fastapi/middleware.py b/codecarbon/integrations/fastapi/middleware.py new file mode 100644 index 000000000..520db8552 --- /dev/null +++ b/codecarbon/integrations/fastapi/middleware.py @@ -0,0 +1,401 @@ +"""FastAPI/Starlette middleware for per-request emissions tracking.""" + +from __future__ import annotations + +import asyncio +import collections +import threading +from collections.abc import Awaitable, Callable, Iterable +from concurrent import futures +from typing import Any + +from codecarbon.external.logger import logger + +try: + from starlette.requests import Request + from starlette.responses import Response + from starlette.types import ASGIApp, Message, Receive, Scope, Send +except ImportError as exc: + raise ImportError( + "CodeCarbon FastAPI integration requires Starlette (installed with FastAPI). " + "Install optional dependencies with: pip install 'codecarbon[fastapi]'" + ) from exc + +from codecarbon import EmissionsTracker +from codecarbon.emissions_tracker import HttpRequestBaseline +from codecarbon.integrations.fastapi._routing import ( + DEFAULT_EXCLUDE, + build_endpoint_key, + should_track_request, +) +from codecarbon.output_methods.emissions_data import EmissionsData + +DEFAULT_TRACKER_KWARGS: dict[str, Any] = { + "save_to_file": False, + "save_to_api": False, + "save_to_logger": False, +} + +_Job = tuple[Callable[..., Any], tuple[Any, ...], futures.Future[Any]] + + +class _TrackerRunner: + """Single tracker thread: request-path jobs first, then pending finalization.""" + + REQUEST = 0 + FINALIZE = 1 + + def __init__(self, thread_name: str = "codecarbon-tracker") -> None: + self._request_jobs: collections.deque[_Job] = collections.deque() + self._finalize_jobs: collections.deque[_Job] = collections.deque() + self._cond = threading.Condition() + self._closed = False + self._thread = threading.Thread(target=self._worker, name=thread_name, daemon=True) + self._thread.start() + + def _run_job(self, job: _Job) -> None: + func, args, future = job + if future.cancelled(): + return + try: + future.set_result(func(*args)) + except Exception as exc: + future.set_exception(exc) + + def _worker(self) -> None: + while True: + with self._cond: + while ( + not self._closed + and not self._request_jobs + and not self._finalize_jobs + ): + self._cond.wait() + if ( + self._closed + and not self._request_jobs + and not self._finalize_jobs + ): + return + if self._request_jobs: + job = self._request_jobs.popleft() + lane = self.REQUEST + else: + job = self._finalize_jobs.popleft() + lane = self.FINALIZE + self._run_job(job) + if lane == self.REQUEST: + while True: + with self._cond: + if self._request_jobs: + break + if not self._finalize_jobs: + break + finalize_job = self._finalize_jobs.popleft() + self._run_job(finalize_job) + + def submit( + self, lane: int, func: Callable[..., Any], *args: Any + ) -> futures.Future[Any]: + if self._closed: + raise RuntimeError("cannot schedule tracker work after shutdown") + future: futures.Future[Any] = futures.Future() + job = (func, args, future) + with self._cond: + if lane == self.REQUEST: + self._request_jobs.append(job) + else: + self._finalize_jobs.append(job) + self._cond.notify() + return future + + def submit_request( + self, func: Callable[..., Any], *args: Any + ) -> futures.Future[Any]: + return self.submit(self.REQUEST, func, *args) + + async def run_async( + self, lane: int, func: Callable[..., Any], *args: Any + ) -> Any: + return await asyncio.wrap_future(self.submit(lane, func, *args)) + + def shutdown(self, *, wait: bool = True) -> None: + if self._closed: + return + with self._cond: + self._closed = True + self._cond.notify_all() + if wait: + self._thread.join() + + +def log_request_complete( + request: Request, + response: Response, + emissions_data: EmissionsData | None, + task_name: str, +) -> None: + """Default ``on_request_complete`` handler; logs via the ``codecarbon`` logger.""" + emissions = getattr(emissions_data, "emissions", None) if emissions_data else None + logger.info( + "CodeCarbon %s: emissions=%s kg CO2 status=%s", + task_name, + emissions, + response.status_code, + ) + + +class CodeCarbonMiddleware: + """ASGI middleware using a shared tracker and deferred per-request measurement.""" + + def __init__( + self, + app: ASGIApp, + *, + project_name: str = "codecarbon-fastapi", + include: Iterable[str] | None = None, + exclude: Iterable[str] | None = None, + task_name_formatter: Callable[[Request], str] | None = None, + on_request_complete: Callable[..., Any] | None = log_request_complete, + tracker_kwargs: dict[str, Any] | None = None, + **emissions_tracker_kwargs: Any, + ) -> None: + """Configure middleware. + + Args: + app: Inner ASGI application. + project_name: ``project_name`` passed to :class:`~codecarbon.EmissionsTracker`. + include: When set, only matching endpoints are tracked (e.g. ``GET /predict``). + exclude: Endpoints or URL prefixes to skip. Defaults to common docs and health routes. + task_name_formatter: Overrides default route-based task naming. + on_request_complete: Callback ``(request, response, emissions_data | None, task_name)``. + Defaults to :func:`log_request_complete`; pass ``None`` to disable logging. + tracker_kwargs: Baseline kwargs merged into the tracker constructor. + **emissions_tracker_kwargs: Additional :class:`~codecarbon.EmissionsTracker` kwargs. + """ + self.app = app + self.project_name = project_name + self.include = set(include) if include is not None else None + self.exclude = set(exclude if exclude is not None else DEFAULT_EXCLUDE) + self.task_name_formatter = task_name_formatter + self.on_request_complete = on_request_complete + merged: dict[str, Any] = dict(DEFAULT_TRACKER_KWARGS) + merged.update(tracker_kwargs or {}) + merged.update(emissions_tracker_kwargs) + merged.setdefault("allow_multiple_runs", True) + self.tracker_kwargs = merged + self._app_tracker: EmissionsTracker | None = None + self._tracker_init_lock = threading.Lock() + self._tracker_runner = _TrackerRunner() + + def shutdown_tracker_executor(self, *, wait: bool = True) -> None: + """Shut down the tracker background thread (idempotent). + + Args: + wait: When ``True``, block until queued tracker work finishes. + """ + self._tracker_runner.shutdown(wait=wait) + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + """ASGI entrypoint.""" + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive) + if not should_track_request(request, self.include, self.exclude): + await self.app(scope, receive, send) + return + + task_name = self._task_name(request) + tracker, baseline = await asyncio.to_thread( + self._begin_request, request, task_name + ) + await self._handle_tracked( + scope, receive, send, request, tracker, task_name, baseline + ) + + def _task_name(self, request: Request) -> str: + if self.task_name_formatter is not None: + return self.task_name_formatter(request) + return build_endpoint_key(request) + + async def _run_request_tracker( + self, func: Callable[..., Any], *args: Any + ) -> Any: + return await self._tracker_runner.run_async(_TrackerRunner.REQUEST, func, *args) + + async def _run_finalize_tracker( + self, func: Callable[..., Any], *args: Any + ) -> Any: + return await self._tracker_runner.run_async(_TrackerRunner.FINALIZE, func, *args) + + def _create_and_start_tracker(self) -> EmissionsTracker: + tracker = EmissionsTracker( + project_name=self.project_name, **self.tracker_kwargs + ) + tracker.start() + return tracker + + def _lifespan_tracker(self, request: Request) -> EmissionsTracker | None: + return getattr(request.app.state, "codecarbon_tracker", None) + + def _tracker_running(self, tracker: EmissionsTracker) -> bool: + return getattr(tracker, "_start_time", None) is not None + + def _begin_request( + self, request: Request, task_name: str + ) -> tuple[EmissionsTracker, HttpRequestBaseline | None]: + tracker = self._lifespan_tracker(request) + if tracker is None: + with self._tracker_init_lock: + if self._app_tracker is None: + self._app_tracker = self._create_and_start_tracker() + tracker = self._app_tracker + if ( + self._lifespan_tracker(request) is not None + and self._tracker_running(tracker) + ): + baseline = tracker.mark_http_request_start(task_name) + return tracker, baseline + tracker.start_task(task_name) + return tracker, None + + def _finalize_on_worker( + self, + tracker: EmissionsTracker, + task_name: str, + request: Request, + response: Response, + run_callback: bool, + baseline: HttpRequestBaseline | None, + ) -> None: + if baseline is not None: + emissions_data = tracker.finish_http_request(baseline) + resolved_task = baseline.task_name + else: + active_task = getattr(tracker, "_active_task", None) + resolved_task = ( + active_task if isinstance(active_task, str) else task_name + ) + emissions_data = tracker.stop_task(resolved_task) + tracker.persist_completed_task(resolved_task) + if run_callback: + self._run_request_complete( + request, response, emissions_data, resolved_task + ) + + def _run_request_complete( + self, + request: Request, + response: Response | None, + emissions_data: EmissionsData | None, + task_name: str, + ) -> None: + if self.on_request_complete is None or response is None: + return + self.on_request_complete(request, response, emissions_data, task_name) + + def _schedule_finalize(self, coro: Awaitable[None]) -> None: + async def _run() -> None: + try: + await coro + except Exception: + logger.exception("CodeCarbon deferred measurement failed") + + asyncio.create_task(_run()) + + async def _finalize_after_response( + self, + tracker: EmissionsTracker, + task_name: str, + request: Request, + response: Response, + baseline: HttpRequestBaseline | None, + *, + run_callback: bool, + ) -> None: + await self._run_finalize_tracker( + self._finalize_on_worker, + tracker, + task_name, + request, + response, + run_callback, + baseline, + ) + + async def _handle_tracked( + self, + scope: Scope, + receive: Receive, + send: Send, + request: Request, + tracker: EmissionsTracker, + task_name: str, + baseline: HttpRequestBaseline | None, + ) -> None: + status_code = 500 + + async def send_wrapper(message: Message) -> None: + nonlocal status_code + if message["type"] == "http.response.start": + status_code = message["status"] + await send(message) + + error: BaseException | None = None + try: + await self.app(scope, receive, send_wrapper) + except BaseException as exc: + error = exc + finally: + response = Response(status_code=status_code) + self._schedule_finalize( + self._finalize_after_response( + tracker, + task_name, + request, + response, + baseline, + run_callback=error is None, + ) + ) + if error is not None: + raise error + + +def shutdown_codecarbon_middleware(app: Any, *, wait: bool = True) -> None: + """Shut down the middleware tracker background thread registered on ``app``. + + Args: + app: Application that called :func:`add_codecarbon_middleware`. + wait: Passed to :meth:`CodeCarbonMiddleware.shutdown_tracker_executor`. + """ + middleware = getattr(app.state, "codecarbon_middleware", None) + if middleware is not None: + middleware.shutdown_tracker_executor(wait=wait) + + +def add_codecarbon_middleware(app: Any, **kwargs: Any) -> None: + """Register :class:`CodeCarbonMiddleware` on a FastAPI or Starlette app. + + Registers the instance on ``app.state.codecarbon_middleware`` so + :func:`create_codecarbon_lifespan` or :func:`shutdown_codecarbon_middleware` + can shut down the tracker background thread on teardown. + + Args: + app: Application instance with ``add_middleware``. + **kwargs: Forwarded to :class:`CodeCarbonMiddleware`. + """ + registered: list[CodeCarbonMiddleware] = [] + + class _RegisteredCodeCarbonMiddleware(CodeCarbonMiddleware): + def __init__(self, asgi_app: ASGIApp, **kw: Any) -> None: + super().__init__(asgi_app, **kw) + registered.clear() + registered.append(self) + + app.add_middleware(_RegisteredCodeCarbonMiddleware, **kwargs) + app.build_middleware_stack() + if registered: + app.state.codecarbon_middleware = registered[0] diff --git a/codecarbon/output_methods/http.py b/codecarbon/output_methods/http.py index 936d6a926..ac5de4e0b 100644 --- a/codecarbon/output_methods/http.py +++ b/codecarbon/output_methods/http.py @@ -6,7 +6,7 @@ from codecarbon.core.api_client import ApiClient from codecarbon.external.logger import logger from codecarbon.output_methods.base_output import BaseOutput -from codecarbon.output_methods.emissions_data import EmissionsData +from codecarbon.output_methods.emissions_data import EmissionsData, TaskEmissionsData class HTTPOutput(BaseOutput): @@ -69,3 +69,11 @@ def out(self, _, delta: EmissionsData): self.api.add_emission(dataclasses.asdict(delta)) except Exception as e: logger.error(e, exc_info=True) + + def task_out(self, data: list[TaskEmissionsData], experiment_name: str) -> None: + del experiment_name + for task_data in data: + try: + self.api.add_emission(dataclasses.asdict(task_data)) + except Exception as e: + logger.error(e, exc_info=True) diff --git a/docs/how-to/fastapi.md b/docs/how-to/fastapi.md new file mode 100644 index 000000000..d12bd7860 --- /dev/null +++ b/docs/how-to/fastapi.md @@ -0,0 +1,202 @@ +# FastAPI middleware + +Track HTTP request carbon emissions for a [FastAPI](https://fastapi.tiangolo.com/) (or Starlette) app. Install the optional integration extra, register the middleware, and each route is measured without per-handler boilerplate. + +## Install + +```console +pip install "codecarbon[fastapi]" +``` + +With uv: + +```console +uv add "codecarbon[fastapi]" +``` + +## Basic usage + +```python +from fastapi import FastAPI +from codecarbon.integrations.fastapi import add_codecarbon_middleware + +app = FastAPI() +add_codecarbon_middleware(app, project_name="my-api") +``` + +Measurement runs **after** the HTTP response is sent (deferred `stop_task`), so clients are not blocked on hardware sampling. By default, emissions are logged on the **`codecarbon`** logger via `log_request_complete`. Pass `on_request_complete=None` to disable logging, or supply your own callback. + +A minimal runnable app lives at [`examples/fastapi_middleware.py`](https://github.com/mlco2/codecarbon/blob/master/examples/fastapi_middleware.py). Run it with: + +```console +uv run --extra fastapi uvicorn examples.fastapi_middleware:app --reload +``` + +Then open or `curl` `http://127.0.0.1:8000/predict` and check application logs for per-request emissions. + +## Lifespan (recommended) + +Start one shared `EmissionsTracker` at boot and flush on shutdown: + +```python +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from codecarbon.integrations.fastapi import add_codecarbon_middleware, create_codecarbon_lifespan + + +@asynccontextmanager +async def lifespan(app: FastAPI): + async with create_codecarbon_lifespan(app, project_name="my-api"): + yield + + +app = FastAPI(lifespan=lifespan) +add_codecarbon_middleware(app) +``` + +`create_codecarbon_lifespan` stores the tracker on `app.state.codecarbon_tracker` for the middleware to reuse, and shuts down the middleware’s tracker background thread on exit. Without lifespan, call `shutdown_codecarbon_middleware(app)` before the process exits. + +## Cloud API + +Use **global config only** (`~/.codecarbon.config`). Do not add a repo-local `./.codecarbon.config`, or it will override these values when you run from the project directory. + +```ini +[codecarbon] +api_endpoint = https://api.codecarbon.io +project_id = 833d292f-4460-43bd-a2f5-497bcff6dc95 +experiment_id = aa69b440-014a-4562-ac06-ba7eecb023f9 +``` + +Run `codecarbon login` to store your `api_key` in the same file. + +To upload emissions to the dashboard, enable `save_to_api` (IDs are read from global config unless overridden in code): + +```python +add_codecarbon_middleware( + app, + tracker_kwargs={"save_to_api": True}, +) +``` + +One **run** is created per app process when the shared tracker starts; each measured request uploads one emission after the response. See [Use the Cloud API & Dashboard](cloud-api.md). + +Verify logging, CSV, and API locally: + +```console +CODECARBON_ALLOW_MULTIPLE_RUNS=True uv run --extra fastapi \ + python scripts/verify_fastapi_middleware_outputs.py --save-to-api +``` + +## Performance + +Per-request tracking uses one shared `EmissionsTracker` with `start_task` / `stop_task` on a single background thread. Request-path work is scheduled ahead of deferred `stop_task` so new requests are not queued behind post-response measurement. + +| Option | Effect | +|--------|--------| +| Default (deferred + `log_request_complete`) | Shared tracker; log after each request | +| `on_request_complete=None` | Same timing, no post-request logging | +| `create_codecarbon_lifespan` | Starts hardware monitoring once at boot (recommended) | + +### Benchmarks (HF embedder workload) + +Live `EmissionsTracker`, uvicorn HTTP, [`paraphrase-MiniLM-L3-v2`](https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L3-v2), 50 timed requests, concurrency 4, `save_to_api=False`. With **`create_codecarbon_lifespan`**, the middleware uses a lightweight per-request snapshot (`mark_http_request_start` / `finish_http_request`) instead of stopping and restarting the tracker scheduler on every request. Measurement still runs **after** the response. + +| Configuration | Mean (ms) | vs baseline | +|---|---:|---:| +| No middleware (baseline) | ~26 | — | +| Deferred, no logging | ~30 | ~+6–17% | +| Deferred + logging (default) | ~27 | ~+6% | + +Absolute overhead is about **+1–4 ms** per request on a fast embedder baseline when the lifespan tracker is used. Older tables near **~15%** used `start_task` / `stop_task` per request (scheduler stop/start on every call). A global lock that held the whole request until `stop_task` finished inflated overhead to **~40%** — that lock is removed. + +With **`save_to_api=True`**, each request also waits on a real HTTPS `add_emission`; mean latency becomes seconds under concurrency (network + serialization), not milliseconds. + +Run-to-run variance is high on a single machine; treat as indicative, not a SLA. Reproduce: + +```console +# Live EmissionsTracker + real HF embedder + uvicorn HTTP (recommended): +uv run --extra fastapi --with uvicorn --with sentence-transformers --with torch \ + python scripts/benchmark_fastapi_middleware.py --realistic + +# Same, explicit flags: +uv run --extra fastapi --with uvicorn --with sentence-transformers --with torch \ + python scripts/benchmark_fastapi_middleware.py --workload hf-embedder --network --real-tracker + +# Mocked tracker for fast CI (~10s; high % overhead with concurrency 8 + noop workload): +uv run --extra fastapi --with uvicorn python scripts/benchmark_fastapi_middleware.py --quick + +# Include save_to_api (mocked upload latency, api_call_interval=1): +uv run --extra fastapi --with uvicorn --with sentence-transformers --with torch \ + python scripts/benchmark_fastapi_middleware.py --workload hf-embedder --with-save-to-api +``` + +The script preloads the ML model once when using `hf-embedder` or `hf-classifier`, so each scenario reuses the same weights instead of reloading. + +With ``save_to_api=True`` and ``create_codecarbon_lifespan``, each finalized +request uploads one emission via ``persist_completed_task`` (after ``stop_task``). +Sub-second requests are sent with API duration rounded up to 1 second. A final +``tracker.stop()`` still flushes run-level totals and any tasks not yet uploaded. + +Requires a valid ``api_key`` and ``experiment_id`` in ``~/.codecarbon.config`` +(``codecarbon login``). The repo ``.codecarbon.config`` must not override those +with empty values. + +Custom logging callback (replaces the default): + +```python +from codecarbon.integrations.fastapi import add_codecarbon_middleware + +add_codecarbon_middleware( + app, + on_request_complete=lambda request, response, data, task_name: logger.info( + "%s emissions=%s", task_name, getattr(data, "emissions", None) + ), +) +``` + +## `include` and `exclude` + +Two filters control which requests are measured. Both accept the same pattern forms: + +| Pattern | Meaning | +|---------|---------| +| `GET /predict` | One HTTP method on one route | +| `/predict` | Any method on that route (`include`), or skip that route/URL prefix (`exclude`) | + +- **`exclude`** — skip matching requests. Defaults to docs and health paths (`/docs`, `/health`, …). Pass your own list to replace the default. +- **`include`** — when set, only matching endpoints are tracked (allowlist). + +```python +add_codecarbon_middleware( + app, + include=["GET /predict", "POST /train"], + exclude=["GET /admin", "/internal"], +) +``` + +## `task_name_formatter`, `on_request_complete` + +- **`task_name_formatter`** — optional `(Request) -> str` override; default is `METHOD /route/template`. +- **`on_request_complete`** — optional `(request, response, emissions_data | None, task_name) -> None`; default logs via `log_request_complete`; `None` disables the callback. + +## Middleware order + +Per [FastAPI middleware order](https://fastapi.tiangolo.com/tutorial/middleware/), the **last** middleware added is **outermost** on the request path (runs first on the way in). Add CodeCarbon **after** other middleware so it wraps inner layers and includes work done by inner middleware and route handlers: + +```python +from starlette.middleware.cors import CORSMiddleware + +app.add_middleware(CORSMiddleware, ...) +add_codecarbon_middleware(app) # outermost on request → measures the full stack below +``` + +## Limitations (v1) + +- **WebSockets** are not instrumented by this middleware. +- **Background tasks** (`BackgroundTasks` and similar) run **after** the middleware has finished the request path; their CPU/GPU use may **not** be fully attributed to that request’s measurement window. +- **Response headers** for emissions are not supported by this middleware (measurement is deferred after the response). Use logging, a custom `on_request_complete` handler, or the [`@track_emissions` decorator](../reference/api.md#track_emissions-decorator) for per-route control. + +## Per-endpoint tracking + +For a single route or fine-grained control without global middleware, use the [`@track_emissions` decorator](../reference/api.md#track_emissions-decorator) (same parameters as `EmissionsTracker`). diff --git a/examples/fastapi_middleware.py b/examples/fastapi_middleware.py new file mode 100644 index 000000000..3f0c6511f --- /dev/null +++ b/examples/fastapi_middleware.py @@ -0,0 +1,57 @@ +"""Minimal FastAPI app with CodeCarbon middleware.""" + +from contextlib import asynccontextmanager +from pathlib import Path + +from fastapi import FastAPI + +from codecarbon.integrations.fastapi import ( + add_codecarbon_middleware, + create_codecarbon_lifespan, +) + +_OUTPUT_DIR = Path(__file__).resolve().parent / "output" +_OUTPUT_DIR.mkdir(exist_ok=True) + +# api_key, experiment_id, project_id: read from ~/.codecarbon.config (not repo .codecarbon.config). +_tracker_kwargs = { + "save_to_file": True, + "save_to_api": True, + "save_to_logger": False, + "log_level": "info", + "output_dir": str(_OUTPUT_DIR), + "allow_multiple_runs": True, +} + + +@asynccontextmanager +async def lifespan(app: FastAPI): + async with create_codecarbon_lifespan( + app, + project_name="fastapi-demo", + **_tracker_kwargs, + ): + yield + + +app = FastAPI(title="CodeCarbon FastAPI demo", lifespan=lifespan) +add_codecarbon_middleware( + app, + project_name="fastapi-demo", + tracker_kwargs=_tracker_kwargs, +) + + +@app.get("/predict") +def predict(text: str = "hello"): + return {"text": text, "label": "demo"} + + +# Per-request: codecarbon logger (INFO) after each response. +# CSV: examples/output/emissions.csv on shutdown; per-task CSV on stop. +# API: one emission per request on dashboard experiment from ~/.codecarbon.config. +# Run: +# CODECARBON_ALLOW_MULTIPLE_RUNS=True uv run --extra fastapi --with uvicorn \ +# uvicorn examples.fastapi_middleware:app --reload +# curl 'http://127.0.0.1:8000/predict?text=hello' +# Stop the server (Ctrl+C) so lifespan flushes the run-level CSV. diff --git a/mkdocs.yml b/mkdocs.yml index 11a0bed93..62e71cc05 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -148,6 +148,7 @@ nav: - LLMs and Agents: how-to/agents.md - How-to Guides: - Configure CodeCarbon: how-to/configuration.md + - FastAPI middleware: how-to/fastapi.md - Compare Model Efficiency: tutorials/comparing-model-efficiency.md - Dashboard & Visualization: - Use the Cloud API & Dashboard: how-to/cloud-api.md diff --git a/pyproject.toml b/pyproject.toml index 59d610015..8b04f0bc4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,6 +95,8 @@ dev = [ "jsonschema", # For BoAmps schema validation tests "mktestdocs", # For testing documentation code blocks "scikit-learn", # For documentation examples and tests + "fastapi>=0.100", + "httpx", ] doc = [ "requests", @@ -123,6 +125,10 @@ viz-legacy = [ amdsmi = [ "amdsmi>=6.0.0" ] +fastapi = [ + "fastapi>=0.100", + "httpx", +] [project.scripts] carbonboard = "codecarbon.viz.carbonboard:main" diff --git a/scripts/benchmark_fastapi_middleware.py b/scripts/benchmark_fastapi_middleware.py new file mode 100644 index 000000000..d67d6f4bc --- /dev/null +++ b/scripts/benchmark_fastapi_middleware.py @@ -0,0 +1,1169 @@ +"""Benchmark FastAPI middleware overhead with a realistic ML inference workload. + +Run from repo root: + + uv run --extra fastapi --with uvicorn --with sentence-transformers --with torch \\ + python scripts/benchmark_fastapi_middleware.py + +Uses async HTTP clients (``httpx.AsyncClient``). Reports 95% bootstrap CIs on mean +latency. Verifies default middleware emits one ``codecarbon`` log line per request. + +Optional ``--with-save-to-api`` adds a scenario with ``save_to_api=True`` and +``api_call_interval=1`` (API ``live_out`` after each task measurement). Mocked runs +add ``--api-delay-ms`` sleep on ``stop_task``; ``--real-tracker`` patches +``ApiClient`` instead of calling the network. + +Use ``--quick`` for in-process ASGI (no uvicorn per scenario), noop workload, and +normal-approx CIs. ML workloads are preloaded once across scenarios when using HF. +""" + +from __future__ import annotations + +import os + +os.environ.setdefault("CODECARBON_LOG_LEVEL", "ERROR") + +import argparse +import asyncio +import logging +import platform +import random +import statistics +import sys +import threading +import time +from contextlib import asynccontextmanager +from dataclasses import dataclass +from typing import Any +from unittest.mock import MagicMock, patch + +import httpx +from fastapi import FastAPI + +import codecarbon.integrations.fastapi.middleware as cc_fastapi_middleware +from codecarbon.external.logger import logger as codecarbon_logger +from codecarbon.integrations.fastapi import ( + add_codecarbon_middleware, + shutdown_codecarbon_middleware, +) + +DEFAULT_MEASUREMENT_DELAY_S = 0.02 +WARMUP_REQUESTS = 50 +BENCHMARK_REQUESTS = 300 +QUICK_WARMUP_REQUESTS = 5 +QUICK_BENCHMARK_REQUESTS = 50 +QUICK_SECONDARY_WARMUP = 2 +SMOKE_WARMUP_REQUESTS = 2 +SMOKE_BENCHMARK_REQUESTS = 20 +SMOKE_INFERENCE_DELAY_MS = 15.0 +QUICK_LOGGING_SAMPLE = 10 +CONCURRENCY = 8 +BOOTSTRAP_SAMPLES = 2000 +QUICK_BOOTSTRAP_SAMPLES = 200 +FINALIZE_DRAIN_MULTIPLIER = 4 +QUICK_INFERENCE_DELAY_MS = 25.0 +CONFIDENCE_LEVEL = 0.95 +FASTAPI_BENCHMARK_PROJECT_ID = "25bf2346-49de-4658-911e-4c9003000e13" +FASTAPI_BENCHMARK_EXPERIMENT_ID = "d2d69403-1373-42b4-a2c1-09589aed4801" +REALISTIC_BENCHMARK_REQUESTS = 50 +REALISTIC_WARMUP_REQUESTS = 5 +REALISTIC_CONCURRENCY = 4 +TRACKER_KWARGS = {"save_to_file": False, "save_to_api": False} +TRACKER_KWARGS_SAVE_TO_API = { + "save_to_file": False, + "save_to_api": True, + "save_to_logger": False, + "api_call_interval": 1, + "experiment_id": FASTAPI_BENCHMARK_EXPERIMENT_ID, +} +DEFAULT_EMBEDDER_MODEL = "sentence-transformers/paraphrase-MiniLM-L3-v2" +DEFAULT_CLASSIFIER_MODEL = "distilbert-base-uncased-finetuned-sst-2-english" +SAMPLE_TEXT = "CodeCarbon measures the carbon footprint of machine learning workloads." + + +@dataclass(frozen=True) +class BenchmarkResult: + """Aggregated HTTP benchmark metrics for one configuration.""" + + name: str + requests: int + concurrency: int + mean_ms: float + ci_low_ms: float + ci_high_ms: float + median_ms: float + p95_ms: float + requests_per_sec: float + overhead_pct: float | None + codecarbon_log_lines: int | None = None + + +def _mock_emissions_data(measurement_delay_s: float) -> MagicMock: + return MagicMock( + emissions=0.001, + duration=measurement_delay_s, + energy_consumed=0.002, + emissions_rate=0.002, + ) + + +def _install_tracker_patch( + measurement_delay_s: float, + *, + api_delay_state: dict[str, float] | None = None, + api_delay_s: float = 0.0, +) -> Any: + delays = api_delay_state if api_delay_state is not None else {"api": api_delay_s} + + def _stop() -> float: + time.sleep(measurement_delay_s) + return 0.001 + + def _stop_task(_name: str) -> MagicMock: + time.sleep(measurement_delay_s) + if delays.get("api", 0.0) > 0: + time.sleep(delays["api"]) + return _mock_emissions_data(measurement_delay_s) + + tracker = MagicMock() + tracker.start.return_value = None + tracker.stop.side_effect = _stop + tracker.start_task.return_value = None + tracker.stop_task.side_effect = _stop_task + tracker.persist_completed_task.return_value = None + tracker.final_emissions_data = _mock_emissions_data(measurement_delay_s) + return patch.object(cc_fastapi_middleware, "EmissionsTracker", return_value=tracker) + + +def _config_ids() -> tuple[str, str]: + """Read project_id and experiment_id from hierarchical config when present.""" + from codecarbon.core.config import get_hierarchical_config + + section = get_hierarchical_config() + project_id = section.get("project_id") or FASTAPI_BENCHMARK_PROJECT_ID + experiment_id = section.get("experiment_id") or FASTAPI_BENCHMARK_EXPERIMENT_ID + return project_id, experiment_id + + +def _install_api_client_patch(api_delay_s: float) -> Any: + """Avoid network I/O while exercising ``save_to_api`` output handlers.""" + + import uuid + + from codecarbon.core import api_client as api_client_module + + def _create_run(self: Any, experiment_id: str) -> None: + self.run_id = str(uuid.uuid4()) + + def _add_emission(self: Any, carbon_emission: dict) -> bool: + time.sleep(api_delay_s) + return True + + return patch.multiple( + api_client_module.ApiClient, + _create_run=_create_run, + add_emission=_add_emission, + ) + + +_Z_95 = 1.96 + + +def bootstrap_mean_ci( + latencies_ms: list[float], + *, + samples: int = BOOTSTRAP_SAMPLES, + confidence: float = CONFIDENCE_LEVEL, +) -> tuple[float, float, float]: + """Return mean and two-sided bootstrap CI bounds for mean latency.""" + if not latencies_ms: + return 0.0, 0.0, 0.0 + n = len(latencies_ms) + boot_means = [ + statistics.mean(random.choices(latencies_ms, k=n)) for _ in range(samples) + ] + boot_means.sort() + alpha = (1.0 - confidence) / 2.0 + low_index = max(0, int(alpha * samples) - 1) + high_index = min(samples - 1, int((1.0 - alpha) * samples)) + return ( + statistics.mean(latencies_ms), + boot_means[low_index], + boot_means[high_index], + ) + + +def normal_mean_ci(latencies_ms: list[float]) -> tuple[float, float, float]: + """Approximate 95% CI for the mean (faster than bootstrap for --quick).""" + if not latencies_ms: + return 0.0, 0.0, 0.0 + n = len(latencies_ms) + mean = statistics.mean(latencies_ms) + if n < 2: + return mean, mean, mean + margin = _Z_95 * statistics.stdev(latencies_ms) / (n**0.5) + return mean, mean - margin, mean + margin + + +def summarize_latencies( + latencies_ms: list[float], + *, + bootstrap_samples: int, + use_normal_ci: bool, +) -> tuple[float, float, float, float, float]: + """Return mean, CI low/high, median, and p95.""" + if use_normal_ci: + mean_ms, ci_low_ms, ci_high_ms = normal_mean_ci(latencies_ms) + else: + mean_ms, ci_low_ms, ci_high_ms = bootstrap_mean_ci( + latencies_ms, samples=bootstrap_samples + ) + return ( + mean_ms, + ci_low_ms, + ci_high_ms, + statistics.median(latencies_ms), + _percentile(latencies_ms, 0.95), + ) + + +class InferenceWorkload: + """Runs a small Hugging Face model once per request.""" + + def __init__( + self, + workload: str, + model_id: str, + *, + inference_delay_s: float = 0.0, + ) -> None: + self.workload = workload + self.model_id = model_id + self.inference_delay_s = inference_delay_s + self._embedder: Any = None + self._classifier: Any = None + self._loaded = False + + def ensure_loaded(self) -> None: + """Load the model at most once (shared across benchmark scenarios).""" + if self._loaded: + return + self.load() + self._loaded = True + + def load(self) -> None: + """Load the model into memory.""" + if self.workload == "noop": + self._loaded = True + return + if self.workload == "hf-embedder": + from sentence_transformers import SentenceTransformer + + self._embedder = SentenceTransformer(self.model_id) + self._loaded = True + return + if self.workload == "hf-classifier": + from transformers import pipeline + + self._classifier = pipeline( + "sentiment-analysis", + model=self.model_id, + device=-1, + ) + self._loaded = True + return + raise ValueError(f"Unknown workload: {self.workload}") + + def run(self, text: str = SAMPLE_TEXT) -> dict[str, Any]: + """Execute one inference and return a small JSON-serializable payload.""" + if self.inference_delay_s > 0: + time.sleep(self.inference_delay_s) + if self.workload == "noop": + return {"ok": True} + if self.workload == "hf-embedder": + vector = self._embedder.encode(text) + return {"dimensions": int(vector.shape[0])} + if self.workload == "hf-classifier": + result = self._classifier(text[:512])[0] + return {"label": result["label"], "score": float(result["score"])} + raise ValueError(f"Unknown workload: {self.workload}") + + +def build_app( + mode: str, + workload: InferenceWorkload, + *, + project_name: str = FASTAPI_BENCHMARK_PROJECT_ID, + experiment_id: str = FASTAPI_BENCHMARK_EXPERIMENT_ID, + real_tracker: bool = False, +) -> FastAPI: + """Build a FastAPI app for the given benchmark mode.""" + if real_tracker and mode != "baseline": + from codecarbon.integrations.fastapi import create_codecarbon_lifespan + + tracker_kwargs = ( + TRACKER_KWARGS_SAVE_TO_API + if mode == "deferred_save_to_api" + else TRACKER_KWARGS + ) + if mode == "deferred_save_to_api": + tracker_kwargs = {**tracker_kwargs, "experiment_id": experiment_id} + + @asynccontextmanager + async def lifespan(_app: FastAPI): + workload.ensure_loaded() + async with create_codecarbon_lifespan( + _app, + project_name=project_name, + allow_multiple_runs=True, + **tracker_kwargs, + ): + yield + + else: + + @asynccontextmanager + async def lifespan(_app: FastAPI): + workload.ensure_loaded() + yield + + application = FastAPI(lifespan=lifespan) + + @application.get("/predict") + def predict(text: str = SAMPLE_TEXT) -> dict[str, Any]: + return workload.run(text) + + if mode == "baseline": + return application + + kwargs: dict[str, Any] = { + "tracker_kwargs": TRACKER_KWARGS, + "exclude": [], + } + if mode == "deferred_no_logging": + kwargs["on_request_complete"] = None + elif mode == "deferred_logging": + pass + elif mode == "deferred_save_to_api": + kwargs["tracker_kwargs"] = { + **TRACKER_KWARGS_SAVE_TO_API, + "experiment_id": experiment_id, + } + kwargs["on_request_complete"] = None + else: + raise ValueError(f"Unknown mode: {mode}") + + add_codecarbon_middleware(application, project_name=project_name, **kwargs) + return application + + +def _percentile(values: list[float], pct: float) -> float: + ordered = sorted(values) + index = max(0, min(len(ordered) - 1, int(len(ordered) * pct) - 1)) + return ordered[index] + + +class _CodeCarbonLogCounter(logging.Handler): + """Count ``codecarbon`` INFO lines emitted during a benchmark scenario.""" + + def __init__(self) -> None: + super().__init__(level=logging.INFO) + self.emissions_lines = 0 + + def emit(self, record: logging.LogRecord) -> None: + if record.name != codecarbon_logger.name: + return + if record.levelno < logging.INFO: + return + message = record.getMessage() + if message.startswith("CodeCarbon ") and "emissions=" in message: + self.emissions_lines += 1 + + +async def _run_load_async( + client: httpx.AsyncClient, + url: str, + requests: int, + concurrency: int, +) -> list[float]: + """Issue concurrent async GET requests and return client-side latencies (ms).""" + semaphore = asyncio.Semaphore(concurrency) + + async def _get() -> float: + async with semaphore: + start = time.perf_counter() + response = await client.get(url, timeout=120.0) + response.raise_for_status() + return (time.perf_counter() - start) * 1000 + + return list(await asyncio.gather(*(_get() for _ in range(requests)))) + + +async def _wait_for_deferred_finalize( + measurement_delay_s: float, + *, + requests: int, + concurrency: int, +) -> None: + """Yield until deferred finalize tasks are likely submitted.""" + waves = max(1, (requests + concurrency - 1) // concurrency) + estimate_s = measurement_delay_s * min(waves, 4) + await asyncio.sleep(min(0.06, max(0.01, estimate_s))) + + +def _drain_middleware(app: FastAPI) -> None: + """Wait for deferred tracker work before tearing down an in-process app.""" + shutdown_codecarbon_middleware(app, wait=True) + + +def _summarize( + name: str, + latencies_ms: list[float], + concurrency: int, + baseline_mean_ms: float | None, + *, + bootstrap_samples: int, + use_normal_ci: bool, + codecarbon_log_lines: int | None = None, +) -> BenchmarkResult: + total_s = sum(latencies_ms) / 1000 + mean_ms, ci_low_ms, ci_high_ms, median_ms, p95_ms = summarize_latencies( + latencies_ms, + bootstrap_samples=bootstrap_samples, + use_normal_ci=use_normal_ci, + ) + overhead = None + if baseline_mean_ms and baseline_mean_ms > 0: + overhead = ((mean_ms - baseline_mean_ms) / baseline_mean_ms) * 100 + return BenchmarkResult( + name=name, + requests=len(latencies_ms), + concurrency=concurrency, + mean_ms=mean_ms, + ci_low_ms=ci_low_ms, + ci_high_ms=ci_high_ms, + median_ms=median_ms, + p95_ms=p95_ms, + requests_per_sec=len(latencies_ms) / total_s if total_s else 0.0, + overhead_pct=overhead, + codecarbon_log_lines=codecarbon_log_lines, + ) + + +async def _wait_for_server_async( + client: httpx.AsyncClient, url: str, timeout_s: float = 120.0 +) -> None: + deadline = time.perf_counter() + timeout_s + while time.perf_counter() < deadline: + try: + response = await client.get(url, timeout=30.0) + response.raise_for_status() + return + except (httpx.HTTPError, OSError): + await asyncio.sleep(0.02) + raise RuntimeError(f"Server at {url} did not become ready") + + +async def _run_scenario_in_process( + mode: str, + display_name: str, + requests: int, + warmup: int, + concurrency: int, + workload: InferenceWorkload, + measurement_delay_s: float, + *, + real_tracker: bool, + bootstrap_samples: int, + use_normal_ci: bool, + verify_logging: bool, + logging_sample: int | None, + experiment_id: str, + project_name: str, +) -> BenchmarkResult: + """Benchmark one configuration in-process via ASGI transport.""" + app = build_app( + mode, + workload, + project_name=project_name, + experiment_id=experiment_id, + real_tracker=real_tracker, + ) + workload.ensure_loaded() + log_counter: _CodeCarbonLogCounter | None = None + logging_level_restore: int | None = None + predict_url = "http://benchmark/predict" + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, timeout=120.0) as client: + if warmup > 0: + await _run_load_async(client, predict_url, warmup, concurrency) + if verify_logging and mode == "deferred_logging": + log_counter = _CodeCarbonLogCounter() + logging_level_restore = codecarbon_logger.level + codecarbon_logger.setLevel(logging.INFO) + codecarbon_logger.addHandler(log_counter) + latencies = await _run_load_async( + client, predict_url, requests, concurrency + ) + if mode != "baseline": + drain_s = 0.5 if real_tracker else measurement_delay_s + await _wait_for_deferred_finalize( + drain_s, requests=requests, concurrency=concurrency + ) + if log_counter is not None: + expected_logs = logging_sample or requests + deadline = time.perf_counter() + min( + 2.0, + measurement_delay_s * (requests / max(concurrency, 1) + 2) + 0.25, + ) + while ( + log_counter.emissions_lines < expected_logs + and time.perf_counter() < deadline + ): + await asyncio.sleep(0.005) + log_lines = log_counter.emissions_lines if log_counter is not None else None + if mode != "baseline": + _drain_middleware(app) + if log_counter is not None: + codecarbon_logger.removeHandler(log_counter) + if logging_level_restore is not None: + codecarbon_logger.setLevel(logging_level_restore) + return _summarize( + display_name, + latencies, + concurrency, + None, + bootstrap_samples=bootstrap_samples, + use_normal_ci=use_normal_ci, + codecarbon_log_lines=log_lines, + ) + + +async def _run_scenario_network( + mode: str, + display_name: str, + port: int, + requests: int, + warmup: int, + concurrency: int, + measurement_delay_s: float, + workload: InferenceWorkload, + real_tracker: bool, + *, + bootstrap_samples: int, + use_normal_ci: bool, + verify_logging: bool, + api_delay_s: float = 0.0, + experiment_id: str = FASTAPI_BENCHMARK_EXPERIMENT_ID, + project_name: str = FASTAPI_BENCHMARK_PROJECT_ID, +) -> BenchmarkResult: + import uvicorn + + app = build_app( + mode, + workload, + project_name=project_name, + experiment_id=experiment_id, + real_tracker=real_tracker, + ) + api_patcher = None + uses_save_to_api = mode == "deferred_save_to_api" + if uses_save_to_api and not real_tracker: + api_patcher = _install_api_client_patch(api_delay_s) + api_patcher.start() + + config = uvicorn.Config( + app, host="127.0.0.1", port=port, log_level="error", access_log=False + ) + server = uvicorn.Server(config) + + def _serve() -> None: + server.run() + + thread = threading.Thread(target=_serve, daemon=True) + thread.start() + predict_url = f"http://127.0.0.1:{port}/predict" + log_counter: _CodeCarbonLogCounter | None = None + logging_level_restore: int | None = None + try: + async with httpx.AsyncClient() as client: + await _wait_for_server_async(client, predict_url) + if warmup > 0: + await _run_load_async(client, predict_url, warmup, concurrency) + if mode != "baseline": + finalize_drain_s = ( + 3.0 + if real_tracker + else measurement_delay_s * FINALIZE_DRAIN_MULTIPLIER + ) + time.sleep(finalize_drain_s) + if verify_logging and mode == "deferred_logging": + log_counter = _CodeCarbonLogCounter() + logging_level_restore = codecarbon_logger.level + codecarbon_logger.setLevel(logging.INFO) + codecarbon_logger.addHandler(log_counter) + latencies = await _run_load_async( + client, predict_url, requests, concurrency + ) + if mode != "baseline": + time.sleep( + 3.0 + if real_tracker + else measurement_delay_s * FINALIZE_DRAIN_MULTIPLIER + ) + log_lines = log_counter.emissions_lines if log_counter is not None else None + if log_counter is not None: + codecarbon_logger.removeHandler(log_counter) + if logging_level_restore is not None: + codecarbon_logger.setLevel(logging_level_restore) + return _summarize( + display_name, + latencies, + concurrency, + None, + bootstrap_samples=bootstrap_samples, + use_normal_ci=use_normal_ci, + codecarbon_log_lines=log_lines, + ) + finally: + server.should_exit = True + thread.join(timeout=3.0) + if api_patcher is not None: + api_patcher.stop() + + +def _format_results( + results: list[BenchmarkResult], + *, + workload: str, + model_id: str, + real_tracker: bool, + measurement_delay_ms: float | None, + api_delay_ms: float | None, + with_save_to_api: bool, + experiment_id: str, + project_id: str, + bootstrap_samples: int, + use_normal_ci: bool, + in_process: bool, + logging_verified: bool | None, +) -> str: + confidence_pct = int(CONFIDENCE_LEVEL * 100) + ci_method = ( + f"{confidence_pct}% normal approx" + if use_normal_ci + else f"{confidence_pct}% bootstrap ({bootstrap_samples} resamples)" + ) + transport = "in-process ASGI" if in_process else "HTTP (uvicorn)" + lines = [ + f"Platform: {platform.system()} {platform.release()} ({platform.machine()})", + f"Python: {sys.version.split()[0]}", + f"Workload: {workload} ({model_id})", + f"Transport: {transport}", + f"HTTP client: async (httpx.AsyncClient)", + f"EmissionsTracker: {'live' if real_tracker else f'mocked ({measurement_delay_ms:.0f} ms stop delay)'}", + f"save_to_api scenario: {'yes (api_call_interval=1)' if with_save_to_api else 'no'}", + f"project_id: {project_id}", + ( + f"experiment_id (save_to_api): {experiment_id}" + if with_save_to_api + else "experiment_id (save_to_api): n/a" + ), + ( + f"Mocked API upload delay: {api_delay_ms:.0f} ms" + if with_save_to_api and api_delay_ms is not None + else "Mocked API upload delay: n/a" + ), + f"Middleware: default deferred measurement", + f"Logger namespace: {codecarbon_logger.name}", + f"Requests per scenario: {results[0].requests} (warmup excluded), " + f"concurrency: {results[0].concurrency}", + f"Mean CI: {ci_method}", + "", + f"| Configuration | Mean (ms) | {confidence_pct}% CI (ms) | Median (ms) | " + f"p95 (ms) | req/s | vs baseline |", + "|---|---:|---|---:|---:|---:|---:|---:|", + ] + for result in results: + ci_cell = f"[{result.ci_low_ms:.1f}, {result.ci_high_ms:.1f}]" + overhead = result.overhead_pct + if overhead is None: + overhead_str = "—" + elif overhead >= 0: + overhead_str = f"+{overhead:.1f}%" + else: + overhead_str = f"{overhead:.1f}%" + lines.append( + f"| {result.name} | {result.mean_ms:.2f} | {ci_cell} | " + f"{result.median_ms:.2f} | {result.p95_ms:.2f} | " + f"{result.requests_per_sec:.1f} | {overhead_str} |" + ) + if logging_verified is not None: + status = "yes" if logging_verified else "no" + lines.append("") + lines.append( + f"CodeCarbon per-request log lines (default middleware): verified={status}" + ) + return "\n".join(lines) + + +SCENARIO_KEYS = { + "no_logging": ("deferred_no_logging", "Deferred, no logging"), + "logging": ("deferred_logging", "Deferred + logging (default)"), + "save_to_api": ("deferred_save_to_api", "Deferred + save_to_api (no logging)"), +} + + +async def _run_benchmarks_async( + *, + requests: int, + warmup: int, + secondary_warmup: int, + concurrency: int, + measurement_delay_s: float, + workload_name: str, + model_id: str, + real_tracker: bool, + bootstrap_samples: int, + use_normal_ci: bool, + verify_logging: bool, + logging_sample: int | None, + with_save_to_api: bool, + scenario_keys: list[str] | None, + api_delay_s: float, + experiment_id: str, + project_id: str, + inference_delay_s: float, + in_process: bool, +) -> tuple[list[BenchmarkResult], bool | None]: + """Run baseline and middleware scenarios.""" + workload = InferenceWorkload( + workload_name, model_id, inference_delay_s=inference_delay_s + ) + if workload_name != "noop": + print(f"Preloading workload {workload_name} ({model_id})...", flush=True) + workload.ensure_loaded() + + api_delay_state = {"api": 0.0} + tracker_patcher: Any | None = None + api_patcher: Any | None = None + + async def _run_one( + mode: str, + label: str, + *, + port: int | None, + scenario_warmup: int, + ) -> BenchmarkResult: + if in_process: + return await _run_scenario_in_process( + mode, + label, + requests, + scenario_warmup, + concurrency, + workload, + measurement_delay_s, + real_tracker=real_tracker, + bootstrap_samples=bootstrap_samples, + use_normal_ci=use_normal_ci, + verify_logging=verify_logging, + logging_sample=logging_sample, + experiment_id=experiment_id, + project_name=project_id, + ) + assert port is not None + return await _run_scenario_network( + mode, + label, + port, + requests, + scenario_warmup, + concurrency, + measurement_delay_s, + workload, + real_tracker, + bootstrap_samples=bootstrap_samples, + use_normal_ci=use_normal_ci, + verify_logging=verify_logging, + api_delay_s=api_delay_state["api"], + experiment_id=experiment_id, + project_name=project_id, + ) + + baseline = await _run_one( + "baseline", + "No middleware (baseline)", + port=8765 if not in_process else None, + scenario_warmup=warmup, + ) + + scenarios: list[tuple[str, str]] = [] + selected = scenario_keys or ["no_logging", "logging"] + if with_save_to_api and "save_to_api" not in selected: + selected = [*selected, "save_to_api"] + for key in selected: + if key not in SCENARIO_KEYS: + raise ValueError( + f"Unknown scenario {key!r}; choose from {sorted(SCENARIO_KEYS)}" + ) + scenarios.append(SCENARIO_KEYS[key]) + + if not real_tracker: + tracker_patcher = _install_tracker_patch( + measurement_delay_s, api_delay_state=api_delay_state + ) + tracker_patcher.start() + + results: list[BenchmarkResult] = [baseline] + logging_result: BenchmarkResult | None = None + try: + for index, (mode, label) in enumerate(scenarios): + api_delay_state["api"] = ( + api_delay_s if mode == "deferred_save_to_api" else 0.0 + ) + middleware_warmup = ( + secondary_warmup + if secondary_warmup > 0 + else min(10, warmup) + if in_process + else warmup + ) + result = await _run_one( + mode, + label, + port=None if in_process else 8766 + index, + scenario_warmup=middleware_warmup if in_process else warmup, + ) + if mode == "deferred_logging": + logging_result = result + results.append(result) + finally: + if api_patcher is not None: + api_patcher.stop() + if tracker_patcher is not None: + tracker_patcher.stop() + + baseline_mean = baseline.mean_ms + enriched: list[BenchmarkResult] = [ + BenchmarkResult( + name=baseline.name, + requests=baseline.requests, + concurrency=baseline.concurrency, + mean_ms=baseline.mean_ms, + ci_low_ms=baseline.ci_low_ms, + ci_high_ms=baseline.ci_high_ms, + median_ms=baseline.median_ms, + p95_ms=baseline.p95_ms, + requests_per_sec=baseline.requests_per_sec, + overhead_pct=None, + ) + ] + for result in results[1:]: + enriched.append( + BenchmarkResult( + name=result.name, + requests=result.requests, + concurrency=result.concurrency, + mean_ms=result.mean_ms, + ci_low_ms=result.ci_low_ms, + ci_high_ms=result.ci_high_ms, + median_ms=result.median_ms, + p95_ms=result.p95_ms, + requests_per_sec=result.requests_per_sec, + overhead_pct=((result.mean_ms - baseline_mean) / baseline_mean * 100), + codecarbon_log_lines=result.codecarbon_log_lines, + ) + ) + + logging_verified: bool | None = None + if logging_result is not None and logging_result.codecarbon_log_lines is not None: + expected_logs = logging_sample or logging_result.requests + logging_verified = logging_result.codecarbon_log_lines >= expected_logs + return enriched, logging_verified + + +def run_benchmarks( + *, + requests: int = BENCHMARK_REQUESTS, + warmup: int = WARMUP_REQUESTS, + secondary_warmup: int = 0, + concurrency: int = CONCURRENCY, + measurement_delay_s: float = DEFAULT_MEASUREMENT_DELAY_S, + workload_name: str, + model_id: str, + real_tracker: bool, + bootstrap_samples: int, + use_normal_ci: bool, + verify_logging: bool, + logging_sample: int | None, + with_save_to_api: bool, + scenario_keys: list[str] | None, + api_delay_s: float, + experiment_id: str, + project_id: str, + inference_delay_s: float, + in_process: bool, +) -> tuple[list[BenchmarkResult], bool | None]: + """Run all scenarios under one asyncio event loop.""" + return asyncio.run( + _run_benchmarks_async( + requests=requests, + warmup=warmup, + secondary_warmup=secondary_warmup, + concurrency=concurrency, + measurement_delay_s=measurement_delay_s, + workload_name=workload_name, + model_id=model_id, + real_tracker=real_tracker, + bootstrap_samples=bootstrap_samples, + use_normal_ci=use_normal_ci, + verify_logging=verify_logging, + logging_sample=logging_sample, + with_save_to_api=with_save_to_api, + scenario_keys=scenario_keys, + api_delay_s=api_delay_s, + experiment_id=experiment_id, + project_id=project_id, + inference_delay_s=inference_delay_s, + in_process=in_process, + ) + ) + + +def _resolve_model_id(workload: str, model_id: str | None) -> str: + if model_id: + return model_id + if workload == "hf-embedder": + return DEFAULT_EMBEDDER_MODEL + if workload == "hf-classifier": + return DEFAULT_CLASSIFIER_MODEL + return "n/a" + + +def main() -> None: + """CLI entrypoint.""" + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--requests", type=int, default=BENCHMARK_REQUESTS) + parser.add_argument("--warmup", type=int, default=WARMUP_REQUESTS) + parser.add_argument("--concurrency", type=int, default=CONCURRENCY) + parser.add_argument( + "--bootstrap-samples", + type=int, + default=BOOTSTRAP_SAMPLES, + help="Bootstrap resamples for mean latency CI", + ) + parser.add_argument( + "--workload", + choices=("noop", "hf-embedder", "hf-classifier"), + default="hf-embedder", + ) + parser.add_argument("--model", default=None, help="Hugging Face model id override") + parser.add_argument( + "--real-tracker", + action="store_true", + help="Use a live EmissionsTracker instead of a mocked stop() delay", + ) + parser.add_argument( + "--realistic", + action="store_true", + help=( + "Live tracker + hf-embedder + uvicorn HTTP: " + f"{REALISTIC_BENCHMARK_REQUESTS} requests, concurrency {REALISTIC_CONCURRENCY}" + ), + ) + parser.add_argument( + "--no-verify-logging", + action="store_true", + help="Skip counting codecarbon logger lines after the default scenario", + ) + parser.add_argument( + "--measurement-delay-ms", + type=float, + default=DEFAULT_MEASUREMENT_DELAY_S * 1000, + help="Mocked tracker stop() duration when --real-tracker is not set", + ) + parser.add_argument( + "--with-save-to-api", + action="store_true", + help="Add a scenario with save_to_api=True and api_call_interval=1", + ) + parser.add_argument( + "--project-id", + default=FASTAPI_BENCHMARK_PROJECT_ID, + help="CodeCarbon project UUID (middleware project_name for tracked scenarios)", + ) + parser.add_argument( + "--experiment-id", + default=FASTAPI_BENCHMARK_EXPERIMENT_ID, + help="CodeCarbon experiment UUID for the save_to_api scenario", + ) + parser.add_argument( + "--api-delay-ms", + type=float, + default=None, + help="Simulated API upload latency (defaults to --measurement-delay-ms)", + ) + parser.add_argument( + "--smoke", + action="store_true", + help=( + "Fastest run: in-process ASGI, 20 requests, skips log verify, " + "no_logging+logging only" + ), + ) + parser.add_argument( + "--quick", + action="store_true", + help=( + "Fast run: in-process ASGI, noop + 25 ms simulated inference, " + "50 timed requests, normal-approx CI" + ), + ) + parser.add_argument( + "--in-process", + action="store_true", + help="Benchmark via httpx ASGI transport (no uvicorn TCP per scenario)", + ) + parser.add_argument( + "--network", + action="store_true", + help="Force uvicorn HTTP even when --quick is set", + ) + parser.add_argument( + "--inference-delay-ms", + type=float, + default=0.0, + help="Optional sleep per /predict request (useful with --workload noop)", + ) + parser.add_argument( + "--logging-sample", + type=int, + default=None, + help="Verify at least N log lines (default: all requests; quick uses 10)", + ) + parser.add_argument( + "--scenarios", + default=None, + help="Comma-separated middleware scenarios: no_logging, logging, save_to_api", + ) + args = parser.parse_args() + if args.realistic: + args.real_tracker = True + args.network = True + args.quick = False + args.workload = "hf-embedder" + if args.requests == BENCHMARK_REQUESTS: + args.requests = REALISTIC_BENCHMARK_REQUESTS + if args.warmup == WARMUP_REQUESTS: + args.warmup = REALISTIC_WARMUP_REQUESTS + if args.concurrency == CONCURRENCY: + args.concurrency = REALISTIC_CONCURRENCY + config_project, config_experiment = _config_ids() + if args.project_id == FASTAPI_BENCHMARK_PROJECT_ID: + args.project_id = config_project + if args.experiment_id == FASTAPI_BENCHMARK_EXPERIMENT_ID: + args.experiment_id = config_experiment + os.environ.setdefault("CODECARBON_ALLOW_MULTIPLE_RUNS", "True") + scenario_keys = ( + [part.strip() for part in args.scenarios.split(",") if part.strip()] + if args.scenarios + else None + ) + use_normal_ci = False + secondary_warmup = 0 + logging_sample = args.logging_sample + if args.smoke: + args.quick = True + if args.requests == BENCHMARK_REQUESTS: + args.requests = SMOKE_BENCHMARK_REQUESTS + if args.warmup == WARMUP_REQUESTS: + args.warmup = SMOKE_WARMUP_REQUESTS + if args.inference_delay_ms == 0.0: + args.inference_delay_ms = SMOKE_INFERENCE_DELAY_MS + args.no_verify_logging = True + if scenario_keys is None: + scenario_keys = ["no_logging", "logging"] + if args.quick: + if args.workload == "hf-embedder": + args.workload = "noop" + if args.requests == BENCHMARK_REQUESTS: + args.requests = QUICK_BENCHMARK_REQUESTS + if args.warmup == WARMUP_REQUESTS: + args.warmup = QUICK_WARMUP_REQUESTS + if args.bootstrap_samples == BOOTSTRAP_SAMPLES: + args.bootstrap_samples = QUICK_BOOTSTRAP_SAMPLES + if args.inference_delay_ms == 0.0: + args.inference_delay_ms = QUICK_INFERENCE_DELAY_MS + use_normal_ci = True + secondary_warmup = QUICK_SECONDARY_WARMUP + if logging_sample is None and not args.no_verify_logging: + logging_sample = QUICK_LOGGING_SAMPLE + in_process = (args.in_process or args.quick) and not args.network + if in_process and not args.quick and args.bootstrap_samples == BOOTSTRAP_SAMPLES: + use_normal_ci = False + model_id = _resolve_model_id(args.workload, args.model) + measurement_delay_s = args.measurement_delay_ms / 1000 + api_delay_ms = ( + args.api_delay_ms + if args.api_delay_ms is not None + else args.measurement_delay_ms + ) + api_delay_s = api_delay_ms / 1000 + inference_delay_s = args.inference_delay_ms / 1000 + + previous_log_level = codecarbon_logger.level + codecarbon_logger.setLevel(logging.WARNING) + + results, logging_verified = run_benchmarks( + requests=args.requests, + warmup=args.warmup, + secondary_warmup=secondary_warmup, + concurrency=args.concurrency, + measurement_delay_s=measurement_delay_s, + workload_name=args.workload, + model_id=model_id, + real_tracker=args.real_tracker, + bootstrap_samples=args.bootstrap_samples, + use_normal_ci=use_normal_ci, + verify_logging=not args.no_verify_logging, + logging_sample=logging_sample, + with_save_to_api=args.with_save_to_api, + scenario_keys=scenario_keys, + api_delay_s=api_delay_s, + experiment_id=args.experiment_id, + project_id=args.project_id, + inference_delay_s=inference_delay_s, + in_process=in_process, + ) + codecarbon_logger.setLevel(previous_log_level) + delay_label = None if args.real_tracker else args.measurement_delay_ms + print( + _format_results( + results, + workload=args.workload, + model_id=model_id, + real_tracker=args.real_tracker, + measurement_delay_ms=delay_label or 0.0, + api_delay_ms=api_delay_ms if args.with_save_to_api else None, + with_save_to_api=args.with_save_to_api, + experiment_id=args.experiment_id, + project_id=args.project_id, + bootstrap_samples=args.bootstrap_samples, + use_normal_ci=use_normal_ci, + in_process=in_process, + logging_verified=logging_verified, + ) + ) + if logging_verified is False: + logging_result = results[-1] + print( + f"\nWARNING: expected at least {logging_sample or logging_result.requests} " + f"CodeCarbon log lines, got {logging_result.codecarbon_log_lines}", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/scripts/verify_fastapi_middleware_outputs.py b/scripts/verify_fastapi_middleware_outputs.py new file mode 100644 index 000000000..acc31ac9c --- /dev/null +++ b/scripts/verify_fastapi_middleware_outputs.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +"""Verify FastAPI middleware logging, CSV, and optional API upload. + +Per-request emissions appear in logs via ``on_request_complete`` (default). +CSV rows and API ``add_emission`` calls are written when the shared tracker +stops (use ``create_codecarbon_lifespan``), not after each ``stop_task``. + +Examples: + uv run --extra fastapi python scripts/verify_fastapi_middleware_outputs.py + uv run --extra fastapi python scripts/verify_fastapi_middleware_outputs.py --save-to-api +""" + +from __future__ import annotations + +import argparse +import logging +import sys +import tempfile +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +import codecarbon.integrations.fastapi.middleware as cc_fastapi_middleware +import requests + +from codecarbon.core.api_client import ApiClient +from codecarbon.core.config import get_hierarchical_config +from codecarbon.integrations.fastapi import ( + add_codecarbon_middleware, + create_codecarbon_lifespan, +) +from codecarbon.integrations.fastapi.middleware import log_request_complete + + +class _LogCounter(logging.Handler): + def __init__(self) -> None: + super().__init__(level=logging.INFO) + self.request_log_lines = 0 + + def emit(self, record: logging.LogRecord) -> None: + if record.name != "codecarbon": + return + message = record.getMessage() + if message.startswith("CodeCarbon ") and "emissions=" in message: + self.request_log_lines += 1 + + +def _build_app( + *, + output_dir: Path, + save_to_api: bool, + project_name: str, +) -> FastAPI: + tracker_kwargs: dict[str, Any] = { + "save_to_file": True, + "save_to_api": save_to_api, + "save_to_logger": False, + "output_dir": str(output_dir), + "measure_power_secs": 2, + "api_call_interval": 1, + "allow_multiple_runs": True, + } + + @asynccontextmanager + async def lifespan(application: FastAPI): + async with create_codecarbon_lifespan( + application, + project_name=project_name, + **tracker_kwargs, + ): + yield + + application = FastAPI(lifespan=lifespan) + add_codecarbon_middleware( + application, + project_name=project_name, + tracker_kwargs=tracker_kwargs, + on_request_complete=log_request_complete, + ) + + @application.get("/predict") + def predict(text: str = "hello") -> dict[str, str]: + return {"text": text, "label": "demo"} + + return application + + +def _count_run_emissions(api: ApiClient, run_id: str) -> int: + url = f"{api.url}/runs/{run_id}/emissions" + response = requests.get(url, headers=api._get_headers(), timeout=15) + if response.status_code != 200: + return 0 + payload = response.json() + items = payload.get("items") or payload.get("data") or [] + if isinstance(items, list): + return len(items) + return 0 + + +def _get_api_client_from_config() -> ApiClient | None: + conf = get_hierarchical_config() + section = conf.get("codecarbon", conf) + api_key = section.get("api_key") or section.get("api_token") + experiment_id = section.get("experiment_id") + endpoint = section.get("api_endpoint") or "https://api.codecarbon.io" + if not api_key or not experiment_id: + return None + return ApiClient( + endpoint_url=endpoint, + experiment_id=experiment_id, + api_key=api_key, + conf=conf, + create_run_automatically=False, + ) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--save-to-api", + action="store_true", + help="Enable save_to_api using ~/.codecarbon.config (requires api_key).", + ) + parser.add_argument( + "--requests", + type=int, + default=3, + help="Number of GET /predict calls (default: 3).", + ) + args = parser.parse_args(argv) + + save_to_api = args.save_to_api + if save_to_api: + api_probe = _get_api_client_from_config() + if api_probe is None: + print( + "ERROR: --save-to-api needs api_key and experiment_id in " + "~/.codecarbon.config", + file=sys.stderr, + ) + return 1 + if api_probe.check_auth() is None: + print( + "WARN: /auth/check failed; continuing (upload probe uses run emissions)." + ) + + log_counter = _LogCounter() + cc_fastapi_middleware.logger.addHandler(log_counter) + + failures: list[str] = [] + try: + with tempfile.TemporaryDirectory(prefix="cc-fastapi-verify-") as tmp: + output_dir = Path(tmp) + app = _build_app( + output_dir=output_dir, + save_to_api=save_to_api, + project_name="fastapi-verify", + ) + run_id: str | None = None + with TestClient(app) as client: + for _ in range(args.requests): + response = client.get("/predict", params={"text": "verify"}) + if response.status_code != 200: + failures.append( + f"predict returned status {response.status_code}" + ) + break + tracker = getattr(app.state, "codecarbon_tracker", None) + if tracker is not None: + for handler in tracker._output_handlers: + handler_run_id = getattr(handler, "run_id", None) + if handler_run_id: + run_id = handler_run_id + break + + if log_counter.request_log_lines < args.requests: + failures.append( + f"expected {args.requests} per-request log lines, got " + f"{log_counter.request_log_lines}" + ) + else: + print( + f"OK: {log_counter.request_log_lines} per-request log line(s) " + "(on_request_complete)" + ) + + emissions_csv = output_dir / "emissions.csv" + if not emissions_csv.is_file() or emissions_csv.stat().st_size == 0: + failures.append( + f"missing or empty CSV at {emissions_csv} (written on tracker.stop)" + ) + else: + line_count = len(emissions_csv.read_text().splitlines()) + print(f"OK: CSV {emissions_csv} ({line_count} line(s) including header)") + + task_csvs = list(output_dir.glob("emissions_*.csv")) + if task_csvs: + print(f"OK: task CSV(s): {', '.join(p.name for p in task_csvs)}") + else: + print( + "NOTE: no per-task CSV (emissions__.csv); " + "run-level emissions.csv is the main artifact on stop" + ) + + if save_to_api: + api = _get_api_client_from_config() + if api is None or run_id is None: + failures.append("could not resolve API client or run_id after stop") + else: + count = _count_run_emissions(api, run_id) + if count < 1: + failures.append( + f"no emissions listed for run {run_id} at " + f"{api.url}/runs/.../emissions" + ) + else: + print( + f"OK: API run {run_id} has {count} emission record(s)" + ) + finally: + cc_fastapi_middleware.logger.removeHandler(log_counter) + + if failures: + for msg in failures: + print(f"FAIL: {msg}", file=sys.stderr) + return 1 + + print("All checks passed.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/integrations/test_fastapi_headers.py b/tests/integrations/test_fastapi_headers.py new file mode 100644 index 000000000..8f43d12fe --- /dev/null +++ b/tests/integrations/test_fastapi_headers.py @@ -0,0 +1,118 @@ +"""Tests for response header mapping from :class:`~codecarbon.output_methods.emissions_data.EmissionsData`.""" + +import pytest +from starlette.responses import Response + +from codecarbon.integrations.fastapi._headers import ( + HEADER_PRESETS, + apply_response_headers, + resolve_header_mapping, +) +from codecarbon.output_methods.emissions_data import EmissionsData + + +@pytest.fixture +def emissions_data() -> EmissionsData: + return EmissionsData( + timestamp="2026-05-19T12:00:00", + project_name="test", + run_id="run-1", + experiment_id="exp-1", + duration=1.5, + emissions=0.00042, + emissions_rate=0.00028, + cpu_power=12.0, + gpu_power=0.0, + ram_power=5.0, + cpu_energy=0.003, + gpu_energy=0.0, + ram_energy=0.001, + energy_consumed=0.004, + water_consumed=0.0, + country_name="France", + country_iso_code="FRA", + region="", + cloud_provider="", + cloud_region="", + os="Darwin", + python_version="3.12", + codecarbon_version="3.2.6", + cpu_count=8, + cpu_model="Apple M1", + gpu_count=0, + gpu_model="", + longitude=2.35, + latitude=48.85, + ram_total_size=16.0, + tracking_mode="machine", + ) + + +def test_resolve_header_mapping_preset_emissions() -> None: + mapping = resolve_header_mapping("emissions") + assert mapping == {"emissions": "X-CodeCarbon-Emissions-kg"} + + +def test_resolve_header_mapping_field_list() -> None: + mapping = resolve_header_mapping(["emissions", "duration"]) + assert mapping["emissions"] == "X-CodeCarbon-Emissions-kg" + assert mapping["duration"] == "X-CodeCarbon-Duration-s" + + +def test_resolve_header_mapping_custom_dict() -> None: + custom = {"emissions": "X-App-CO2", "duration": "X-App-Time"} + assert resolve_header_mapping(custom) == custom + + +def test_resolve_header_mapping_bool_true_aliases_emissions() -> None: + assert resolve_header_mapping(True) == HEADER_PRESETS["emissions"] + + +def test_resolve_header_mapping_none_or_false_returns_empty() -> None: + assert resolve_header_mapping(None) == {} + assert resolve_header_mapping(False) == {} + + +def test_resolve_header_mapping_full_preset() -> None: + mapping = resolve_header_mapping("full") + assert mapping["emissions"] == "X-CodeCarbon-Emissions-kg" + assert ( + mapping["cpu_utilization_percent"] + == "X-CodeCarbon-Cpu-Utilization-Percent-percent" + ) + + +def test_resolve_header_mapping_unknown_preset_raises() -> None: + with pytest.raises(ValueError, match="Unknown response_headers preset"): + resolve_header_mapping("not-a-preset") + + +def test_apply_response_headers_sets_values(emissions_data: EmissionsData) -> None: + response = Response(content=b"ok") + apply_response_headers( + response, + emissions_data, + { + "emissions": "X-CodeCarbon-Emissions-kg", + "duration": "X-CodeCarbon-Duration-s", + }, + ) + assert response.headers["X-CodeCarbon-Emissions-kg"] == "0.00042" + assert response.headers["X-CodeCarbon-Duration-s"] == "1.5" + + +def test_apply_response_headers_ignores_unknown_fields( + emissions_data: EmissionsData, +) -> None: + response = Response(content=b"ok") + apply_response_headers(response, emissions_data, {"not_a_field": "X-Bad"}) + assert "X-Bad" not in response.headers + + +def test_apply_response_headers_noop_when_mapping_empty( + emissions_data: EmissionsData, +) -> None: + response = Response(content=b"ok") + before = dict(response.headers) + apply_response_headers(response, emissions_data, {}) + assert dict(response.headers) == before diff --git a/tests/integrations/test_fastapi_import.py b/tests/integrations/test_fastapi_import.py new file mode 100644 index 000000000..5289310ec --- /dev/null +++ b/tests/integrations/test_fastapi_import.py @@ -0,0 +1,51 @@ +"""Import surface for the optional FastAPI integration package.""" + +import builtins +import importlib +import sys + +import pytest + + +def test_fastapi_integration_importable() -> None: + """Public helpers are importable without instantiating middleware.""" + from codecarbon.integrations.fastapi import ( + CodeCarbonMiddleware, + add_codecarbon_middleware, + create_codecarbon_lifespan, + log_request_complete, + shutdown_codecarbon_middleware, + ) + + assert CodeCarbonMiddleware is not None + assert callable(add_codecarbon_middleware) + assert callable(create_codecarbon_lifespan) + assert callable(log_request_complete) + assert callable(shutdown_codecarbon_middleware) + + +def test_missing_starlette_shows_helpful_error(monkeypatch: pytest.MonkeyPatch) -> None: + """Middleware import surfaces an actionable hint without Starlette/FastAPI.""" + for key in list(sys.modules): + if key.startswith("starlette") or key.startswith( + "codecarbon.integrations.fastapi" + ): + del sys.modules[key] + + real_import = builtins.__import__ + + def mock_import( + name: str, + globals: dict | None = None, + locals: dict | None = None, + fromlist: tuple[str, ...] = (), + level: int = 0, + ): + root = name.split(".", 1)[0] + if root in ("starlette", "fastapi"): + raise ImportError("no starlette") + return real_import(name, globals, locals, fromlist, level) + + monkeypatch.setattr(builtins, "__import__", mock_import) + with pytest.raises(ImportError, match=r"pip install .*codecarbon\[fastapi\]"): + importlib.import_module("codecarbon.integrations.fastapi.middleware") diff --git a/tests/integrations/test_fastapi_lifespan.py b/tests/integrations/test_fastapi_lifespan.py new file mode 100644 index 000000000..eae624d28 --- /dev/null +++ b/tests/integrations/test_fastapi_lifespan.py @@ -0,0 +1,29 @@ +import asyncio +from unittest.mock import MagicMock, patch + +import pytest +from fastapi import FastAPI + +import codecarbon.integrations.fastapi.lifespan as cc_fastapi_lifespan +from codecarbon.integrations.fastapi.lifespan import create_codecarbon_lifespan + + +@pytest.fixture +def app(): + return FastAPI() + + +@patch.object(cc_fastapi_lifespan, "EmissionsTracker") +def test_lifespan_stops_tracker_on_shutdown(MockTracker, app): + tracker = MagicMock() + MockTracker.return_value = tracker + + async def run(): + async with create_codecarbon_lifespan(app, project_name="api"): + assert app.state.codecarbon_tracker is tracker + tracker.start.assert_called_once() + + asyncio.run(run()) + + tracker.stop.assert_called_once() + assert app.state.codecarbon_tracker is None diff --git a/tests/integrations/test_fastapi_middleware.py b/tests/integrations/test_fastapi_middleware.py new file mode 100644 index 000000000..27d1d8f8f --- /dev/null +++ b/tests/integrations/test_fastapi_middleware.py @@ -0,0 +1,416 @@ +import asyncio +import logging +from concurrent import futures +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +import codecarbon.integrations.fastapi.lifespan as cc_fastapi_lifespan +import codecarbon.integrations.fastapi.middleware as cc_fastapi_middleware +from codecarbon.integrations.fastapi import ( + add_codecarbon_middleware, + create_codecarbon_lifespan, + shutdown_codecarbon_middleware, +) +from codecarbon.external.logger import logger as codecarbon_logger +from codecarbon.integrations.fastapi.middleware import log_request_complete + + +def _run_finalize_immediately(coro: Any) -> None: + def run_in_thread() -> None: + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(coro) + finally: + loop.close() + + futures.ThreadPoolExecutor(max_workers=1).submit(run_in_thread).result() + + +@pytest.fixture(autouse=True) +def finalize_deferred_immediately(): + with patch.object( + cc_fastapi_middleware.CodeCarbonMiddleware, + "_schedule_finalize", + side_effect=_run_finalize_immediately, + ): + yield + + +@pytest.fixture +def app(): + application = FastAPI() + + @application.get("/items/{item_id}") + def get_item(item_id: int): + return {"item_id": item_id} + + @application.get("/health") + def health(): + return {"ok": True} + + add_codecarbon_middleware(application, project_name="test-api") + return application + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_tracks_routed_request(MockTracker, app) -> None: + tracker_instance = MockTracker.return_value + tracker_instance.stop_task.return_value = MagicMock(emissions=0.001) + + response = TestClient(app).get("/items/7") + + assert response.status_code == 200 + MockTracker.assert_called_once() + tracker_instance.start.assert_called_once() + tracker_instance.start_task.assert_called_once() + tracker_instance.stop_task.assert_called_once() + tracker_instance.persist_completed_task.assert_called_once_with("GET /items/7") + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_skips_excluded_paths(MockTracker, app) -> None: + response = TestClient(app).get("/health") + assert response.status_code == 200 + MockTracker.assert_not_called() + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_on_request_complete_callback(MockTracker) -> None: + application = FastAPI() + completed = [] + + @application.get("/predict") + def predict(): + return {"ok": True} + + add_codecarbon_middleware( + application, + on_request_complete=lambda request, response, data, task_name: completed.append( + (request.url.path, response.status_code, data, task_name) + ), + ) + tracker_instance = MockTracker.return_value + emissions = MagicMock(emissions=0.001) + tracker_instance.stop_task.return_value = emissions + + response = TestClient(application).get("/predict") + assert response.status_code == 200 + assert completed == [("/predict", 200, emissions, "GET /predict")] + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_uses_lifespan_tracker(MockTracker) -> None: + application = FastAPI() + tracker_instance = MagicMock() + tracker_instance._start_time = 1.0 + baseline = MagicMock(task_name="GET /predict") + emissions = MagicMock(emissions=0.003) + tracker_instance.mark_http_request_start.return_value = baseline + tracker_instance.finish_http_request.return_value = emissions + application.state.codecarbon_tracker = tracker_instance + completed = [] + + @application.get("/predict") + def predict(): + return {"ok": True} + + add_codecarbon_middleware( + application, + on_request_complete=lambda request, response, data, task_name: completed.append( + (request.url.path, data, task_name) + ), + ) + + response = TestClient(application).get("/predict") + assert response.status_code == 200 + MockTracker.assert_not_called() + tracker_instance.mark_http_request_start.assert_called_once_with("GET /predict") + tracker_instance.finish_http_request.assert_called_once_with(baseline) + tracker_instance.persist_completed_task.assert_called_once_with("GET /predict") + assert completed == [("/predict", emissions, "GET /predict")] + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_skips_callback_when_handler_raises(MockTracker) -> None: + application = FastAPI() + tracker_instance = MagicMock() + tracker_instance.stop_task.return_value = MagicMock(emissions=0.001) + application.state.codecarbon_tracker = tracker_instance + completed = [] + + @application.get("/fail") + def fail(): + raise RuntimeError("boom") + + add_codecarbon_middleware( + application, + on_request_complete=lambda *args: completed.append(args), + ) + + with pytest.raises(RuntimeError, match="boom"): + TestClient(application, raise_server_exceptions=True).get("/fail") + + assert completed == [] + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_lazy_tracker(MockTracker) -> None: + application = FastAPI() + tracker_instance = MagicMock() + tracker_instance.stop_task.return_value = MagicMock(emissions=0.005) + MockTracker.return_value = tracker_instance + + @application.get("/run") + def run(): + return {"ok": True} + + add_codecarbon_middleware(application) + + response = TestClient(application).get("/run") + assert response.status_code == 200 + MockTracker.assert_called_once() + tracker_instance.start.assert_called_once() + tracker_instance.start_task.assert_called_once_with("GET /run") + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_no_logging_when_callback_disabled(MockTracker) -> None: + application = FastAPI() + + @application.get("/predict") + def predict(): + return {"ok": True} + + add_codecarbon_middleware(application, on_request_complete=None) + MockTracker.return_value.stop_task.return_value = MagicMock(emissions=0.001) + + with patch.object(cc_fastapi_middleware.logger, "info") as mock_info: + response = TestClient(application).get("/predict") + + assert response.status_code == 200 + mock_info.assert_not_called() + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_include_endpoints_allowlist(MockTracker) -> None: + application = FastAPI() + + @application.get("/predict") + def predict(): + return {"ok": True} + + @application.get("/metrics") + def metrics(): + return {"ok": True} + + add_codecarbon_middleware(application, include=["GET /predict"]) + MockTracker.return_value.stop_task.return_value = MagicMock(emissions=0.001) + + client = TestClient(application) + assert client.get("/predict").status_code == 200 + assert client.get("/metrics").status_code == 200 + MockTracker.assert_called_once() + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +def test_middleware_exclude_endpoints(MockTracker) -> None: + application = FastAPI() + + @application.get("/predict") + def predict(): + return {"tracked": True} + + @application.get("/admin") + def admin(): + return {"admin": True} + + add_codecarbon_middleware(application, exclude=["GET /admin"]) + MockTracker.return_value.stop_task.return_value = MagicMock(emissions=0.001) + + client = TestClient(application) + client.get("/predict") + client.get("/admin") + MockTracker.assert_called_once() + + +def test_log_request_complete_uses_codecarbon_logger() -> None: + request = MagicMock(url=MagicMock(path="/predict")) + response = MagicMock(status_code=200) + emissions = MagicMock(emissions=0.0012) + counter = _CodeCarbonLogCapture() + + cc_fastapi_middleware.logger.addHandler(counter) + try: + log_request_complete(request, response, emissions, "GET /predict") + finally: + cc_fastapi_middleware.logger.removeHandler(counter) + + assert codecarbon_logger.name == "codecarbon" + assert counter.emissions_lines == 1 + + +class _CodeCarbonLogCapture(logging.Handler): + def __init__(self) -> None: + super().__init__(level=logging.INFO) + self.emissions_lines = 0 + + def emit(self, record: logging.LogRecord) -> None: + if record.name != "codecarbon": + return + message = record.getMessage() + if message.startswith("CodeCarbon ") and "emissions=" in message: + self.emissions_lines += 1 + + +@patch.object(cc_fastapi_middleware, "EmissionsTracker") +@patch.object(cc_fastapi_middleware.logger, "info") +def test_middleware_default_logs_after_request(mock_logger_info, MockTracker) -> None: + application = FastAPI() + MockTracker.return_value.stop_task.return_value = MagicMock(emissions=0.001) + + @application.get("/predict") + def predict(): + return {"ok": True} + + add_codecarbon_middleware(application, project_name="test-api") + response = TestClient(application).get("/predict") + + assert response.status_code == 200 + mock_logger_info.assert_called_once() + + +def test_add_codecarbon_middleware_registers_instance_on_app_state() -> None: + application = FastAPI() + add_codecarbon_middleware(application, project_name="shutdown-test") + middleware = application.state.codecarbon_middleware + middleware.shutdown_tracker_executor() + with pytest.raises(RuntimeError, match="shutdown"): + middleware._tracker_runner.submit_request(lambda: None) + + +def test_shutdown_codecarbon_middleware_helper() -> None: + application = FastAPI() + add_codecarbon_middleware(application, project_name="shutdown-test") + shutdown_codecarbon_middleware(application) + middleware = application.state.codecarbon_middleware + with pytest.raises(RuntimeError, match="shutdown"): + middleware._tracker_runner.submit_request(lambda: None) + + +@patch.object(cc_fastapi_lifespan, "EmissionsTracker") +def test_create_codecarbon_lifespan_shuts_down_middleware_executor( + MockTracker: MagicMock, +) -> None: + MockTracker.return_value = MagicMock() + + @asynccontextmanager + async def lifespan(application: FastAPI): + async with create_codecarbon_lifespan(application, project_name="lifespan-test"): + yield + + application = FastAPI(lifespan=lifespan) + add_codecarbon_middleware(application, project_name="lifespan-test") + + with TestClient(application): + pass + + middleware = application.state.codecarbon_middleware + with pytest.raises(RuntimeError, match="shutdown"): + middleware._tracker_runner.submit_request(lambda: None) + + +def test_middleware_real_tracker_logs_and_csv_on_lifespan_stop(tmp_path: Path) -> None: + tracker_kwargs = { + "save_to_file": True, + "save_to_api": False, + "save_to_logger": False, + "output_dir": str(tmp_path), + "measure_power_secs": 10, + "allow_multiple_runs": True, + } + + @asynccontextmanager + async def lifespan(application: FastAPI): + async with create_codecarbon_lifespan( + application, + project_name="outputs-test", + **tracker_kwargs, + ): + yield + + application = FastAPI(lifespan=lifespan) + + @application.get("/predict") + def predict() -> dict[str, bool]: + return {"ok": True} + + add_codecarbon_middleware( + application, + project_name="outputs-test", + tracker_kwargs=tracker_kwargs, + ) + log_counter = _CodeCarbonLogCapture() + cc_fastapi_middleware.logger.addHandler(log_counter) + try: + with TestClient(application) as client: + assert client.get("/predict").status_code == 200 + assert client.get("/predict").status_code == 200 + finally: + cc_fastapi_middleware.logger.removeHandler(log_counter) + + assert log_counter.emissions_lines == 2 + emissions_csv = tmp_path / "emissions.csv" + assert emissions_csv.is_file() + assert emissions_csv.stat().st_size > 0 + + +@patch("codecarbon.output_methods.http.ApiClient") +def test_middleware_real_tracker_calls_api_per_request( + MockApiClient, tmp_path: Path +) -> None: + mock_api = MockApiClient.return_value + mock_api.run_id = "test-run-id" + mock_api.add_emission.return_value = True + tracker_kwargs = { + "save_to_file": False, + "save_to_api": True, + "save_to_logger": False, + "output_dir": str(tmp_path), + "experiment_id": "00000000-0000-0000-0000-000000000001", + "api_key": "test-key", + "measure_power_secs": 10, + "allow_multiple_runs": True, + } + + @asynccontextmanager + async def lifespan(application: FastAPI): + async with create_codecarbon_lifespan( + application, + project_name="api-outputs-test", + **tracker_kwargs, + ): + yield + + application = FastAPI(lifespan=lifespan) + + @application.get("/predict") + def predict() -> dict[str, bool]: + return {"ok": True} + + add_codecarbon_middleware( + application, + project_name="api-outputs-test", + tracker_kwargs=tracker_kwargs, + on_request_complete=None, + ) + with TestClient(application) as client: + assert client.get("/predict").status_code == 200 + assert client.get("/predict").status_code == 200 + + assert mock_api.add_emission.call_count >= 2 diff --git a/tests/integrations/test_fastapi_routing.py b/tests/integrations/test_fastapi_routing.py new file mode 100644 index 000000000..d9c98f538 --- /dev/null +++ b/tests/integrations/test_fastapi_routing.py @@ -0,0 +1,55 @@ +"""Tests for route naming and endpoint filter helpers.""" + +from unittest.mock import MagicMock + +from codecarbon.integrations.fastapi._routing import build_endpoint_key, should_track_request + + +def _mock_request(method: str, route_path: str | None, url_path: str) -> MagicMock: + request = MagicMock() + request.method = method + request.url.path = url_path + if route_path is None: + request.scope = {} + else: + route = MagicMock() + route.path = route_path + request.scope = {"route": route} + return request + + +def test_build_endpoint_key_uses_route_template() -> None: + request = _mock_request("GET", "/predict", "/predict") + assert build_endpoint_key(request) == "GET /predict" + + +def test_should_track_request_exclude_path_prefix() -> None: + request = _mock_request("GET", "/docs", "/docs/oauth2-redirect") + assert should_track_request(request, None, ["/docs"]) is False + + +def test_should_track_request_exclude_by_method_and_path() -> None: + request = _mock_request("GET", "/predict", "/predict") + assert should_track_request(request, None, ["GET /predict"]) is False + assert should_track_request(request, None, ["POST /predict"]) is True + + +def test_should_track_request_exclude_path_only() -> None: + request = _mock_request("POST", "/predict", "/predict") + assert should_track_request(request, None, ["/predict"]) is False + + +def test_should_track_request_include_allowlist() -> None: + request = _mock_request("GET", "/predict", "/predict") + other = _mock_request("GET", "/health", "/health") + include = ["GET /predict"] + assert should_track_request(request, include, []) is True + assert should_track_request(other, include, []) is False + + +def test_should_track_request_include_path_only() -> None: + get_request = _mock_request("GET", "/predict", "/predict") + post_request = _mock_request("POST", "/predict", "/predict") + include = ["/predict"] + assert should_track_request(get_request, include, []) is True + assert should_track_request(post_request, include, []) is True diff --git a/tests/output_methods/test_http.py b/tests/output_methods/test_http.py index 790055c0a..7095dae26 100644 --- a/tests/output_methods/test_http.py +++ b/tests/output_methods/test_http.py @@ -170,6 +170,52 @@ def test_codecarbon_api_out(self): api_output.out(None, self.emissions_data) self.mock_add_emission.assert_called_once() + def test_codecarbon_api_task_out(self): + from codecarbon.output_methods.emissions_data import TaskEmissionsData + + api_output = CodeCarbonAPIOutput( + endpoint_url=self.url, + experiment_id=self.experiment_id, + api_key=self.api_key, + conf=None, + ) + task_data = TaskEmissionsData( + task_name="GET /predict", + timestamp=self.emissions_data.timestamp, + project_name=self.emissions_data.project_name, + run_id=self.emissions_data.run_id, + duration=2.0, + emissions=self.emissions_data.emissions, + emissions_rate=self.emissions_data.emissions_rate, + cpu_power=self.emissions_data.cpu_power, + gpu_power=self.emissions_data.gpu_power, + ram_power=self.emissions_data.ram_power, + cpu_energy=self.emissions_data.cpu_energy, + gpu_energy=self.emissions_data.gpu_energy, + ram_energy=self.emissions_data.ram_energy, + energy_consumed=self.emissions_data.energy_consumed, + water_consumed=self.emissions_data.water_consumed, + country_name=self.emissions_data.country_name, + country_iso_code=self.emissions_data.country_iso_code, + region=self.emissions_data.region, + cloud_provider=self.emissions_data.cloud_provider, + cloud_region=self.emissions_data.cloud_region, + os=self.emissions_data.os, + python_version=self.emissions_data.python_version, + codecarbon_version=self.emissions_data.codecarbon_version, + cpu_count=self.emissions_data.cpu_count, + cpu_model=self.emissions_data.cpu_model, + gpu_count=self.emissions_data.gpu_count, + gpu_model=self.emissions_data.gpu_model, + longitude=self.emissions_data.longitude, + latitude=self.emissions_data.latitude, + ram_total_size=self.emissions_data.ram_total_size, + tracking_mode=self.emissions_data.tracking_mode, + on_cloud=self.emissions_data.on_cloud, + ) + api_output.task_out([task_data], "test_experiment") + self.mock_add_emission.assert_called_once() + @patch("codecarbon.output_methods.http.logger.error") def test_codecarbon_out_api_call_failure(self, mock_logger): self.mock_add_emission.side_effect = Exception("Test exception") diff --git a/tests/test_api_call.py b/tests/test_api_call.py index a4bb4cd7f..c3d018ba7 100644 --- a/tests/test_api_call.py +++ b/tests/test_api_call.py @@ -190,31 +190,34 @@ def test_add_emission_returns_false_when_run_creation_fails(self): ) ) - def test_add_emission_skips_short_duration(self): - api = ApiClient( - endpoint_url="http://test.com", - experiment_id="exp-1", - conf=conf, - create_run_automatically=False, - ) - api.run_id = "run-1" + def test_add_emission_rounds_subsecond_duration_to_one_second(self): + with requests_mock.Mocker() as m: + m.post("http://test.com/emissions", json={"id": "em-1"}, status_code=201) + api = ApiClient( + endpoint_url="http://test.com", + experiment_id="exp-1", + conf=conf, + create_run_automatically=False, + ) + api.run_id = "run-1" - self.assertFalse( - api.add_emission( - { - "duration": 0.5, - "emissions": 1.0, - "emissions_rate": 1.0, - "cpu_power": 1.0, - "gpu_power": 0.0, - "ram_power": 0.5, - "cpu_energy": 0.1, - "gpu_energy": 0.0, - "ram_energy": 0.1, - "energy_consumed": 0.2, - } + self.assertTrue( + api.add_emission( + { + "duration": 0.5, + "emissions": 1.0, + "emissions_rate": 1.0, + "cpu_power": 1.0, + "gpu_power": 0.0, + "ram_power": 0.5, + "cpu_energy": 0.1, + "gpu_energy": 0.0, + "ram_energy": 0.1, + "energy_consumed": 0.2, + } + ) ) - ) + self.assertEqual(m.last_request.json()["duration"], 1) def test_add_emission_returns_false_on_unsuccessful_post(self): with requests_mock.Mocker() as m: diff --git a/uv.lock b/uv.lock index c0ceab00d..b177b9414 100644 --- a/uv.lock +++ b/uv.lock @@ -41,6 +41,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/78/b6/6307fbef88d9b5ee7421e68d78a9f162e0da4900bc5f5793f6d3d0e34fb8/annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53", size = 13643, upload-time = "2024-05-20T21:33:24.1Z" }, ] +[[package]] +name = "anyio" +version = "4.13.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "exceptiongroup", marker = "python_full_version < '3.11'" }, + { name = "idna" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/19/14/2c5dd9f512b66549ae92767a9c7b330ae88e1932ca57876909410251fe13/anyio-4.13.0.tar.gz", hash = "sha256:334b70e641fd2221c1505b3890c69882fe4a2df910cba14d97019b90b24439dc", size = 231622, upload-time = "2026-03-24T12:59:09.671Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/da/42/e921fccf5015463e32a3cf6ee7f980a6ed0f395ceeaa45060b61d86486c2/anyio-4.13.0-py3-none-any.whl", hash = "sha256:08b310f9e24a9594186fd75b4f73f4a4152069e3853f1ed8bfbf58369f4ad708", size = 114353, upload-time = "2026-03-24T12:59:08.246Z" }, +] + [[package]] name = "arrow" version = "1.4.0" @@ -443,6 +457,10 @@ carbonboard = [ { name = "dash-bootstrap-components" }, { name = "fire" }, ] +fastapi = [ + { name = "fastapi" }, + { name = "httpx" }, +] viz-legacy = [ { name = "dash" }, { name = "dash-bootstrap-components" }, @@ -453,6 +471,8 @@ viz-legacy = [ dev = [ { name = "black" }, { name = "bumpver" }, + { name = "fastapi" }, + { name = "httpx" }, { name = "jsonschema" }, { name = "logfire" }, { name = "mktestdocs" }, @@ -489,8 +509,10 @@ requires-dist = [ { name = "dash", marker = "extra == 'viz-legacy'" }, { name = "dash-bootstrap-components", marker = "extra == 'carbonboard'", specifier = ">1.0.0" }, { name = "dash-bootstrap-components", marker = "extra == 'viz-legacy'", specifier = ">1.0.0" }, + { name = "fastapi", marker = "extra == 'fastapi'", specifier = ">=0.100" }, { name = "fire", marker = "extra == 'carbonboard'" }, { name = "fire", marker = "extra == 'viz-legacy'" }, + { name = "httpx", marker = "extra == 'fastapi'" }, { name = "nvidia-ml-py" }, { name = "pandas", marker = "python_full_version < '3.14'" }, { name = "pandas", marker = "python_full_version >= '3.14'", specifier = ">=2.3.3" }, @@ -505,12 +527,14 @@ requires-dist = [ { name = "rich" }, { name = "typer" }, ] -provides-extras = ["carbonboard", "viz-legacy", "amdsmi"] +provides-extras = ["carbonboard", "viz-legacy", "amdsmi", "fastapi"] [package.metadata.requires-dev] dev = [ { name = "black" }, { name = "bumpver" }, + { name = "fastapi", specifier = ">=0.100" }, + { name = "httpx" }, { name = "jsonschema" }, { name = "logfire", specifier = ">=1.0.1" }, { name = "mktestdocs" }, @@ -794,6 +818,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c1/ea/53f2148663b321f21b5a606bd5f191517cf40b7072c0497d3c92c4a13b1e/executing-2.2.1-py2.py3-none-any.whl", hash = "sha256:760643d3452b4d777d295bb167ccc74c64a81df23fb5e08eff250c425a4b2017", size = 28317, upload-time = "2025-09-01T09:48:08.5Z" }, ] +[[package]] +name = "fastapi" +version = "0.136.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "annotated-doc" }, + { name = "pydantic" }, + { name = "starlette" }, + { name = "typing-extensions" }, + { name = "typing-inspection" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5d/45/c130091c2dfa061bbfe3150f2a5091ef1adf149f2a8d2ae769ecaf6e99a2/fastapi-0.136.1.tar.gz", hash = "sha256:7af665ad7acfa0a3baf8983d393b6b471b9da10ede59c60045f49fbc89a0fa7f", size = 397448, upload-time = "2026-04-23T16:49:44.046Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5a/ff/2e4eca3ade2c22fe1dea7043b8ee9dabe47753349eb1b56a202de8af6349/fastapi-0.136.1-py3-none-any.whl", hash = "sha256:a6e9d7eeada96c93a4d69cb03836b44fa34e2854accb7244a1ece36cd4781c3f", size = 117683, upload-time = "2026-04-23T16:49:42.437Z" }, +] + [[package]] name = "filelock" version = "3.29.0" @@ -865,6 +905,43 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/11/8c/c9138d881c79aa0ea9ed83cbd58d5ca75624378b38cee225dcf5c42cc91f/griffelib-2.0.2-py3-none-any.whl", hash = "sha256:925c857658fb1ba40c0772c37acbc2ab650bd794d9c1b9726922e36ea4117ea1", size = 142357, upload-time = "2026-03-27T11:34:46.275Z" }, ] +[[package]] +name = "h11" +version = "0.16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" }, +] + +[[package]] +name = "httpcore" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" }, +] + +[[package]] +name = "httpx" +version = "0.28.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "certifi" }, + { name = "httpcore" }, + { name = "idna" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, +] + [[package]] name = "identify" version = "2.6.19" @@ -2958,6 +3035,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/46/2c/1462b1d0a634697ae9e55b3cecdcb64788e8b7d63f54d923fcd0bb140aed/soupsieve-2.8.3-py3-none-any.whl", hash = "sha256:ed64f2ba4eebeab06cc4962affce381647455978ffc1e36bb79a545b91f45a95", size = 37016, upload-time = "2026-01-20T04:27:01.012Z" }, ] +[[package]] +name = "starlette" +version = "1.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/81/69/17425771797c36cded50b7fe44e850315d039f28b15901ab44839e70b593/starlette-1.0.0.tar.gz", hash = "sha256:6a4beaf1f81bb472fd19ea9b918b50dc3a77a6f2e190a12954b25e6ed5eea149", size = 2655289, upload-time = "2026-03-22T18:29:46.779Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0b/c9/584bc9651441b4ba60cc4d557d8a547b5aff901af35bda3a4ee30c819b82/starlette-1.0.0-py3-none-any.whl", hash = "sha256:d3ec55e0bb321692d275455ddfd3df75fff145d009685eb40dc91fc66b03d38b", size = 72651, upload-time = "2026-03-22T18:29:45.111Z" }, +] + [[package]] name = "taskipy" version = "1.14.1"