Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,12 @@ tests/test_data/rapl/*
credentials*
.codecarbon.config*
scripts/agent-vm.personal.config.sh

# Added by ggshield
.cache_ggshield

# Added by ggshield
.cache_ggshield

# Added by ggshield
.cache_ggshield
1 change: 1 addition & 0 deletions codecarbon/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Optional integrations for frameworks and platforms."""
13 changes: 13 additions & 0 deletions codecarbon/integrations/fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""FastAPI integration: middleware and lifespan helpers."""

from codecarbon.integrations.fastapi.lifespan import create_codecarbon_lifespan
from codecarbon.integrations.fastapi.middleware import (
CodeCarbonMiddleware,
add_codecarbon_middleware,
)

__all__ = [
"CodeCarbonMiddleware",
"add_codecarbon_middleware",
"create_codecarbon_lifespan",
]
119 changes: 119 additions & 0 deletions codecarbon/integrations/fastapi/_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Configurable response headers from emissions measurements."""

from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from typing import Union

from starlette.requests import Request
from starlette.responses import Response

from codecarbon.output_methods.emissions_data import EmissionsData

HeaderConfig = Union[bool, str, Sequence[str], Mapping[str, str], None]
HeaderFormatter = Callable[[EmissionsData, Request], Mapping[str, str]]

FIELD_UNITS: dict[str, str] = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be transformed in an Enum ? I have the feeling it could be leveraged elsewhere for labels and it could be maintained from the core package, not only the fastapi integration. Thoughts ?

"emissions": "kg",
"emissions_rate": "kg-per-s",
"duration": "s",
"energy_consumed": "kwh",
"cpu_energy": "kwh",
"gpu_energy": "kwh",
"ram_energy": "kwh",
"water_consumed": "l",
"cpu_power": "w",
"gpu_power": "w",
"ram_power": "w",
"cpu_utilization_percent": "percent",
"gpu_utilization_percent": "percent",
"ram_utilization_percent": "percent",
"ram_used_gb": "gb",
"pue": "ratio",
"wue": "l-per-kwh",
}

HEADER_PRESETS: dict[str, dict[str, str]] = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, a collection of Enums here would make sense no ?

"emissions": {"emissions": "X-CodeCarbon-Emissions-kg"},
"default": {
"emissions": "X-CodeCarbon-Emissions-kg",
"energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh",
"duration": "X-CodeCarbon-Duration-s",
"emissions_rate": "X-CodeCarbon-Emissions-Rate-kg-per-s",
},
"energy": {
"emissions": "X-CodeCarbon-Emissions-kg",
"energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh",
"cpu_energy": "X-CodeCarbon-Cpu-Energy-kwh",
"gpu_energy": "X-CodeCarbon-Gpu-Energy-kwh",
"ram_energy": "X-CodeCarbon-Ram-Energy-kwh",
"duration": "X-CodeCarbon-Duration-s",
},
"power": {
"emissions": "X-CodeCarbon-Emissions-kg",
"cpu_power": "X-CodeCarbon-Cpu-Power-w",
"gpu_power": "X-CodeCarbon-Gpu-Power-w",
"ram_power": "X-CodeCarbon-Ram-Power-w",
"duration": "X-CodeCarbon-Duration-s",
},
}

FULL_HEADER_FIELDS: tuple[str, ...] = tuple(FIELD_UNITS.keys())


def _auto_header_name(field: str) -> str:
unit = FIELD_UNITS.get(field, "")
title = "-".join(part.capitalize() for part in field.split("_"))
suffix = f"-{unit}" if unit else ""
return f"X-CodeCarbon-{title}{suffix}"


def resolve_header_mapping(config: HeaderConfig) -> dict[str, str]:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this defensive method could be converted in a way more straightforward (and clear) one, if the unit tests demonstrate resilience to each corner case :)

"""Normalize ``response_headers`` settings to ``{field_name: header_name}``.

Args:
config: ``None`` or ``False`` for no headers; ``True`` for the emissions preset;
a preset name (``emissions``, ``default``, ``energy``, ``power``, ``full``);
a sequence of field names (auto header names); or an explicit mapping.

Returns:
Mapping from :class:`~codecarbon.output_methods.emissions_data.EmissionsData`
attribute names to HTTP header names.

Raises:
ValueError: If ``config`` is a string that is not a known preset (other than
``full``).
"""
if config is None or config is False:
return {}
if config is True:
return dict(HEADER_PRESETS["emissions"])
if isinstance(config, str):
preset = HEADER_PRESETS.get(config)
if preset is None:
if config == "full":
return {field: _auto_header_name(field) for field in FULL_HEADER_FIELDS}
raise ValueError(f"Unknown response_headers preset: {config!r}")
return dict(preset)
if isinstance(config, Mapping):
return dict(config)
return {field: _auto_header_name(field) for field in config}


def apply_response_headers(
response: Response,
emissions_data: EmissionsData,
header_mapping: Mapping[str, str],
) -> None:
"""Write selected emission fields onto an HTTP response as headers.

Args:
response: Outgoing Starlette response (headers are updated in place).
emissions_data: Values read via ``getattr`` for each key in ``header_mapping``.
header_mapping: Field name to HTTP header name; unknown fields are skipped.
"""
for field, header_name in header_mapping.items():
if not hasattr(emissions_data, field):
continue
value = getattr(emissions_data, field)
response.headers[header_name] = str(value)
135 changes: 135 additions & 0 deletions codecarbon/integrations/fastapi/_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Route naming and endpoint filter helpers for FastAPI/Starlette."""

from collections.abc import Callable, Iterable
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from starlette.requests import Request

DEFAULT_EXCLUDE: frozenset[str] = frozenset(
{
"/docs",
"/redoc",
"/openapi.json",
"/health",
"/healthz",
"/ready",
"/live",
}
)

HTTP_METHODS = frozenset(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, this could benefit from being an Enum

{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", "TRACE", "CONNECT"}
)


def get_endpoint_path(request: "Request") -> str:
"""Return the mounted route template or the raw URL path.

Args:
request: Current Starlette/FastAPI request.

Returns:
Route template such as ``/items/{item_id}``, or ``request.url.path``.
"""
route = request.scope.get("route")
if route is not None:
return route.path
return request.url.path


def build_endpoint_key(request: "Request") -> str:
"""Build a stable endpoint identifier such as ``GET /predict``.

Args:
request: Current Starlette/FastAPI request.

Returns:
HTTP method plus route template or URL path.
"""
return f"{request.method} {get_endpoint_path(request)}"


def is_method_pattern(pattern: str) -> bool:
"""Return True when ``pattern`` is ``METHOD /path``."""
method, _, path = pattern.partition(" ")
return method in HTTP_METHODS and path.startswith("/")


def matches_exclude(
pattern: str,
url_path: str,
endpoint_key: str,
endpoint_path: str,
) -> bool:
"""Return True when an exclude pattern matches the request."""
if is_method_pattern(pattern):
return endpoint_key == pattern
if not pattern.startswith("/"):
return endpoint_key == pattern
return (
url_path == pattern
or url_path.startswith(f"{pattern}/")
or endpoint_path == pattern
)


def matches_include(pattern: str, endpoint_key: str, endpoint_path: str) -> bool:
"""Return True when an include pattern matches the request."""
if is_method_pattern(pattern):
return endpoint_key == pattern
if pattern.startswith("/"):
return endpoint_path == pattern
return endpoint_key == pattern


def should_track_request(
request: "Request",
include: Iterable[str] | None,
exclude: Iterable[str],
) -> bool:
"""Return True when the request should be measured.

Patterns use one of two forms:

* ``METHOD /route/template`` — one HTTP method on one route (e.g. ``GET /predict``)
* ``/route/template`` — any method on that route, or a URL path prefix when excluding

Args:
request: Current Starlette/FastAPI request.
include: When set, only matching endpoints are tracked.
exclude: Endpoints or URL prefixes to skip.

Returns:
True when CodeCarbon should track this request.
"""
url_path = request.url.path
endpoint_key = build_endpoint_key(request)
endpoint_path = get_endpoint_path(request)
for pattern in exclude:
if matches_exclude(pattern, url_path, endpoint_key, endpoint_path):
return False
if include is None:
return True
return any(
matches_include(pattern, endpoint_key, endpoint_path) for pattern in include
)


def build_task_name(
request: "Request",
formatter: Callable[["Request"], str] | None = None,
) -> str:
"""Derive a stable label like ``GET /items/{item_id}`` for task-scoped tracking.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could lead to task override if resource is being accessed by 2 parallel or sequential requests. Maybe we could add some random here if the goal of this method is to provide a name for the codecarbon task concept


Args:
request: Current Starlette/FastAPI request.
formatter: Optional function that returns the task name instead of the default.

Returns:
Method plus route template when a route is mounted on the request scope,
otherwise method plus the raw URL path.
"""
if formatter is not None:
return formatter(request)
return build_endpoint_key(request)
38 changes: 38 additions & 0 deletions codecarbon/integrations/fastapi/lifespan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Lifespan helpers for sharing one ``EmissionsTracker`` across requests."""

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

from codecarbon import EmissionsTracker


@asynccontextmanager
async def create_codecarbon_lifespan(
app: Any,
*,
project_name: str = "codecarbon-fastapi",
**tracker_kwargs: Any,
) -> AsyncIterator[None]:
"""Start a tracker for the app lifetime and expose it on ``app.state``.

Args:
app: Starlette/FastAPI application with ``state`` namespace.
project_name: ``project_name`` for :class:`~codecarbon.EmissionsTracker`.
**tracker_kwargs: Extra constructor kwargs for the tracker.

Yields:
``None`` while the app runs.
"""
merged = dict(tracker_kwargs)
merged.setdefault("allow_multiple_runs", True)
tracker = EmissionsTracker(project_name=project_name, **merged)
tracker.start()
app.state.codecarbon_tracker = tracker
try:
yield
finally:
tracker.stop()
app.state.codecarbon_tracker = None
Loading
Loading