Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
from lib.context_utils import store_context_async, extract_otel_trace_context
from lib.logging_utils import init_logger
from lib.queue import VconQueue
from lib.vcon_redis import VconRedis
from lib.vcon_egress_compat import to_configured_legacy
import redis_mgr
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request as StarletteRequest
Expand Down Expand Up @@ -485,6 +487,12 @@ async def sync_vcon_from_storage(vcon_uuid: UUID) -> Optional[dict]:
for storage_name in Configuration.get_storages():
vcon = Storage(storage_name=storage_name).get(str(vcon_uuid))
if vcon:
# Storage may hold a legacy / egress-converted format (see the
# egress_format_version storage option). Canonicalize to the current
# spec before caching or returning so Redis and API clients never see
# legacy field names or JSON-string bodies — mirrors the
# VconRedis.get_vcon storage-fallback path.
VconRedis._enforce_spec_on_write(vcon)
# Store the vCon back in Redis with expiration
await cache_vcon_in_redis(f"vcon:{str(vcon_uuid)}", vcon)
# Add to sorted set for timestamp-based retrieval
Expand Down Expand Up @@ -610,11 +618,13 @@ async def get_vcon(vcon_uuid: UUID) -> JSONResponse:
HTTPException: If vCon is not found (404)
"""
vcon = await ensure_vcon_in_redis(vcon_uuid)

if not vcon:
raise HTTPException(status_code=404, detail="vCon not found")

return JSONResponse(content=vcon)

# Redis/cache stays canonical; emit the configured legacy format (if any)
# only on the egress response.
return JSONResponse(content=to_configured_legacy(vcon))


@api_router.get(
Expand Down Expand Up @@ -649,7 +659,9 @@ async def get_vcons(
if not vcon:
# Only sync from storage if not found in Redis (avoids redundant Redis check)
vcon = await sync_vcon_from_storage(vcon_uuid)
results.append(vcon)
# Redis/cache stays canonical; emit the configured legacy format (if
# any) only on the egress response.
results.append(to_configured_legacy(vcon) if vcon else vcon)

return JSONResponse(content=results, status_code=200)

Expand Down
156 changes: 156 additions & 0 deletions common/lib/vcon_egress_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Spec→legacy vCon conversion for egress compatibility (CON-581).

This is the inverse of :func:`lib.vcon_compat.normalize_legacy_fields`.

The conserver normalizes every vCon *up* to the current spec (``vcon: "0.4.0"``)
on read and write. Downstream consumers built against an older schema (e.g.
``0.0.1``) break on the canonical shape — notably the ``type`` → ``purpose``
attachment rename and the write-path serialization of dict/list analysis and
attachment bodies into JSON strings (``encoding: "json"``). This module converts
an *outgoing* payload back to a legacy version — reversing the field renames and
re-inflating those JSON-string bodies to native objects with ``encoding: "none"``
— so those consumers keep working while a migration is planned.

It never mutates the canonical in-pipeline copy: callers pass ``vcon.to_dict()``
and receive a new, deep-copied, downgraded dict. Enable it per egress point via
the ``egress_format_version`` option on the webhook link and the
postgres / s3 / elasticsearch storage modules. When the option is unset, callers
skip this module entirely and behaviour is unchanged.

The rename tables below mirror ``lib.vcon_compat`` (in the opposite direction).
``test_vcon_egress_compat`` round-trips ``normalize_legacy_fields(to_legacy(x))``
to guard against the two drifting apart.
"""

from __future__ import annotations

import copy
import json
from typing import Any, Dict

# Spec name → legacy name. Inverse of lib.vcon_compat._TOP_LEVEL_RENAMES.
_TOP_LEVEL_SPEC_TO_LEGACY = {
"amended": "appended",
"critical": "must_support",
}

# Spec name → legacy name for dialog / analysis / attachment entries.
# Inverse of the renames applied by lib.vcon_compat._normalize_entry.
_ENTRY_SPEC_TO_LEGACY = {
"schema": "schema_version",
"mediatype": "mimetype",
"critical": "must_support",
}

# Legacy versions this module knows how to emit.
SUPPORTED_VERSIONS = {"0.0.1"}


def _rename(d: Dict[str, Any], old: str, new: str) -> None:
"""Move ``d[old]`` to ``d[new]`` unless ``d[new]`` is already set.

Mirrors ``vcon_compat._rename``: if both are present the destination
(legacy) field wins and the source is dropped.
"""
if old not in d:
return
if new in d:
d.pop(old, None)
return
d[new] = d.pop(old)


def _entry_to_legacy(entry: Dict[str, Any]) -> None:
if not isinstance(entry, dict):
return
for spec, legacy in _ENTRY_SPEC_TO_LEGACY.items():
_rename(entry, spec, legacy)


