Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from dlq_utils import get_ingress_list_dlq_name
from lib.context_utils import store_context_async, extract_otel_trace_context
from lib.logging_utils import init_logger
from lib.metrics import increment_counter
from lib.queue import VconQueue
from lib.vcon_redis import VconRedis
from lib.vcon_egress_compat import to_configured_legacy
Expand Down Expand Up @@ -800,6 +801,10 @@ async def post_vcon(
if context:
await store_context_async(redis_async, ingress_list, vcon_uuid_str, context)
await queue.enqueue_async(redis_async, ingress_list, vcon_uuid_str)
increment_counter(
"conserver.api.count_vcons_enqueued",
attributes={"ingress_list": ingress_list, "source": "new"},
)

try:
vcon_hook.on_vcon_created(str(inbound_vcon.uuid), dict_vcon, ingress_lists)
Expand Down Expand Up @@ -898,6 +903,10 @@ async def external_ingress_vcon(
if context:
await store_context_async(redis_async, ingress_list, vcon_uuid_str, context)
await queue.enqueue_async(redis_async, ingress_list, vcon_uuid_str)
increment_counter(
"conserver.api.count_vcons_enqueued",
attributes={"ingress_list": ingress_list, "source": "external"},
)

logger.info(
f"Successfully stored vCon {inbound_vcon.uuid} and added to ingress list {ingress_list}"
Expand Down Expand Up @@ -1032,6 +1041,11 @@ async def post_vcon_ingress(
for vcon_uuid_str in valid_vcon_uuids:
await store_context_async(redis_async, ingress_list, vcon_uuid_str, context)
await queue.enqueue_async(redis_async, ingress_list, *valid_vcon_uuids)
increment_counter(
"conserver.api.count_vcons_enqueued",
value=len(valid_vcon_uuids),
attributes={"ingress_list": ingress_list, "source": "reingress"},
)
logger.info(f"Added {len(valid_vcon_uuids)} vCon UUIDs to ingress list {ingress_list}")
else:
logger.warning(f"No valid vCons found to add to ingress list {ingress_list}")
Expand Down Expand Up @@ -1153,6 +1167,12 @@ async def post_dlq_reprocess(
break
await queue.enqueue_async(redis_async, ingress_list, item)
counter += 1
if counter:
increment_counter(
"conserver.api.count_vcons_enqueued",
value=counter,
attributes={"ingress_list": ingress_list, "source": "dlq_reprocess"},
)
return JSONResponse(content=counter)
except Exception as e:
logger.error(f"Error reprocessing DLQ: {str(e)}")
Expand Down
116 changes: 115 additions & 1 deletion common/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi.testclient import TestClient
from vcon_fixture import generate_mock_vcon
import pytest
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
import api
from datetime import datetime
from settings import CONSERVER_API_TOKEN, CONSERVER_HEADER_NAME
Expand Down Expand Up @@ -140,3 +140,117 @@ def test_post_vcon_with_ingress_list():
vcon_ids = response.json()
assert test_vcon["uuid"] in vcon_ids
print("Ingress list contains vCon ID: {}".format(test_vcon["uuid"]))


@pytest.mark.anyio
def test_post_vcon_with_ingress_list_increments_enqueue_counter():
"""The API must count every vCon it pushes onto an ingress list — the
chain-stall alert uses this counter as its arrivals signal (CON-618)."""
test_vcon = generate_mock_vcon()
ingress_list_name = "test_ingress_list_counter"

with patch("api.increment_counter") as mock_counter:
with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client:
response = client.post(
"/vcon", json=test_vcon, params={"ingress_lists": [ingress_list_name]}
)
assert response.status_code == 201

mock_counter.assert_called_once_with(
"conserver.api.count_vcons_enqueued",
attributes={"ingress_list": ingress_list_name, "source": "new"},
)


@pytest.mark.anyio
def test_post_vcon_without_ingress_list_skips_enqueue_counter():
test_vcon = generate_mock_vcon()

with patch("api.increment_counter") as mock_counter:
with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client:
response = client.post("/vcon", json=test_vcon)
assert response.status_code == 201

mock_counter.assert_not_called()


@pytest.mark.anyio
def test_vcon_ingress_bulk_increments_enqueue_counter():
"""POST /vcon/ingress counts every UUID it pushes (source=reingress)."""
test_vcon = generate_mock_vcon()
post_vcon(test_vcon)
ingress_list_name = "test_ingress_bulk_counter"

with patch("api.increment_counter") as mock_counter:
with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client:
response = client.post(
"/vcon/ingress",
json=[test_vcon["uuid"]],
params={"ingress_list": ingress_list_name},
)
assert response.status_code == 204

mock_counter.assert_called_once_with(
"conserver.api.count_vcons_enqueued",
value=1,
attributes={"ingress_list": ingress_list_name, "source": "reingress"},
)


@pytest.mark.anyio
def test_external_ingress_increments_enqueue_counter():
"""POST /vcon/external-ingress counts the submitted vCon (source=external)."""
test_vcon = generate_mock_vcon()
ingress_list_name = "partner_list"

with patch.object(
api.Configuration, "get_ingress_auth", return_value={ingress_list_name: "partner-key"}
), patch("api.increment_counter") as mock_counter:
with TestClient(api.app, headers={CONSERVER_HEADER_NAME: "partner-key"}) as client:
response = client.post(
"/vcon/external-ingress",
json=test_vcon,
params={"ingress_list": ingress_list_name},
)
assert response.status_code == 204

mock_counter.assert_called_once_with(
"conserver.api.count_vcons_enqueued",
attributes={"ingress_list": ingress_list_name, "source": "external"},
)


@pytest.mark.anyio
def test_dlq_reprocess_increments_enqueue_counter_with_moved_count():
"""POST /dlq/reprocess counts how many items it moved back (source=dlq_reprocess)."""
ingress_list_name = "test_dlq_counter"

with patch.object(
api.queue, "dequeue_dlq_async", new=AsyncMock(side_effect=["uuid-1", "uuid-2", None])
), patch.object(api.queue, "enqueue_async", new=AsyncMock()), patch(
"api.increment_counter"
) as mock_counter:
with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client:
response = client.post("/dlq/reprocess", params={"ingress_list": ingress_list_name})
assert response.status_code == 200
assert response.json() == 2

mock_counter.assert_called_once_with(
"conserver.api.count_vcons_enqueued",
value=2,
attributes={"ingress_list": ingress_list_name, "source": "dlq_reprocess"},
)


@pytest.mark.anyio
def test_dlq_reprocess_empty_dlq_skips_enqueue_counter():
"""An empty DLQ moves nothing and must not emit a zero-count increment."""
with patch.object(
api.queue, "dequeue_dlq_async", new=AsyncMock(return_value=None)
), patch("api.increment_counter") as mock_counter:
with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client:
response = client.post("/dlq/reprocess", params={"ingress_list": "empty_dlq"})
assert response.status_code == 200
assert response.json() == 0

mock_counter.assert_not_called()