Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,12 @@ tests/test_data/rapl/*
credentials*
.codecarbon.config*
scripts/agent-vm.personal.config.sh

# Added by ggshield
.cache_ggshield

# Added by ggshield
.cache_ggshield

# Added by ggshield
.cache_ggshield
8 changes: 5 additions & 3 deletions codecarbon/core/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,17 @@ def add_emission(self, carbon_emission: dict):
"ApiClient.add_emission still no run_id, aborting for this time !"
)
return False
if carbon_emission["duration"] < 1:
duration = float(carbon_emission["duration"])
if duration <= 0:
logger.warning(
"ApiClient : emissions not sent because of a duration smaller than 1."
"ApiClient : emissions not sent because duration is zero or negative."
)
return False
duration_for_api = max(1, int(round(duration)))
emission = EmissionCreate(
timestamp=get_datetime_with_timezone(),
run_id=self.run_id,
duration=int(carbon_emission["duration"]),
duration=duration_for_api,
emissions_sum=carbon_emission["emissions"],
emissions_rate=carbon_emission["emissions_rate"],
cpu_power=carbon_emission["cpu_power"],
Expand Down
136 changes: 134 additions & 2 deletions codecarbon/emissions_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import platform
import re
import threading
import time
import uuid
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -60,6 +61,21 @@
_sentinel = object()


@dataclasses.dataclass(frozen=True)
class HttpRequestBaseline:
"""Per-request totals snapshot for FastAPI middleware (lifespan tracker)."""

task_name: str
started_at: float
duration_at_start: float
emissions: float
cpu_energy: float
gpu_energy: float
ram_energy: float
energy_consumed: float
water_consumed: float


class BaseEmissionsTracker(ABC):
"""
Primary abstraction with Emissions Tracking functionality.
Expand Down Expand Up @@ -425,6 +441,7 @@ def __init__(
self._tasks: Dict[str, Task] = {}
self._active_task: Optional[str] = None
self._active_task_emissions_at_start: Optional[EmissionsData] = None
self._http_measure_lock = threading.Lock()
# Tracking mode detection
self._hardware = []
resource_tracker = ResourceTracker(self)
Expand Down Expand Up @@ -710,8 +727,112 @@ def stop_task(self, task_name: str = None) -> EmissionsData:
self._active_task = None
self._active_task_emissions_at_start = None # Clear task-specific start data

if self._scheduler is not None and self._scheduler._stopped:
if self._start_time is not None:
self._scheduler.start()

return task_emission_data

def _resolve_http_task_name(self, task_name: str) -> str:
"""Return a unique task name for HTTP request tracking."""
if not task_name:
task_name = uuid.uuid4().__str__()
if task_name in self._tasks:
task_name += "_" + uuid.uuid4().__str__()
return task_name

def mark_http_request_start(self, task_name: str) -> HttpRequestBaseline:
"""Snapshot cumulative totals at request start (FastAPI lifespan path).

Use with :meth:`finish_http_request` while the main tracker scheduler keeps
running. Avoids per-request scheduler and hardware restarts from
:meth:`start_task`.

Args:
task_name: Logical name for this HTTP request (e.g. route key).

Returns:
Baseline to pass to :meth:`finish_http_request`.

Raises:
RuntimeError: If the tracker has not been started with :meth:`start`.
"""
if self._start_time is None:
raise RuntimeError("EmissionsTracker.start() must run before HTTP requests")
with self._http_measure_lock:
resolved = self._resolve_http_task_name(task_name)
self._tasks[resolved] = Task(task_name=resolved)
duration_at_start = time.perf_counter() - self._start_time
return HttpRequestBaseline(
task_name=resolved,
started_at=time.perf_counter(),
duration_at_start=duration_at_start,
emissions=self._total_emissions,
cpu_energy=self._total_cpu_energy.kWh,
gpu_energy=self._total_gpu_energy.kWh,
ram_energy=self._total_ram_energy.kWh,
energy_consumed=self._total_energy.kWh,
water_consumed=self._total_water.litres,
)

def finish_http_request(
self, baseline: HttpRequestBaseline
) -> Optional[EmissionsData]:
"""Compute per-request emissions from a :meth:`mark_http_request_start` baseline.

Args:
baseline: Value returned by :meth:`mark_http_request_start`.

Returns:
Request-scoped :class:`~codecarbon.output.EmissionsData`, or ``None`` if
the task record is missing.
"""
with self._http_measure_lock:
task = self._tasks.get(baseline.task_name)
if task is None:
logger.warning(
"finish_http_request: unknown task %s", baseline.task_name
)
return None
self._measure_power_and_energy()
emissions_at_stop = self._prepare_emissions_data()
previous = dataclasses.replace(emissions_at_stop)
previous.emissions = baseline.emissions
previous.cpu_energy = baseline.cpu_energy
previous.gpu_energy = baseline.gpu_energy
previous.ram_energy = baseline.ram_energy
previous.energy_consumed = baseline.energy_consumed
previous.water_consumed = baseline.water_consumed
previous.duration = baseline.duration_at_start

task_emission_data = dataclasses.replace(emissions_at_stop)
request_duration = time.perf_counter() - baseline.started_at
task_emission_data.duration = Time.from_seconds(request_duration).seconds
task_emission_data.compute_delta_emission(previous)

task.emissions_data = task_emission_data
task.is_active = False
return task_emission_data

def persist_completed_task(self, task_name: str) -> None:
"""Push a finished task's emissions to API handlers (e.g. after ``stop_task``).

Args:
task_name: Name of the task that was stopped with :meth:`stop_task`.
"""
if not self._save_to_api:
return
task = self._tasks.get(task_name)
if task is None or task.is_active or task.emissions_data is None:
return
if task.uploaded_to_api:
return
task_payload = [task.out()]
for handler in self._output_handlers:
if isinstance(handler, CodeCarbonAPIOutput):
handler.task_out(task_payload, self._experiment_name)
task.uploaded_to_api = True

@suppress(Exception)
def flush(self) -> Optional[float]:
"""
Expand Down Expand Up @@ -808,13 +929,24 @@ def _persist_data(
experiment_name=None,
):
task_emissions_data = []
api_task_emissions_data = []
for task in self._tasks:
task_emissions_data.append(self._tasks[task].out())
task_entry = self._tasks[task].out()
task_emissions_data.append(task_entry)
if not self._tasks[task].uploaded_to_api:
api_task_emissions_data.append(task_entry)

for handler in self._output_handlers:
handler.out(total_emissions, delta_emissions)
if len(task_emissions_data) > 0:
handler.task_out(task_emissions_data, experiment_name)
if isinstance(handler, CodeCarbonAPIOutput):
if api_task_emissions_data:
handler.task_out(api_task_emissions_data, experiment_name)
for task_obj in self._tasks.values():
if not task_obj.is_active and task_obj.emissions_data:
task_obj.uploaded_to_api = True
else:
handler.task_out(task_emissions_data, experiment_name)

def _update_emissions(self) -> None:
"""
Expand Down
1 change: 1 addition & 0 deletions codecarbon/external/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, task_name): # , task_measure
self.task_name: str = task_name
self.start_time = time.perf_counter()
self.is_active = True
self.uploaded_to_api = False

def out(self):
return TaskEmissionsData(
Expand Down
1 change: 1 addition & 0 deletions codecarbon/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Optional integrations for frameworks and platforms."""
17 changes: 17 additions & 0 deletions codecarbon/integrations/fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""FastAPI integration: middleware and lifespan helpers."""

from codecarbon.integrations.fastapi.lifespan import create_codecarbon_lifespan
from codecarbon.integrations.fastapi.middleware import (
CodeCarbonMiddleware,
add_codecarbon_middleware,
log_request_complete,
shutdown_codecarbon_middleware,
)

__all__ = [
"CodeCarbonMiddleware",
"add_codecarbon_middleware",
"create_codecarbon_lifespan",
"log_request_complete",
"shutdown_codecarbon_middleware",
]
Loading