def _body_to_legacy(entry: Dict[str, Any]) -> None:
"""Inverse of ``VconRedis._stringify_json_body``.

The spec write-path serializes dict/list ``body`` values to a JSON string
and sets ``encoding: "json"``. The legacy 0.0.1 shape carries the native
object/array with ``encoding: "none"``, so parse it back. Applied to
analysis and attachment entries only — dialog bodies are not stringified on
write. Left untouched if the body isn't valid JSON.
"""
if not isinstance(entry, dict):
return
if entry.get("encoding") == "json" and isinstance(entry.get("body"), str):
try:
entry["body"] = json.loads(entry["body"])
except (ValueError, TypeError):
return
entry["encoding"] = "none"


def _attachment_to_legacy(att: Dict[str, Any]) -> None:
if not isinstance(att, dict):
return
_entry_to_legacy(att)
# Spec uses ``purpose``; the legacy field was ``type``. Mirror the forward
# normalizer's caveat: only migrate when ``type`` is absent, since the
# ``lawful_basis`` extension legitimately uses ``type`` as its value.
if "type" not in att and "purpose" in att:
att["type"] = att.pop("purpose")


def to_legacy(vcon_dict: Dict[str, Any], target_version: str) -> Dict[str, Any]:
"""Return a deep-copied vCon dict converted to ``target_version``.

:param vcon_dict: a spec-current (0.4.0) vCon dict, e.g. ``vcon.to_dict()``.
:param target_version: legacy version to emit; must be in
:data:`SUPPORTED_VERSIONS`.
:raises ValueError: if ``target_version`` is not supported.

The input is never mutated.
"""
if target_version not in SUPPORTED_VERSIONS:
raise ValueError(
f"Unsupported egress_format_version {target_version!r}; "
f"supported: {sorted(SUPPORTED_VERSIONS)}"
)
if not isinstance(vcon_dict, dict):
return vcon_dict

out = copy.deepcopy(vcon_dict)

for spec, legacy in _TOP_LEVEL_SPEC_TO_LEGACY.items():
_rename(out, spec, legacy)

for entry in out.get("dialog", []) or []:
_entry_to_legacy(entry)
for entry in out.get("analysis", []) or []:
_entry_to_legacy(entry)
_body_to_legacy(entry)
for att in out.get("attachments", []) or []:
_attachment_to_legacy(att)
_body_to_legacy(att)

# Legacy 0.0.1 always carries these top-level keys (matching the shape in
# Strolid's store today); the 0.4.0 library drops empty group/redacted.
out.setdefault("group", [])
out.setdefault("redacted", {})
out.setdefault("appended", None)

out["vcon"] = target_version
return out


