From 23634b410d7021cdc04e232db3b7120f45fb6250 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:01:54 +0100 Subject: [PATCH 01/15] Add opa dependency function to create OpaUserClient --- src/blueapi/service/authorization.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index 372a6220d..bbd4bd796 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -1,13 +1,15 @@ import logging from collections.abc import Mapping from contextlib import AbstractAsyncContextManager, aclosing, nullcontext -from typing import Any, Self +from typing import Any, Self, cast import aiohttp from aiohttp import ClientSession +from fastapi import Depends, HTTPException, Request +from starlette import status from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount -from blueapi.service.authentication import TiledAuth +from blueapi.service.authentication import TiledAuth, unchecked_bearer_token LOGGER = logging.getLogger(__name__) @@ -74,3 +76,14 @@ async def validate_tiled_config( tiled.token_url = oidc.token_endpoint auth = TiledAuth(tiled) await opa.require_tiled_service_account(auth.get_access_token()) + + +async def opa( + request: Request, token: str | None = Depends(unchecked_bearer_token) +) -> OpaUserClient | None: + + if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): + if not token: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + return opa.for_token(token) + return None From 0f685a2982a4ff88b2ffebfb67220b7a796d90b1 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:24:54 +0100 Subject: [PATCH 02/15] test opa dependency function --- src/blueapi/service/authorization.py | 2 +- .../unit_tests/service/test_authorization.py | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index bbd4bd796..b8c3b9eb4 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -85,5 +85,5 @@ async def opa( if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) - return opa.for_token(token) + return OpaUserClient(opa, token) return None diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index 249198580..ff3949174 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -2,11 +2,13 @@ from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest +from fastapi import HTTPException from pydantic import HttpUrl from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authorization import ( OpaClient, + opa, validate_tiled_config, ) @@ -149,3 +151,28 @@ async def test_validate_tiled_config_with_missing_config( assert await validate_tiled_config(tiled_auth, oidc, opa_client) is None if opa_client is not None: opa_client.require_tiled_service_account.assert_not_called() + + +async def test_opa_dependency_method(): + request = MagicMock() + + user_client = await opa(request, "foo_bar") + + assert user_client is not None + assert user_client.client == request.app.state.authz + assert user_client.token == "foo_bar" + + +async def test_opa_dependency_without_token(): + request = MagicMock() + + with pytest.raises(HTTPException, match="401"): + await opa(request, None) + + +@pytest.mark.parametrize("token", ["foo_bar", None]) +async def test_opa_dependency_without_authz(token): + request = MagicMock() + del request.app.state.authz + user_client = await opa(request, token) + assert user_client is None From 6302a760672748a67183fff3ed34812e4667b72d Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:17:35 +0100 Subject: [PATCH 03/15] Add can_submit_task auth check method and config --- helm/blueapi/config_schema.json | 7 ++++- helm/blueapi/values.schema.json | 7 ++++- src/blueapi/config.py | 1 + src/blueapi/service/authorization.py | 30 +++++++++++++++++++ .../unit_tests/service/test_authorization.py | 1 + tests/unit_tests/test_config.py | 1 + 6 files changed, 45 insertions(+), 2 deletions(-) diff --git a/helm/blueapi/config_schema.json b/helm/blueapi/config_schema.json index 68fc79539..d5203cd56 100644 --- a/helm/blueapi/config_schema.json +++ b/helm/blueapi/config_schema.json @@ -344,10 +344,15 @@ "tiled_service_account_check": { "title": "Tiled Service Account Check", "type": "string" + }, + "submit_task_check": { + "title": "Submit Task Check", + "type": "string" } }, "required": [ - "tiled_service_account_check" + "tiled_service_account_check", + "submit_task_check" ], "title": "OpaConfig", "type": "object", diff --git a/helm/blueapi/values.schema.json b/helm/blueapi/values.schema.json index 7472c37a2..4fa6f4f28 100644 --- a/helm/blueapi/values.schema.json +++ b/helm/blueapi/values.schema.json @@ -756,7 +756,8 @@ "title": "OpaConfig", "type": "object", "required": [ - "tiled_service_account_check" + "tiled_service_account_check", + "submit_task_check" ], "properties": { "root": { @@ -767,6 +768,10 @@ "maxLength": 2083, "minLength": 1 }, + "submit_task_check": { + "title": "Submit Task Check", + "type": "string" + }, "tiled_service_account_check": { "title": "Tiled Service Account Check", "type": "string" diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 06c149955..49cfa113b 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -299,6 +299,7 @@ class Tag(StrEnum): class OpaConfig(BlueapiBaseModel): root: HttpUrl = HttpUrl("http://localhost:8181") tiled_service_account_check: str + submit_task_check: str class ApplicationConfig(BlueapiBaseModel): diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index b8c3b9eb4..d5645f109 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -1,4 +1,5 @@ import logging +import re from collections.abc import Mapping from contextlib import AbstractAsyncContextManager, aclosing, nullcontext from typing import Any, Self, cast @@ -10,8 +11,10 @@ from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authentication import TiledAuth, unchecked_bearer_token +from blueapi.service.model import TaskRequest LOGGER = logging.getLogger(__name__) +INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P\d+)-(?P\d+)$") class OpaClient: @@ -60,6 +63,33 @@ async def require_tiled_service_account(self, token: str): f"Tiled service account is not valid for '{self._instrument}'" ) + async def require_submit_task(self, instrument_session: str, token: str): + if not (match := INSTRUMENT_SESSION_RE.match(instrument_session)): + raise ValueError("Invalid instrument session") + + if not await self._call_opa( + self._conf.submit_task_check, + { + "token": token, + "proposal": int(match["proposal"]), + "visit": int(match["visit"]), + }, + ): + raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED) + + +class OpaUserClient: + client: OpaClient + token: str + + def __init__(self, client: OpaClient, token: str): + self.client = client + self.token = token + + async def can_submit_task(self, task: TaskRequest): + LOGGER.info("Checking permissions to run task") + await self.client.require_submit_task(task.instrument_session, self.token) + async def validate_tiled_config( tiled: ServiceAccount | str | None, oidc: OIDCConfig | None, opa: OpaClient | None diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index ff3949174..37c1d7e3f 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -24,6 +24,7 @@ def opa_config() -> OpaConfig: return OpaConfig( root=HttpUrl("http://auth.example.com"), + submit_task_check="/auth/submit", tiled_service_account_check="/auth/tiled", ) diff --git a/tests/unit_tests/test_config.py b/tests/unit_tests/test_config.py index b6e52c393..0d161b061 100644 --- a/tests/unit_tests/test_config.py +++ b/tests/unit_tests/test_config.py @@ -340,6 +340,7 @@ def test_config_yaml_parsed(temp_yaml_config_file): "opa": { "root": "http://opa.example.com/", "tiled_service_account_check": "v1/tiled_service_account", + "submit_task_check": "v1/submit_task", }, }, { From 7140d58e11cca9f29a03718fb7bfd0ac9e2e23cd Mon Sep 17 00:00:00 2001 From: root Date: Fri, 15 May 2026 08:27:14 +0000 Subject: [PATCH 04/15] feat: add authz dependency injection --- src/blueapi/service/main.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 3114fa73f..7a8f46e42 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -40,7 +40,7 @@ from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum -from .authorization import OpaClient, validate_tiled_config +from .authorization import OpaClient, OpaUserClient, opa, validate_tiled_config from .model import ( DeviceModel, DeviceResponse, @@ -258,6 +258,13 @@ def get_device_by_name( ) +async def submission_check( + opa: Annotated[OpaUserClient, Depends(opa)], + task_request: TaskRequest, +): + await opa.can_submit_task(task_request) + + @secure_router_v1.post("/tasks", status_code=status.HTTP_201_CREATED, tags=[Tag.TASK]) @secure_router.post("/tasks", status_code=status.HTTP_201_CREATED, tags=[Tag.TASK]) @start_as_current_span( @@ -271,6 +278,7 @@ def submit_task( request: Request, response: Response, task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])], + authz_check: Annotated[None, Depends(submission_check)], runner: Annotated[WorkerDispatcher, Depends(_runner)], user: Fedid, ) -> TaskResponse: From 1dffe0669d8118aa6d00e6b86ebb8bfadbc7eb95 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 18 May 2026 15:01:34 +0000 Subject: [PATCH 05/15] feat: add auth check dependency injections to task endpoints --- src/blueapi/service/authorization.py | 8 +++++++- src/blueapi/service/main.py | 16 +++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index d5645f109..17a3b696b 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -2,7 +2,7 @@ import re from collections.abc import Mapping from contextlib import AbstractAsyncContextManager, aclosing, nullcontext -from typing import Any, Self, cast +from typing import Annotated, Any, Self, cast import aiohttp from aiohttp import ClientSession @@ -117,3 +117,9 @@ async def opa( raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) return OpaUserClient(opa, token) return None + +async def submit_permission( + opa: Annotated[OpaUserClient, Depends(opa)], + task_request: TaskRequest, +): + await opa.can_submit_task(task_request) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 7a8f46e42..9cd0694c2 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -40,7 +40,7 @@ from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum -from .authorization import OpaClient, OpaUserClient, opa, validate_tiled_config +from .authorization import OpaClient, submit_permission, validate_tiled_config from .model import ( DeviceModel, DeviceResponse, @@ -258,13 +258,6 @@ def get_device_by_name( ) -async def submission_check( - opa: Annotated[OpaUserClient, Depends(opa)], - task_request: TaskRequest, -): - await opa.can_submit_task(task_request) - - @secure_router_v1.post("/tasks", status_code=status.HTTP_201_CREATED, tags=[Tag.TASK]) @secure_router.post("/tasks", status_code=status.HTTP_201_CREATED, tags=[Tag.TASK]) @start_as_current_span( @@ -278,7 +271,7 @@ def submit_task( request: Request, response: Response, task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])], - authz_check: Annotated[None, Depends(submission_check)], + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], user: Fedid, ) -> TaskResponse: @@ -317,6 +310,7 @@ def submit_task( @start_as_current_span(TRACER, "task_id") def delete_submitted_task( task_id: str, + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> TaskResponse: return TaskResponse(task_id=runner.run(interface.clear_task, task_id)) @@ -335,6 +329,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: @start_as_current_span(TRACER) def get_tasks( runner: Annotated[WorkerDispatcher, Depends(_runner)], + _: Annotated[None, Depends(submit_permission)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: """ @@ -371,6 +366,7 @@ def get_tasks( def set_active_task( request: Request, task: WorkerTask, + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerTask: """Set a task to active status, the worker should begin it as soon as possible. @@ -401,6 +397,7 @@ def get_passthrough_headers(request: Request) -> dict[str, str]: @start_as_current_span(TRACER, "task_id") def get_task( task_id: str, + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> TrackableTask: """Retrieve a task""" @@ -478,6 +475,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt def set_state( state_change_request: StateChangeRequest, response: Response, + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ From 6eae8f2621442c507c9d2264b0ec5407a2159ab3 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 08:13:42 +0000 Subject: [PATCH 06/15] feat: create new access task permission fns and add as dependencies --- src/blueapi/service/main.py | 57 +++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 9cd0694c2..6b999cb6e 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -146,6 +146,41 @@ def get_app(config: ApplicationConfig): return app +def access_task_permission( + request: Request, + task_id: str, + runner: Annotated[WorkerDispatcher, Depends(_runner)], +): + access_token: dict[str, Any] | None = getattr( + request.state, "decoded_access_token", None + ) + try: + task = runner.run(interface.get_task_by_id, task_id) + except KeyError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from None + + if ( + access_token + and task + and access_token.get("fedid") != task.task.metadata.get("user") + ): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + + +# start_task_permission is used when there is WorkerTask +def start_task_permission( + request: Request, + task: WorkerTask, + runner: Annotated[WorkerDispatcher, Depends(_runner)], +): + if not task.task_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail="No task id provided", + ) + access_task_permission(request, task.task_id, runner) + + async def on_key_error_404(_: Request, __: Exception): return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, @@ -310,7 +345,7 @@ def submit_task( @start_as_current_span(TRACER, "task_id") def delete_submitted_task( task_id: str, - _: Annotated[None, Depends(submit_permission)], + _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> TaskResponse: return TaskResponse(task_id=runner.run(interface.clear_task, task_id)) @@ -328,8 +363,8 @@ def validate_task_status(v: str) -> TaskStatusEnum: @secure_router.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @start_as_current_span(TRACER) def get_tasks( + request: Request, runner: Annotated[WorkerDispatcher, Depends(_runner)], - _: Annotated[None, Depends(submit_permission)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: """ @@ -349,6 +384,15 @@ def get_tasks( tasks = runner.run(interface.get_tasks_by_status, desired_status) else: tasks = runner.run(interface.get_tasks) + + access_token: dict[str, Any] | None = getattr( + request.state, "decoded_access_token", None + ) + user = access_token.get("fedid") if access_token else None + + if user: + tasks = [t for t in tasks if t.task.metadata.get("user") == user] + return TasksListResponse(tasks=tasks) @@ -366,7 +410,7 @@ def get_tasks( def set_active_task( request: Request, task: WorkerTask, - _: Annotated[None, Depends(submit_permission)], + _: Annotated[None, Depends(start_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerTask: """Set a task to active status, the worker should begin it as soon as possible. @@ -397,7 +441,7 @@ def get_passthrough_headers(request: Request) -> dict[str, str]: @start_as_current_span(TRACER, "task_id") def get_task( task_id: str, - _: Annotated[None, Depends(submit_permission)], + _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> TrackableTask: """Retrieve a task""" @@ -475,7 +519,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt def set_state( state_change_request: StateChangeRequest, response: Response, - _: Annotated[None, Depends(submit_permission)], + _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ @@ -507,6 +551,9 @@ def set_state( elif new_state == WorkerState.RUNNING: runner.run(interface.resume_worker) elif new_state in {WorkerState.ABORTING, WorkerState.STOPPING}: + # active = runner.run(interface.get_active_task) + # if active.task.metadata.get("user"): + try: runner.run( interface.cancel_active_task, From 291338df90c86128c6a645718b890f09e85bf75b Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 08:30:53 +0000 Subject: [PATCH 07/15] refactor: update rest api version --- src/blueapi/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 49cfa113b..8b25b7d63 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -309,7 +309,7 @@ class ApplicationConfig(BlueapiBaseModel): """ #: API version to publish in OpenAPI schema - REST_API_VERSION: ClassVar[str] = "1.3.0" + REST_API_VERSION: ClassVar[str] = "1.3.1" LICENSE_INFO: ClassVar[dict[str, str]] = { "name": "Apache 2.0", From afe5ea0b20e5027d6886b3ce562b1b9a57b5f7cf Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 10:13:46 +0000 Subject: [PATCH 08/15] comment out dependency addition in set_state --- src/blueapi/config.py | 2 +- src/blueapi/service/main.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 8b25b7d63..49cfa113b 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -309,7 +309,7 @@ class ApplicationConfig(BlueapiBaseModel): """ #: API version to publish in OpenAPI schema - REST_API_VERSION: ClassVar[str] = "1.3.1" + REST_API_VERSION: ClassVar[str] = "1.3.0" LICENSE_INFO: ClassVar[dict[str, str]] = { "name": "Apache 2.0", diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 6b999cb6e..0ac4d94a4 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -519,7 +519,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt def set_state( state_change_request: StateChangeRequest, response: Response, - _: Annotated[None, Depends(access_task_permission)], + # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ From 16c1d585eb2521ed62b0dacc3914a5f3a1d801d9 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 14:16:24 +0000 Subject: [PATCH 09/15] refactor: add admin check and check to set state function --- src/blueapi/service/authentication.py | 3 +++ src/blueapi/service/main.py | 30 ++++++++++++++--------- tests/unit_tests/service/test_rest_api.py | 2 +- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/blueapi/service/authentication.py b/src/blueapi/service/authentication.py index 64dfc3004..156540bb0 100644 --- a/src/blueapi/service/authentication.py +++ b/src/blueapi/service/authentication.py @@ -286,6 +286,9 @@ def unchecked_bearer_token(req: Request) -> str | None: return None return param.strip() + def admin(self): + return False + UncheckedBearerToken = Annotated[str | None, Depends(unchecked_bearer_token)] diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 0ac4d94a4..4d6e1708a 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -147,6 +147,7 @@ def get_app(config: ApplicationConfig): def access_task_permission( + opa: Annotated[OPAClient, Depends(get_opa_client)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -154,21 +155,19 @@ def access_task_permission( access_token: dict[str, Any] | None = getattr( request.state, "decoded_access_token", None ) - try: - task = runner.run(interface.get_task_by_id, task_id) - except KeyError: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from None + task = runner.run(interface.get_task_by_id, task_id) - if ( + if not opa.admin() and ( access_token and task and access_token.get("fedid") != task.task.metadata.get("user") ): - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # start_task_permission is used when there is WorkerTask def start_task_permission( + opa: Annotated[OPAClient, Depends(get_opa_client)], request: Request, task: WorkerTask, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -178,7 +177,7 @@ def start_task_permission( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="No task id provided", ) - access_task_permission(request, task.task_id, runner) + access_task_permission(opa, request, task.task_id, runner) async def on_key_error_404(_: Request, __: Exception): @@ -390,8 +389,7 @@ def get_tasks( ) user = access_token.get("fedid") if access_token else None - if user: - tasks = [t for t in tasks if t.task.metadata.get("user") == user] + tasks = [t for t in tasks if t.task.metadata.get("user") == user] return TasksListResponse(tasks=tasks) @@ -517,8 +515,10 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt ) @start_as_current_span(TRACER, "state_change_request.new_state") def set_state( + request: Request, state_change_request: StateChangeRequest, response: Response, + opa: Annotated[OPAClient, Depends(get_opa_client)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: @@ -546,14 +546,20 @@ def set_state( current_state in _ALLOWED_TRANSITIONS and new_state in _ALLOWED_TRANSITIONS[current_state] ): + active = runner.run(interface.get_active_task) + access_token: dict[str, Any] | None = getattr( + request.state, "decoded_access_token", None + ) + user = access_token.get("fedid") if access_token else None + + if not opa.admin() and active and active.task.metadata.get("user") != user: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + if new_state == WorkerState.PAUSED: runner.run(interface.pause_worker, state_change_request.defer) elif new_state == WorkerState.RUNNING: runner.run(interface.resume_worker) elif new_state in {WorkerState.ABORTING, WorkerState.STOPPING}: - # active = runner.run(interface.get_active_task) - # if active.task.metadata.get("user"): - try: runner.run( interface.cancel_active_task, diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index c1d3b6a95..bf0a6a997 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -251,7 +251,7 @@ def test_create_task(mock_runner: Mock, client: TestClient) -> None: response = client.post("/tasks", json=task.model_dump()) - mock_runner.run.assert_called_with(submit_task, task, {"user": "Unknown"}) + mock_runner.run.assert_called_with(submit_task, task, {"user": None}) assert response.json() == {"task_id": task_id} From 0e1c02bb6c955f22bf4d3a4099f8857c340da87f Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:34:41 +0100 Subject: [PATCH 10/15] Update dependency names --- src/blueapi/service/main.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 4d6e1708a..22401afe3 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -40,7 +40,13 @@ from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum -from .authorization import OpaClient, submit_permission, validate_tiled_config +from .authorization import ( + OpaClient, + OpaUserClient, + opa, + submit_permission, + validate_tiled_config, +) from .model import ( DeviceModel, DeviceResponse, @@ -147,7 +153,7 @@ def get_app(config: ApplicationConfig): def access_task_permission( - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -167,7 +173,7 @@ def access_task_permission( # start_task_permission is used when there is WorkerTask def start_task_permission( - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], request: Request, task: WorkerTask, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -518,7 +524,7 @@ def set_state( request: Request, state_change_request: StateChangeRequest, response: Response, - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: From 46bc00635808f7b56f14103936c9b53bca76fc51 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:53:37 +0100 Subject: [PATCH 11/15] Add missing admin check --- helm/blueapi/config_schema.json | 7 ++++++- helm/blueapi/values.schema.json | 7 ++++++- src/blueapi/config.py | 1 + src/blueapi/service/authorization.py | 7 +++++++ tests/unit_tests/test_config.py | 1 + 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/helm/blueapi/config_schema.json b/helm/blueapi/config_schema.json index d5203cd56..603474d8d 100644 --- a/helm/blueapi/config_schema.json +++ b/helm/blueapi/config_schema.json @@ -348,11 +348,16 @@ "submit_task_check": { "title": "Submit Task Check", "type": "string" + }, + "admin_check": { + "title": "Admin Check", + "type": "string" } }, "required": [ "tiled_service_account_check", - "submit_task_check" + "submit_task_check", + "admin_check" ], "title": "OpaConfig", "type": "object", diff --git a/helm/blueapi/values.schema.json b/helm/blueapi/values.schema.json index 4fa6f4f28..825ad21d0 100644 --- a/helm/blueapi/values.schema.json +++ b/helm/blueapi/values.schema.json @@ -757,9 +757,14 @@ "type": "object", "required": [ "tiled_service_account_check", - "submit_task_check" + "submit_task_check", + "admin_check" ], "properties": { + "admin_check": { + "title": "Admin Check", + "type": "string" + }, "root": { "title": "Root", "default": "http://localhost:8181/", diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 49cfa113b..d0eb4cf55 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -300,6 +300,7 @@ class OpaConfig(BlueapiBaseModel): root: HttpUrl = HttpUrl("http://localhost:8181") tiled_service_account_check: str submit_task_check: str + admin_check: str class ApplicationConfig(BlueapiBaseModel): diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index 17a3b696b..97f44940f 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -77,6 +77,9 @@ async def require_submit_task(self, instrument_session: str, token: str): ): raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED) + async def is_admin(self, token: str) -> bool: + return await self._call_opa(self._conf.admin_check, {"token": token}) + class OpaUserClient: client: OpaClient @@ -90,6 +93,9 @@ async def can_submit_task(self, task: TaskRequest): LOGGER.info("Checking permissions to run task") await self.client.require_submit_task(task.instrument_session, self.token) + async def admin(self) -> bool: + return await self.client.is_admin(self.token) + async def validate_tiled_config( tiled: ServiceAccount | str | None, oidc: OIDCConfig | None, opa: OpaClient | None @@ -118,6 +124,7 @@ async def opa( return OpaUserClient(opa, token) return None + async def submit_permission( opa: Annotated[OpaUserClient, Depends(opa)], task_request: TaskRequest, diff --git a/tests/unit_tests/test_config.py b/tests/unit_tests/test_config.py index 0d161b061..abc2851e6 100644 --- a/tests/unit_tests/test_config.py +++ b/tests/unit_tests/test_config.py @@ -341,6 +341,7 @@ def test_config_yaml_parsed(temp_yaml_config_file): "root": "http://opa.example.com/", "tiled_service_account_check": "v1/tiled_service_account", "submit_task_check": "v1/submit_task", + "admin_check": "v1/admin_check", }, }, { From 33f7cecc96f53ee51e335ffcb88fc9edf316d7c4 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 15:45:58 +0100 Subject: [PATCH 12/15] Handle missing opa and fix tests --- src/blueapi/service/authorization.py | 3 +- src/blueapi/service/main.py | 12 ++++++-- tests/unit_tests/service/test_rest_api.py | 35 +++++++++++++++++++---- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index 97f44940f..b3a46b611 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -129,4 +129,5 @@ async def submit_permission( opa: Annotated[OpaUserClient, Depends(opa)], task_request: TaskRequest, ): - await opa.can_submit_task(task_request) + if opa: + await opa.can_submit_task(task_request) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 22401afe3..6e2bd2f88 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -153,11 +153,14 @@ def get_app(config: ApplicationConfig): def access_task_permission( - opa: Annotated[OpaUserClient, Depends(opa)], + opa: Annotated[OpaUserClient | None, Depends(opa)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): + if not opa: + return + access_token: dict[str, Any] | None = getattr( request.state, "decoded_access_token", None ) @@ -558,7 +561,12 @@ def set_state( ) user = access_token.get("fedid") if access_token else None - if not opa.admin() and active and active.task.metadata.get("user") != user: + if ( + opa + and not opa.admin() + and active + and active.task.metadata.get("user") != user + ): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) if new_state == WorkerState.PAUSED: diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index bf0a6a997..a2248e798 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -574,7 +574,12 @@ def test_get_state(mock_runner: Mock, client: TestClient): def test_set_state_running_to_paused(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.PAUSED - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -588,7 +593,12 @@ def test_set_state_running_to_paused(mock_runner: Mock, client: TestClient): def test_set_state_paused_to_running(mock_runner: Mock, client: TestClient): current_state = WorkerState.PAUSED final_state = WorkerState.RUNNING - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -602,7 +612,12 @@ def test_set_state_paused_to_running(mock_runner: Mock, client: TestClient): def test_set_state_running_to_aborting(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.ABORTING - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -619,7 +634,12 @@ def test_set_state_running_to_stopping_including_reason( current_state = WorkerState.RUNNING final_state = WorkerState.STOPPING reason = "blueapi is being stopped" - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", @@ -635,7 +655,12 @@ def test_set_state_transition_error(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.STOPPING - mock_runner.run.side_effect = [current_state, TransitionError(), final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + TransitionError(), + final_state, + ] response = client.put( "/worker/state", From d4ca1b8139d460e77fe5b5ca91d8b9fe064fd1e7 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 15:46:43 +0100 Subject: [PATCH 13/15] Remove old admin method --- src/blueapi/service/authentication.py | 3 -- src/blueapi/service/main.py | 42 +++++++-------------------- 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/src/blueapi/service/authentication.py b/src/blueapi/service/authentication.py index 156540bb0..64dfc3004 100644 --- a/src/blueapi/service/authentication.py +++ b/src/blueapi/service/authentication.py @@ -286,9 +286,6 @@ def unchecked_bearer_token(req: Request) -> str | None: return None return param.strip() - def admin(self): - return False - UncheckedBearerToken = Annotated[str | None, Depends(unchecked_bearer_token)] diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 6e2bd2f88..06c8765c6 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -154,31 +154,21 @@ def get_app(config: ApplicationConfig): def access_task_permission( opa: Annotated[OpaUserClient | None, Depends(opa)], - request: Request, task_id: str, + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): - if not opa: - return - - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) task = runner.run(interface.get_task_by_id, task_id) - if not opa.admin() and ( - access_token - and task - and access_token.get("fedid") != task.task.metadata.get("user") - ): + if opa and not opa.admin() and (task and fedid != task.task.metadata.get("user")): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # start_task_permission is used when there is WorkerTask def start_task_permission( - opa: Annotated[OpaUserClient, Depends(opa)], - request: Request, task: WorkerTask, + opa: Annotated[OpaUserClient, Depends(opa)], + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): if not task.task_id: @@ -186,7 +176,7 @@ def start_task_permission( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="No task id provided", ) - access_task_permission(opa, request, task.task_id, runner) + access_task_permission(opa, task.task_id, fedid, runner) async def on_key_error_404(_: Request, __: Exception): @@ -316,12 +306,11 @@ def submit_task( task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])], _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], - user: Fedid, + fedid: Fedid, ) -> TaskResponse: """Submit a task to the worker.""" try: - user = user or "Unknown" - task_id: str = runner.run(interface.submit_task, task_request, {"user": user}) + task_id: str = runner.run(interface.submit_task, task_request, {"user": fedid}) response.headers["Location"] = f"{request.url}/{task_id}" return TaskResponse(task_id=task_id) except ValidationError as e: @@ -371,7 +360,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: @secure_router.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @start_as_current_span(TRACER) def get_tasks( - request: Request, + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: @@ -393,12 +382,7 @@ def get_tasks( else: tasks = runner.run(interface.get_tasks) - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) - user = access_token.get("fedid") if access_token else None - - tasks = [t for t in tasks if t.task.metadata.get("user") == user] + tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] return TasksListResponse(tasks=tasks) @@ -524,9 +508,9 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt ) @start_as_current_span(TRACER, "state_change_request.new_state") def set_state( - request: Request, state_change_request: StateChangeRequest, response: Response, + fedid: Fedid, opa: Annotated[OpaUserClient, Depends(opa)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -556,16 +540,12 @@ def set_state( and new_state in _ALLOWED_TRANSITIONS[current_state] ): active = runner.run(interface.get_active_task) - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) - user = access_token.get("fedid") if access_token else None if ( opa and not opa.admin() and active - and active.task.metadata.get("user") != user + and active.task.metadata.get("user") != fedid ): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) From a5a0f48c77febadb52d07d2346fff0781c175f86 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:17:51 +0100 Subject: [PATCH 14/15] Use starlette statuses directly --- src/blueapi/service/authorization.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index b3a46b611..cb4dd2722 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -7,7 +7,7 @@ import aiohttp from aiohttp import ClientSession from fastapi import Depends, HTTPException, Request -from starlette import status +from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authentication import TiledAuth, unchecked_bearer_token @@ -75,7 +75,7 @@ async def require_submit_task(self, instrument_session: str, token: str): "visit": int(match["visit"]), }, ): - raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED) + raise HTTPException(status_code=HTTP_403_FORBIDDEN) async def is_admin(self, token: str) -> bool: return await self._call_opa(self._conf.admin_check, {"token": token}) @@ -120,13 +120,13 @@ async def opa( if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): if not token: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + raise HTTPException(status_code=HTTP_401_UNAUTHORIZED) return OpaUserClient(opa, token) return None async def submit_permission( - opa: Annotated[OpaUserClient, Depends(opa)], + opa: Annotated[OpaUserClient | None, Depends(opa)], task_request: TaskRequest, ): if opa: From 8112e4b9200d44958c11b4f47e184d31fc384640 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:27:47 +0100 Subject: [PATCH 15/15] test task submission authz --- .../unit_tests/service/test_authorization.py | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index 37c1d7e3f..65f5c44a6 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -8,9 +8,12 @@ from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authorization import ( OpaClient, + OpaUserClient, opa, + submit_permission, validate_tiled_config, ) +from blueapi.service.model import TaskRequest # Reusable client patch decorator patch_client_session = patch( @@ -25,6 +28,7 @@ def opa_config() -> OpaConfig: return OpaConfig( root=HttpUrl("http://auth.example.com"), submit_task_check="/auth/submit", + admin_check="/auth/admin", tiled_service_account_check="/auth/tiled", ) @@ -108,6 +112,105 @@ async def test_opa_adds_input_fields(session: MagicMock, opa_config: OpaConfig): ) +@pytest.mark.parametrize( + "result,context", + [(True, nullcontext()), (False, pytest.raises(HTTPException, match="403"))], +) +@patch_client_session +async def test_require_submit_task( + session: MagicMock, + opa_config: OpaConfig, + result: bool, + context: AbstractContextManager, +): + session.return_value.post = AsyncMock( + return_value=MagicMock(json=AsyncMock(return_value={"result": result})) + ) + + client = OpaClient(instrument="p99", config=opa_config) + + session.assert_called_once_with(base_url="http://auth.example.com/") + with context: + await client.require_submit_task( + instrument_session="cm12345-1", token="foo_bar" + ) + + session().post.assert_called_once_with( + "/auth/submit", + json={ + "input": { + "token": "foo_bar", + "beamline": "p99", + "audience": "account", + "visit": 1, + "proposal": 12345, + } + }, + ) + + +@patch_client_session +async def test_opa_require_submit_task_invalid_session( + session: MagicMock, opa_config: OpaConfig +): + client = OpaClient(instrument="p45", config=opa_config) + + with pytest.raises(ValueError): + await client.require_submit_task( + instrument_session="not a session", token="foo_bar" + ) + + +@pytest.mark.parametrize("result", [True, False]) +@patch_client_session +async def test_opa_is_admin(session: MagicMock, opa_config: OpaConfig, result: bool): + session.return_value.post = AsyncMock( + return_value=MagicMock(json=AsyncMock(return_value={"result": result})) + ) + client = OpaClient(instrument="p45", config=opa_config) + + admin = await client.is_admin("foo_bar") + + assert admin == result + + session().post.assert_called_once_with( + "/auth/admin", + json={"input": {"token": "foo_bar", "beamline": "p45", "audience": "account"}}, + ) + + +@pytest.mark.parametrize( + "result,context", + [ + (None, nullcontext()), + (HTTPException(status_code=403), pytest.raises(HTTPException, match="403")), + ], +) +async def test_user_client_can_submit_task(result, context: AbstractContextManager): + opa = MagicMock(spec=OpaUserClient) + opa.require_submit_task = AsyncMock(side_effect=result) + + user_client = OpaUserClient(opa, "foo_bar") + + with context: + await user_client.can_submit_task( + TaskRequest(name="foo", params={}, instrument_session="cm12345-1") + ) + opa.require_submit_task.assert_called_once_with("cm12345-1", "foo_bar") + + +@pytest.mark.parametrize("result", [True, False]) +async def test_user_client_admin(result: bool): + opa = MagicMock(spec=OpaUserClient) + opa.is_admin = AsyncMock(return_value=result) + + user_client = OpaUserClient(opa, "foo_bar") + + admin = await user_client.admin() + + assert admin == result + + async def test_validate_tiled_config(): opa = MagicMock(spec=OpaClient) tiled = ServiceAccount() @@ -177,3 +280,21 @@ async def test_opa_dependency_without_authz(token): del request.app.state.authz user_client = await opa(request, token) assert user_client is None + + +@pytest.mark.parametrize( + "result,context", + [ + (None, nullcontext()), + (HTTPException(status_code=403), pytest.raises(HTTPException, match="403")), + ], +) +async def test_submit_permission_dependency(result, context: AbstractContextManager): + opa = MagicMock(spec=OpaUserClient) + opa.can_submit_task.side_effect = result + with context: + await submit_permission(opa, Mock()) + + +async def test_submit_permission_dependency_without_opa(): + assert await submit_permission(None, Mock()) is None