diff --git a/.github/workflows/helm-install-test.yml b/.github/workflows/helm-install-test.yml index f50c0e0..ee5b717 100644 --- a/.github/workflows/helm-install-test.yml +++ b/.github/workflows/helm-install-test.yml @@ -154,6 +154,7 @@ jobs: --set externalRedis.url="redis://redis:6379/0" \ --set externalRedis.existingSecret=redis-secret \ --set externalRedis.passwordKey=redis-password \ + --set admin.enabled=true \ --set replicaCount=3 \ --set resources.limits.cpu=100m \ --set resources.requests.cpu=50m \ @@ -230,6 +231,67 @@ jobs: echo "✓ All smoke tests passed!" ' + - name: Verify admin dashboard + run: | + set -euo pipefail + ENDPOINT="http://s3proxy-python:4433" + BUCKET="admin-ci" + KEY="admin-ci.txt" + + echo "=== Upload an object through the proxy (so the dashboard has traffic) ===" + kubectl run admin-ci-upload -n s3proxy --rm -i --restart=Never \ + --image=amazon/aws-cli:latest \ + --env="AWS_ACCESS_KEY_ID=minioadmin" \ + --env="AWS_SECRET_ACCESS_KEY=minioadmin" \ + --env="AWS_DEFAULT_REGION=us-east-1" \ + --command -- /bin/sh -c " + set -e + echo 'admin dashboard ci object' > /tmp/o.txt + aws --endpoint-url $ENDPOINT s3 mb s3://$BUCKET 2>/dev/null || true + aws --endpoint-url $ENDPOINT s3 cp /tmp/o.txt s3://$BUCKET/$KEY + aws --endpoint-url $ENDPOINT s3 cp s3://$BUCKET/$KEY /tmp/d.txt + test \"\$(cat /tmp/d.txt)\" = 'admin dashboard ci object' + echo UPLOAD_OK + " + + echo "=== Port-forward the admin dashboard ===" + kubectl port-forward svc/s3proxy-python 4433:4433 -n s3proxy & + PF_PID=$! + trap "kill $PF_PID 2>/dev/null || true" EXIT + for i in $(seq 1 15); do curl -sf -o /dev/null http://localhost:4433/admin/login && break; sleep 1; done + + echo "=== 1. Login page is served (unauthenticated) ===" + curl -sf http://localhost:4433/admin/login | grep -q "Sign in to the admin dashboard" + echo "✓ Login page served" + + echo "=== 2. Auth is enforced ===" + CODE=$(curl -s -o /dev/null -w '%{http_code}' http://localhost:4433/admin/api/status) + [ "$CODE" = "401" ] || { echo "✗ status API without auth returned $CODE (expected 401)"; exit 1; } + # The AWS secret key must NOT work as a dashboard password (fallback removed). + CODE=$(curl -s -o /dev/null -w '%{http_code}' -u minioadmin:minioadmin http://localhost:4433/admin/api/status) + [ "$CODE" = "401" ] || { echo "✗ AWS creds accepted as admin login ($CODE) — fallback not removed"; exit 1; } + echo "✓ Unauthenticated + AWS-cred requests rejected (401)" + + echo "=== 3. Dashboard + status API work with admin credentials (admin/admin) ===" + CODE=$(curl -s -o /dev/null -w '%{http_code}' -u admin:admin http://localhost:4433/admin/) + [ "$CODE" = "200" ] || { echo "✗ dashboard returned $CODE (expected 200)"; exit 1; } + STATUS=$(curl -sf -u admin:admin http://localhost:4433/admin/api/status) + echo "$STATUS" | grep -q '"status":"Running"' || { echo "✗ status not Running: $STATUS"; exit 1; } + echo "✓ Dashboard + status API healthy" + + echo "=== 4. Bucket listing API shows the uploaded object ===" + BUCKETS=$(curl -sf -u admin:admin "http://localhost:4433/admin/api/buckets/$BUCKET") + echo "$BUCKETS" | grep -q "\"$KEY\"" || { echo "✗ '$KEY' not in bucket listing: $BUCKETS"; exit 1; } + echo "✓ Bucket listing shows '$KEY'" + + echo "=== 5. Object detail API reports encryption ===" + OBJ=$(curl -sf -u admin:admin "http://localhost:4433/admin/api/objects/$BUCKET/$KEY") + echo "$OBJ" | grep -q '"encrypted":true' || { echo "✗ object detail not encrypted: $OBJ"; exit 1; } + echo "✓ Object detail reports encrypted" + + echo "" + echo "✓ Admin dashboard verified end-to-end" + - name: Show logs on failure if: failure() run: | diff --git a/chart/README.md b/chart/README.md index a274e5b..2a2ac14 100644 --- a/chart/README.md +++ b/chart/README.md @@ -31,6 +31,18 @@ helm install s3proxy oci://ghcr.io/serversidehannes/s3proxy-python/charts/s3prox | `secrets.existingSecrets.keys.encryptKey` | `S3PROXY_ENCRYPT_KEY` | Key name in existing secret | | `secrets.existingSecrets.keys.awsAccessKeyId` | `AWS_ACCESS_KEY_ID` | Key name in existing secret | | `secrets.existingSecrets.keys.awsSecretAccessKey` | `AWS_SECRET_ACCESS_KEY` | Key name in existing secret | +| `admin.enabled` | `false` | Enable the admin dashboard | +| `admin.path` | `/admin` | URL path prefix for the dashboard | +| `admin.username` | `admin` | Dashboard username (stored in the Secret; override in production) | +| `admin.password` | `admin` | Dashboard password (stored in the Secret; override in production) | +| `admin.existingSecret.name` | `""` | Pre-created secret holding admin credentials | +| `admin.existingSecret.usernameKey` | `S3PROXY_ADMIN_USERNAME` | Username key in the existing secret | +| `admin.existingSecret.passwordKey` | `S3PROXY_ADMIN_PASSWORD` | Password key in the existing secret | +| `admin.ingress.enabled` | `false` | Dedicated Ingress for the dashboard (keep off unless intentionally exposing it) | +| `admin.ingress.className` | `nginx` | Ingress class for the admin Ingress | +| `admin.ingress.host` | `""` | Hostname for the dashboard (required when enabled) | +| `admin.ingress.annotations` | `{}` | Annotations (e.g. IP allowlist) for the admin Ingress | +| `admin.ingress.tls` | `[]` | TLS config for the admin Ingress | | `redis-ha.enabled` | `true` | Deploy embedded Redis HA | | `redis-ha.replicas` | `1` | Redis replicas | | `redis-ha.auth` | `false` | Enable Redis auth | diff --git a/chart/templates/admin-ingress.yaml b/chart/templates/admin-ingress.yaml new file mode 100644 index 0000000..cdd575b --- /dev/null +++ b/chart/templates/admin-ingress.yaml @@ -0,0 +1,38 @@ +{{- if and .Values.admin.enabled .Values.admin.ingress.enabled -}} +{{- if not .Values.admin.ingress.host }} +{{- fail "admin.ingress.enabled is true but admin.ingress.host is empty — set a hostname for the admin dashboard" }} +{{- end }} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ .Chart.Name }}-admin + labels: + {{- include "s3proxy.labels" . | nindent 4 }} + {{- with .Values.admin.ingress.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + ingressClassName: {{ .Values.admin.ingress.className }} + {{- with .Values.admin.ingress.tls }} + tls: + {{- range . }} + - hosts: + {{- range .hosts }} + - {{ . | quote }} + {{- end }} + secretName: {{ .secretName }} + {{- end }} + {{- end }} + rules: + - host: {{ .Values.admin.ingress.host | quote }} + http: + paths: + - path: {{ .Values.admin.path }} + pathType: Prefix + backend: + service: + name: {{ .Chart.Name }} + port: + number: {{ .Values.service.port }} +{{- end }} diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index cfe5086..7d1d7a9 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -18,3 +18,7 @@ data: {{- end }} S3PROXY_REDIS_UPLOAD_TTL_HOURS: {{ .Values.externalRedis.uploadTtlHours | quote }} S3PROXY_LOG_LEVEL: {{ .Values.logLevel | quote }} + {{- if .Values.admin.enabled }} + S3PROXY_ADMIN_UI: "true" + S3PROXY_ADMIN_PATH: {{ .Values.admin.path | quote }} + {{- end }} diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index 08ea61c..c229a8f 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -29,8 +29,13 @@ spec: - secretRef: name: {{ printf "%s-secrets" .Chart.Name }} {{- end }} + {{- /* Admin creds need an explicit env block only when they are NOT already + pulled in via the chart-secret envFrom above (i.e. an admin existingSecret + is set, or the app uses an existing secret so envFrom never fires). */ -}} + {{- $adminFromChartSecret := and .Values.admin.enabled (not .Values.admin.existingSecret.name) (not .Values.secrets.existingSecrets.enabled) }} + {{- $adminNeedsEnv := and .Values.admin.enabled (not $adminFromChartSecret) }} {{- /* Determine if we need env section */ -}} - {{- $needsEnv := or .Values.secrets.existingSecrets.enabled (and (index .Values "redis-ha" "enabled") (index .Values "redis-ha" "auth")) (and (not (index .Values "redis-ha" "enabled")) .Values.externalRedis.existingSecret) }} + {{- $needsEnv := or .Values.secrets.existingSecrets.enabled $adminNeedsEnv (and (index .Values "redis-ha" "enabled") (index .Values "redis-ha" "auth")) (and (not (index .Values "redis-ha" "enabled")) .Values.externalRedis.existingSecret) }} {{- if $needsEnv }} env: {{- /* App secrets from existing secret */ -}} @@ -51,6 +56,20 @@ spec: name: {{ .Values.secrets.existingSecrets.name }} key: {{ .Values.secrets.existingSecrets.keys.awsSecretAccessKey }} {{- end }} + {{- /* Admin dashboard credentials (when not pulled via the chart-secret envFrom) */ -}} + {{- if $adminNeedsEnv }} + {{- $adminSecret := .Values.admin.existingSecret.name | default (printf "%s-secrets" .Chart.Name) }} + - name: S3PROXY_ADMIN_USERNAME + valueFrom: + secretKeyRef: + name: {{ $adminSecret }} + key: {{ .Values.admin.existingSecret.usernameKey }} + - name: S3PROXY_ADMIN_PASSWORD + valueFrom: + secretKeyRef: + name: {{ $adminSecret }} + key: {{ .Values.admin.existingSecret.passwordKey }} + {{- end }} {{- /* Redis password from redis-ha secret */ -}} {{- if and (index .Values "redis-ha" "enabled") (index .Values "redis-ha" "auth") }} - name: S3PROXY_REDIS_PASSWORD diff --git a/chart/templates/secret.yaml b/chart/templates/secret.yaml index 63ef99b..6ee7822 100644 --- a/chart/templates/secret.yaml +++ b/chart/templates/secret.yaml @@ -1,6 +1,9 @@ -{{- if not .Values.secrets.existingSecrets.enabled }} +{{- $createApp := not .Values.secrets.existingSecrets.enabled -}} +{{- $createAdmin := and .Values.admin.enabled (not .Values.admin.existingSecret.name) -}} +{{- if or $createApp $createAdmin }} # Creating static secrets (provide via helm --set or secure values file, not hardcoded) -# Secret keys created: S3PROXY_ENCRYPT_KEY, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY +# Secret keys created: S3PROXY_ENCRYPT_KEY, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, +# and (when the admin dashboard is enabled) S3PROXY_ADMIN_USERNAME / S3PROXY_ADMIN_PASSWORD apiVersion: v1 kind: Secret metadata: @@ -9,7 +12,13 @@ metadata: {{- include "s3proxy.labels" . | nindent 4 }} type: Opaque stringData: + {{- if $createApp }} S3PROXY_ENCRYPT_KEY: {{ .Values.secrets.encryptKey | quote }} AWS_ACCESS_KEY_ID: {{ .Values.secrets.awsAccessKeyId | quote }} AWS_SECRET_ACCESS_KEY: {{ .Values.secrets.awsSecretAccessKey | quote }} + {{- end }} + {{- if $createAdmin }} + S3PROXY_ADMIN_USERNAME: {{ .Values.admin.username | quote }} + S3PROXY_ADMIN_PASSWORD: {{ .Values.admin.password | quote }} + {{- end }} {{- end }} diff --git a/chart/values.yaml b/chart/values.yaml index 584dd80..167ddbf 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -97,6 +97,30 @@ secrets: logLevel: "DEBUG" +# Admin dashboard — served on the main service port at `admin.path`. +# Credentials are stored in a Secret. Override username/password in production, +# or point existingSecret at your own Secret holding the credential keys below. +admin: + enabled: false + path: "/admin" + username: "admin" + password: "admin" + existingSecret: + name: "" + usernameKey: "S3PROXY_ADMIN_USERNAME" + passwordKey: "S3PROXY_ADMIN_PASSWORD" + + # Optional dedicated Ingress for the dashboard, separate from the S3 data-plane + # ingress above. OFF by default — the admin UI should not be internet-facing + # unless you explicitly expose it (ideally behind an internal ingress class, + # an IP allowlist, or VPN). Routes `admin.path` to the proxy service. + ingress: + enabled: false + className: "nginx" + host: "" + annotations: {} + tls: [] # e.g. [{ secretName: admin-tls, hosts: [admin.example.com] }] + resources: requests: cpu: "100m" diff --git a/e2e/docker-compose.yml b/e2e/docker-compose.yml index c86c6d5..d34906d 100644 --- a/e2e/docker-compose.yml +++ b/e2e/docker-compose.yml @@ -59,9 +59,10 @@ services: BUILD_PID=$$! # Build esrally image with dependencies pre-installed (parallel) + # Pinned to 3.12 because yappi (esrally dep) has no 3.14 wheel yet ( printf '%s\n' \ - 'FROM python:3.14-slim' \ + 'FROM python:3.12-slim' \ 'RUN apt-get update && apt-get install -y --no-install-recommends gcc python3-dev git curl && rm -rf /var/lib/apt/lists/*' \ 'RUN pip install --no-cache-dir esrally' \ | docker build -t esrally:latest -f - /tmp @@ -419,6 +420,7 @@ services: --set performance.memoryLimitMb=64 \ --set gateway.enabled=true \ --set ingress.enabled=true \ + --set admin.enabled=true \ --set 'ingress.annotations.nginx\.ingress\.kubernetes\.io/proxy-body-size=256m' \ --set redis-ha.enabled=true \ --set redis-ha.persistentVolume.enabled=true \ diff --git a/pyproject.toml b/pyproject.toml index 382a267..aaf3cd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,10 @@ target-version = "py314" [tool.ruff.lint] select = ["E", "F", "I", "N", "W", "UP", "B", "C4", "SIM"] +[tool.ruff.lint.per-file-ignores] +# Inline HTML/CSS/JS template has unavoidably long lines +"s3proxy/admin/templates.py" = ["E501"] + [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] diff --git a/s3proxy/admin/__init__.py b/s3proxy/admin/__init__.py new file mode 100644 index 0000000..1534196 --- /dev/null +++ b/s3proxy/admin/__init__.py @@ -0,0 +1,6 @@ +"""Admin dashboard for S3Proxy.""" + +from .collectors import record_request +from .router import create_admin_router + +__all__ = ["create_admin_router", "record_request"] diff --git a/s3proxy/admin/auth.py b/s3proxy/admin/auth.py new file mode 100644 index 0000000..0b2bb55 --- /dev/null +++ b/s3proxy/admin/auth.py @@ -0,0 +1,160 @@ +"""Auth for admin dashboard — session cookies + Basic Auth fallback.""" + +from __future__ import annotations + +import base64 +import binascii +import hashlib +import hmac +import json +import secrets +import time +from typing import TYPE_CHECKING + +from fastapi import Depends, HTTPException, Request, status +from fastapi.responses import RedirectResponse +from fastapi.security import HTTPBasic, HTTPBasicCredentials + +if TYPE_CHECKING: + from ..config import Settings + +SESSION_COOKIE = "s3proxy_session" +SESSION_TTL_SECONDS = 24 * 3600 +_BASIC_REALM = "S3Proxy Admin" + + +class AdminCredentials: + """Resolved admin credentials with session-signing key.""" + + def __init__(self, settings: Settings, credentials_store: dict[str, str]): + if not (settings.admin_username and settings.admin_password): + raise RuntimeError( + "Admin dashboard requires S3PROXY_ADMIN_USERNAME and S3PROXY_ADMIN_PASSWORD" + ) + self.username = settings.admin_username + self.password = settings.admin_password + # Derive a session-signing secret from the KEK so cookies survive pod restarts. + self.session_secret = hashlib.sha256(b"s3proxy-admin-session|" + settings.kek).digest() + + def valid(self, username: str, password: str) -> bool: + return secrets.compare_digest(username.encode(), self.username.encode()) and ( + secrets.compare_digest(password.encode(), self.password.encode()) + ) + + +# --------------------------------------------------------------------------- +# Session cookie — base64(json({u, exp})) "." hex(hmac-sha256) +# --------------------------------------------------------------------------- + + +def _b64url_encode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") + + +def _b64url_decode(s: str) -> bytes: + padding = "=" * ((4 - len(s) % 4) % 4) + return base64.urlsafe_b64decode(s + padding) + + +def issue_session(username: str, secret: bytes, ttl: int = SESSION_TTL_SECONDS) -> str: + payload = {"u": username, "e": int(time.time()) + ttl} + body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode()) + mac = hmac.new(secret, body.encode(), hashlib.sha256).hexdigest() + return f"{body}.{mac}" + + +def verify_session(token: str, secret: bytes) -> str | None: + if not token or "." not in token: + return None + body, _, mac = token.rpartition(".") + try: + expected = hmac.new(secret, body.encode(), hashlib.sha256).hexdigest() + if not hmac.compare_digest(mac, expected): + return None + payload = json.loads(_b64url_decode(body)) + except ValueError, binascii.Error, json.JSONDecodeError: + return None + if int(payload.get("e", 0)) < int(time.time()): + return None + return str(payload.get("u", "")) + + +# --------------------------------------------------------------------------- +# Dependencies +# --------------------------------------------------------------------------- + + +_basic_security = HTTPBasic(realm=_BASIC_REALM, auto_error=False) +_basic_dep = Depends(_basic_security) + + +def _check_basic(creds: HTTPBasicCredentials | None, admin: AdminCredentials) -> str | None: + if creds is None: + return None + return admin.username if admin.valid(creds.username, creds.password) else None + + +def _check_cookie(request: Request, admin: AdminCredentials) -> str | None: + token = request.cookies.get(SESSION_COOKIE) + if not token: + return None + return verify_session(token, admin.session_secret) + + +def make_verify_html(admin: AdminCredentials, login_url: str): + """Auth dep for HTML routes — redirects to login_url if not logged in.""" + + async def verify( + request: Request, + creds: HTTPBasicCredentials | None = _basic_dep, + ) -> str: + user = _check_cookie(request, admin) or _check_basic(creds, admin) + if user: + return user + raise HTTPException( + status_code=status.HTTP_303_SEE_OTHER, + headers={"Location": login_url}, + ) + + return verify + + +def make_verify_api(admin: AdminCredentials): + """Auth dep for JSON API routes — returns 401 if not logged in.""" + + async def verify( + request: Request, + creds: HTTPBasicCredentials | None = _basic_dep, + ) -> str: + user = _check_cookie(request, admin) or _check_basic(creds, admin) + if user: + return user + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Not authenticated", + headers={"WWW-Authenticate": f'Basic realm="{_BASIC_REALM}"'}, + ) + + return verify + + +def set_session_cookie(response: RedirectResponse, token: str, secure: bool) -> None: + response.set_cookie( + SESSION_COOKIE, + token, + max_age=SESSION_TTL_SECONDS, + httponly=True, + samesite="strict", + secure=secure, + path="/", + ) + + +def clear_session_cookie(response: RedirectResponse) -> None: + response.delete_cookie(SESSION_COOKIE, path="/") + + +# Backwards-compat helper for existing tests. +def create_auth_dependency(settings: Settings, credentials_store: dict[str, str]): + admin = AdminCredentials(settings, credentials_store) + return make_verify_api(admin) diff --git a/s3proxy/admin/collectors.py b/s3proxy/admin/collectors.py new file mode 100644 index 0000000..d497a5a --- /dev/null +++ b/s3proxy/admin/collectors.py @@ -0,0 +1,656 @@ +"""Data collectors for the admin dashboard.""" + +from __future__ import annotations + +import hashlib +import os +import time +from collections import defaultdict, deque +from dataclasses import asdict, dataclass +from typing import TYPE_CHECKING + +from .. import metrics + +if TYPE_CHECKING: + from ..config import Settings + + +# --------------------------------------------------------------------------- +# Sliding-window rate tracker over Prometheus counters +# --------------------------------------------------------------------------- + + +class RateTracker: + """Sample counter values on a schedule, then compute deltas over the window.""" + + def __init__(self, window_seconds: int = 3600, max_samples: int = 180): + self._window = window_seconds + self._max_samples = max_samples + self._snapshots: deque[tuple[float, dict[str, float]]] = deque(maxlen=max_samples) + + def record(self, counters: dict[str, float]) -> None: + now = time.monotonic() + self._snapshots.append((now, dict(counters))) + cutoff = now - self._window + while len(self._snapshots) > 2 and self._snapshots[0][0] < cutoff: + self._snapshots.popleft() + + def rate_per_second(self, key: str) -> float: + if len(self._snapshots) < 2: + return 0.0 + t0, v0 = self._snapshots[0] + t1, v1 = self._snapshots[-1] + elapsed = t1 - t0 + if elapsed < 0.5: + return 0.0 + delta = v1.get(key, 0.0) - v0.get(key, 0.0) + return max(0.0, delta / elapsed) + + def total(self, key: str) -> float: + if not self._snapshots: + return 0.0 + _, v0 = self._snapshots[0] + _, v1 = self._snapshots[-1] + return max(0.0, v1.get(key, 0.0) - v0.get(key, 0.0)) + + def sparkline(self, key: str, points: int = 30) -> list[float]: + """Return per-bucket deltas suitable for a sparkline.""" + _, values = self.sparkline_series(key, points) + return values + + def sparkline_series(self, key: str, points: int = 30) -> tuple[list[float], list[float]]: + """Return (wall_clock_timestamps, per-bucket deltas) in parallel lists.""" + if len(self._snapshots) < 2: + return [], [] + snaps = list(self._snapshots) + # Map monotonic samples → wall-clock by using the current offset. + mono_now = time.monotonic() + wall_now = time.time() + offset = wall_now - mono_now + pairs: list[tuple[float, float]] = [] + for prev, curr in zip(snaps, snaps[1:], strict=False): + elapsed = curr[0] - prev[0] + if elapsed <= 0: + continue + pairs.append( + (curr[0] + offset, max(0.0, curr[1].get(key, 0.0) - prev[1].get(key, 0.0))) + ) + if len(pairs) > points: + step = len(pairs) / points + pairs = [pairs[int(i * step)] for i in range(points)] + times = [round(p[0], 3) for p in pairs] + values = [round(p[1], 2) for p in pairs] + return times, values + + def earliest_value(self, key: str) -> float | None: + if not self._snapshots: + return None + return self._snapshots[0][1].get(key, 0.0) + + +_rate_tracker = RateTracker() + + +# --------------------------------------------------------------------------- +# Request log — ring buffer for the activity feed +# --------------------------------------------------------------------------- + + +@dataclass(slots=True, frozen=True) +class RequestEntry: + timestamp: float + method: str + operation: str + bucket: str + key: str + status: int + duration_ms: float + size: int + client_ip: str + + +class RequestLog: + def __init__(self, maxlen: int = 200): + self._entries: deque[RequestEntry] = deque(maxlen=maxlen) + + def record(self, entry: RequestEntry) -> None: + self._entries.append(entry) + + def recent(self, limit: int = 10) -> list[dict]: + entries = list(self._entries) + entries.reverse() + return [asdict(e) for e in entries[:limit]] + + def all(self) -> list[RequestEntry]: + return list(self._entries) + + +_request_log = RequestLog(maxlen=200) + + +def record_request( + method: str, + path: str, + operation: str, + status: int, + duration: float, + size: int, + client_ip: str = "", +) -> None: + """Append a completed request to the ring buffer.""" + bucket, key = _split_bucket_key(path) + _request_log.record( + RequestEntry( + timestamp=time.time(), + method=method, + operation=operation, + bucket=bucket, + key=key, + status=status, + duration_ms=round(duration * 1000, 1), + size=size, + client_ip=client_ip, + ) + ) + + +def _split_bucket_key(path: str) -> tuple[str, str]: + stripped = path.lstrip("/") + if not stripped: + return "", "" + if "/" not in stripped: + return stripped, "" + bucket, _, key = stripped.partition("/") + return bucket, key + + +# --------------------------------------------------------------------------- +# Prometheus helpers +# --------------------------------------------------------------------------- + + +def _read_counter(counter) -> float: + return float(counter._value.get()) + + +def _read_labeled_counter_sum(counter) -> float: + total = 0.0 + for sample in counter.collect()[0].samples: + if sample.name.endswith("_total"): + total += sample.value + return total + + +def _read_errors_total() -> float: + errs = 0.0 + for sample in metrics.REQUEST_COUNT.collect()[0].samples: + if not sample.name.endswith("_total"): + continue + status = str(sample.labels.get("status", "")) + if status.startswith(("4", "5")): + errs += sample.value + return errs + + +def _read_error_breakdown() -> dict[str, float]: + """Break errors down by status class (4xx / 5xx / 503).""" + out = {"4xx": 0.0, "5xx": 0.0, "503": 0.0} + for sample in metrics.REQUEST_COUNT.collect()[0].samples: + if not sample.name.endswith("_total"): + continue + status = str(sample.labels.get("status", "")) + if status == "503": + out["503"] += sample.value + out["5xx"] += sample.value + elif status.startswith("5"): + out["5xx"] += sample.value + elif status.startswith("4"): + out["4xx"] += sample.value + return out + + +def _read_method_breakdown() -> dict[str, float]: + out: dict[str, float] = {} + for sample in metrics.REQUEST_COUNT.collect()[0].samples: + if not sample.name.endswith("_total"): + continue + method = str(sample.labels.get("method", "?")) + out[method] = out.get(method, 0.0) + sample.value + return out + + +def _latency_percentiles() -> dict[str, float]: + """Approximate p50/p95/p99 by walking the histogram cumulative buckets.""" + buckets: list[tuple[float, float]] = [] + total = 0.0 + for sample in metrics.REQUEST_DURATION.collect()[0].samples: + if sample.name.endswith("_bucket"): + le = sample.labels.get("le", "") + if le == "+Inf": + total = sample.value + else: + try: + buckets.append((float(le), sample.value)) + except ValueError: + continue + if total < 1 or not buckets: + return {"p50_ms": 0.0, "p95_ms": 0.0, "p99_ms": 0.0, "count": 0} + buckets.sort(key=lambda b: b[0]) + + def _pct(p: float) -> float: + threshold = total * p + for upper, count in buckets: + if count >= threshold: + return round(upper * 1000, 1) + return round(buckets[-1][0] * 1000, 1) + + return { + "p50_ms": _pct(0.5), + "p95_ms": _pct(0.95), + "p99_ms": _pct(0.99), + "count": int(total), + } + + +# --------------------------------------------------------------------------- +# Formatters +# --------------------------------------------------------------------------- + + +def _format_bytes(n: float) -> tuple[str, str]: + """Return (number, unit) pair so the UI can render them distinctly.""" + value = float(n) + for unit in ("B", "KB", "MB", "GB", "TB", "PB"): + if abs(value) < 1024 or unit == "PB": + if unit == "B": + return f"{int(value)}", unit + return f"{value:.1f}" if value < 100 else f"{value:.0f}", unit + value /= 1024 + return f"{value:.0f}", "PB" + + +def _format_uptime(seconds: float) -> str: + s = int(seconds) + days, rem = divmod(s, 86400) + hours, rem = divmod(rem, 3600) + minutes = rem // 60 + parts: list[str] = [] + if days: + parts.append(f"{days}d") + if hours or days: + parts.append(f"{hours}h") + parts.append(f"{minutes}m") + return " ".join(parts) + + +def _format_relative(ts: float, now: float | None = None) -> str: + delta = max(0.0, (now or time.time()) - ts) + if delta < 60: + return f"{int(delta)}s ago" + if delta < 3600: + return f"{int(delta // 60)}m ago" + if delta < 86400: + return f"{int(delta // 3600)}h ago" + return f"{int(delta // 86400)}d ago" + + +def _format_size(n: int) -> str: + if n <= 0: + return "—" + num, unit = _format_bytes(n) + return f"{num} {unit}" + + +# --------------------------------------------------------------------------- +# Derived aggregations from the request log +# --------------------------------------------------------------------------- + + +def _derive_buckets(entries: list[RequestEntry]) -> list[dict]: + by_bucket: dict[str, dict] = defaultdict( + lambda: {"objects": set(), "bytes": 0, "last_seen": 0.0} + ) + for e in entries: + if not e.bucket: + continue + info = by_bucket[e.bucket] + if e.key: + info["objects"].add(e.key) + if e.size > 0: + info["bytes"] += e.size + if e.timestamp > info["last_seen"]: + info["last_seen"] = e.timestamp + + out: list[dict] = [] + for name, info in by_bucket.items(): + num, unit = _format_bytes(info["bytes"]) + out.append( + { + "name": name, + "encrypted": True, + "objects": len(info["objects"]), + "size": f"{num} {unit}" if info["bytes"] > 0 else "—", + "last_seen": info["last_seen"], + } + ) + out.sort(key=lambda b: b["last_seen"], reverse=True) + return out + + +def _derive_keys(settings: Settings) -> list[dict]: + fp = hashlib.sha256(settings.kek).hexdigest()[:8] + return [ + { + "id": f"key-{fp}", + "type": "Local (KEK)", + "status": "Active", + "created": "—", + } + ] + + +# --------------------------------------------------------------------------- +# Aggregate collector +# --------------------------------------------------------------------------- + + +def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") -> dict: + """Gather everything the dashboard renders in a single JSON blob.""" + now = time.time() + uptime_s = max(0.0, time.monotonic() - start_time) + + total_requests = _read_labeled_counter_sum(metrics.REQUEST_COUNT) + bytes_encrypted = _read_counter(metrics.BYTES_ENCRYPTED) + bytes_decrypted = _read_counter(metrics.BYTES_DECRYPTED) + errors_total = _read_errors_total() + + counters = { + "requests": total_requests, + "bytes_crypto": bytes_encrypted + bytes_decrypted, + "errors": errors_total, + } + _rate_tracker.record(counters) + + req_rate = _rate_tracker.rate_per_second("requests") + crypto_rate = _rate_tracker.rate_per_second("bytes_crypto") + + num_enc, unit_enc = _format_bytes(bytes_encrypted) + num_thr, unit_thr = _format_bytes(crypto_rate) + + req_times, req_values = _rate_tracker.sparkline_series("requests") + crypto_times, crypto_values = _rate_tracker.sparkline_series("bytes_crypto") + err_times, err_values = _rate_tracker.sparkline_series("errors") + + entries = _request_log.all() + buckets = _derive_buckets(entries) + last_error_ts = next((e.timestamp for e in reversed(entries) if e.status >= 400), None) + + return { + "header": { + "title": "S3 Encryption Proxy", + "status": "Running", + "uptime": _format_uptime(uptime_s), + "pod": os.environ.get("HOSTNAME", "local"), + "version": version, + }, + "cards": { + "requests": { + "label": "Requests", + "value": f"{int(total_requests):,}", + "unit": "", + "spark": req_values, + "spark_times": req_times, + "y_label": "req / sample", + "breakdown": [ + {"label": m, "value": f"{int(v):,}", "weight": float(v)} + for m, v in sorted(_read_method_breakdown().items(), key=lambda kv: -kv[1]) + if v > 0 + ], + }, + "data_encrypted": { + "label": "Data Encrypted", + "value": num_enc, + "unit": unit_enc, + "spark": crypto_values, + "spark_times": crypto_times, + "y_label": "bytes / sample", + "breakdown": [ + { + "label": "Encrypted (PUT)", + "value": " ".join(_format_bytes(bytes_encrypted)), + "weight": float(bytes_encrypted), + }, + { + "label": "Decrypted (GET)", + "value": " ".join(_format_bytes(bytes_decrypted)), + "weight": float(bytes_decrypted), + }, + ], + }, + "errors": { + "label": "Errors", + "value": f"{int(errors_total):,}", + "unit": "", + "spark": err_values, + "spark_times": err_times, + "y_label": "errors / sample", + "breakdown": [ + {"label": k, "value": f"{int(v):,}", "weight": float(v)} + for k, v in _read_error_breakdown().items() + ], + }, + "active_buckets": { + "label": "Active Buckets", + "value": str(len(buckets)), + "unit": "", + "detail": f"seen in last {len(entries)} reqs", + "breakdown": [ + { + "label": b["name"], + "value": f"{b['objects']} obj · {b['size']}", + "weight": float(b["objects"]), + } + for b in buckets[:8] + ], + }, + }, + "latency": _latency_percentiles(), + "activity": [ + { + "time": _format_relative(e["timestamp"], now), + "operation": _operation_display(e["method"], e["operation"]), + "bucket": e["bucket"] or "—", + "object": e["key"] or "—", + "status": "Success" if e["status"] < 400 else "Error", + "status_code": e["status"], + "size": _format_size(e["size"]), + "client_ip": e["client_ip"] or "—", + "latency": f"{e['duration_ms']:.0f} ms", + } + for e in _request_log.recent(10) + ], + "buckets": [ + { + "name": b["name"], + "encrypted": b["encrypted"], + "objects": f"{b['objects']:,}", + "size": b["size"], + } + for b in buckets[:8] + ], + "keys": _derive_keys(settings), + "footer": { + "version": version, + "req_per_s": f"{req_rate:.0f}", + "throughput": f"{num_thr} {unit_thr}/s" if crypto_rate > 0 else f"0 {unit_thr}/s", + "last_error": _format_relative(last_error_ts, now) if last_error_ts else "never", + }, + } + + +def _operation_display(method: str, operation: str) -> str: + """Shorten operation names for the feed (GET, PUT, DELETE, etc.).""" + return method or operation + + +# --------------------------------------------------------------------------- +# S3-backed drill-down helpers (bucket list, object head) +# --------------------------------------------------------------------------- + + +async def list_bucket_objects( + settings: Settings, + credentials_store: dict[str, str], + bucket: str, + prefix: str = "", + delimiter: str | None = "/", + max_keys: int = 500, +) -> dict: + """List a "directory" in a bucket using ListObjectsV2 with a delimiter. + + When delimiter is "/" (default), the response is split into sub-prefixes + (folders) and objects at the current level — the standard S3-console + file-explorer shape. + """ + from ..client import S3Client, S3Credentials + + if not credentials_store: + raise RuntimeError("No S3 credentials available") + access = next(iter(credentials_store)) + creds = S3Credentials(access, credentials_store[access], settings.region) + + async with S3Client(settings, creds) as client: + result = await client.list_objects_v2( + bucket=bucket, + prefix=prefix or None, + delimiter=delimiter, + max_keys=max_keys, + ) + + prefix_len = len(prefix) + folders: list[dict] = [] + for cp in result.get("CommonPrefixes", []) or []: + full = cp.get("Prefix", "") + name = full[prefix_len:].rstrip("/") + folders.append({"prefix": full, "name": name}) + + objects: list[dict] = [] + for o in result.get("Contents", []) or []: + full = o.get("Key", "") + # Skip the "directory marker" object that some tools create at the prefix itself + if full == prefix: + continue + size = int(o.get("Size", 0)) + lm = o.get("LastModified") + objects.append( + { + "key": full, + "name": full[prefix_len:], + "size": size, + "size_h": _format_size(size), + "last_modified": lm.isoformat() if lm else "", + "etag": (o.get("ETag") or "").strip('"'), + } + ) + return { + "bucket": bucket, + "prefix": prefix, + "delimiter": delimiter or "", + "folders": folders, + "objects": objects, + "count": len(folders) + len(objects), + "is_truncated": bool(result.get("IsTruncated", False)), + } + + +def list_logs( + limit: int = 200, + query: str = "", + operation: str = "", + status: str = "", +) -> dict: + """Return filtered request-log entries for the /logs view.""" + now = time.time() + q = (query or "").strip().lower() + op_filter = (operation or "").strip().upper() + status_filter = (status or "").strip().lower() + + entries = list(_request_log.all()) + entries.reverse() + out: list[dict] = [] + for e in entries: + if op_filter and (e.method or e.operation).upper() != op_filter: + continue + if status_filter: + is_err = e.status >= 400 + if status_filter == "success" and is_err: + continue + if status_filter == "error" and not is_err: + continue + if q: + blob = f"{e.bucket} {e.key} {e.client_ip} {e.method} {e.operation} {e.status}".lower() + if q not in blob: + continue + out.append( + { + "time": _format_relative(e.timestamp, now), + "timestamp": e.timestamp, + "operation": _operation_display(e.method, e.operation), + "bucket": e.bucket or "", + "object": e.key or "", + "status": "Success" if e.status < 400 else "Error", + "status_code": e.status, + "size": _format_size(e.size), + "client_ip": e.client_ip or "", + "latency": f"{e.duration_ms:.0f} ms", + } + ) + if len(out) >= limit: + break + + all_ops = sorted( + {(e.method or e.operation) for e in _request_log.all() if e.method or e.operation} + ) + return { + "count": len(out), + "total": len(_request_log.all()), + "entries": out, + "operations": all_ops, + } + + +async def head_object_detail( + settings: Settings, + credentials_store: dict[str, str], + bucket: str, + key: str, +) -> dict: + """HEAD an object and return user-facing metadata.""" + from ..client import S3Client, S3Credentials + + if not credentials_store: + raise RuntimeError("No S3 credentials available") + access = next(iter(credentials_store)) + creds = S3Credentials(access, credentials_store[access], settings.region) + + async with S3Client(settings, creds) as client: + md = await client.head_object(bucket, key) + + user_metadata = dict(md.get("Metadata") or {}) + # Redact the binary envelope (encrypted DEK) — it's opaque to humans. + isec = user_metadata.pop(settings.dektag_name, None) + if isec is not None: + user_metadata["_encrypted_dek"] = f"<{len(isec)} bytes>" + + lm = md.get("LastModified") + return { + "bucket": bucket, + "key": key, + "content_length": int(md.get("ContentLength", 0)), + "size_h": _format_size(int(md.get("ContentLength", 0))), + "content_type": md.get("ContentType", ""), + "etag": (md.get("ETag") or "").strip('"'), + "last_modified": lm.isoformat() if lm else "", + "metadata": user_metadata, + "encrypted": isec is not None, + } diff --git a/s3proxy/admin/router.py b/s3proxy/admin/router.py new file mode 100644 index 0000000..f8f00e0 --- /dev/null +++ b/s3proxy/admin/router.py @@ -0,0 +1,176 @@ +"""Admin dashboard router.""" + +from __future__ import annotations + +import asyncio +import json as _json +import time +from typing import TYPE_CHECKING + +from botocore.exceptions import ClientError +from fastapi import APIRouter, Depends, Form, HTTPException, Request, status +from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse + +from .auth import ( + AdminCredentials, + clear_session_cookie, + issue_session, + make_verify_api, + make_verify_html, + set_session_cookie, +) +from .collectors import collect_all, head_object_detail, list_bucket_objects, list_logs +from .templates import render_dashboard, render_login + +if TYPE_CHECKING: + from ..config import Settings + + +def create_admin_router( + settings: Settings, + credentials_store: dict[str, str], + version: str = "1.0.0", +) -> APIRouter: + """Build the admin dashboard router with session cookie + Basic Auth.""" + admin = AdminCredentials(settings, credentials_store) + prefix = settings.admin_path.rstrip("/") + login_url = f"{prefix}/login" + verify_html = make_verify_html(admin, login_url) + verify_api = make_verify_api(admin) + cookie_secure = not settings.no_tls + + router = APIRouter() + + # ---- Login / logout (unauthenticated) ------------------------------------ + + @router.get("/login", response_class=HTMLResponse) + async def login_page(request: Request) -> HTMLResponse: + error = request.query_params.get("error") + return HTMLResponse(render_login(admin_path=settings.admin_path, error=error)) + + @router.post("/login") + async def login_submit( + request: Request, + username: str = Form(...), + password: str = Form(...), + ) -> RedirectResponse: + if not admin.valid(username, password): + dest = f"{settings.admin_path.rstrip('/')}/login?error=1" + return RedirectResponse(dest, status_code=status.HTTP_303_SEE_OTHER) + token = issue_session(username, admin.session_secret) + response = RedirectResponse( + settings.admin_path.rstrip("/") + "/", + status_code=status.HTTP_303_SEE_OTHER, + ) + set_session_cookie(response, token, secure=cookie_secure) + return response + + @router.get("/logout") + async def logout() -> RedirectResponse: + response = RedirectResponse( + settings.admin_path.rstrip("/") + "/login", + status_code=status.HTTP_303_SEE_OTHER, + ) + clear_session_cookie(response) + return response + + # ---- Authenticated routes ------------------------------------------------ + + @router.get("/", response_class=HTMLResponse, dependencies=[Depends(verify_html)]) + async def dashboard() -> HTMLResponse: + return HTMLResponse(render_dashboard(admin_path=settings.admin_path)) + + @router.get("/api/status", dependencies=[Depends(verify_api)]) + async def status_api(request: Request) -> JSONResponse: + data = collect_all( + request.app.state.settings, + request.app.state.start_time, + version=version, + ) + return JSONResponse(data) + + @router.get("/api/stream", dependencies=[Depends(verify_api)]) + async def status_stream(request: Request) -> StreamingResponse: + """Push status updates via SSE — only emits when the payload changes.""" + + async def event_gen(): + last_payload: str | None = None + last_heartbeat = time.monotonic() + try: + while True: + if await request.is_disconnected(): + return + data = collect_all( + request.app.state.settings, + request.app.state.start_time, + version=version, + ) + payload = _json.dumps(data) + now = time.monotonic() + if payload != last_payload: + last_payload = payload + last_heartbeat = now + yield f"event: status\ndata: {payload}\n\n" + elif now - last_heartbeat > 15: + last_heartbeat = now + yield ": hb\n\n" + await asyncio.sleep(1) + except asyncio.CancelledError: + return + + return StreamingResponse( + event_gen(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + "Connection": "keep-alive", + }, + ) + + @router.get("/api/logs", dependencies=[Depends(verify_api)]) + async def logs_api( + q: str = "", + operation: str = "", + status: str = "", + limit: int = 200, + ) -> JSONResponse: + data = list_logs( + limit=min(max(limit, 1), 500), + query=q, + operation=operation, + status=status, + ) + return JSONResponse(data) + + @router.get("/api/buckets/{bucket}", dependencies=[Depends(verify_api)]) + async def list_bucket( + bucket: str, + prefix: str = "", + delimiter: str = "/", + limit: int = 500, + ) -> JSONResponse: + try: + data = await list_bucket_objects( + settings, + credentials_store, + bucket, + prefix=prefix, + delimiter=delimiter or None, + max_keys=min(limit, 1000), + ) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "Error") + raise HTTPException(status_code=404, detail=f"{code}: {bucket}") from exc + return JSONResponse(data) + + @router.get("/api/objects/{bucket}/{key:path}", dependencies=[Depends(verify_api)]) + async def head_object_api(bucket: str, key: str) -> JSONResponse: + try: + data = await head_object_detail(settings, credentials_store, bucket, key) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "Error") + raise HTTPException(status_code=404, detail=f"{code}: {bucket}/{key}") from exc + return JSONResponse(data) + + return router diff --git a/s3proxy/admin/templates.py b/s3proxy/admin/templates.py new file mode 100644 index 0000000..0fcd28b --- /dev/null +++ b/s3proxy/admin/templates.py @@ -0,0 +1,1734 @@ +"""HTML templates for the admin dashboard — login + dashboard.""" + +from __future__ import annotations + +from html import escape as _esc + +_SHARED_CSS = """ + :root { + --bg: #fafafa; + --surface: #ffffff; + --border: #e5e7eb; + --border-strong: #d1d5db; + --text: #111827; + --text-muted: #6b7280; + --text-subtle: #9ca3af; + --ok: #10b981; + --ok-bg: #ecfdf5; + --err: #ef4444; + --err-bg: #fef2f2; + --dark: #111827; + --icon-bg: #f3f4f6; + --accent: #2563eb; + --accent-soft: rgba(37,99,235,0.12); + --accent-softer: rgba(37,99,235,0.04); + } + * { box-sizing: border-box; } + html, body { + margin: 0; + background: var(--bg); + color: var(--text); + font-family: -apple-system, BlinkMacSystemFont, "Inter", "Segoe UI", + Helvetica, Arial, sans-serif; + font-size: 14px; + line-height: 1.4; + -webkit-font-smoothing: antialiased; + } + a { color: inherit; } + .brand-mark { + width: 40px; height: 40px; + background: var(--dark); + border-radius: 10px; + display: flex; align-items: center; justify-content: center; + flex-shrink: 0; + } + .brand-mark svg { color: #fff; } + .brand-name { font-size: 20px; font-weight: 600; letter-spacing: -0.01em; } + .btn-dark { + background: #111827; color: #fff; + border: none; border-radius: 8px; + padding: 8px 14px; font-size: 13px; font-weight: 500; + cursor: pointer; + } + .btn-dark:hover { background: #1f2937; } + .btn-dark:disabled { opacity: .6; cursor: not-allowed; } + .btn-ghost { + background: transparent; color: var(--text); + border: 1px solid var(--border-strong); + border-radius: 8px; padding: 5px 10px; + font-size: 12px; cursor: pointer; + display: inline-flex; align-items: center; gap: 4px; + } + .btn-ghost:hover { background: var(--icon-bg); } +""" + + +_LOGIN_HTML = """ + + + + +Sign in · S3 Encryption Proxy + + + +
+
+ + + __ERROR_BLOCK__ + +
+ + +
+
+ + +
+ + +
+
+ + +""" + + +_DASHBOARD_HTML = """ + + + + +S3 Encryption Proxy + + + +
+ +
+
+ + S3 Encryption Proxy +
+
+ Running + Uptime: + Signed in · Logout +
+
+ + +
+
+ __CARD_REQUESTS__ + __CARD_DATA__ + __CARD_ERRORS__ + __CARD_BUCKETS__ +
+ +
+
+
Recent Activity
+
+ Live + View all logs → +
+
+ + + + + + + + + + +
TimeOperationBucketObjectStatusSizeClient IPLatency
No requests yet — traffic will appear here.
+
+ +
+
+
+
Buckets
+
+ + + + + + + +
NameEncryptionObjectsSize
No buckets observed yet.
+
+ +
+
+
Keys
+
+ + + + + +
Key IDTypeStatusCreated
+
+
+
+ + + + + + + + + + + + + +
+ + + + + + +""" + + +def _card_html(num: str, label: str, icon_svg: str, key: str) -> str: + return f""" + +
+
+ {icon_svg} + {label} +
+ + + +
+
+
 
+ +
+ """ + + +_ICON_REQUESTS = ( + '' + '' +) +_ICON_LOCK = ( + '' + '' +) +_ICON_ERR = ( + '' + '' + '' +) +_ICON_BUCKET = ( + '' + '' + '' +) + + +def render_dashboard(admin_path: str = "/admin") -> str: + """Render the dashboard HTML with API URLs + logout link substituted.""" + prefix = admin_path.rstrip("/") + html = _DASHBOARD_HTML + html = html.replace("__SHARED_CSS__", _SHARED_CSS) + html = html.replace("__STATUS_URL__", f"{prefix}/api/status") + html = html.replace("__STREAM_URL__", f"{prefix}/api/stream") + html = html.replace("__BUCKET_URL__", f"{prefix}/api/buckets") + html = html.replace("__OBJECT_URL__", f"{prefix}/api/objects") + html = html.replace("__LOGS_URL__", f"{prefix}/api/logs") + html = html.replace("__LOGIN_URL__", f"{prefix}/login") + html = html.replace("__LOGOUT_URL__", f"{prefix}/logout") + html = html.replace( + "__CARD_REQUESTS__", _card_html("1", "Requests", _ICON_REQUESTS, "requests") + ) + html = html.replace( + "__CARD_DATA__", _card_html("2", "Data Encrypted", _ICON_LOCK, "data_encrypted") + ) + html = html.replace("__CARD_ERRORS__", _card_html("3", "Errors", _ICON_ERR, "errors")) + html = html.replace( + "__CARD_BUCKETS__", _card_html("4", "Active Buckets", _ICON_BUCKET, "active_buckets") + ) + return html + + +def render_login(admin_path: str = "/admin", error: str | None = None) -> str: + """Render the sign-in page.""" + prefix = admin_path.rstrip("/") + error_block = ( + f'
{_esc(_login_error_text(error))}
' if error else "" + ) + html = _LOGIN_HTML + html = html.replace("__SHARED_CSS__", _SHARED_CSS) + html = html.replace("__LOGIN_ACTION__", f"{prefix}/login") + html = html.replace("__ERROR_BLOCK__", error_block) + return html + + +def _login_error_text(err: str) -> str: + mapping = {"1": "Invalid username or password."} + return mapping.get(err, "Sign in failed. Please try again.") diff --git a/s3proxy/app.py b/s3proxy/app.py index ec755e6..7ee19bf 100644 --- a/s3proxy/app.py +++ b/s3proxy/app.py @@ -5,6 +5,7 @@ import logging import os import sys +import time import uuid from collections.abc import AsyncIterator from contextlib import asynccontextmanager @@ -78,8 +79,10 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: handler = S3ProxyHandler(settings, credentials_store, multipart_manager) # Store in app.state for route access + app.state.settings = settings app.state.handler = handler app.state.verifier = verifier + app.state.start_time = time.monotonic() yield @@ -106,6 +109,15 @@ def create_app(settings: Settings | None = None) -> FastAPI: app = FastAPI(title="S3Proxy", lifespan=lifespan, docs_url=None, redoc_url=None) _register_exception_handlers(app) + + if settings.admin_ui: + from .admin import create_admin_router + + app.include_router( + create_admin_router(settings, credentials_store), + prefix=settings.admin_path, + ) + _register_routes(app) return app @@ -116,7 +128,20 @@ def _register_exception_handlers(app: FastAPI) -> None: @app.exception_handler(HTTPException) async def s3_exception_handler(request: Request, exc: HTTPException): - """Return S3-compatible error response with request ID.""" + """Return S3-compatible error response with request ID. + + Non-S3 exceptions that carry their own headers (e.g. admin auth 401 with + WWW-Authenticate) are passed through so browsers can prompt for credentials. + """ + if not isinstance(exc, S3Error) and getattr(exc, "headers", None): + from fastapi.responses import JSONResponse + + return JSONResponse( + status_code=exc.status_code, + content={"detail": exc.detail}, + headers=exc.headers, + ) + request_id = str(uuid.uuid4()).replace("-", "").upper()[:16] if isinstance(exc, S3Error): @@ -151,6 +176,12 @@ def _register_routes(app: FastAPI) -> None: async def health(): return PlainTextResponse("ok") + @app.get("/favicon.ico") + async def favicon() -> Response: + # Silence browser favicon probes so they don't fall through to the + # S3 catch-all and pollute the admin activity feed as a "bucket". + return Response(status_code=204) + @app.get("/metrics") async def metrics(): return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST) diff --git a/s3proxy/config.py b/s3proxy/config.py index 5d6b5d1..a679567 100644 --- a/s3proxy/config.py +++ b/s3proxy/config.py @@ -49,6 +49,12 @@ class Settings(BaseSettings): # Logging log_level: str = Field(default="INFO", description="Log level (DEBUG, INFO, WARNING, ERROR)") + # Admin dashboard + admin_ui: bool = Field(default=False, description="Enable the admin dashboard at admin_path") + admin_path: str = Field(default="/admin", description="URL path prefix for the admin UI") + admin_username: str = Field(default="", description="Admin dashboard username") + admin_password: str = Field(default="", description="Admin dashboard password") + # Cached KEK derived from encrypt_key (computed once in model_post_init) _kek: bytes = PrivateAttr() diff --git a/s3proxy/request_handler.py b/s3proxy/request_handler.py index 763bc53..4e322c6 100644 --- a/s3proxy/request_handler.py +++ b/s3proxy/request_handler.py @@ -13,6 +13,7 @@ from structlog.stdlib import BoundLogger from . import concurrency, crypto +from .admin import record_request from .errors import S3Error, raise_for_client_error, raise_for_exception from .handlers import S3ProxyHandler from .metrics import ( @@ -134,6 +135,13 @@ async def handle_proxy_request( REQUEST_COUNT.labels(method=method, operation=operation, status=status_code).inc() REQUEST_DURATION.labels(method=method, operation=operation).observe(duration) + try: + size = int(request.headers.get("content-length", "0")) + except ValueError: + size = 0 + client_ip = request.client.host if request.client else "" + record_request(method, path, operation, status_code, duration, size, client_ip) + if reserved_memory > 0: await concurrency.release_memory(reserved_memory) logger.info( diff --git a/tests/unit/test_admin.py b/tests/unit/test_admin.py new file mode 100644 index 0000000..2f2e832 --- /dev/null +++ b/tests/unit/test_admin.py @@ -0,0 +1,202 @@ +"""Tests for the admin dashboard.""" + +from __future__ import annotations + +import time + +import pytest +from fastapi.testclient import TestClient + +from s3proxy import metrics +from s3proxy.admin import collectors, record_request +from s3proxy.admin.auth import AdminCredentials, create_auth_dependency +from s3proxy.admin.router import create_admin_router +from s3proxy.admin.templates import render_dashboard +from s3proxy.config import Settings + + +def _reset_collector_state() -> None: + collectors._request_log._entries.clear() + collectors._rate_tracker._snapshots.clear() + + +@pytest.fixture(autouse=True) +def _clean_state(): + _reset_collector_state() + yield + _reset_collector_state() + + +@pytest.fixture +def admin_settings(): + return Settings( + host="http://localhost:9000", + encrypt_key="test-kek-32bytes!!!!!!!!!!!!!!!!", + admin_ui=True, + admin_username="admin", + admin_password="secret", + ) + + +def test_record_request_splits_bucket_and_key() -> None: + record_request("GET", "/my-bucket/path/to/file.txt", "GetObject", 200, 0.042, 1024, "10.0.0.1") + entries = collectors._request_log.all() + assert len(entries) == 1 + e = entries[0] + assert e.bucket == "my-bucket" + assert e.key == "path/to/file.txt" + assert e.status == 200 + assert e.duration_ms == pytest.approx(42.0) + assert e.client_ip == "10.0.0.1" + + +def test_collect_all_builds_expected_sections(admin_settings) -> None: + record_request("PUT", "/customer-data/invoice.pdf", "PutObject", 200, 0.05, 2048, "10.0.0.1") + record_request("GET", "/archives/log.gz", "GetObject", 500, 0.1, 0, "10.0.0.2") + + start = time.monotonic() - 120 # 2 minutes + data = collectors.collect_all(admin_settings, start_time=start, version="9.9.9") + + assert data["header"]["title"] == "S3 Encryption Proxy" + assert data["header"]["status"] == "Running" + assert "m" in data["header"]["uptime"] + + assert set(data["cards"].keys()) == {"requests", "data_encrypted", "errors", "active_buckets"} + assert data["cards"]["active_buckets"]["value"] == "2" + + ops = [row["operation"] for row in data["activity"]] + assert ops == ["GET", "PUT"] # newest first + assert data["activity"][0]["status"] == "Error" + assert data["activity"][1]["status"] == "Success" + assert data["activity"][1]["bucket"] == "customer-data" + assert data["activity"][1]["size"] == "2.0 KB" + + bucket_names = {b["name"] for b in data["buckets"]} + assert bucket_names == {"customer-data", "archives"} + + assert data["keys"][0]["status"] == "Active" + assert data["footer"]["version"] == "9.9.9" + + +def test_render_dashboard_injects_api_url() -> None: + html = render_dashboard(admin_path="/ops") + assert '"/ops/api/status"' in html + assert "__API_URL__" not in html + + +def _make_app(settings: Settings): + from fastapi import FastAPI + + app = FastAPI() + router = create_admin_router(settings, credentials_store={}, version="1.2.3") + app.include_router(router, prefix=settings.admin_path) + app.state.settings = settings + app.state.start_time = time.monotonic() + return app + + +def test_dashboard_redirects_to_login_without_auth(admin_settings) -> None: + client = TestClient(_make_app(admin_settings), follow_redirects=False) + r = client.get("/admin/") + assert r.status_code == 303 + assert r.headers["location"].endswith("/admin/login") + + +def test_dashboard_html_served_with_basic_auth(admin_settings) -> None: + client = TestClient(_make_app(admin_settings)) + r = client.get("/admin/", auth=("admin", "secret")) + assert r.status_code == 200 + assert "S3 Encryption Proxy" in r.text + assert "Recent Activity" in r.text + + +def test_login_page_renders(admin_settings) -> None: + client = TestClient(_make_app(admin_settings)) + r = client.get("/admin/login") + assert r.status_code == 200 + assert "Sign in to the admin dashboard" in r.text + + +def test_login_post_sets_cookie_and_redirects(admin_settings) -> None: + client = TestClient(_make_app(admin_settings), follow_redirects=False) + r = client.post("/admin/login", data={"username": "admin", "password": "secret"}) + assert r.status_code == 303 + assert r.headers["location"].endswith("/admin/") + assert "s3proxy_session=" in r.headers.get("set-cookie", "") + + +def test_login_post_rejects_bad_credentials(admin_settings) -> None: + client = TestClient(_make_app(admin_settings), follow_redirects=False) + r = client.post("/admin/login", data={"username": "admin", "password": "wrong"}) + assert r.status_code == 303 + assert "error=1" in r.headers["location"] + + +def test_session_cookie_authenticates_dashboard(admin_settings) -> None: + client = TestClient(_make_app(admin_settings), follow_redirects=False) + r = client.post("/admin/login", data={"username": "admin", "password": "secret"}) + cookie = r.headers["set-cookie"].split(";")[0] + r2 = client.get("/admin/", headers={"Cookie": cookie}) + assert r2.status_code == 200 + + +def test_logout_clears_cookie(admin_settings) -> None: + client = TestClient(_make_app(admin_settings), follow_redirects=False) + r = client.get("/admin/logout") + assert r.status_code == 303 + assert r.headers["location"].endswith("/admin/login") + assert "s3proxy_session=" in r.headers.get("set-cookie", "") + + +def test_status_api_returns_expected_shape(admin_settings) -> None: + client = TestClient(_make_app(admin_settings)) + r = client.get("/admin/api/status", auth=("admin", "secret")) + assert r.status_code == 200 + payload = r.json() + assert payload["header"]["status"] == "Running" + assert payload["footer"]["version"] == "1.2.3" + for key in ("requests", "data_encrypted", "errors", "active_buckets"): + assert key in payload["cards"] + assert "breakdown" in payload["cards"][key] + + +def test_status_api_401_without_auth(admin_settings) -> None: + client = TestClient(_make_app(admin_settings)) + r = client.get("/admin/api/status") + assert r.status_code == 401 + + +def test_auth_uses_explicit_credentials_not_aws() -> None: + settings = Settings( + host="http://localhost:9000", + encrypt_key="test-kek", + admin_ui=True, + admin_username="admin", + admin_password="admin", + ) + admin = AdminCredentials(settings, {"AKIAEXAMPLE": "secret-key"}) + assert admin.valid("admin", "admin") + # The AWS access key / secret must NOT work as dashboard credentials. + assert not admin.valid("AKIAEXAMPLE", "secret-key") + + +def test_auth_raises_when_credentials_blank() -> None: + settings = Settings( + host="http://localhost:9000", + encrypt_key="test-kek", + admin_ui=True, + admin_username="", + admin_password="", + ) + with pytest.raises(RuntimeError): + create_auth_dependency(settings, {}) + + +def test_collector_does_not_crash_on_empty_metrics(admin_settings) -> None: + """collect_all must work even before any request has been recorded.""" + # Ensure we don't blow up on cold start + data = collectors.collect_all(admin_settings, start_time=time.monotonic(), version="x") + expected = f"{int(collectors._read_labeled_counter_sum(metrics.REQUEST_COUNT)):,}" + assert data["cards"]["requests"]["value"] == expected + assert data["activity"] == [] + assert data["buckets"] == []