def to_configured_legacy(vcon_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Apply :func:`to_legacy` if the deployment configured a legacy egress
version, otherwise return ``vcon_dict`` unchanged.

Reads the single ``EGRESS_FORMAT_VERSION`` setting (deployment-wide). This
is the one place every egress point — the webhook link, the storage
backends, and the API read endpoints — consults, so the behavior is
configured once rather than per module. The setting is read lazily on each
call so tests (and runtime config reloads) take effect.
"""
from settings import EGRESS_FORMAT_VERSION

if EGRESS_FORMAT_VERSION:
return to_legacy(vcon_dict, EGRESS_FORMAT_VERSION)
return vcon_dict
10 changes: 7 additions & 3 deletions common/lib/vcon_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ def _enforce_spec_on_write(cls, vcon_dict: dict) -> dict:
# Rename legacy field names before the rest of the enforcement
# so subsequent loops operate on spec-named entries.
normalize_legacy_fields(vcon_dict)
# draft-ietf-vcon-vcon-core-02 §4.1.1 — syntax param.
if not vcon_dict.get("vcon"):
vcon_dict["vcon"] = "0.4.0"
# draft-ietf-vcon-vcon-core-02 §4.1.1 — syntax param. The renames above
# bring field names up to the current spec, so stamp the matching
# version unconditionally. A missing value, or a stale legacy value
# (e.g. "0.0.1" from a legacy producer or an egress-converted storage
# payload loaded back on a Redis miss), would otherwise misdescribe the
# now-canonical data.
vcon_dict["vcon"] = "0.4.0"
# speckit: ``group`` is reserved and must not be emitted empty.
if vcon_dict.get("group") == []:
vcon_dict.pop("group", None)
Expand Down
8 changes: 8 additions & 0 deletions common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@
# Enable parallel storage writes using ThreadPoolExecutor (default True)
CONSERVER_PARALLEL_STORAGE = os.getenv("CONSERVER_PARALLEL_STORAGE", "true").lower() in ("true", "1", "yes")

# Egress format compatibility (deployment-wide).
# When set to a legacy vCon version string (e.g. "0.0.1"), every egress point
# emits that older format instead of the current spec: the webhook link, the
# storage backends, and the API read endpoints. Leave unset to emit the current
# spec everywhere. The canonical in-pipeline representation (Redis cache, link
# processing) is always kept on the current spec regardless of this setting.
EGRESS_FORMAT_VERSION = os.getenv("EGRESS_FORMAT_VERSION") or None

# Per-worker in-flight vCon concurrency (default 1 = strict serial, current behaviour).
# When > 1, each worker process dispatches up to N vCons to a ThreadPoolExecutor,
# back-pressuring before BLPOP so at most N chains run in parallel per worker.
Expand Down
15 changes: 12 additions & 3 deletions common/storage/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from lib.logging_utils import init_logger
from lib.vcon_redis import VconRedis
from lib.vcon_egress_compat import to_configured_legacy
import logging
import elasticsearch
import json
Expand Down Expand Up @@ -65,6 +66,12 @@ def save(
vcon = vcon_redis.get_vcon(vcon_uuid)
vcon_dict = vcon.to_dict()

# If EGRESS_FORMAT_VERSION is set, downgrade the indexed payload to that
# legacy format for downstream consumers built against an older schema.
# The canonical vCon in Redis is untouched. This also restores the
# legacy attachment ``type`` key the index names below are built from.
vcon_dict = to_configured_legacy(vcon_dict)

if not vcon_dict["dialog"]:
return

Expand Down Expand Up @@ -99,9 +106,11 @@ def save(

# Index the attachments, separated by 'type' - id=f"{vcon_uuid}_{attachment_index}"
for ind, attachment in enumerate(vcon_dict["attachments"]):
attachment_type = attachment.get(
"type"
).lower() # TODO this might be "purpose" in some of the attachments!!
# Spec 0.4.0 renamed attachment ``type`` -> ``purpose``; accept
# either so the index name resolves regardless of payload version.
attachment_type = (
attachment.get("type") or attachment.get("purpose") or "unknown"
).lower()
encoding = attachment.get("encoding", "none")
if encoding == "json" and isinstance(attachment["body"], str): # Only parse if it's a string
attachment["body"] = json.loads(attachment["body"])
Expand Down
17 changes: 12 additions & 5 deletions common/storage/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Optional, Dict, Any, Type
from lib.logging_utils import init_logger
from lib.vcon_redis import VconRedis
from lib.vcon_egress_compat import to_configured_legacy
from playhouse.postgres_ext import PostgresqlExtDatabase, BinaryJSONField
from peewee import (
Model,
Expand Down Expand Up @@ -117,26 +118,32 @@ def save(
try:
vcon_redis = VconRedis()
vcon = vcon_redis.get_vcon(vcon_uuid)


# If EGRESS_FORMAT_VERSION is set, downgrade the persisted payload to
# that legacy format for downstream consumers built against an older
# schema. The canonical vCon in Redis is untouched; the stored copy
# normalizes back up on read.
vcon_json = to_configured_legacy(vcon.to_dict())

# Connect to Postgres
db = get_db_connection(opts)
table_name = opts.get("table_name", "vcons")

# Create dynamic model for this database and table
VconsModel = create_vcons_model(db, table_name)

# Ensure table exists
db.create_tables([VconsModel], safe=True)

# Prepare vCon data
vcon_data = {
"id": vcon.uuid,
"uuid": vcon.uuid,
"vcon": vcon.vcon,
"vcon": vcon_json.get("vcon", vcon.vcon),
"created_at": vcon.created_at,
"updated_at": datetime.now(),
"subject": vcon.subject,
"vcon_json": vcon.to_dict(),
"vcon_json": vcon_json,
}

# Insert or update the vCon
Expand Down
9 changes: 8 additions & 1 deletion common/storage/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional
from lib.logging_utils import init_logger
from lib.vcon_redis import VconRedis
from lib.vcon_egress_compat import to_configured_legacy
import boto3

logger = init_logger(__name__)
Expand Down Expand Up @@ -85,10 +86,16 @@ def save(
vcon = vcon_redis.get_vcon(vcon_uuid)
s3 = _create_s3_client(opts)

# If EGRESS_FORMAT_VERSION is set, downgrade the stored payload to that
# legacy format for downstream consumers built against an older schema.
# The canonical vCon in Redis is untouched.
vcon_dict = to_configured_legacy(vcon.to_dict())
body = json.dumps(vcon_dict)

date_path = _date_prefix(vcon.created_at)
destination_directory = _build_s3_key(vcon_uuid, date_path, opts.get("s3_path"))
s3.put_object(
Bucket=opts["aws_bucket"], Key=destination_directory, Body=vcon.dumps()
Bucket=opts["aws_bucket"], Key=destination_directory, Body=body
)

lookup_key = _build_lookup_key(vcon_uuid, opts.get("s3_path"))
Expand Down
Loading
Loading