diff --git a/docs/design/client_api_execution_modes.md b/docs/design/client_api_execution_modes.md new file mode 100644 index 0000000000..6f3c69d7e0 --- /dev/null +++ b/docs/design/client_api_execution_modes.md @@ -0,0 +1,1001 @@ +# Client API Execution Modes Design + +## Status + +Proposal / design note. Epic: **FLARE-2698** (Client API and 3rd party integration refactoring, fix version 2.9.0). + +Revision 2 (2026-07-01): incorporates review feedback — universal session setup, owner-death and receive-side contracts, forward-path payload lifecycle, cleanup policy definition, receiver-confirmed terminal outcome, per-receiver transfer budgets, configuration surface, attach auth hardening, and a re-sequenced migration plan. + +## Background + +NVFlare supports the Client API so training scripts can interact with FLARE through a small surface: + +```python +import nvflare.client as flare + +flare.init() +model = flare.receive() +result = train(model) +flare.send(result) +flare.shutdown() +``` + +This works well for simple in-process scripts. It becomes harder to reason about when training runs in another process, under torchrun, under a scheduler, or inside an externally managed AV/third-party system. + +Several integration paths have grown up around this — an in-process executor, the subprocess Pipe/LauncherExecutor stack, AV-style IPCAgent, third-party FlareAgent, and an older multi-process executor. They are catalogued in Appendix A: Current Integration Paths. + +The new design keeps the Client API as the user-facing contract, but replaces the recommended Pipe/LauncherExecutor integration story with clearer execution modes. + +## Problem + +The current subprocess and third-party integration paths mix four concerns — how training starts, how task/result/log control messages move, how large payloads move, and how Client API semantics are exposed to user code — into one stack: + +``` +ClientAPILauncherExecutor + -> LauncherExecutor + -> TaskExchanger + -> PipeHandler + -> SubprocessLauncher + -> CellPipe/FilePipe + -> FlareAgent +``` + +This produces two distinct problems. + +1. **Lifecycle ambiguity.** Control-message acceptance is conflated with payload completion: a Pipe ACK or Cell send_request() reply means "the peer accepted the control message," but the tensor payload may still be downloading from another process. As a result producers get released, subprocesses killed, and retries fired before the bytes are actually out — the root of the races around pass-through, tensor disk offload, CCWF, retries, and subprocess exit, and the dominant failure mode for large (multi-GB) models. + +2. **Integration complexity.** The user-facing model is too large. Users may be asked to understand FlareAgent, IPCAgent, CellPipe, FilePipe, TaskExchanger, SubprocessLauncher, and LauncherExecutor, even though the desired training-script interface is only Client API. + +## Goals + +- Keep `flare.init()`, `flare.receive()`, `flare.send()`, and `flare.log()` as the trainer-facing API. +- Define execution modes by lifecycle ownership. +- Use DataBus for same-process training. +- Use Cell for all out-of-process training. +- Build on existing CellNet/F3 rather than redesigning it in this work. +- Preserve tensor streaming, lazy download, and tensor disk offload. +- Make payload lifecycle explicit enough to remove ACK ambiguity — in both directions: task delivery and result upload. +- Simplify the CCWF (swarm/cyclic/CSE) peer-to-peer transfer path by reusing the same payload-lifecycle contract instead of app-layer pass-through machinery, and make tensor disk offload safe to re-enable there. +- Cover external_process execution, torchrun, Deepspeed/wrappers, AV attach, SLURM/HPC, Docker/K8s job runtime combinations, and experiment tracking metrics. +- Keep migration compatibility for existing ScriptRunner usage. + +## Non-Goals + +- Redesigning JobLauncherSpec. +- Rewriting CellNet/CoreCell/F3 transport. +- Building a filesystem-backed CellNet/F3 transport driver in this proposal. +- Removing tensor streaming or tensor disk offload. +- Making FilePipe/NFS a recommended control-plane transport. +- Supporting multiple NVFlare Cell connections from all distributed ranks in V1. +- Exposing IPCAgent, FlareAgent, CellPipe, or FilePipe as the normal Client API integration surface. + +## Proposal + +### What We Propose + +In one paragraph: the trainer-facing Client API (flare.init/receive/send/log) stays exactly as it is; everything changing is behind it. Concretely: + +1. Replace the Pipe/launcher integration stack with one public **ClientAPIExecutor** configured by execution mode. The executor delegates internally to mode-specific backends for `in_process`, `external_process`, and `attach`. Retire ClientAPILauncherExecutor, LauncherExecutor, the generic Launcher/SubprocessLauncher, and the Pipe/TaskExchanger/FlareAgent chain as the recommended surface. The old path stays available, marked legacy, during migration. + +2. Formalize one **Client API control protocol over Cell**. All out-of-process communication (session setup, task/result/log/abort/heartbeat) goes through Cell — never a Pipe. In-process is the only exception and uses DataBus. Users never touch IPCAgent/CellPipe/FilePipe/FlareAgent. + +3. Make **payload terminal-state explicit**, consuming existing F3 transfer signals (see Payload Lifecycle and Dependencies) so "control message accepted" is never confused with "payload transferred" — for task delivery as well as result upload. + +The rest of this section details each. + +### Overview + +Introduce one public ClientAPIExecutor with three execution modes: + +| Mode | Backend | Lifecycle Owner | Primary Uses | +|---|---|---|---| +| in_process | DataBus | CJ process | local dev, simulator, simple scripts, scheduler wrappers | +| external_process | Cell | NVFlare owns external trainer process tree | python, torchrun, deepspeed, local wrappers, local side of multinode launches | +| attach | Cell | external system owns trainer | AV, manual attach, resident trainer, SLURM/K8s live attach | + +Where the trainer runs relative to the Client Job (CJ) process in each mode: + +``` +in_process: [ CJ process: executor + trainer ] DataBus +external_process: [ CJ: executor ] --Cell--> [ launched process tree: trainer ] (rank 0 = control) +attach: [ CJ: executor ] <--Cell-- [ external trainer started elsewhere ] +``` + +The architectural split is: + +- **Job runtime launch:** JobLauncherSpec +- **Training execution:** Client API execution modes +- **Control plane:** DataBus or Cell +- **Data plane:** inline, streaming, lazy download, disk offload, file/NFS, object reference + +For out-of-process modes, the control stack is always: + +``` +Client API protocol + -> Cell + -> F3 transport driver +``` + +The in-process mode is the exception because it stays inside the CJ process: + +``` +Client API protocol + -> DataBus +``` + +### Execution Modes + +#### in_process + +Runs user training code inside the Client Job process. This is the clean version of today's in-process ScriptRunner path. + +It does not create a Cell connection, launch a child process, or manage an external trainer lifecycle. + +What actually changes relative to today's InProcessClientAPIExecutor: nothing user-visible. The executor is consolidated under ClientAPIExecutor as the in_process backend, and LOG-to-analytics conversion (today the executor callback that fires `fed.analytix_log_stats`) moves with it. The payload lifecycle below is trivially satisfied in this mode — there is no cross-process transfer between trainer and CJ. When the CJ itself is the producer toward the server (lazy refs forwarded upstream), the CJ-side transfer is governed by the same shared payload-lifecycle contract; that obligation exists today and is unchanged by this design. + +#### external_process + +Starts and owns an external trainer process tree. + +**Why this mode exists.** The primary driver is multi-GPU / multi-process training — +torchrun, Deepspeed, Horovod, `mpirun`. NVFlare deliberately does not reimplement +inter-rank communication or the elastic-launch machinery those tools already provide; +instead it shells out to them (`subprocess.Popen("torchrun train.py ...")`) and talks to +**only rank 0** over Cell, which shares the model with the other ranks through the training +framework's own collectives (see the Rank Contract). Two design consequences follow directly +from this: NVFlare launches and owns the local process tree (so a bare launch-scoped token +over the localhost connection it created is sufficient authentication — full challenge-response +is an attach-mode concern, see Appendix B), and per-task trainer launch stays out of +JobLauncherSpec (see Alternatives Considered) because the launcher NVFlare invokes *is* the +distributed launcher. + +Responsibilities: + +- prepare Client API bootstrap config (including a launch-scoped session token; see Session Setup) +- start the configured command +- own process group lifecycle +- accept the trainer's HELLO and bind the trainer Cell connection to the current job/task stream +- exchange task/result/log control messages +- keep the trainer process alive until payload transfer completes +- terminate/cancel the process tree on abort/end + +This replaces the recommended public story of ClientAPILauncherExecutor + LauncherExecutor + SubprocessLauncher + Pipe. SubprocessLauncher may remain as an internal helper during migration, but it should not be the user-facing abstraction. + +external_process mode should be configured through `ClientAPIExecutor(..., command=...)` or the equivalent FedJob/ScriptRunner field. It can still use an internal process runner to spawn, monitor, and terminate the external trainer process tree. + +**Process-tree termination.** The process runner starts the trainer in its own process group/session (as SubprocessLauncher does today with `start_new_session=True` on POSIX). Orderly stop is SIGTERM to the group, a bounded grace period, then SIGKILL to the group; on Windows, terminate the process tree via the job-object/taskkill equivalent. The CJ records the trainer process-group id in the job workspace so the client CP can reap an orphaned trainer tree as defense-in-depth (see CJ Failure below). + +external_process mode does not mean single-node-only. It means the trainer runs outside the CJ process, while NVFlare still owns the local command it starts. That command may be torchrun, Deepspeed, or another distributed launcher that joins a multinode training job, provided rendezvous configuration and the other nodes are arranged by the user, scheduler, or site platform. NVFlare owns only the process tree it started; remote ranks outside that tree are governed by the distributed launcher/scheduler contract. + +#### attach + +Passive mode: it waits for an externally started trainer (AV, scheduler, or a user) to attach. Responsibilities: + +- create attach id/token/config +- authenticate and authorize the attaching trainer +- map attach session to peer FQCN +- bind trainer to job/site/task stream +- exchange task/result/log control messages +- maintain heartbeat/session lease for the attached trainer +- send abort/shutdown control messages + +NVFlare does not start the trainer in this mode; it only enforces the Cell session, heartbeat, token, and payload lifecycle. Because it does not own the process, the external trainer/platform must honor the contract that it does not exit or cleanup producer-owned streaming resources before terminal payload state. (A narrow optional hook for NVFlare to also start/poll/cancel the trainer — e.g. for SLURM/K8s — is a possible later addition; see Future Enhancements.) + +### Client API Backends + +The public trainer API remains: + +```python +flare.init() +flare.receive() +flare.send() +flare.log(...) +flare.shutdown() +``` + +(`flare.get_task_name()` is listed in the rank contract below; it exists in `nvflare/client/api.py` today but is not exported from the package `__init__` — exporting it is part of this work.) + +Internally, `flare.init()` binds to: + +- **In-process backend:** DataBus control/data path +- **Cell backend:** Cell control path for external_process and attach modes + +A Client API **session** is the runtime state for one trainer connection: identity, auth, current task, heartbeat, payload policy, and shutdown state. It is not a separate user-visible concept. + +**Session invariant (V1).** A session owns at most one active *task* at a time; it may additionally have open *transfers* from earlier tasks draining in the background (the fan-out case below). For the default single-receiver result, `flare.send()` blocks until the terminal transfer outcome (see Payload Lifecycle), so the previous task is fully terminal before the next TASK_READY — matching the blocking behavior of today's subprocess send path. For a workflow-declared multi-receiver result (CCWF fan-out), `flare.send()` returns at RESULT_ACCEPTED with the payload registered for download; the transfer keeps draining in the background while the session accepts its next task, and the producer-liveness obligation attaches to the session/process rather than the send call — `flare.shutdown()` and executor process stop block until all open transfers reach terminal state. Without this, staged fan-out would deadlock: CSE/broadcast receivers pull a producer's result during their own later tasks, which could never be dispatched if every producer sat blocked in send. Per-task control state (task id) resets when the task completes; each result's (result_id, transfer_id) state persists until its transfer reaches a terminal state; session identity, auth, and heartbeat state persist across rounds. + +Note the trainer side is not a thin shim: the Cell backend of the Client API replaces FlareAgent's protocol engine (task queueing, heartbeat, teardown handling, payload waits) inside the trainer process. This is real scope, accounted for in Migration Plan step 3. + +### JobLauncher Boundary + +JobLauncherSpec launches the NVFlare job runtime itself, not the user training code inside that runtime. + +Examples: ProcessJobLauncher, DockerJobLauncher, K8sJobLauncher. + +This proposal does not require changes to JobLauncherSpec. + +The axes can combine: + +| Job runtime launch | Training execution | Example | +|---|---|---| +| ProcessJobLauncher | in_process | local simple Client API job | +| ProcessJobLauncher | external_process | local torchrun from CJ | +| ProcessJobLauncher | in_process or external_process | wrapper submits SLURM trainer and reads artifacts | +| ProcessJobLauncher | attach | AV or scheduler starts trainer; CJ waits for it to attach over Cell | +| DockerJobLauncher | any Client API mode | CJ container chooses training execution mode | +| K8sJobLauncher | any Client API mode | CJ pod chooses training execution mode | + +Avoid hiding placement in the wrong axis. For example, prefer DockerJobLauncher over `external_process(command="docker run ...")` for Docker placement. + +### Control Protocol + +Two vocabularies appear below and should not be conflated: **control messages** (this section) are wire messages over Cell; **payload-transfer states** (next section, e.g. PAYLOAD_ACQUIRED, TRANSFER_COMPLETE) are logical states observed from the lower transfer layer and need not be separate wire messages. Payload streaming reuses shared Cell/FOBS/F3 transfer behavior rather than a Client-API-specific loop. + +The logical control protocol, grouped by when each message is used: + +**Session setup (all out-of-process modes):** + +``` +trainer -> CJ : HELLO (identity, attach/session id, rank, protocol version) +CJ -> trainer : HELLO_CHALLENGE (nonce) [or ERROR] +trainer -> CJ : HELLO_PROOF (token proof over the nonce; see Appendix B) +CJ -> trainer : HELLO_ACCEPTED (or ERROR) +``` + +**Per task (every round):** + +``` +CJ -> trainer : TASK_READY (task id, task name, FLModel ref, params) +trainer -> CJ : TASK_ACCEPTED +...trainer pulls task payload, trains... +trainer -> CJ : TASK_FAILED (task id, reason — e.g. task payload download failed) [failure path] +trainer -> CJ : RESULT_READY (result_id, transfer_id, manifest) +CJ -> trainer : RESULT_ACCEPTED (or RESULT_REJECTED) +...payload transfer runs (see Payload Lifecycle)... +``` + +**Throughout:** + +``` +trainer -> CJ : LOG (metrics/log lines) +both : HEARTBEAT (liveness/session lease) +``` + +**Teardown / failure:** + +``` +CJ -> trainer : ABORT | SHUTDOWN +trainer -> CJ : BYE +either : ERROR +``` + +Notes on use: + +- **Session setup is universal, not attach-only.** Owning the launched PID does not tell the CJ when the trainer's Cell is connected, which FQCN it bound, or that `flare.init()` completed — and the first per-task message is a CJ→trainer push. So external_process performs the same HELLO handshake as attach, proving possession of a launch-scoped session token that the executor generated into the bootstrap config at launch time. Attach differs only in token delivery (out-of-band, by the platform/operator) and auth policy (Appendix B; a one-round variant that folds the proof into HELLO is available where the channel already provides confidentiality). One handshake thus provides readiness signaling, stale-process rejection (a leftover trainer from a previous run holds a stale token and is refused), and version negotiation in all out-of-process modes. This generalizes what the legacy stack did implicitly: today the CJ waits for the subprocess's first Pipe heartbeat before sending tasks, and CellPipe embeds a job-scoped token in the FQCN to reject strange peers. +- **Protocol version** is carried in HELLO in both modes. Version skew is actually most likely in external_process — the launched command routinely runs a different nvflare install (a user venv/conda env or container image). On mismatch the executor rejects the HELLO with a clear error and fails the task; V1 supports exactly one protocol version, and the version field exists so later versions can define a compatibility window. +- TASK_READY/TASK_ACCEPTED and RESULT_READY/RESULT_ACCEPTED are the request/reply core, one cycle per `flare.receive()`/`flare.send()`. +- **TASK_READY idempotency.** TASK_READY carries the task id; the trainer backend treats a duplicate/redelivered TASK_READY for the current task id as idempotent and replies with its current task state instead of double-delivering the task to user code. Because Cell gives request/reply semantics, the CJ does not blind-resend TASK_READY while the session is alive; redelivery arises only around reconnect/retry edges. +- LOG and HEARTBEAT flow on the same Cell connection — no separate metrics pipe. (This is not a new co-mingling: today's "metrics pipe" is a second logical CellPipe channel on the same underlying Cell connection.) +- PAYLOAD_ACQUIRED/TRANSFER_COMPLETE/TRANSFER_FAILED are not sent by the Client API backend as wire messages; they are payload-transfer states it observes from the lower layer (next section). + +The payload-transfer states are detailed in the next section. The one thing to note here: RESULT_ACCEPTED only means the control message was accepted — not that the payload finished transferring. A producer-facing blocking facade (`flare.send()` returning only at terminal transfer state — the default for single-receiver results; declared fan-out results return at RESULT_ACCEPTED and drain in the background, see the session invariant) must block only that call, not Cell dispatcher threads, download workers, or the CJ relay path; the lower layer stays asynchronous internally — heartbeats and LOG messages continue autonomously while a send blocks. This does not cost throughput — bytes still stream through Cell/FOBS/F3 — it only fixes lifecycle ownership. + +#### Receive-Side Contract + +The send side gets a state machine below; the receive side needs an explicit contract too, because the protocol is push-style while the public API is a blocking `receive(timeout=None)`: + +- The trainer backend queues TASK_READY messages as they arrive; `flare.receive(timeout)` blocks on that queue. On timeout it returns None with the session still healthy (matching today's `Optional[FLModel]` signature). +- On clean end — SHUTDOWN received, or the job ends and the session is closed in an orderly way — `flare.receive()` returns None and `flare.is_running()` returns False. This is the contract the batch-artifact wrapper loop (`if model is None: break`) relies on. +- On ABORT, the current task is cancelled: a blocked `receive()`/`send()` raises a session exception (the Cell-backend analog of today's `AgentClosed`), and `flare.is_running()` returns False. +- On session loss (heartbeat timeout / Cell disconnect, see Heartbeat and Liveness), blocked calls raise the same session exception; user code is expected to exit its training loop. + +TASK_ACCEPTED acknowledges control receipt of the task message only — it does not mean the task payload has been materialized (see Forward Path below). + +#### Heartbeat and Liveness + +- Both sides send HEARTBEAT on the session's Cell connection. Recommended defaults follow the legacy Pipe values: interval 5 s, miss timeout 30 s, tunable in the bootstrap config. +- In external_process the executor has two liveness signals: process exit (waitpid on the process tree it owns) and heartbeat. Process exit is authoritative for "trainer is gone"; heartbeat covers a live-but-wedged trainer. +- In attach the heartbeat lease is the only liveness signal the executor has. +- **Precedence rule for transfers:** active data-plane transfer progress counts as session liveness. While a session has an in-flight transfer (a result in WAIT_PAYLOAD_ACQUIRED/WAIT_TRANSFER_COMPLETE, including background fan-out drains), the session lease does not expire on missed control heartbeats alone; the transfer's own idle/progress policy (shared F3 layer) governs, and only transfer failure/timeout or an explicit abort ends the session. This resolves the otherwise-contradictory pair "heartbeat timeout invalidates the session" vs "the executor must not revoke the session before terminal payload state" — and heartbeat false-positives during multi-GB transfers are precisely the failure class this design exists to remove. (The legacy stack solved this with a dedicated STREAM_PROGRESS topic feeding transfer waits, deliberately separate from peer liveness; the same separation is preserved here, just owned by the shared transfer layer.) + +#### CJ Failure (Owner Death) + +The rules above are written from the executor's viewpoint; the inverse failure — the CJ dying while the trainer is alive — needs its own contract, because lifecycle ownership is this design's defining axis: + +- **Trainer side, both out-of-process modes:** the trainer backend treats sustained heartbeat loss or Cell disconnect as CJ death. The same in-flight-transfer precedence rule applies on this side too — a trainer serving an active, progressing transfer does not self-terminate on missed control heartbeats alone. In external_process it then self-terminates the trainer process group after a bounded grace period (configured in the bootstrap config) — the process tree must not outlive its owner. In attach mode it raises the session exception out of blocked Client API calls and stops serving the session; whether the external platform tears the trainer down is that platform's decision. +- **Producer mid-transfer:** a producer whose CJ/session has died aborts its local transfer resources (cancels the DownloadService transaction) rather than continuing to serve pulls for a dead job, unless the manifest already satisfies a persistent cleanup policy (see Cleanup Policy). +- **Site-level reaping (defense-in-depth):** the CJ records the launched trainer's process-group id in the job workspace at launch; the client CP reaps any recorded trainer group whose CJ is gone. This covers SIGKILL/OOM of the CJ, where the trainer-side grace logic may be the only survivor and GPUs would otherwise leak. +- **Attach after CJ restart:** in V1, a CJ restart fails the attach session — the session table and token digest died with the executor, and the single-session token cannot be replayed to the new CJ. The platform re-runs its attach delivery against the restarted job (fresh token) or the task fails and is rescheduled. Automatic token redelivery/renegotiation on restart is a Future Enhancement. + +This replaces, and must be at least as strong as, the legacy stack's PEER_GONE topic, which today gives both sides an explicit peer-death signal. + +### Payload Lifecycle State Machine + +The payload lifecycle is normative for out-of-process Client API modes, but it should be implemented by consuming lower-layer Cell/FOBS/F3 transfer states rather than by reimplementing streaming waits in Client API. It applies in **both directions**; the result direction is specified first because it is where producer cleanup is gated, then the forward (task delivery) direction, which is where the known large-model failures first appear. + +Identifiers: + +- `session_id`: one attached/local trainer connection. +- `task_id`: task currently owned by the session. +- `result_id`: idempotency key for one `flare.send()` call. +- `transfer_id`: id for payload transfer and cleanup coordination. +- `payload_id`: one manifest entry inside a result. +- `download_tx_id`: F3 DownloadService transaction id, when applicable. +- `download_ref_id`: F3 object ref id inside a download transaction. + +For Cell/FOBS-backed streaming, the Client API backend should set: + +- `MessageHeaderKey.MSG_ROOT_ID = transfer_id` +- `MessageHeaderKey.MSG_ROOT_TTL = transfer_timeout` +- DownloadService `tx_id = transfer_id` when a single transaction is used + +If multiple DownloadService transactions are needed, the manifest must carry the mapping from transfer_id to all download_tx_id and download_ref_id values. + +**Result send state machine.** The happy path runs down the left; any state can branch to a terminal failure on the right. The producer (trainer) is held alive until a terminal state is reached. + +``` +flare.send(result) +READY_TO_SEND ───────────── validation fails ──────► RESULT_REJECTED [terminal] + │ RESULT_READY(result_id, transfer_id, manifest) +WAIT_RESULT_ACCEPTED ─────── rejected ─────────────► RESULT_REJECTED [terminal] + │ RESULT_ACCEPTED (control message accepted — NOT payload done) +WAIT_PAYLOAD_ACQUIRED ────── fail / timeout ───────► TRANSFER_FAILED [terminal] + │ PAYLOAD_ACQUIRED (consumer registered/began the pull — bytes not done) +WAIT_TRANSFER_COMPLETE ───── fail / timeout ───────► TRANSFER_FAILED [terminal] + │ TRANSFER_COMPLETE (transfer-layer terminal outcome) +DONE_CLEANUP_ALLOWED [terminal — producer may exit and free payload] + +ABORT at any state ────────────────────────────────► ABORTED [terminal] +``` + +**Terminal states are DONE_CLEANUP_ALLOWED (success), TRANSFER_FAILED, RESULT_REJECTED, and ABORTED.** (TRANSFER_COMPLETE / TRANSFER_FAILED are the transfer-layer outcomes the machine consumes; TRANSFER_COMPLETE moves the machine to DONE_CLEANUP_ALLOWED. Earlier drafts used the two vocabularies interchangeably; they are now distinct.) RESULT_ACCEPTED is reached well before cleanup is allowed — that gap is the whole point: an accepted control message is not a completed payload transfer. + +Logical ownership: + +- RESULT_READY is emitted by the trainer-side Client API backend. +- RESULT_ACCEPTED is emitted by the executor/CJ after validating session, task id, result id, and manifest. +- PAYLOAD_ACQUIRED means the final consumer has registered/pinned the transfer — it issued the download request and the producer's DownloadService transaction is now active, or the consumer otherwise committed to pulling. Bytes are not yet fully transferred. Its value is liveness: if PAYLOAD_ACQUIRED never fires within a bound, no consumer ever showed up, and the result can fail fast instead of waiting the full transfer timeout. For inline payloads it is immediate. +- **Observability note:** F3 already exposes a public producer-side `progress_cb` on `new_transaction`/ObjectDownloader that fires an ACTIVE-state event with receiver identity on each receiver's first download request. PAYLOAD_ACQUIRED in V1 is implemented by consuming that signal per receiver (no new F3 surface needed), with a per-receiver acquire budget for fail-fast (see Per-Receiver Budgets). The load-bearing signal remains TRANSFER_COMPLETE. +- TRANSFER_COMPLETE means all required bytes have been pulled successfully by every expected receiver and the producer may free resources. For DownloadService, this requires transaction termination plus an aggregate all-receivers-success outcome; the current FINISHED status alone means the transaction reached its receiver count and must not be treated as proof of success. For blob streams, successful StreamFuture completion supplies the success outcome. This — not PAYLOAD_ACQUIRED — is what gates `Downloadable.release()` and producer/subprocess exit. + +So the two states are not interchangeable: PAYLOAD_ACQUIRED answers "did a consumer commit to pulling?" (liveness / fail-fast), TRANSFER_COMPLETE answers "are the bytes safely out?" (cleanup gate). For a large model these can be seconds-to-minutes apart, which is exactly why both exist. + +**Terminal transfer outcome — the first contract to build.** The backend exposes exactly one normalized terminal outcome per transfer: TRANSFER_COMPLETE or TRANSFER_FAILED. It derives that outcome from a DownloadService terminal callback together with aggregate receiver outcomes, or from StreamFuture completion/error for blob streams. The current DownloadService callback statuses (FINISHED/TIMEOUT/DELETED) do not by themselves distinguish all-receivers-success from a transaction that reached its receiver count with a receiver failure, so the shared payload layer must expose that distinction first. Three refinements are part of that shared-layer work, because today's per-receiver SUCCESS is recorded when the producer serves the final chunk (produce() returns EOF) — i.e., it is producer-served, not receiver-confirmed: + +1. **Receiver-confirmed terminal status.** The receiver reports its own terminal outcome (consume/finalization success or failure — e.g. a disk-offload write error after the last chunk) back to the producer's transaction as a small completion message, and `downloaded_to_one` reflects receiver truth rather than served-EOF. Without this, a receiver that fails after its last pull is invisible to a producer that has already resolved TRANSFER_COMPLETE. +2. **Bounded post-completion linger.** The producer does not exit at the instant of transaction_done; it lingers for a bounded grace matched to the receiver-side chunk-retry budget and the finished-ref tombstone window, so a receiver whose final EOF reply was lost can retry against a still-live producer instead of failing after the producer resolved success. (Today the tombstone healing only works while the producer process is alive; immediate exit at transaction_done would defeat it.) +3. **Retry-aware per-receiver accounting.** A receiver's terminal status is recorded after its retry budget is exhausted, not at first failure, so data-plane retries do not prematurely mark a receiver failed. + +The producer-facing wait is then an awaitable facade over those signals: `flare.send()` awaits it for single-receiver results, while for declared fan-out results the same awaitable gates session close and process release instead (see the session invariant). These are two linked implementation tasks (the normalized aggregate outcome with the three refinements above, and the awaitable facade); without both, the payload lifecycle is not enforceable. + +**Retry rules:** + +- RESULT_READY is retried with the same result_id and transfer_id. +- The executor must treat duplicate RESULT_READY messages as idempotent. +- If the result was already accepted, reply with the current state rather than creating a second result or second transfer. +- Payload download retries happen inside the data-plane mechanism and must not create a new result_id. +- TASK_READY redelivery is idempotent by task id (see Control Protocol). + +**Failure and timeout rules:** + +- If validation fails before result acceptance, executor emits RESULT_REJECTED with a reason and the producer may cleanup. +- If payload acquisition fails or times out, observer emits TRANSFER_FAILED with status and reason. +- If the producer exits before terminal state, executor treats the result as failed unless the manifest already satisfies the declared cleanup policy (see Cleanup Policy). +- If abort/shutdown arrives, producer cancels local transfer resources and reports terminal failure if possible. +- If the CJ/session dies before terminal state, the producer aborts its local transfer (see CJ Failure). + +**Cleanup rule:** + +Producer-owned resources must stay alive until one terminal state is reached. + +- external_process executors must not stop the trainer before terminal state. +- Attach executors must not revoke the session before terminal state unless aborting (and see the heartbeat precedence rule — an in-flight transfer keeps the lease alive). + +Process ownership determines how strongly NVFlare can enforce this rule: + +- external_process owns the producer process tree and must keep it alive until terminal state or explicitly fail/abort the task. +- attach owns the attach session and lease, but not always the producer process. If the attached trainer exits early, the executor marks the transfer failed unless the manifest already satisfies the cleanup policy. + +#### Cleanup Policy + +`cleanup_policy` appears in the result envelope and is load-bearing in the failure rules above, so it needs concrete semantics. V1 defines two values, per payload manifest entry: + +- **LIVE_PULL (default):** the payload is served from producer-owned live resources (DownloadService refs, stream sources). The producer must survive to terminal transfer state; early producer exit fails the result. +- **PERSISTENT_ARTIFACT:** the payload is fully materialized in a durable location named by the manifest (file/NFS path, object-store reference) before RESULT_READY is sent. Producer exit after RESULT_ACCEPTED is safe; consumers pull from the durable location on their own schedule. + +"The manifest satisfies the cleanup policy" means: every payload entry in the manifest is PERSISTENT_ARTIFACT (or inline). A result mixing LIVE_PULL and PERSISTENT_ARTIFACT entries is governed by the strictest entry. Scheduler batch-artifact flows are the intended users of PERSISTENT_ARTIFACT; live tensor streaming is always LIVE_PULL. + +#### Forward Path: Task Payload Delivery + +The forward direction — global model from server/CJ to the trainer — is where the known large-model field failures first appear (the 5 GB × 16-client failure manifests on task_payload_download, with the CJ resending while the subprocess is still materializing). The same lifecycle discipline applies, with the roles reversed: the CJ/server side is the producer, the trainer is the consumer. + +- **TASK_READY is control-only.** It carries the task metadata and FLModel lazy refs; TASK_ACCEPTED acknowledges the control message, before any payload materialization — the same "accepted ≠ transferred" caveat as RESULT_ACCEPTED. +- **The trainer's pull is governed by the same shared transfer policy.** `flare.receive()` materializes (or lazily holds) the model by pulling through the shared F3 path; idle/progress/timeout policy is the shared layer's, and the upstream producer (CJ relay or server job process) is held to the same producer-liveness rule — its refs stay alive until the trainer's pull reaches a terminal state. +- **Trainer-side download failure is explicit.** If the task payload pull fails or times out, the trainer backend sends TASK_FAILED with the task id and reason; the executor fails or retries the task at the workflow's discretion. There is no silent hang and no ambiguous half-delivered task. +- **No blind resend.** The CJ does not resend TASK_READY on a timer while the session is alive (Cell request/reply plus TASK_READY idempotency replace the Pipe resend loop that caused duplicate-delivery races). + +Relationship to `progress_aware_streaming.md`: that design's Phase-1 wait policies are hosted today in TaskExchanger (CJ-side forward waits) and FlareAgent (trainer-side reverse waits) over an internal Pipe topic — exactly the components this design retires. The progress-tracking substrate it defines (per-transfer progress in the shared F3 layer) is the part that survives and is what the waits here consume; the Pipe-topic delivery and TaskExchanger/FlareAgent wait owners die with the legacy stack. The forward-path contract above is the re-homed replacement: readiness comes from HELLO, delivery from Cell request/reply, and materialization waits from the shared transfer layer instead of resend suppression. + +#### Per-Receiver Budgets and Timeouts + +DownloadService's transaction timeout is inactivity-based, and any receiver's activity resets the whole transaction's timer — so a progressing receiver can mask a stalled one, and (for staged multi-receiver workflows) a legitimately late receiver is indistinguishable from a dead one under a single transaction-wide bound. The shared payload layer therefore tracks budgets per (transfer_id, receiver): + +- **Acquire budget:** how long each expected receiver has to issue its first pull. For CCWF, the workflow supplies an expected-pull window per receiver stage (an evaluator that pulls only when its evaluate task is scheduled gets a window derived from the workflow's stage schedule, not a generic streaming timeout). +- **Idle budget:** per-receiver progress timeout once pulling has started — a stalled receiver is failed individually without resetting or being masked by others. +- The transaction-wide TTL (MSG_ROOT_TTL / transfer_timeout) is derived as an envelope over the per-receiver windows, not tuned independently — this is what keeps the design from reintroducing the coupled-timeout tuning it lists as a pain point. + +A receiver that exhausts its acquire or idle budget is marked failed for the aggregate outcome (freeing the producer per the fan-out policy) without waiting for the full transaction TTL. + +### Data Plane + +Payloads should be represented by a manifest rather than implied by the control transport. + +Possible payload locations: inline payload, Cell streaming reference, lazy download reference, disk offload reference, NFS/shared file reference, object store reference. + +Example conceptual result envelope: + +``` +ResultEnvelope + task_id + result_id + return_code + metrics + payload_manifest # entries: payload_id, location type, refs, size + cleanup_policy # LIVE_PULL | PERSISTENT_ARTIFACT (per entry; see Cleanup Policy) +``` + +File/NFS remains useful, especially for HPC, but only as a data plane. + +### Tensor Streaming And Disk Offload + +The new design preserves tensor streaming, lazy download, and tensor disk offload. What changes is ownership. + +Today, large-payload behavior is coupled to Pipe mechanics: + +- forward path: CJ receives lazy refs instead of materializing global model tensors, then re-emits those refs to the subprocess +- reverse path: subprocess-side `CellPipe.pass_through_on_send=True` makes CJ receive lazy refs for result tensors, then the server downloads from the subprocess +- subprocess exit is gated on a download-complete callback, with `download_complete_timeout` as the bound/fallback and a deferred stop polling for natural exit +- tensor disk offload is controlled through receiver-side Cell FOBS context + +These mechanisms should remain. The problem is that Pipe ACK, process exit, lazy-reference lifetime, retry, and tensor cleanup are entangled. + +In the new model, PASS_THROUGH becomes lower-layer payload-transfer behavior, not a Pipe feature or a new user-facing Client API concept. The transfer metadata must make multi-hop ownership explicit, especially for: + +``` +external trainer process + -> Client Job process as relay + -> Server Job process as final consumer +``` + +In that path, the CJ may intentionally skip tensor materialization and forward lazy references so the server-side job process streams tensors directly from the producer's DownloadService. The important rule is that the producer's resources remain alive until the final consumer's transfer reaches a terminal state, not merely until the CJ ACKs the control message. + +For external_process and attach modes, the producer must stay alive until the terminal transfer outcome. For scheduler batch artifact mode, use explicit file/NFS or object-store payload manifests (PERSISTENT_ARTIFACT) instead of live pass-through. + +### How Client API maps onto existing F3 features + +Concretely, each capability reuses what nvflare/fuel/f3 already provides — the Client API backend wires them together rather than adding new transport code: + +| Need | Existing F3 feature used | Client API backend responsibility | +|---|---|---| +| send a result blob / large tensor | `stream_cell.send_blob` (returns StreamFuture); DownloadService ref for lazy/large objects | put the ref in the result manifest; wait on the future/transaction for terminal state | +| lazy download (don't materialize) | DownloadService transactions; manifest holds download_tx_id/download_ref_id | carry refs in the manifest; do not pull bytes on the control path | +| pass-through relay (CJ forwards refs without materializing) | PASS_THROUGH already lives in the Cell layer — `MessageHeaderKey.PASS_THROUGH` per message, `cell.decode_pass_through_channels` per channel, carried in the FOBS decode context | set/honor PASS_THROUGH on the relay channel; do not reimplement forwarding in Client API | +| tensor disk offload | receiver-side Cell FOBS decode context | leave as a receiver-side storage policy; unchanged by execution mode | +| consumer-began-pulling liveness | producer-side `progress_cb` ACTIVE events with receiver identity | map to PAYLOAD_ACQUIRED per receiver; enforce acquire budgets | +| keep producer alive until consumer done | DownloadService terminal callback + aggregate receiver outcome + `Downloadable.release()` | resolve the normalized terminal outcome (with receiver confirmation and post-completion linger), then release the producer/session | + +So pass-through stays in the Cell layer (it already exists there); the Client API change is to stop treating it as a CellPipe feature and instead set it on the relay channel and consume the terminal-state callback. Tensor disk offload is untouched — it remains a receiver-side FOBS storage policy, independent of whether the producer is a launched trainer process or an attached trainer. + +### Payload Transfer Boundary + +This proposal should not define a new streaming subsystem. Detailed streaming progress behavior belongs in shared Cell/FOBS/F3 payload-transfer code so Client API, Collab API, and lower-level Executor/Controller paths can use the same behavior. Streaming liveness is an end-to-end property: both the source and the final consumer must stay alive across every relay hop, not just until the next control ACK (a CJ ACK means the relay accepted the result metadata, not that the final consumer finished downloading). + +F3 already provides most required primitives: transaction termination, timeout/deletion, per-download status callbacks, per-receiver first-pull progress events, and receiver identity. It does not yet expose one aggregate terminal outcome that distinguishes all-receivers-success from completed-with-receiver-failure, nor receiver-confirmed terminal statuses, nor per-receiver budgets. The shared payload layer must normalize those primitives before Client API consumes them. Given that, the contract is: + +> The Client API Cell protocol preserves the transfer ids and waits for the lower-layer terminal state before releasing the producer; it does not reimplement streaming progress or completion. + +This execution-mode design only requires the following boundary contract: + +- control ACK is not payload completion; the control-ACK timeout is separate from the payload-transfer timeout — in both directions (task delivery and result upload) +- streamed/offloaded payloads have a stable transfer identity, and Client API binds its result lifecycle to that transfer_id +- relay roles are explicit, so a CJ can forward lazy refs without becoming the final payload consumer +- producer cleanup waits for terminal transfer state; the result fails if the producer, relay, or final consumer disappears before then +- retries are idempotent by stable task/result/transfer ids +- receiver identity is available when multiple final consumers may pull the same source ref, and liveness/budgets are tracked per receiver + +Who observes completion, and how it reaches `flare.send()`: because consumers pull directly from the producer's DownloadService (the ref carries the producer's fqcn + ref_id; a relay only forwards the ref), the producer observes final-consumer termination locally — its transaction callback fires when the expected pulls have reached terminal outcomes, and (with the receiver-confirmation refinement) those outcomes reflect receiver truth. The producer combines that callback with aggregate receiver results to resolve TRANSFER_COMPLETE or TRANSFER_FAILED. The relay does not forward a completion signal back, and there is no multi-hop signal to thread. The awaitable facade waits on that normalized local outcome — in the producer's `flare.send()` for single-receiver results, or gating session close/process release for declared fan-out results (see the session invariant). This is why relay depth does not add lifecycle complexity: the terminal outcome is still resolved at the producer. + +The API surface may differ, but the lower-level transfer contract should remain shared: + +``` +Client API / Collab API / Executor-Controller APIs + -> shared payload lifecycle and streaming progress contract + -> Cell + -> F3 streaming and transport drivers +``` + +### CellNet Boundary and Dependencies + +Decision: build the Client API Cell backend on top of existing CellNet/F3 — primarily an integration effort, not a transport build-out. Existing F3 supplies the transport and transaction primitives, but the shared payload layer must add an aggregate terminal success/failure outcome (receiver-confirmed, with post-completion linger and per-receiver budgets) before Client API can safely gate cleanup. No broad CellNet rewrite is in scope. + +New work belongs above CellNet: + +- define Client API Cell protocol topics +- maintain sessions; validate session tokens (attach tokens and launch-scoped tokens) and job/site/task identity +- translate Client API calls into protocol messages +- call the shared payload-transfer APIs with explicit transfer ids and wait semantics, and map terminal transfer states to result success/failure, cleanup, and process/session release + +Two linked implementation tasks remain; the other dependencies are settled for V1: + +- **Aggregate terminal outcome and awaitable facade.** First, the shared payload layer must combine transaction termination with receiver-confirmed receiver outcomes so it can report TRANSFER_COMPLETE only when every expected receiver succeeded, and TRANSFER_FAILED otherwise — including the post-completion linger and retry-aware accounting described in Payload Lifecycle. Second, a producer-facing wait must be layered over the resulting DownloadService/StreamFuture signals — consumed by `flare.send()` for single-receiver results and by session close/process release for declared fan-out results (see the session invariant). Neither piece exists as the required end-to-end contract today. (The acquisition signal, by contrast, already exists: the public producer-side `progress_cb` fires ACTIVE with receiver identity on first pull.) +- **Receiver count (settled for V1).** downloaded_to_all requires a known num_receivers. Workflow-driven cases supply it directly — CCWF declares it (aggregator = 1, broadcast/CSE = N; see CCWF Transfer Path), and ordinary client→server is always 1. The count reaches the producer through the existing FOBS-context NUM_RECEIVERS mechanism when the result is registered for download. Only a genuinely unknown fan-out (consumer set not known at produce time) would need count discovery; no such case is in V1 scope, and it would fall back to the transaction timeout. + +If a future deployment needs control messages over shared storage (no usable network path), model that below Cell as a filesystem-backed CellNet/F3 transport driver or relay in a separate design/PR — the Client API protocol still sees Cell semantics. Likewise, if implementation uncovers a genuinely missing CellNet capability, capture it as a separate transport-focused design/PR (see Non-Goals). + +### Configuration Surface + +The executor being replaced exposes a large surface (~30 constructor params on ClientAPILauncherExecutor, plus five components ScriptRunner wires: task CellPipe, metric CellPipe, MetricRelay, SubprocessLauncher, ExternalConfigurator). The new surface must be explicit about what an implementer writes and where each legacy knob goes. + +**ClientAPIExecutor arguments (V1):** + +```python +ClientAPIExecutor( + execution_mode="in_process" | "external_process" | "attach", + # external_process only + command="python train.py ...", # or "torchrun ...", "deepspeed ..." + launch_once=True, # launch per job (default) vs per task + launch_timeout=..., shutdown_timeout=..., stop_grace_period=..., + # in_process only — the trainer script NVFlare runs in the CJ process + task_script_path=..., task_script_args=..., + # session / protocol + heartbeat_interval=5.0, heartbeat_timeout=30.0, # out-of-process only + task_wait_timeout=..., result_wait_timeout=..., + # task-name mapping — powers flare.is_train()/is_evaluate()/is_submit_model() + train_task_name="train", evaluate_task_name="validate", + submit_model_task_name="submit_model", train_with_evaluation=False, + # memory management (carried forward from ScriptRunner) + memory_gc_rounds=0, cuda_empty_cache=False, + # attach only + attach_timeout=..., allow_reconnect=False, +) +``` + +Mode-scoped arguments are validated at construction: an argument set for a mode +that ignores it (e.g. `command` in in_process, `heartbeat_interval` in in_process, +`attach_timeout` outside attach) is rejected with a clear error rather than +silently dropped. `task_script_path`/`task_script_args` (the in_process trainer +entry point), the task-name mapping, and the memory knobs are carried forward from +today's InProcessClientAPIExecutor/ScriptRunner so existing jobs map without loss. + +**No parameter converters on the executor (per FLARE-2698).** The `params_exchange_format`, +`params_transfer_type`, `server_expected_format`, and `from/to_nvflare_converter_id` arguments +of the legacy executors are intentionally absent. Conversion between the framework-agnostic +aggregation representation (numpy) and the framework-native training representation +(`torch.Tensor`, Keras weights) moves out of the executor into **send/receive filters at the +client edge** — the same DXO-transformation mechanism NVFlare already uses (e.g. the PT +quantizer/dequantizer filters). The intermediate layers (executor, Cell) pass through +untransformed. Recipes/ScriptRunner auto-wire the framework's conversion filter (as the *last* +task-data filter and *first* task-result filter, so privacy/DP/HE filters still operate on +numpy). Because these filters run client-side on task receipt, `flare.receive()` still hands the +training script native tensors — the ergonomic is preserved, the executor surface is not. +Transfer type (FULL/DIFF) is a separate axis handled by the Client API's model_registry, not a +converter; it is decided independently of this removal. + +Example client job config (external_process): + +```json +{ + "executor": { + "id": "client_api_executor", + "path": "nvflare.app_common.executors.client_api_executor.ClientAPIExecutor", + "args": { + "execution_mode": "external_process", + "command": "torchrun --nproc_per_node=2 custom/train.py" + } + } +} +``` + +One executor component, no pipes, no launcher, no MetricRelay: LOG messages arrive on the session's Cell connection and the executor's Cell backend converts them into `fed.analytix_log_stats` analytics events on the CJ side (the role MetricRelay plays today for ex-process, and the in-process executor's log callback plays for in-process). ExternalConfigurator's job — writing the trainer bootstrap config — moves into the external_process backend's launch step. + +**Legacy knob disposition (summary):** + +| Legacy knob/component | Disposition | +|---|---| +| launch_external_process, command | `execution_mode` / `command` | +| launch_once, launch_timeout, shutdown_timeout | kept, same meaning | +| external_pre_init_timeout | replaced by HELLO wait (launch_timeout covers it) | +| heartbeat_interval / heartbeat_timeout | kept, session heartbeat | +| peer_read_timeout, max_resends | dropped — Cell request/reply + idempotent TASK_READY replace Pipe resend | +| submit_result_timeout, last_result_transfer_timeout, download_complete_timeout, streaming_idle_timeout | replaced by the shared transfer layer's per-receiver budgets and transfer TTL (result_wait_timeout is the control-side bound) | +| pipe_connect_type, task/metric CellPipe, PipeHandler | dropped — one Cell session | +| MetricRelay, metric pipe | dropped — LOG on the session connection; executor fires analytics events | +| SubprocessLauncher | internal process runner (not public surface) | +| ExternalConfigurator | folded into external_process launch (bootstrap config) | +| params_exchange_format / server_expected_format / from_nvflare_converter_id / to_nvflare_converter_id | removed from executor (FLARE-2698) — conversion moves to send/receive filters at the client edge | +| params_transfer_type (FULL/DIFF) | not a converter — stays a Client API concern (model_registry); decided separately | +| task_script_path / task_script_args | kept (in_process trainer entry point) | +| train_task_name / evaluate_task_name / submit_model_task_name / train_with_evaluation | kept — power flare.is_train()/is_evaluate()/is_submit_model() | +| memory_gc_rounds / cuda_empty_cache | kept, same meaning | + +### Observability + +Each session exposes its lifecycle state for operators: session state (waiting HELLO / idle / task active / waiting transfer), current task_id/result_id/transfer_id, per-receiver transfer progress (from the shared layer's progress events), and last heartbeat. These surface through the standard job stats/log channels (CJ logs at state transitions with the ids above; stats pollable via the existing cell/job info mechanisms), so "why is this producer still alive" and "which receiver is stalled" are answerable from the site without a debugger. + +## Scenario Coverage + +These sections show the design holds up across the demanding real-world scenarios, using the contracts defined in the Proposal (execution modes, the rank contract defined below, payload lifecycle, the F3 transport boundary). Two of them (the rank contract and the attach auth contract) are normative additions in their own right and are marked as such. + +### Multi-GPU and torchrun (Rank Contract — normative) + +V1 supports torchrun, Deepspeed, and similar distributed process tools with a single NVFlare control rank per trainer process group. + +Rank policy: + +- RANK is the distributed global rank and determines the NVFlare control rank. +- LOCAL_RANK is only for local device binding and must not determine NVFlare control ownership. +- WORLD_SIZE describes the trainer process group size. +- If `flare.init(rank=...)` is called, that value overrides RANK for NVFlare rank policy. +- If neither explicit rank nor RANK is set, rank defaults to 0. +- Default control rank is global rank 0. + +Client API contract: + +- Only the control rank creates the Cell connection to NVFlare. +- Only the control rank may call NVFlare control APIs directly: `flare.receive()`, `flare.send()`, `flare.log()`, `flare.is_running()`, `flare.get_task_name()`, `flare.is_train()`, `flare.is_evaluate()`, and `flare.is_submit_model()`. +- Non-control ranks may call `flare.init(rank=...)` to establish local rank context, but they do not create a Cell connection. +- Non-control ranks should get FL task state, model parameters, and stop/abort decisions through the training framework's distributed primitives. +- The new backend should fail fast with a clear error if a non-control rank calls a direct NVFlare control API, unless it is using an explicit NVFlare distributed helper that performs rank-0 communication and broadcasts results. + +Model sharing contract: + +- NVFlare delivers the FLModel to the control rank. +- The training script or framework is responsible for sharing it with other ranks. +- Valid sharing mechanisms include torch.distributed broadcast, framework-managed broadcast, checkpoint plus barrier, or a future NVFlare helper that wraps those mechanisms. +- NVFlare should not assume how the training framework shards, broadcasts, or reloads model state. + +Failure contract: + +- external_process owns the whole local process tree. +- If any rank exits nonzero, the local process runner should treat the training command as failed and the executor should fail or abort the current task. +- If a non-control rank reports an error through distributed collectives and the control rank sends an error result, that result is handled normally. +- If the process group fails before RESULT_READY, the executor returns a task failure and cleans up process/session resources. +- If the control rank exits before payload terminal state, payload lifecycle rules decide whether the result is failed or already safe (see Cleanup Policy). +- Abort/shutdown must be delivered to the control rank and then propagated to the process group by the process runner, framework, or user script. + +Attach mode uses the same rank policy. For V1, only the control rank attaches to NVFlare. Multiple rank Cells attaching for the same attach_id should be rejected unless a future multi-attach protocol is explicitly designed. + +Current examples show two patterns: + +- `examples/advanced/multi-gpu/pt` launches with torch.distributed.run and uses rank 0 for model load/send. This example should be updated to make rank-0-only `is_running()`/`receive()` behavior explicit and broadcast control decisions to nonzero ranks. +- `examples/advanced/qwen3-vl` uses a clearer pattern: rank 0 calls Client API, then broadcasts running state, round state, and errors to other ranks. +- Lightning integration already wraps this style by receiving on rank 0 and broadcasting through the trainer strategy. + +### Attach Topology and Auth (normative) + +Attach mode works only if the external trainer can discover and authenticate the Client API endpoint for the current job. attach creates a job/session-scoped bootstrap config when it starts; the AV platform, operator, or scheduler delivers it to the trainer before `flare.init()`. The trainer then creates a Cell from that config and sends HELLO to the target CJ. Trainer code stays just `flare.init()` (config via a new `NVFLARE_CLIENT_API_CONFIG` environment variable or the existing `flare.init(config_file=...)`); users never construct FQCNs or instantiate IPCAgent/Cell/CellPipe. + +The full connection topology, bootstrap-config field list, and attach-token auth contract are in Appendix B: Attach Topology and Auth. + +### Scheduler and HPC + +Scheduler support has two patterns. + +**Batch Artifact Pattern** + +The Client API wrapper runs in the NVFlare job runtime, usually on a control/login/admin node. The scheduled compute job does not call `flare.init()`. NVFlare proposes nothing new here beyond the existing Client API — the user follows this flow with their own scheduler scripts. `write_input_artifacts`, `submit_sbatch`, `wait_for_slurm_job`, and `read_output_artifacts` below are illustrative user steps, not proposed NVFlare APIs: + +```python +import nvflare.client as flare + +flare.init() +while flare.is_running(): + model = flare.receive() + if model is None: + break + write_input_artifacts(model, work_dir) # user code + job_id = submit_sbatch(work_dir) # user code + wait_for_slurm_job(job_id) # user code + result = read_output_artifacts(work_dir) # user code + flare.send(result) +``` + +Use this when compute nodes cannot open Cell connections back to NVFlare, or when site policy requires compute nodes to communicate only inside the cluster. Results here use PERSISTENT_ARTIFACT manifests. + +This can run under in_process or external_process. + +**Live Attach Pattern** + +The scheduled trainer (submitted by the scheduler or operator) calls `flare.init()` and connects back to the CJ over Cell, under attach. Use this only when the scheduled trainer can reach the required Cell endpoint over a confidential transport (see Appendix B). + +### CCWF Transfer Path + +Client-controlled workflows (swarm, cyclic, CSE) are the historically fragile case: the final consumer of a trained model is a peer client, not the server, reached multi-hop. Today that path is held together by app-layer machinery — LazyDownloadRef handling, `_resolve_lazy_refs` FOBS round-trips, and per-path branching in swarm_client_ctl, with PASS_THROUGH channel registration performed by ClientAPILauncherExecutor at initialization — plus a subprocess lifetime gated by a download-complete callback with `download_complete_timeout` as the bound and a deferred stop_task. It accreted many fixes and disk offload was disabled as too fragile. + +This is not a missing-transport problem. DownloadService already provides what is needed: a transaction with num_receivers, per-receiver completion (downloaded_to_one), an all-done signal (downloaded_to_all / transaction_done), and a timeout backstop. The model is direct pull — the ref carries the producer subprocess's fqcn + ref_id, the peer downloads straight from that subprocess, and the CJ that forwards the ref is a relay, not a receiver. So num_receivers counts the real consumers, and the relay hop does not need to be counted. + +The fix is therefore to use that contract instead of reimplementing it above the transport: + +- The workflow declares num_receivers when the result is produced — it always knows the consumer set: aggregator = 1 (swarm), next client = 1 (cyclic), all clients = N (broadcast best / CSE). This is the concrete, tractable form of the "dynamic receiver count" item — it is workflow-declared, not unknown. The count reaches the producer via the existing FOBS-context NUM_RECEIVERS mechanism when the result is registered. +- The executor gates subprocess stop on the normalized terminal transfer outcome for those receivers, replacing download_complete_timeout + deferred stop. This is the same normalized-outcome contract the Client API consumes — one mechanism observed at two points. For single-receiver results (swarm aggregator, cyclic next-client) the trainer's `flare.send()` itself returns at the terminal outcome. For fan-out results, send returns at RESULT_ACCEPTED and the executor's stop-gate is the enforcement point: the trainer process stays resident (typically blocked in its next `flare.receive()`), serving downloads in the background, and is released only when the aggregate outcome resolves (see the session invariant). +- This removes the LazyDownloadRef / PASS_THROUGH / `_resolve_lazy_refs` / per-path branching from the workflow controller: it hands the transfer layer a result + receiver count and waits for terminal state. + +This is an explicit refactor, not an automatic benefit of the executor change — and it is scheduled as Migration Plan step 6. Today the same "when do the real bytes get pulled" concern is solved three different ways in swarm_client_ctl: + +- remote aggregator: the result is sent over the AUX channel and the FOBS encode/decode during the send implicitly triggers the peer to download from the producer's DownloadService; +- local aggregation (aggr == self): no send happens, so the controller does an explicit `_resolve_lazy_refs()` FOBS round-trip to force the download before the gatherer sees the result; +- the gatherer: keeps a defensive `_has_lazy_refs()` re-resolve in case a ref slipped through. + +These three are not the same moment — they fire at different workflow stages — so the fix unifies the mechanism, not the timing. Each consumer still pulls whenever its stage arrives; they just all use one transfer contract instead of three bespoke materialization tricks. The producer registers the result in a DownloadService transaction with num_receivers = the workflow-declared consumer set; every consumer (local or remote, early or late) pulls through that same transaction; and the producer is released only when the aggregate outcome resolves, i.e. after all num_receivers have pulled — which is precisely what spans consumers arriving at different stages. The local-vs-remote split also stops mattering: a local consumer is just a receiver whose slot happens to be the same site (and under in_process there is no separate process, so no download). The controller then stops resolving refs at all — no implicit FOBS-triggered download, no `_resolve_lazy_refs` round-trip, no `_has_lazy_refs` guard. Until this step lands, SWARM keeps its current behavior. + +Two CCWF-specific policies still need to be set (not new transport): + +- **Fan-out (broadcast/CSE).** The producer stays alive until all N receivers pull; a stuck or dead receiver must not pin it forever. This is enforced with the per-receiver budgets above: each receiver stage gets a workflow-supplied expected-pull window, a receiver that exhausts its budget is marked failed for the aggregate outcome, and the workflow decides whether a partial fan-out (N-1 of N) is a usable result or a task failure. The transaction TTL is the envelope over the stage windows, plus a defined abort path. +- **Disk offload.** Safe to re-enable once the offloaded artifact is deleted only on the normalized terminal outcome (not on subprocess exit); requires explicit validation given its history. + +## Alternatives Considered + +**Keep Pipe As The Main Abstraction.** This preserves current behavior, but it keeps control-plane lifecycle coupled to payload transfer and Pipe ACK semantics. It also leaves users exposed to FlareAgent, TaskExchanger, CellPipe, and FilePipe. Decision: reject as the future recommended path. Keep temporarily for compatibility. + +**Extend JobLauncher To Launch Training Code.** JobLauncherSpec already launches CJ/SJ job runtime processes. Reusing it to also launch per-task trainer code would blur two lifecycle layers and make Docker/K8s/SLURM placement harder to reason about. It is also the wrong shape for the mode's primary use: external_process exists to shell out to a distributed launcher (torchrun/Deepspeed/Horovod/`mpirun`) so NVFlare does not reimplement inter-rank comm — the command NVFlare runs *is* the trainer launcher, and NVFlare only talks to rank 0. Decision: reject for this design. Keep JobLauncherSpec focused on job runtime launch; external_process owns the trainer command via its internal process runner. + +**Authenticate The Trainer With Transport-Layer mTLS Instead Of A Session Token.** Rather than an app-layer HELLO token/proof, the trainer cell could present a site-issued client certificate and let CellNet's existing mTLS + FQCN↔CN binding authenticate it for free — reusing proven crypto with no new auth code. Rejected as the default because it breaks the ergonomic that motivates external_process and attach: external trainers (torchrun subprocesses, AV systems, SLURM jobs) are not provisioned NVFlare participants and today's IPCAgent/CellPipe ad-hoc cells deliberately load only the root CA (no client identity). Requiring per-trainer cert issuance/rotation just to run `flare.init()` pushes PKI management onto every trainer and every ephemeral launch. Decision: use a bootstrap-delivered session token, scoped by mode — a bare launch-scoped token over the localhost connection NVFlare itself created for external_process, and challenge-response with single-session/expiry for attach (Appendix B), where the trainer is started by an untrusted party over a possibly-remote channel. Deployments that *do* provision trainer certs may still run over mTLS; the token contract is layered above whatever transport security exists, not a replacement for it. (A leaked token is only replayable by a party that can also reach the cell — see the transport/threat discussion — which is why the token strength is scaled to how NVFlare obtains and delivers it per mode.) + +**Expose A General Client API Launcher.** A general launcher abstraction would recreate the same ambiguity that exists today between LauncherExecutor, SubprocessLauncher, and JobLauncherSpec. The external_process backend only needs an internal process runner. Attach mode only needs an optional start/poll/cancel helper. Decision: reject. Do not expose a general Client API Launcher extension point. If attach mode ever needs to start the trainer, that is a narrow optional start/poll/cancel hook (Future Enhancements), not a general launcher. + +**Use File/NFS As Control Plane.** This can be convenient on HPC systems, but file polling makes lifecycle, timeouts, retries, abort, and result ownership difficult. It also complicates large tensor streaming and CCWF behavior. Decision: reject as a Client API control-plane abstraction. Keep file/NFS/object storage as payload data-plane mechanisms. If a site truly needs control messages over a shared filesystem because there is no usable network path, implement that as a filesystem-backed CellNet/F3 transport driver or relay in a separate design/PR. This preserves one Client API control model instead of reintroducing a parallel Pipe protocol. + +**Let Every Distributed Rank Attach To NVFlare.** This could make each rank independently visible to NVFlare, but it multiplies sessions, auth, task ownership, retries, and duplicate-result handling. Decision: reject for V1. Use one control rank per trainer process group. + +**Redesign CellNet First.** CellNet already provides request/reply, streaming, FOBS context, pass-through, and DownloadService transactions. A transport rewrite would expand scope and delay the Client API cleanup. Decision: reject for this proposal. Any proven CellNet gaps should be handled in separate designs/PRs. + +## Discussion + +### Pain Point Coverage + +The new design addresses the current pain points if the Cell/FOBS/F3 payload transfer boundary is honored. + +| Current pain point | Proposed resolution | +|---|---| +| Pipe ACK does not mean payload transfer is complete | Lower-layer payload transfer provides terminal state; Client API waits for that state before cleanup — on both task delivery and result upload | +| PASS_THROUGH lifecycle is split across FlareAgent, CellPipe, FOBS, LauncherExecutor, and timeout config | Treat lazy forwarding as lower-layer payload-transfer behavior, not as a Client API Pipe feature | +| Subprocess can be killed while server/peer is still pulling tensors | external_process releases the process only after the terminal transfer outcome, with a bounded post-completion linger for late retries | +| Attached trainer may exit before downstream payload pull completes | `flare.send()` in attach mode returns only after terminal transfer state or failure (declared fan-out results gate session close instead); external platform must honor that contract | +| Many timeout knobs must be tuned together | Client API exposes lifecycle intent; the shared transfer layer owns per-receiver acquire/idle budgets, with the transaction TTL derived from them rather than tuned independently | +| Retries can create duplicate payload transactions or stale state | Stable result_id and transfer_id make retries idempotent; TASK_READY is idempotent by task id | +| CJ or trainer death leaves the other side undefined | Explicit owner-death contract: trainer self-terminates on CJ loss (external_process), session exceptions on either side, CP-level reaping as backstop | +| LauncherExecutor/SubprocessLauncher overlaps with JobLauncherSpec | Keep JobLauncherSpec for job runtime launch; use Client API execution modes for training execution | +| AV/manual trainers need IPCAgent/FlareAgent knowledge | attach hides attach topology behind `flare.init()` and bootstrap config | +| FilePipe mixes control and data plane | Keep file/NFS/object refs as payload data plane; future filesystem control belongs under CellNet/F3 transport | + +These resolutions all rest on the shared Cell/FOBS/F3 transfer boundary; the primitives exist today, and the two remaining implementation tasks (the normalized aggregate outcome and the awaitable facade) are in "CellNet Boundary and Dependencies." + +### Use Case Coverage + +| Current use case | Current components | Proposed coverage | +|---|---|---| +| Simple Client API script | InProcessClientAPIExecutor, DataBus | in_process | +| Local subprocess Client API | ClientAPILauncherExecutor, LauncherExecutor, SubprocessLauncher, CellPipe | external_process | +| Local PyTorch DDP | ScriptRunner(..., command="torchrun ...") | external_process(command="torchrun client.py") | +| Multinode PyTorch/Deepspeed where CJ starts the local rank group | custom wrapper or scheduler command | external_process(command="torchrun/deepspeed ..."); external scheduler/platform provides rendezvous and remote ranks | +| Deepspeed / wrapper | subprocess Client API path | external_process(command="...") | +| AV-owned trainer | IPCAgent/IPCExchanger or FlareAgent/TaskExchanger | attach | +| Manual/resident trainer | low-level agent or custom scripts | attach | +| SLURM batch artifact training | wrapper calls Client API and submits sbatch | in_process or external_process | +| SLURM live attach training | scheduled trainer calls flare.init() | attach (scheduler starts the trainer) | +| Docker/K8s job runtime | DockerJobLauncher, K8sJobLauncher | unchanged; combine with any Client API mode | +| Large tensor streaming | pass-through in Pipe path | explicit Cell payload lifecycle | +| FilePipe/NFS transfer | FilePipe as control and data channel | file/NFS as payload manifest entry (PERSISTENT_ARTIFACT) | +| Experiment tracking metrics | MetricRelay plus metrics pipe | same Cell control connection carries log/metric messages; executor fires analytics events | + +### Compatibility + +Trainer code using `flare.init()`, `flare.receive()`, `flare.send()`, and `flare.log()` should continue to work. + +Existing ScriptRunner knobs can map forward to execution modes: + +- `launch_external_process=False` → `execution_mode="in_process"` +- `launch_external_process=True` → `execution_mode="external_process"` + +For ScriptRunner compatibility, the explicit mode spelling should be: + +```python +ScriptRunner(..., execution_mode="in_process") +ScriptRunner(..., execution_mode="external_process", command="torchrun ...") +``` + +ScriptRunner maps only to in_process and external_process — it exists to run a script that NVFlare starts, and attach mode has no script to launch. Attach jobs configure `ClientAPIExecutor(execution_mode="attach", ...)` directly. + +Keep the current Pipe-based subprocess path during migration. "Retire as recommended / keep legacy" concretely means: the legacy classes remain in place and importable with deprecation warnings; ScriptRunner defaults flip to the new executor only when the coverage gate below is met; existing job configs referencing the legacy classes keep working for at least one release after the flip; removal is a separate later decision. Mark the legacy path deprecated only after the new Cell backend covers external_process python, torchrun, metrics/logging, tensor streaming, tensor disk offload, abort/shutdown, and attach mode. + +Manual multi-rank examples should be updated to follow the rank contract: rank 0 calls NVFlare control APIs; other ranks receive state through distributed framework communication. + +### Acceptance Tests + +The new executor/backends should include targeted tests for these contracts. Tags mark which Migration Plan step delivers each group. + +Core external_process and rank contract (steps 3–4): + +- external_process python script can receive, send, log, and shutdown +- external_process `torchrun --nproc_per_node=2` creates one NVFlare session from rank 0 +- non-control rank direct receive/send/log/is_running fails with a clear error +- rank 0 can receive an FLModel, broadcast it to rank 1, train, and send one result +- nonzero rank process failure causes task failure/abort instead of hanging +- rank 0 failure before RESULT_READY causes task failure and cleanup +- rank 0 failure after RESULT_READY follows payload lifecycle terminal-state rules +- LOCAL_RANK controls device placement but does not make a process the NVFlare control rank + +Session and failure contracts (steps 3–4): + +- session setup with a wrong/stale session token is rejected (the proof fails at HELLO_PROOF); a stale trainer from a previous run cannot bind +- HELLO with a mismatched protocol version fails fast with a clear error +- duplicate TASK_READY delivery does not double-deliver the task to user code +- trainer-side task payload download failure produces TASK_FAILED, not a hang +- CJ kill (SIGKILL) leads to trainer process-group self-termination within the grace period; CP reaping covers a disabled trainer-side grace +- heartbeat loss during an active multi-GB transfer does not revoke the session while the transfer is progressing; transfer failure/timeout does +- `flare.receive()` returns None on SHUTDOWN/job end; raises the session exception on ABORT and on session loss + +Payload lifecycle (steps 2, 4): + +- tensor streaming/offload test covers the terminal transfer outcome before external_process trainer exit +- a receiver whose final EOF reply is lost recovers within the post-completion linger window (producer still alive) +- a receiver-side finalization failure after the last pull yields TRANSFER_FAILED, not silent success +- a receiver that never pulls exhausts its acquire budget and fails fast without waiting the full transaction TTL + +Attach (step 5): + +- attach mode accepts one control-rank trainer and rejects duplicate trainers for the same attach id +- attach replay: a captured token proof cannot be replayed (challenge-response), and a second attach with the same token is rejected +- ABORT/SHUTDOWN from a sender other than the bound CJ session is rejected by the trainer backend + +CCWF (step 6): + +- CCWF swarm: a large-model result streams from a producer subprocess to a peer aggregator, and the producer is released only after the aggregate terminal outcome (no download_complete_timeout guess) +- CCWF broadcast/CSE fan-out: producer stays alive until all N receivers pull, and a non-pulling receiver exhausts its per-receiver budget/abort path instead of pinning the producer +- CCWF fan-out send: `flare.send()` for a declared multi-receiver result returns at RESULT_ACCEPTED, the trainer proceeds to its next `flare.receive()` while the transfer drains, and `flare.shutdown()`/process stop block until the aggregate outcome resolves (no deadlock across mutually-consuming peers) +- CCWF with tensor disk offload: the offloaded artifact is deleted only on the terminal outcome, and a peer can download it before producer exit + +### Open Questions + +- Exact public argument names and migration aliases. +- Scheduler batch helper library vs. standard wrapper component. +- Exact factoring of shared code with IPCAgent/IPCExchanger. +- Compatibility flag for selecting the legacy Pipe path during migration. +- Whether partial fan-out (N-1 of N receivers succeeded) should be surfaceable to CCWF workflows as a usable result or always a task failure. + +## Migration Plan + +1. Land design alignment. +2. **Shared payload layer (F3):** aggregate terminal outcome (receiver-confirmed statuses, post-completion linger, retry-aware accounting, per-receiver budgets) and the awaitable producer-facing facade. This is the prerequisite for every step below; nothing user-visible ships here. +3. **ClientAPIExecutor skeleton:** the executor, the in_process backend (consolidating InProcessClientAPIExecutor), ScriptRunner `execution_mode` wiring, and the trainer-side Cell protocol engine (session setup, receive/send contracts, heartbeat, owner-death handling) replacing FlareAgent. +4. **Cell backend for external_process:** launch/HELLO/process-group ownership; support python, torchrun, rank-0 NVFlare communication, metrics/logging, tensor streaming, and explicit transfer completion. Validate in simulator and POC as part of this step. +5. **Passive attach mode**, including the auth contract in Appendix B. +6. **CCWF refactor:** move swarm/cyclic/CSE controllers onto the transfer contract (workflow-declared num_receivers, terminal-outcome-gated release, remove `_resolve_lazy_refs`/`_has_lazy_refs`/per-path branching), and validate disk offload re-enable. This delivers the CCWF goals and their acceptance tests. +7. Add scheduler support as batch artifact helpers, and optionally an attach-starter hook (see Future Enhancements). +8. Update docs and deprecate Pipe-based Client API execution once the Compatibility coverage gate is met. + +## Future Enhancements + +- Add convenience FedJob/ScriptRunner helpers around the execution_mode option if needed. +- Add a small NVFlare distributed helper for common rank-0 receive/broadcast and collect/send patterns. +- An optional, narrow AttachStarter hook so attach mode can also start/poll/cancel the external trainer (e.g. SlurmAttachStarter, K8sAttachStarter, or a custom one), plus scheduler batch-helper APIs. It is never the task/result transport; attach mode stays fully usable without it. +- Automatic attach token redelivery/renegotiation after CJ restart. +- Add optional token refresh for long-lived attach sessions if needed. +- Consider a filesystem-backed CellNet/F3 transport for sites that have shared storage but no usable network path between relevant processes. +- Consider a more general payload-policy header if PASS_THROUGH grows beyond lazy-reference forwarding. +- Move CellNet improvements, if any, into separate transport-focused designs. +- Update multi-GPU examples so manual PyTorch DDP follows the rank contract as clearly as Qwen and Lightning already do. + +## Recommendation + +Proceed with one public ClientAPIExecutor configured by execution mode: + +- **in_process:** same process, DataBus +- **external_process:** external trainer process tree, Cell, owns PID/process lifecycle +- **attach:** external trainer attaches by Cell (NVFlare does not start it) + +The most important change is the boundary, not a class hierarchy: remove Pipe as the recommended Client API control-plane abstraction and make payload lifecycle explicit — in both directions, with owner death covered on both sides. + +## Appendix A: Current Integration Paths + +The integration components that exist today, for reference. (Rendered as the wiring ScriptRunner assembles; the components cooperate rather than forming a strict linear stack.) + +**In-process Client API:** + +``` +ScriptRunner(launch_external_process=False) + -> InProcessClientAPIExecutor + -> InProcessClientAPI + -> DataBus / EventManager +``` + +**Subprocess Client API** (ScriptRunner assembles: ClientAPILauncherExecutor, a task CellPipe, a metric CellPipe + MetricRelay, SubprocessLauncher, ExternalConfigurator): + +``` +ScriptRunner(launch_external_process=True) + CJ side: ClientAPILauncherExecutor -> LauncherExecutor -> TaskExchanger -> PipeHandler -> task CellPipe + MetricRelay -> metric CellPipe + SubprocessLauncher (starts the trainer) + trainer side: ExProcessClientAPI -> FlareAgentWithFLModel -> PipeHandler -> task CellPipe +``` + +**Older AV-style integration:** + +``` +IPCExchanger + -> IPCAgent + -> direct F3 Cell communication +``` + +**Current third-party docs:** + +``` +TaskExchanger + -> FlareAgent + -> CellPipe or FilePipe +``` + +**Older non-Client-API multi-process path:** + +``` +PTMultiProcessExecutor + -> MultiProcessExecutor + -> torch.distributed.run + -> sub_worker_process +``` + +## Appendix B: Attach Topology and Auth + +V1 topology should follow the existing IPCAgent connection shape: + +``` +CJ/site job cell + -> waits on Client API attach channel + +external trainer process + -> creates a Cell using bootstrap config + -> connects to the configured parent/relay URL + -> sends HELLO to the target CJ/job cell FQCN +``` + +The trainer Cell should use a deterministic child FQCN under the site, scoped by attach id, for example: + +``` +.-client_api_ +``` + +The `-` child-name convention follows the existing IPCAgent-style ad-hoc connection pattern. The exact helper should be internal; user code should not construct FQCNs. Note the FQCN is a routing name, not an identity: authentication comes from the token contract below, never from the FQCN alone. + +Low-level attach details such as parent/root URL, target CJ FQCN, trainer FQCN, token, secure-mode settings, and heartbeat timeouts belong in the bootstrap config. Bootstrap config should include: + +- job id, site name, and workspace context +- attach id/session id and expected task stream +- trainer Cell FQCN and target CJ/job Cell FQCN +- Cell endpoint or CP/root/relay route +- secure-mode settings +- attach token or certificate/key references (plus the executor nonce, when the one-round proof variant is used) +- heartbeat interval and attach/read/result timeouts, and the CJ-loss grace period +- rank policy for torchrun or other multi-process launchers +- payload policy and supported payload reference types +- protocol version + +**Bootstrap config protection.** The bootstrap config carries the session's primary authenticator, so its handling is part of the contract: it must be delivered over a confidential channel, written owner-only (0600) when it lands on disk, be per-job ephemeral (invalid after job end/attach_timeout), and never be placed world-readable on a shared filesystem or passed through process arguments/environment visible to co-tenants. Where the platform has a secrets mechanism (K8s Secrets, scheduler credential stores), the config should reference secrets rather than embed them. This applies to the external_process bootstrap config as well — today's equivalent (`client_api_config.json`) embeds live auth tokens with no permission handling, which this design corrects. + +**Attach auth contract:** + +- The executor generates a high-entropy attach token per attach id. The token is held in executor memory for the session lifetime; anything persisted stores only its digest. +- The token is scoped to job id, site name, attach id, target FQCN, trainer FQCN, and allowed rank policy. +- **Token proof is challenge-response, not bearer presentation.** HELLO carries the attach id; the executor replies with HELLO_CHALLENGE (a nonce); the trainer proves possession in HELLO_PROOF with an HMAC over (nonce, attach id, job id, trainer FQCN, protocol version) keyed by the token. The raw token never crosses the wire, so observing attach traffic does not permit replay. (A one-round variant — the proof folded into HELLO, computed over an executor nonce delivered in the bootstrap config plus a trainer nonce — is acceptable where the channel already provides confidentiality, e.g. external_process on localhost or TLS; raw-token presentation is acceptable only over a confidential channel, and never in POC/insecure mode across a network.) +- **Attach requires a confidential transport.** The trainer's Cell connection to the CP/relay endpoint must be TLS-protected in secure mode; attach across a network in insecure/POC mode is for local development only. The ad-hoc trainer cell has no mTLS client identity in the existing stack (it loads only the root CA), so the token contract — not transport identity — is what authenticates the trainer; the transport requirement exists to protect the exchange, and the FQCN it claims is treated as unauthenticated routing data. +- The token is single-session by default. +- HELLO must include attach id, job id, site name, trainer FQCN, rank info, and protocol version; the token proof arrives in HELLO_PROOF over the executor's nonce (or inside HELLO in the one-round variant). +- The executor accepts attach only when token proof, scope, peer FQCN, job id, site name, and rank policy all match. +- A second trainer with the same attach id is rejected while a session is active. +- Reconnect is allowed only if explicitly enabled and the previous session has been marked disconnected by heartbeat timeout. On reconnect the executor issues a fresh token through the same delivery path — the original token is not reusable, preserving the single-session property. +- Tokens expire if the trainer does not attach before attach_timeout. +- V1 does not require mid-session token refresh; after a trainer attaches, the session is governed by heartbeat, job lifetime, and explicit revoke. +- On job end, abort, attach timeout, heartbeat timeout (subject to the in-flight-transfer precedence rule), or explicit revoke, the executor invalidates the token/session and rejects future HELLO. +- **Teardown is authenticated in both directions.** The trainer backend accepts ABORT/SHUTDOWN/BYE only from the bound CJ session (sender FQCN equals the session's CJ FQCN and the message carries the session id); teardown from any other origin is logged and ignored. This closes the gap in today's IPCAgent, which acts on abort/bye from any sender. Symmetrically, the executor accepts trainer messages only from the bound session. +- **Listener hygiene.** The attach channel is an inbound endpoint on the CJ: HELLO handling is rate-limited per source, failed proofs count toward invalidating the attach id (bounded attempts), and the listener accepts only the message shapes of the session-setup exchange before authentication. + +This is similar to the bootstrap data that IPCAgent needed, but hidden behind Client API. Implementation may reuse IPCAgent-style topology or helpers, but IPCAgent is not part of the trainer-facing contract. external_process uses the same session-token machinery with local delivery (the executor writes the bootstrap config directly into the trainer's launch environment), which is why session setup is one protocol rather than an attach-only feature. diff --git a/docs/design/client_api_execution_modes_plan.md b/docs/design/client_api_execution_modes_plan.md new file mode 100644 index 0000000000..63ea1560c0 --- /dev/null +++ b/docs/design/client_api_execution_modes_plan.md @@ -0,0 +1,150 @@ +# Client API Execution Modes — 2.9 Implementation Plan + +Companion to `client_api_execution_modes.md` (design). Tracks Epic **FLARE-2698** (Client API and 3rd party integration refactoring, 2.9.0). Decomposes the design's 8-step Migration Plan into **37 PRs** with dependencies, sizes, and a release cut line. Scoped against the codebase post-2.8.0; granularity calibrated against the repo's merged-PR history (2026-07-01, see Calibration below). + +Size guide: **S** <300 changed LOC, **M** 300–800, **L** 800–2000. + +## Granularity calibration + +Measured against the last ~300 merged PRs on main: median merged PR is ~140 LOC / 4 files; 65% land at ≤300 LOC; only ~13% fall in the 300–800 band. Recent comparable programs shipped as many small PRs over weeks: **recipe API ~28 PRs, tensor-offload/streaming reliability ~20, distributed provisioning/multicloud ~18** — each roughly half this program's scope. So ~36 PRs is in line with how this repo actually ships large features, and the plan's M-heavy granularity is already on the *large* side of repo norms (p75–p85). Consolidating further would create PR shapes the repo rarely merges for core code. + +An adversarial consolidation pass was run on nine candidate merges; four survived (applied below), five were rejected. The pattern: **safe merges are same-owner/same-module dedups of work two tracks scoped twice, or two halves of one contract with no independent consumer. Bad merges drag an early foundation behind a later integration point, or weld a revert-sensitive change (interface freeze, wire-protocol version skew, security fix) to unrelated code.** Deliberate non-merges are listed in Guardrails at the end. + +## Interface freezes (do these first, review across all track owners) + +1. **Protocol vocabulary** (`nvflare/client/cell/defs.py`) — the Cell control-protocol Topics/MsgKeys/CHANNEL/version, consumed by the trainer engine, external_process, and attach. Pure constants, no I/O. (The session-token/HMAC-proof *helpers* are NOT frozen here — see the auth-model note below: they land with EP-3, where the host-trust/auth-strength decision is actually made.) +2. **ClientAPIExecutor skeleton + `ClientAPIBackendSpec`** (`nvflare/app_common/executors/client_api_executor.py`). Freeze the full V1 constructor from the design's Configuration Surface even though only in_process works initially. +3. **F3 aggregate terminal outcome** (`TransferOutcome`, additive `outcome_cb`). Reuse `TransferProgressState` terminal names; `transaction_done_cb` signature untouchable (benign-TIMEOUT callers: `hci/server/binary_transfer.py`, `app_opt/job_launcher/workspace_cell_transfer.py`). + +## Wave plan + +Tracks: **F3** payload layer · **EX** executor/ScriptRunner · **TE** trainer engine · **EP** external_process · **AT** attach · **CC** CCWF · **CT** compat/docs/tests. + +### Wave 0 — foundations (no cross-deps; start immediately, fully parallel) + +| PR | Track | Size | Notes | +|---|---|---|---| +| PR-0 Land design doc + this implementation plan in docs/design/ | all | S | **Lands first, merges fast** — the design is already approved; this just puts the reference material in-repo so every subsequent PR links it. Interface-freeze sign-offs happen on TE-1/EX-2/F3-1 themselves | +| F3-1 Aggregate all-receivers terminal transfer outcome | F3 | M | Interface freeze #3. Purely additive next to FINISHED/TIMEOUT/DELETED | +| TE-1 Protocol vocabulary (`client/cell/defs.py`) | TE | S | Interface freeze #1 — the Cell control-protocol Topics/MsgKeys/CHANNEL/version only. The auth **mechanism** is deliberately NOT frozen here: the token is 3 roles (rendezvous, anti-mixup, auth) and whether external_process needs a *secret* HMAC proof at all is a host-trust decision (single-tenant: no; multi-tenant: yes) that EP-3 makes. So the proof helpers (TokenScope, compute/verify_hello_proof, combine_nonces) land with **EP-3**, and the generic generators (generate_session_token/nonce, token_digest) go to the fuel/sec consolidation (**FLARE-3017**). The stateful SessionTokenManager is attach-only (**AT-2**) | +| EX-2 ClientAPIExecutor skeleton + backend spec + analytics-event ownership | EX | M | Interface freeze #2 | +| TE-2 Bootstrap config schema + NVFLARE_CLIENT_API_CONFIG resolution | TE | M | Additive ConfigKeys; consumes EP-1's 0600 writer. Kept separate from TE-1: touches legacy-shared `client/config.py`, different revert profile | +| EP-1 0600 permissions for Client API config files | EP | S | Fixes live exposure in today's `client_api_config.json`; standalone + backportable by design | +| EP-2 TrainerProcessRunner (process-group lifecycle + PGID records) | EP | M | SIGTERM→grace→SIGKILL; preserves SubprocessLauncher's natural-exit window; Windows path platform-guarded | +| EX-1 Export `flare.get_task_name` | EX | S | Ship-today one-liner; kept out of the churn-prone skeleton PR | + +### Wave 1 — parallel tracks behind foundations + +| PR | Track | Size | Depends | +|---|---|---|---| +| F3-2 Receiver-confirmed completion + retry-aware accounting | F3 | M | F3-1. The version-skew wire change — lands as early as possible for maximum soak; capability-flag gated, both skews interop-tested | +| F3-3 Per-(transfer, receiver) acquire/idle budgets | F3 | M | F3-1. Unconditional per-receiver activity tracking. Must also settle the quorum surface for fan-out: workflows with min_responses-style policy (k-of-N receivers suffices) need either an optional min_receivers on the transaction/facade or a documented pattern of evaluating TransferOutcome.refs against their own threshold — `completed` stays the strict all-receivers certificate either way | +| F3-4 Awaitable producer transfer facade + PAYLOAD_ACQUIRED (via existing progress_cb) | F3 | M | F3-1. Must CHAIN existing DOWNLOAD_COMPLETE_CB, not replace | +| EX-3 in_process backend (consolidate InProcessClientAPIExecutor) | EX | L | EX-2. Behavior-parity bar: "nothing user-visible" | +| TE-3 TrainerCellSession engine (handshake, heartbeat, owner-death, trainer-side authenticated teardown) | TE | L | TE-1, TE-2. Injectable clock + kill hook; AT owner co-reviews the teardown-auth tests | +| EP-5 CP-side orphan reaping of trainer PGIDs | EP | M | EP-2. PID-reuse guard via start-time record | + +### Wave 2 — contracts and wiring + +| PR | Track | Size | Depends | +|---|---|---|---| +| F3-5 Bounded post-completion linger before producer release | F3 | S | F3-2, F3-4. Gates only the new `wait_released`; kept separate from F3-2 so the wire change isn't welded to exit-timing policy | +| TE-4 TrainerCellSession task/result contracts (receive queue + TASK_READY idempotency + TASK_FAILED; RESULT_READY flow + terminal-outcome blocking + fan-out drain/shutdown gating) | TE | L | TE-3; F3-4 via a narrow stubbed wait-protocol interface. Merged from two Ms: same class, same owner, no independent consumer of either half. Guard: split fan-out drain back out if the diff passes ~1800 LOC | +| EP-3 external_process auth + CJ-side HELLO acceptance + session-scoped message enforcement | EP | M | TE-1 (vocabulary). **Owns the auth-model decision**: a per-launch rendezvous id always; a *secret* launch token proven by one-round HMAC on multi-tenant hosts (rendezvous-only + OS isolation may suffice single-tenant). Introduces the proof helpers (TokenScope, compute/verify_hello_proof, combine_nonces) here — reused later by AT-2. Session-scoped message enforcement (accept trainer messages only from the bound session) closes the live IPCAgent any-sender gap; a P0 control, not the P2 attach track | +| EX-4 ScriptRunner `execution_mode` param + launch_external_process mapping | EX | M | EX-2/3. Convert ~22 internal recipe call sites in the same PR | +| EX-5 Converter→filter migration (FLARE-2698 bullet 2) | EX | M | EX-3, EP-4. Replace executor-owned ParamsConverters with PT/TF send+receive conversion filters at the client edge (last task-data / first task-result filter); recipes auto-wire per framework; Client API boundary passes through (RAW). Removes params_exchange_format/server_expected_format/converter-id from the surface (already excluded from EX-2's freeze). Transfer type FULL/DIFF stays in model_registry, decided separately | +| CC-1 CCWF transfer-declaration plumbing (receiver sets, stage windows, aux passthrough) | CC | M | F3-3. Declaration-only; absent headers preserve today's defaults — its behavior-neutrality is what de-risks the CC track | +| CT-8 Session observability (state-transition logs + StatsPoolManager view) | CT | M | EX-2; extends as backends land | + +### Wave 3 — integration point + +| PR | Track | Size | Depends | +|---|---|---|---| +| TE-5 CellClientAPI backend (flare.* on the new engine, backend selection) | TE | M | TE-4, TE-2. Opt-in only; legacy defaults untouched | +| EP-4 external_process backend for ClientAPIExecutor | EP | L | EP-2/3, EX-2/3, TE-5, F3-4. The step-4 integration point; watch for XL creep — split dispatch if needed | +| EP-6 Rank contract (torchrun multi-rank, non-control-rank fail-fast) | EP | M | EP-4, TE-5 | +| CT-1 Acceptance-test harness + core external_process suite + simulator/POC smoke | CT | L | EP-4. Absorbs the EP track's E2E validation (incl. POC-mode smoke); EP owner is co-reviewer as the track's sign-off point. tests/integration_test/fast (Blossom premerge) + xdist-safe unit subset | + +### Wave 4 — validation, workflows, attach + +| PR | Track | Size | Depends | +|---|---|---|---| +| CT-2 torchrun 2-rank rank-contract CI tests (CPU/gloo) | CT | M | CT-1, EP-4. EP owner co-reviews | +| CT-3 Owner-death (CJ-kill) + payload-lifecycle E2E tests | CT | L | CT-1, F3 track, EP-4. Covers CJ-SIGKILL self-termination + CP reaping; EP owner co-reviews | +| CC-2 Swarm onto the transfer contract (remove lazy-ref machinery) | CC | L | CC-1, F3 track, EP-4. Highest-risk PR in the program; maximally isolated revert unit; retires test_lazy_ref_local_aggr / test_msg_root_ttl | +| CC-3 Cyclic onto the transfer contract | CC | S | CC-1, EP-4. Kept out of CC-1 (would drag the behavior-neutral foundation behind EP-4) and out of CC-2 (swarm revert must not drag cyclic); shared broadcast_final_result declaration coordinates behind the CC-1 helper | +| CC-4 CSE / broadcast-best fan-out (N receivers, per-receiver budgets) | CC | L | CC-1/2, TE-4 fan-out drain | +| CC-5 Re-enable CCWF tensor disk offload (terminal-outcome-gated cleanup) | CC | M | CC-2/4. Deliberately last in track; flag experimental in 2.9 | +| AT-2 Attach session manager backend (CJ side) + attach-side session enforcement | AT | L | EX-2, TE-3, EP-3 (reuses EP-3's proof helpers). Adds the stateful `SessionTokenManager` (single-use nonce issuance, attach-window expiry, single-session, invalidation) — required because attach's token is delivered out-of-band by an untrusted starter over a possibly-remote channel, unlike external_process's localhost launch | +| AT-3 Trainer-side attach flow (bootstrap config, ad-hoc cell, HELLO proof) | AT | M | AT-2, TE-5. Connects via CP parent_url (ad-hoc listeners default-disabled) | +| CT-5 Rank-contract example updates (multi-gpu/pt, pt-ddp-docker) + SLURM batch-artifact example | CT | M | EP-4. multi-gpu/pt violates the rank contract today; qwen3-vl is the reference | + +### Wave 5 — release gate + +| PR | Track | Size | Depends | +|---|---|---|---| +| AT-4 Attach hardening (rate limit, bounded proof attempts, reconnect rotation) + E2E smoke + job-config example + delivery docs | AT | L | AT-2/3. Merged from two Ms: same owner, sequential, both inside the P2 tail so they slip together. Full negative-case matrix lives once in CT-4, not here | +| CT-4 Attach + CCWF system-level acceptance suites | CT | L | CT-1, AT track, CC track. Owns the attach negative-case matrix (replay, duplicate attach_id, spoofed teardown) — written once here, deduped from AT-4. CCWF multi-site runs in slow/ (nightly) | +| CT-6 Client API docs overhaul (client_api.rst rewrite, 3rd-party/agent docs) | CT | L | Arg names frozen (steps 3–5 merged). Written against merged code, not the design doc | +| CT-7 Legacy-stack deprecation warnings | CT | S | Coverage gate met. Warnings only in 2.9; the ScriptRunner **default flip is a separate 2.10 PR** by design (different ship dates) | + +## Program PR conventions + +Every PR in the program uses this description skeleton so reviewers always have the map: + +```markdown +## What + + +## Program context +Client API Execution Modes (2.9) — PR , Wave of the plan. +Design: docs/design/client_api_execution_modes.md § +Plan: docs/design/client_api_execution_modes_plan.md +Depends on: · Unblocks: + +## Design contracts implemented + + +## Out of scope (and where it lands instead) + +``` + +Notes: the one-paragraph "why" for the whole program lives in PR-0 and gets linked, not pasted. Interface-freeze PRs (TE-1, EX-2, F3-1) additionally name the tracks that consume the frozen surface and require sign-off from those owners before merge. + +## Critical path + +F3-1 → F3-4 → TE-4 → TE-5 → EP-4 → CT-1 → CT-3 → release gate. The TE-4 merge removed one review cycle from the path. **EP-4** remains the schedule risk to watch — it's where executor, trainer engine, auth, process runner, and the F3 facade meet; its prerequisites are deliberately extracted to keep it L. + +## 2.9 cut line (recommendation) + +- **P0 — commit for 2.9** (25 PRs): F3 track (5) + EX track (4) + TE track (5) + EP track (6) + CT-1/2/3/6/8. Headline: *external_process and in_process on the new Cell stack with an enforceable payload lifecycle, owner-death handling, session-scoped message enforcement, and the config-permission fix*. +- **P1 — strongly target for 2.9** (6 PRs): CC track (5) + the CCWF half of CT-4. Fully severable (SWARM keeps current behavior until CC-2 lands); ship CC-5 flagged experimental. +- **P2 — stretch for 2.9, else 2.9.x/2.10** (4 PRs): AT track (3) + the attach half of CT-4. Attach is the most self-contained step; nothing in P0/P1 depends on it (its reusable auth/proof helpers ship in P0 via EP-3, and session enforcement via EP-3, regardless). +- **CT-7 warnings** (1 PR) close 2.9. **Explicitly deferred to 2.10**: the ScriptRunner default flip and any legacy-class removal — ship 2.9 opt-in with warnings; flip after a release of field soak. + +## Effort and staffing + +36 PRs ≈ 6 S + 18 M + 12 L. At review-inclusive rates (S ≈ 2 days, M ≈ 1 week, L ≈ 2 weeks) that is ≈ 44 engineer-weeks total; ≈ 34 for P0+P1. With 4–5 engineers owning tracks (F3, TE, EP+EX, CC, AT+CT), full parallelism after Wave 0; calendar floor is the critical path — realistically **10–14 weeks to P0+P1 complete** plus stabilization. If 2.9 code freeze is nearer, cut P2 first, then CC-4/CC-5. + +## Guardrails — deliberate non-merges + +These pairs are tempting to consolidate and must stay separate: + +- **EP-1 (0600 fix) with anything**: live credential exposure on today's path; standalone, cherry-pickable, surgical revert. +- **EX-3 (in_process backend) + EX-4 (ScriptRunner)**: ScriptRunner is the most-used public entry point (golden exported-config tests, ~22 recipe conversions); a backend parity revert must not drag the public parameter surface. +- **Anything into CC-2 (swarm refactor)**: highest-risk PR in the program; must remain a maximally isolated revert unit. +- **F3-3 (budgets) + F3-4 (facade)**: same module, but F3-4 is on the critical path every track waits on; F3-3 serves only CC-1. +- **F3-2 (receiver-confirm wire change) + F3-5 (linger policy)**: the version-skew change wants early landing and a surgical revert; don't weld it to exit-timing policy. +- **EP-3 + AT-2**: same CJ-side HELLO/session machinery, but they straddle the P0/P2 boundary — AT-2 consumes EP-3's module instead. +- **EP-2 (runner) + EP-5 (CP reaping)**: different processes and blast radii; EP-5's false-positive-reap risk needs an independent revert path. +- **CT-7 warnings + default flip**: different ship dates by design (2.9 vs 2.10). + +## Cross-cutting risks and constraints (from code scouting) + +- **CI**: GitHub premerge is ubuntu-only, CPU-only, xdist — all acceptance tests must be CPU-feasible and port-dynamic; integration tests run only via Blossom (`/build`, fast/ premerge, slow/ nightly); fork PRs get no integration signal, so protocol contracts need cheap unit-level duplicates. torchrun tests: gloo only; multi-GPU/nccl validation is manual. +- **No Windows CI**: process-group termination Windows path ships platform-guarded with skipped tests — flag as a release-note caveat. +- **Version-skew interop** (F3-2): capability-flag gating with explicit old-producer/new-receiver and new-producer/old-receiver tests; confirm sends fire-and-forget. +- **Behavior parity**: in_process consolidation and ScriptRunner wiring have golden-config tests (exported job JSON byte-compare) to protect the most-used public entry point. +- **0600 writer is single-sourced**: EP-1 owns the hardened writer; TE-2 and attach bootstrap tests consume it rather than re-implementing permission handling. +- **Pass-through registration ownership** moves from ClientAPILauncherExecutor.initialize() into the new backend — coordinate EP-4 with CC-1 to avoid double registration during migration. +- **Trainer exit hang**: FlareAgent's atexit/main-thread-join watcher for non-daemon F3 threads solves a real problem (scripts that never call flare.shutdown()); TE-3/TE-5 must carry an equivalent or trainers hang at exit. diff --git a/nvflare/fuel/f3/streaming/download_service.py b/nvflare/fuel/f3/streaming/download_service.py index 5c5ab7574b..38756016cb 100644 --- a/nvflare/fuel/f3/streaming/download_service.py +++ b/nvflare/fuel/f3/streaming/download_service.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import functools import threading import time import uuid @@ -23,6 +24,14 @@ from nvflare.fuel.f3.cellnet.defs import MessageHeaderKey, ReturnCode from nvflare.fuel.f3.cellnet.utils import make_reply, new_cell_message from nvflare.fuel.f3.message import Message +from nvflare.fuel.f3.streaming.transfer_outcome import ( # noqa: F401 (re-exported legacy names) + DownloadStatus, + RefOutcome, + TransactionDoneStatus, + TransferOutcome, + compute_transfer_outcome, + terminal_state_for_done_status, +) from nvflare.fuel.f3.streaming.transfer_progress import TransferProgressState from nvflare.fuel.utils.log_utils import get_obj_logger from nvflare.security.logging import secure_format_exception @@ -191,21 +200,32 @@ def mark_active(self): self.tx.mark_active() def obj_downloaded(self, to_receiver: str, status: str): - if to_receiver in self.receiver_statuses: - return + # Status recording is guarded so terminal-outcome snapshots taken on the + # monitor thread never observe a half-updated map; user callbacks run + # outside the lock. + with self._progress_lock: + if to_receiver in self.receiver_statuses: + return + + self.receiver_statuses[to_receiver] = status + self.num_receivers_done = len(self.receiver_statuses) - self.receiver_statuses[to_receiver] = status - self.num_receivers_done = len(self.receiver_statuses) + assert isinstance(self.tx, _Transaction) + all_done = 0 < self.tx.num_receivers <= self.num_receivers_done and not self._downloaded_to_all_called + if all_done: + self._downloaded_to_all_called = True assert isinstance(self.obj, Downloadable) self.obj.downloaded_to_one(to_receiver, status) - assert isinstance(self.tx, _Transaction) - if 0 < self.tx.num_receivers <= self.num_receivers_done and not self._downloaded_to_all_called: + if all_done: # this object is done for all receivers - self._downloaded_to_all_called = True self.obj.downloaded_to_all() + def snapshot_receiver_statuses(self) -> dict: + with self._progress_lock: + return dict(self.receiver_statuses) + def emit_progress( self, *, @@ -334,19 +354,17 @@ class ProduceRC: EOF = "eof" -class DownloadStatus: - """Constants for object download status.""" +def _invoke_cb_safely(logger, what: str, cb, *args, **kwargs): + """Invoke a user callback without letting its exception escape. - SUCCESS = "success" - FAILED = "failed" - - -class TransactionDoneStatus: - """Constants for transaction completion status.""" - - FINISHED = "finished" - TIMEOUT = "timeout" - DELETED = "deleted" + Termination callbacks run on the transaction monitor thread; a propagating + exception would kill that thread and stop all future transactions from + finishing or expiring, and would skip outcome recording and source release. + """ + try: + cb(*args, **kwargs) + except Exception as ex: + logger.warning(f"{what} failed: {secure_format_exception(ex)}") class _FinishedRef: @@ -370,13 +388,18 @@ def __init__( cb_kwargs=None, progress_cb: Optional[Callable] = None, progress_interval: float = 30.0, + outcome_cb: Optional[Callable] = None, ): """Constructor of the transaction object. Args: timeout: amount of time since last activity - num_receivers: number of receivers. 0 means unlimited. + num_receivers: number of receivers. 0 means unknown/unbounded: such a + transaction is never certified finished (is_finished() returns False) — + it terminates via timeout or deletion, and its aggregate outcome can + never be COMPLETED (all-receivers-success cannot be certified). tx_id: if provided, use it; otherwise create one + outcome_cb: called with the aggregate TransferOutcome after transaction_done_cb fires """ if tx_id: self.tid = tx_id @@ -390,6 +413,7 @@ def __init__( self._stats_lock = threading.Lock() self.transaction_done_cb = transaction_done_cb self.cb_kwargs = cb_kwargs or {} + self.outcome_cb = outcome_cb self.progress_cb = progress_cb if progress_interval < 0: raise ValueError(f"progress_interval must be non-negative, got {progress_interval}") @@ -459,9 +483,33 @@ def is_finished(self): return False return True - def transaction_done(self, status: str): - """Called when the transaction is finished.""" + def transaction_done(self, status: str, on_outcome=None) -> TransferOutcome: + """Called when the transaction is finished. + + Returns the aggregate TransferOutcome (see transfer_outcome.py): COMPLETED only + when every expected receiver succeeded — TransactionDoneStatus.FINISHED alone + does not certify that. The existing transaction_done_cb contract is unchanged + except that callback exceptions no longer propagate (they would kill the + monitor thread and skip source release); outcome_cb (if set) fires after it + with the computed outcome. on_outcome (used by DownloadService to record the + outcome) is invoked right after the outcome is computed — before the + potentially slow user callbacks — so pollers see the terminal outcome as soon + as the transaction stops being active. + """ refs = self.snapshot_refs() + + # Compute (and record via on_outcome) the aggregate outcome first, from + # locked per-receiver snapshots, before any user callback runs. + outcome = compute_transfer_outcome( + tx_id=self.tid, + done_status=status, + num_receivers=self.num_receivers, + refs=[RefOutcome(ref_id=ref.rid, receiver_statuses=ref.snapshot_receiver_statuses()) for ref in refs], + timestamp=time.time(), + ) + if on_outcome: + _invoke_cb_safely(self.logger, f"outcome recording for tx {self.tid}", on_outcome, outcome) + progress_state = self._progress_state_for_transaction_status(status) if progress_state: for ref in refs: @@ -485,10 +533,27 @@ def transaction_done(self, status: str): for ref in refs: obj = ref.obj assert isinstance(obj, Downloadable) - obj.transaction_done(self.tid, status) + _invoke_cb_safely( + self.logger, + f"transaction_done of {type(obj)} for tx {self.tid}", + obj.transaction_done, + self.tid, + status, + ) if self.transaction_done_cb: - self.transaction_done_cb(self.tid, status, base_objs, **self.cb_kwargs) + _invoke_cb_safely( + self.logger, + f"transaction done callback for tx {self.tid}", + self.transaction_done_cb, + self.tid, + status, + base_objs, + **self.cb_kwargs, + ) + + if self.outcome_cb: + _invoke_cb_safely(self.logger, f"transfer outcome callback for tx {self.tid}", self.outcome_cb, outcome) # Release source objects after the callback so the callback can still # reference them. This drops the last infrastructure reference to @@ -496,6 +561,8 @@ def transaction_done(self, status: str): for ref in refs: ref.obj.release() + return outcome + def emit_progress_event(self, event: dict): if not self.progress_cb: return @@ -510,19 +577,14 @@ def emit_progress_event(self, event: dict): @staticmethod def _progress_state_for_transaction_status(status: str) -> Optional[str]: - if status == TransactionDoneStatus.TIMEOUT: - return TransferProgressState.FAILED - if status == TransactionDoneStatus.DELETED: - return TransferProgressState.ABORTED - if status == TransactionDoneStatus.FINISHED: - return TransferProgressState.COMPLETED - return None + return terminal_state_for_done_status(status) class TransactionInfo: """This structure contains public info of a transaction: timeout value of the transaction; - number of receivers that objects in the transaction will be downloaded to. 0 means unknown. + number of receivers that objects in the transaction will be downloaded to. 0 means unknown/unbounded + (the transaction is never certified finished and terminates via timeout or deletion); objects that are added to the transaction. """ @@ -541,6 +603,17 @@ class DownloadService: # transaction has been cleaned up without turning a completed transfer into a fatal missing-ref error. _finished_refs = {} FINISHED_REFS_TTL = 1800.0 + # Terminal outcomes of finished/expired/deleted transactions, kept for a bounded + # time so producers can query the aggregate result after termination. Guarded by + # its own lock so outcome polling never contends with the chunk-serving _tx_lock. + _tx_outcomes = {} + # Current live incarnation per tx_id (registered by new_transaction), so a + # transaction that terminates concurrently with a same-id retry cannot record + # its outcome over the new incarnation. Guarded by _outcome_lock. + _tx_incarnations = {} + _outcome_lock = threading.Lock() + _accept_outcomes = True + TX_OUTCOME_TTL = 1800.0 _logger = None _tx_monitor = None _tx_lock = threading.Lock() @@ -556,6 +629,9 @@ def _initialize(cls, cell: Cell): cls._tx_monitor = threading.Thread(target=cls._monitor_tx, daemon=True) cls._tx_monitor.start() + # re-enable outcome recording after a prior shutdown() + cls._accept_outcomes = True + initialized = cls._initialized_cells.get(cell) if not initialized: # register CBs @@ -576,6 +652,7 @@ def new_transaction( transaction_done_cb=None, progress_cb: Optional[Callable] = None, progress_interval: float = 30.0, + outcome_cb: Optional[Callable] = None, **cb_kwargs, ): cls._initialize(cell) @@ -587,9 +664,16 @@ def new_transaction( cb_kwargs, progress_cb=progress_cb, progress_interval=progress_interval, + outcome_cb=outcome_cb, ) with cls._tx_lock: cls._tx_table[tx.tid] = tx + with cls._outcome_lock: + # a reused explicit tx_id must not surface the previous incarnation's + # outcome: purge any recorded outcome and register this incarnation as + # current so a concurrently-terminating older incarnation cannot record + cls._tx_outcomes.pop(tx.tid, None) + cls._tx_incarnations[tx.tid] = tx return tx.tid @classmethod @@ -622,7 +706,7 @@ def delete_transaction(cls, transaction_id: str): cls._delete_tx(tx) if tx: - tx.transaction_done(TransactionDoneStatus.DELETED) + tx.transaction_done(TransactionDoneStatus.DELETED, on_outcome=functools.partial(cls._record_outcome, tx=tx)) @classmethod def shutdown(cls): @@ -637,6 +721,13 @@ def shutdown(cls): cls._delete_tx(tx) cls._finished_refs.clear() + with cls._outcome_lock: + # stop recording (a concurrent monitor iteration may be mid-termination) + # and drop recorded outcomes; recording re-enables on next _initialize + cls._accept_outcomes = False + cls._tx_outcomes.clear() + cls._tx_incarnations.clear() + with cls._init_lock: # Shutdown resets callback-registration state even when a cell is still # strongly held, so a later isolated service setup registers callbacks again. @@ -654,10 +745,48 @@ def _delete_tx(cls, tx: _Transaction, tombstone_finished_refs: bool = False): for r in tx.snapshot_refs(): cls._ref_table.pop(r.rid, None) if tombstone_finished_refs: - cls._finished_refs[r.rid] = _FinishedRef(dict(r.receiver_statuses), now) + cls._finished_refs[r.rid] = _FinishedRef(r.snapshot_receiver_statuses(), now) else: cls._finished_refs.pop(r.rid, None) + @classmethod + def _record_outcome(cls, outcome: TransferOutcome, tx: Optional[_Transaction] = None): + with cls._outcome_lock: + current = cls._tx_incarnations.get(outcome.tx_id) + if tx is not None and current is not None and current is not tx: + # a newer incarnation of this tx_id (a retry) registered while this + # transaction was terminating; its outcome must not shadow the live one + return + if current is tx: + cls._tx_incarnations.pop(outcome.tx_id, None) + if not cls._accept_outcomes: + return + cls._tx_outcomes[outcome.tx_id] = outcome + + @classmethod + def get_transaction_outcome(cls, transaction_id: str) -> Optional[TransferOutcome]: + """Get the aggregate terminal outcome of a terminated transaction. + + Returns None if the transaction is unknown, still active, or its outcome + record has expired (TX_OUTCOME_TTL). + """ + with cls._outcome_lock: + outcome = cls._tx_outcomes.get(transaction_id) + if outcome and outcome.expired(time.time(), cls.TX_OUTCOME_TTL): + cls._tx_outcomes.pop(transaction_id, None) + return None + return outcome + + @classmethod + def _expire_outcomes(cls, now: float): + with cls._outcome_lock: + # full scan: concurrent recorders (monitor + delete_transaction) can insert + # slightly out of timestamp order, so an early-break is not safe; the scan + # is one float comparison per record + expired = [tid for tid, outcome in cls._tx_outcomes.items() if outcome.expired(now, cls.TX_OUTCOME_TTL)] + for tid in expired: + cls._tx_outcomes.pop(tid, None) + @classmethod def _expire_finished_refs(cls, now: float): if not cls._finished_refs: @@ -803,11 +932,17 @@ def _monitor_tx(cls): cls._expire_finished_refs(now) + cls._expire_outcomes(now) + for tx in expired_tx: - tx.transaction_done(TransactionDoneStatus.TIMEOUT) + tx.transaction_done( + TransactionDoneStatus.TIMEOUT, on_outcome=functools.partial(cls._record_outcome, tx=tx) + ) for tx in finished_tx: - tx.transaction_done(TransactionDoneStatus.FINISHED) + tx.transaction_done( + TransactionDoneStatus.FINISHED, on_outcome=functools.partial(cls._record_outcome, tx=tx) + ) time.sleep(5.0) diff --git a/nvflare/fuel/f3/streaming/obj_downloader.py b/nvflare/fuel/f3/streaming/obj_downloader.py index 52dfb8b199..7fdb7ad8ca 100644 --- a/nvflare/fuel/f3/streaming/obj_downloader.py +++ b/nvflare/fuel/f3/streaming/obj_downloader.py @@ -27,6 +27,7 @@ def __init__( transaction_done_cb=None, progress_cb=None, progress_interval: float = 30.0, + outcome_cb=None, **cb_kwargs, ): """Constructor of ObjectDownloader. @@ -40,6 +41,9 @@ def __init__( transaction_done_cb: the callback to be called when the transaction is done. progress_cb: optional callback for source-side per-ref/per-receiver progress. progress_interval: minimum seconds between active progress callback events for advancing counters. + outcome_cb: optional callback called with the aggregate TransferOutcome after transaction_done_cb. + Unlike the transaction done status, the outcome distinguishes all-receivers-success (COMPLETED) + from termination with a receiver failure (FAILED). Signature: outcome_cb(outcome: TransferOutcome). **cb_kwargs: kwargs to be passed to transaction_done_cb. Notes: the CB signature is: @@ -62,6 +66,7 @@ def __init__( transaction_done_cb=transaction_done_cb, progress_cb=progress_cb, progress_interval=progress_interval, + outcome_cb=outcome_cb, **cb_kwargs, ) diff --git a/nvflare/fuel/f3/streaming/transfer_outcome.py b/nvflare/fuel/f3/streaming/transfer_outcome.py new file mode 100644 index 0000000000..45ed3736ba --- /dev/null +++ b/nvflare/fuel/f3/streaming/transfer_outcome.py @@ -0,0 +1,203 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Normalized aggregate terminal outcome for DownloadService transactions. + +TransactionDoneStatus.FINISHED only means a transaction reached its receiver count: +a receiver that FAILED still counts toward num_receivers, and the transaction_done_cb +carries no per-receiver outcomes. The Client API payload lifecycle +(docs/design/client_api_execution_modes.md, "Terminal transfer outcome") requires the +distinction between "every expected receiver succeeded" and "the transaction merely +terminated". This module provides that distinction additively: TransactionDoneStatus +values and the transaction_done_cb contract are unchanged. + +Receiver truth wins over known termination mechanics: a transaction whose expected +receivers all succeeded resolves COMPLETED even if it was terminated by routine +cleanup (delete_transaction) or a late timeout. Everything else fails closed — +including a FINISHED transaction with no objects (a mid-assembly race must not +certify success) and any unknown/future termination status (validated before +receiver truth is considered). + +Outcome status values reuse the TransferProgressState terminal vocabulary +(completed / failed / aborted) rather than introducing another status set. + +Known limits, resolved by later PRs of the same design (see +docs/design/client_api_execution_modes_plan.md): +- per-receiver statuses are producer-served (recorded when produce() returns EOF), + not receiver-confirmed — a receiver-side finalization failure after the last chunk + is not visible here until receiver-confirmed completion (plan PR F3-2) lands; +- num_receivers is a count without receiver identity — expected-receiver identity + checks arrive with per-receiver budgets (plan PR F3-3); +- the outcome covers the refs present at termination; adding objects to a + transaction after receivers already finished the earlier ones is not supported. +""" + +import time +from dataclasses import dataclass +from typing import Dict, List, Optional + +from nvflare.fuel.f3.streaming.transfer_progress import TransferProgressState + + +class DownloadStatus: + """Constants for object download status.""" + + SUCCESS = "success" + FAILED = "failed" + + +class TransactionDoneStatus: + """Constants for transaction completion status.""" + + FINISHED = "finished" + TIMEOUT = "timeout" + DELETED = "deleted" + + +def terminal_state_for_done_status(done_status: str) -> Optional[str]: + """Map a TransactionDoneStatus value to the TransferProgressState terminal vocabulary.""" + if done_status == TransactionDoneStatus.FINISHED: + return TransferProgressState.COMPLETED + if done_status == TransactionDoneStatus.TIMEOUT: + return TransferProgressState.FAILED + if done_status == TransactionDoneStatus.DELETED: + return TransferProgressState.ABORTED + return None + + +class TransferOutcomeReason: + """Constants explaining how a TransferOutcome status was determined.""" + + ALL_RECEIVERS_SUCCEEDED = "all_receivers_succeeded" + RECEIVER_FAILED = "receiver_failed" + NO_OBJECTS = "no_objects" + TIMEOUT = "timeout" + DELETED = "deleted" + UNKNOWN_RECEIVER_COUNT = "unknown_receiver_count" + UNKNOWN_DONE_STATUS = "unknown_done_status" + + +@dataclass(frozen=True) +class RefOutcome: + """Per-object terminal outcome. + + receiver_statuses maps receiver FQCN to a DownloadStatus value (success / failed) + as recorded by the producer side. Treat the dict as read-only: the same instance + is shared with every consumer of the outcome. + """ + + ref_id: str + receiver_statuses: Dict[str, str] + + +@dataclass(frozen=True) +class TransferOutcome: + """Aggregate terminal outcome of one download transaction. + + status is a TransferProgressState terminal value: COMPLETED only when every + expected receiver of every ref succeeded; FAILED on any receiver failure, + missing receiver, no objects, timeout, or unknown receiver count; ABORTED on + explicit deletion before full success. done_status carries the raw + TransactionDoneStatus for callers that need the untranslated termination cause. + """ + + tx_id: str + status: str # a TransferProgressState terminal value + reason: str # a TransferOutcomeReason value + done_status: str # the raw TransactionDoneStatus value + num_receivers: int + refs: List[RefOutcome] + timestamp: float + + @property + def completed(self) -> bool: + return self.status == TransferProgressState.COMPLETED + + def expired(self, now: float, ttl: float) -> bool: + return now - self.timestamp > ttl + + +def _all_receivers_succeeded(num_receivers: int, refs: List[RefOutcome]) -> bool: + if num_receivers <= 0 or not refs: + return False + for r in refs: + if len(r.receiver_statuses) < num_receivers: + return False + if any(s != DownloadStatus.SUCCESS for s in r.receiver_statuses.values()): + return False + return True + + +_KNOWN_DONE_STATUSES = ( + TransactionDoneStatus.FINISHED, + TransactionDoneStatus.TIMEOUT, + TransactionDoneStatus.DELETED, +) + + +def compute_transfer_outcome( + tx_id: str, + done_status: str, + num_receivers: int, + refs: List[RefOutcome], + timestamp: Optional[float] = None, +) -> TransferOutcome: + """Compute the aggregate terminal outcome for a terminated transaction. + + The termination status is validated first: an unknown/future done_status fails + closed even if every receiver succeeded. For known statuses, receiver truth + wins: if every expected receiver of every ref succeeded, the outcome is + COMPLETED regardless of how the transaction terminated (FINISHED, or routine + cleanup via DELETED, or a late TIMEOUT). Otherwise the outcome fails closed + based on the termination cause. + + Args: + tx_id: ID of the terminated transaction. + done_status: the TransactionDoneStatus value the transaction terminated with. + num_receivers: the transaction's expected receiver count (0 means unknown). + refs: per-ref receiver statuses snapshotted at termination. + timestamp: termination time; defaults to now. + + Returns: a TransferOutcome. + """ + if timestamp is None: + timestamp = time.time() + + if done_status not in _KNOWN_DONE_STATUSES: + status, reason = TransferProgressState.FAILED, TransferOutcomeReason.UNKNOWN_DONE_STATUS + elif _all_receivers_succeeded(num_receivers, refs): + status, reason = TransferProgressState.COMPLETED, TransferOutcomeReason.ALL_RECEIVERS_SUCCEEDED + elif done_status == TransactionDoneStatus.DELETED: + status, reason = TransferProgressState.ABORTED, TransferOutcomeReason.DELETED + elif done_status == TransactionDoneStatus.TIMEOUT: + status, reason = TransferProgressState.FAILED, TransferOutcomeReason.TIMEOUT + else: + # FINISHED without full receiver success + if num_receivers <= 0: + # unknown receiver count: all-receivers-success can never be certified + status, reason = TransferProgressState.FAILED, TransferOutcomeReason.UNKNOWN_RECEIVER_COUNT + elif not refs: + # a FINISHED transaction with no objects must not certify success + status, reason = TransferProgressState.FAILED, TransferOutcomeReason.NO_OBJECTS + else: + status, reason = TransferProgressState.FAILED, TransferOutcomeReason.RECEIVER_FAILED + + return TransferOutcome( + tx_id=tx_id, + status=status, + reason=reason, + done_status=done_status, + num_receivers=num_receivers, + refs=refs, + timestamp=timestamp, + ) diff --git a/tests/unit_test/fuel/f3/streaming/download_service_test.py b/tests/unit_test/fuel/f3/streaming/download_service_test.py index d0a573db48..aeba5f04ff 100644 --- a/tests/unit_test/fuel/f3/streaming/download_service_test.py +++ b/tests/unit_test/fuel/f3/streaming/download_service_test.py @@ -13,8 +13,7 @@ # limitations under the License. import threading -import weakref -from typing import Any, Tuple +from typing import Any from unittest.mock import Mock, patch import pytest @@ -24,57 +23,22 @@ from nvflare.fuel.f3.cellnet.utils import new_cell_message from nvflare.fuel.f3.streaming.download_service import ( Consumer, - Downloadable, DownloadService, DownloadStatus, ProduceRC, TransactionDoneStatus, ) from nvflare.fuel.utils.network_utils import get_open_ports +from tests.unit_test.fuel.f3.streaming.download_test_utils import ( + MockDownloadable, + make_isolated_download_service, + run_monitor_once, +) - -class MockDownloadable(Downloadable): - """Mock downloadable for testing.""" - - def __init__(self, data_chunks: list, fail_on_chunk: int = -1): - super().__init__(data_chunks) - self.data_chunks = data_chunks - self.fail_on_chunk = fail_on_chunk - self.current_chunk = 0 - self.downloaded_to_one_calls = [] - self.downloaded_to_all_called = False - self.downloaded_to_all_call_count = 0 - self.transaction_done_calls = [] - self.tx_id = None - self.ref_id = None - - def set_transaction(self, tx_id: str, ref_id: str): - self.tx_id = tx_id - self.ref_id = ref_id - - def produce(self, state: dict, requester: str) -> Tuple[str, Any, dict]: - if not state: - chunk_idx = 0 - else: - chunk_idx = state.get("chunk_idx", 0) - - if self.fail_on_chunk >= 0 and chunk_idx == self.fail_on_chunk: - return ProduceRC.ERROR, None, {} - - if chunk_idx >= len(self.data_chunks): - return ProduceRC.EOF, None, {} - - return ProduceRC.OK, self.data_chunks[chunk_idx], {"chunk_idx": chunk_idx + 1} - - def downloaded_to_one(self, to_receiver: str, status: str): - self.downloaded_to_one_calls.append((to_receiver, status)) - - def downloaded_to_all(self): - self.downloaded_to_all_called = True - self.downloaded_to_all_call_count += 1 - - def transaction_done(self, transaction_id: str, status: str): - self.transaction_done_calls.append((transaction_id, status)) +# local aliases: the helpers moved to download_test_utils so isolated-service +# state stays defined in one place +_make_isolated_download_service = make_isolated_download_service +_run_monitor_once = run_monitor_once class MockConsumer(Consumer): @@ -103,18 +67,6 @@ def download_failed(self, ref_id: str, reason: str): self.failure_reason = reason -def _make_isolated_download_service(): - class IsolatedDownloadService(DownloadService): - _tx_table = {} - _ref_table = {} - _finished_refs = {} - _logger = Mock() - _tx_lock = threading.Lock() - _initialized_cells = weakref.WeakKeyDictionary() - - return IsolatedDownloadService - - def _make_download_request(ref_id: str, requester: str, state: dict = None): payload = {"ref_id": ref_id} if state is not None: @@ -122,34 +74,6 @@ def _make_download_request(ref_id: str, requester: str, state: dict = None): return new_cell_message(headers={MessageHeaderKey.ORIGIN: requester}, payload=payload) -def _run_monitor_once(service_cls, now): - from nvflare.fuel.f3.streaming import download_service as download_service_module - - class MonitorIterationDone(Exception): - pass - - monitor_thread = threading.current_thread() - real_time = download_service_module.time.time - real_sleep = download_service_module.time.sleep - - def test_thread_time(): - if threading.current_thread() is monitor_thread: - return now - return real_time() - - def test_thread_sleep(seconds): - if threading.current_thread() is monitor_thread: - raise MonitorIterationDone - real_sleep(seconds) - - with ( - patch.object(download_service_module.time, "time", side_effect=test_thread_time), - patch.object(download_service_module.time, "sleep", side_effect=test_thread_sleep), - ): - with pytest.raises(MonitorIterationDone): - service_cls._monitor_tx() - - class TestDownloadService: """Test suite for DownloadService.""" diff --git a/tests/unit_test/fuel/f3/streaming/download_test_utils.py b/tests/unit_test/fuel/f3/streaming/download_test_utils.py new file mode 100644 index 0000000000..1e218027da --- /dev/null +++ b/tests/unit_test/fuel/f3/streaming/download_test_utils.py @@ -0,0 +1,121 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Shared DownloadService test helpers. + +Used by download_service_test.py and transfer_outcome_test.py so the isolated +service subclass stays in one place: every class-level table DownloadService +grows must be overridden here, or an "isolated" test subclass silently shares +production state. +""" + +import threading +import weakref +from typing import Any, Tuple +from unittest.mock import Mock, patch + +import pytest + +from nvflare.fuel.f3.streaming.download_service import Downloadable, DownloadService, ProduceRC + + +class MockDownloadable(Downloadable): + """Mock downloadable for testing.""" + + def __init__(self, data_chunks: list, fail_on_chunk: int = -1): + super().__init__(data_chunks) + self.data_chunks = data_chunks + self.fail_on_chunk = fail_on_chunk + self.current_chunk = 0 + self.downloaded_to_one_calls = [] + self.downloaded_to_all_called = False + self.downloaded_to_all_call_count = 0 + self.transaction_done_calls = [] + self.released = False + self.tx_id = None + self.ref_id = None + + def set_transaction(self, tx_id: str, ref_id: str): + self.tx_id = tx_id + self.ref_id = ref_id + + def produce(self, state: dict, requester: str) -> Tuple[str, Any, dict]: + if not state: + chunk_idx = 0 + else: + chunk_idx = state.get("chunk_idx", 0) + + if self.fail_on_chunk >= 0 and chunk_idx == self.fail_on_chunk: + return ProduceRC.ERROR, None, {} + + if chunk_idx >= len(self.data_chunks): + return ProduceRC.EOF, None, {} + + return ProduceRC.OK, self.data_chunks[chunk_idx], {"chunk_idx": chunk_idx + 1} + + def downloaded_to_one(self, to_receiver: str, status: str): + self.downloaded_to_one_calls.append((to_receiver, status)) + + def downloaded_to_all(self): + self.downloaded_to_all_called = True + self.downloaded_to_all_call_count += 1 + + def transaction_done(self, transaction_id: str, status: str): + self.transaction_done_calls.append((transaction_id, status)) + + def release(self): + self.released = True + + +def make_isolated_download_service(): + class IsolatedDownloadService(DownloadService): + _tx_table = {} + _ref_table = {} + _finished_refs = {} + _tx_outcomes = {} + _tx_incarnations = {} + _outcome_lock = threading.Lock() + _accept_outcomes = True + _logger = Mock() + _tx_lock = threading.Lock() + _initialized_cells = weakref.WeakKeyDictionary() + + return IsolatedDownloadService + + +def run_monitor_once(service_cls, now): + from nvflare.fuel.f3.streaming import download_service as download_service_module + + class MonitorIterationDone(Exception): + pass + + monitor_thread = threading.current_thread() + real_time = download_service_module.time.time + real_sleep = download_service_module.time.sleep + + def test_thread_time(): + if threading.current_thread() is monitor_thread: + return now + return real_time() + + def test_thread_sleep(seconds): + if threading.current_thread() is monitor_thread: + raise MonitorIterationDone + real_sleep(seconds) + + with ( + patch.object(download_service_module.time, "time", side_effect=test_thread_time), + patch.object(download_service_module.time, "sleep", side_effect=test_thread_sleep), + ): + with pytest.raises(MonitorIterationDone): + service_cls._monitor_tx() diff --git a/tests/unit_test/fuel/f3/streaming/transfer_outcome_test.py b/tests/unit_test/fuel/f3/streaming/transfer_outcome_test.py new file mode 100644 index 0000000000..49fc9d00ea --- /dev/null +++ b/tests/unit_test/fuel/f3/streaming/transfer_outcome_test.py @@ -0,0 +1,367 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import dataclasses +import time + +import pytest + +from nvflare.fuel.f3.streaming.download_service import DownloadStatus, TransactionDoneStatus, _Transaction +from nvflare.fuel.f3.streaming.transfer_outcome import ( + RefOutcome, + TransferOutcomeReason, + compute_transfer_outcome, + terminal_state_for_done_status, +) +from nvflare.fuel.f3.streaming.transfer_progress import TransferProgressState +from tests.unit_test.fuel.f3.streaming.download_test_utils import ( + MockDownloadable, + make_isolated_download_service, + run_monitor_once, +) + + +def _stub_obj(): + return MockDownloadable([b"chunk"]) + + +class TestComputeTransferOutcome: + """Aggregation rules: COMPLETED only when every expected receiver succeeded; receiver truth wins.""" + + def _refs(self, statuses_per_ref): + return [RefOutcome(ref_id=f"R{i}", receiver_statuses=s) for i, s in enumerate(statuses_per_ref)] + + def test_finished_all_success_is_completed(self): + refs = self._refs([{"r1": DownloadStatus.SUCCESS, "r2": DownloadStatus.SUCCESS}]) + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.FINISHED, 2, refs, 100.0) + assert outcome.status == TransferProgressState.COMPLETED + assert outcome.reason == TransferOutcomeReason.ALL_RECEIVERS_SUCCEEDED + assert outcome.completed + assert outcome.done_status == TransactionDoneStatus.FINISHED + + def test_finished_with_failed_receiver_is_failed(self): + # the distinction FINISHED alone cannot express: receiver-count reached, one receiver FAILED + refs = self._refs([{"r1": DownloadStatus.SUCCESS, "r2": DownloadStatus.FAILED}]) + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.FINISHED, 2, refs, 100.0) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.RECEIVER_FAILED + assert not outcome.completed + + def test_finished_with_missing_receiver_is_failed(self): + refs = self._refs([{"r1": DownloadStatus.SUCCESS}]) + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.FINISHED, 2, refs, 100.0) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.RECEIVER_FAILED + + def test_failed_receiver_on_any_ref_fails_the_transfer(self): + refs = self._refs( + [ + {"r1": DownloadStatus.SUCCESS}, + {"r1": DownloadStatus.FAILED}, + ] + ) + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.FINISHED, 1, refs, 100.0) + assert outcome.status == TransferProgressState.FAILED + + def test_finished_with_no_refs_fails_closed(self): + # a mid-assembly race (tx terminated before add_object) must not certify success + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.FINISHED, 2, [], 100.0) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.NO_OBJECTS + + def test_deleted_after_full_success_is_completed(self): + # routine cleanup via delete_transaction after all receivers succeeded: receiver truth wins + refs = self._refs([{"r1": DownloadStatus.SUCCESS}]) + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.DELETED, 1, refs, 100.0) + assert outcome.status == TransferProgressState.COMPLETED + assert outcome.reason == TransferOutcomeReason.ALL_RECEIVERS_SUCCEEDED + assert outcome.done_status == TransactionDoneStatus.DELETED + + def test_timeout_after_full_success_is_completed(self): + refs = self._refs([{"r1": DownloadStatus.SUCCESS}]) + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.TIMEOUT, 1, refs, 100.0) + assert outcome.status == TransferProgressState.COMPLETED + + def test_timeout_without_full_success_is_failed(self): + refs = self._refs([{"r1": DownloadStatus.SUCCESS}]) + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.TIMEOUT, 2, refs, 100.0) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.TIMEOUT + + def test_deleted_without_full_success_is_aborted(self): + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.DELETED, 1, [], 100.0) + assert outcome.status == TransferProgressState.ABORTED + assert outcome.reason == TransferOutcomeReason.DELETED + + def test_unknown_receiver_count_cannot_complete(self): + refs = self._refs([{"r1": DownloadStatus.SUCCESS}]) + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.FINISHED, 0, refs, 100.0) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.UNKNOWN_RECEIVER_COUNT + + def test_unknown_done_status_fails_closed(self): + outcome = compute_transfer_outcome("T1", "bogus", 1, [], 100.0) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.UNKNOWN_DONE_STATUS + assert outcome.done_status == "bogus" + + def test_unknown_done_status_with_successful_receivers_still_fails_closed(self): + # status validation precedes receiver truth: an unknown/future termination + # status must not certify success even when every receiver succeeded + refs = self._refs([{"r1": DownloadStatus.SUCCESS}]) + outcome = compute_transfer_outcome("T1", "future-status", 1, refs, 100.0) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.UNKNOWN_DONE_STATUS + + def test_outcome_is_frozen(self): + outcome = compute_transfer_outcome("T1", TransactionDoneStatus.DELETED, 1, [], 100.0) + with pytest.raises(dataclasses.FrozenInstanceError): + outcome.status = TransferProgressState.COMPLETED + + def test_terminal_state_mapping(self): + assert terminal_state_for_done_status(TransactionDoneStatus.FINISHED) == TransferProgressState.COMPLETED + assert terminal_state_for_done_status(TransactionDoneStatus.TIMEOUT) == TransferProgressState.FAILED + assert terminal_state_for_done_status(TransactionDoneStatus.DELETED) == TransferProgressState.ABORTED + assert terminal_state_for_done_status("bogus") is None + + +class TestTransactionOutcome: + """transaction_done() computes/records the outcome first and never lets callbacks break termination.""" + + def test_transaction_done_returns_outcome_with_receiver_map(self): + tx = _Transaction(timeout=10.0, num_receivers=1) + obj = _stub_obj() + ref = tx.add_object(obj) + ref.obj_downloaded("r1", DownloadStatus.SUCCESS) + + outcome = tx.transaction_done(TransactionDoneStatus.FINISHED) + + assert outcome.completed + assert outcome.tx_id == tx.tid + assert outcome.refs[0].receiver_statuses == {"r1": DownloadStatus.SUCCESS} + assert obj.released + + def test_on_outcome_fires_before_user_callbacks(self): + order = [] + tx = _Transaction( + timeout=10.0, + num_receivers=1, + transaction_done_cb=lambda *a, **kw: order.append("done_cb"), + outcome_cb=lambda outcome: order.append("outcome_cb"), + ) + obj = _stub_obj() + ref = tx.add_object(obj) + ref.obj_downloaded("r1", DownloadStatus.FAILED) + + tx.transaction_done(TransactionDoneStatus.FINISHED, on_outcome=lambda outcome: order.append("recorded")) + + assert order == ["recorded", "done_cb", "outcome_cb"] + + def test_raising_callbacks_do_not_break_recording_or_release(self): + # a raising transaction_done_cb must not skip outcome recording, source + # release, or (in production) kill the monitor thread + recorded = [] + + def bad_done_cb(*args, **kwargs): + raise RuntimeError("done_cb boom") + + def bad_outcome_cb(outcome): + raise RuntimeError("outcome_cb boom") + + tx = _Transaction(timeout=10.0, num_receivers=1, transaction_done_cb=bad_done_cb, outcome_cb=bad_outcome_cb) + obj = _stub_obj() + ref = tx.add_object(obj) + ref.obj_downloaded("r1", DownloadStatus.SUCCESS) + + outcome = tx.transaction_done(TransactionDoneStatus.FINISHED, on_outcome=recorded.append) + + assert outcome.completed + assert recorded and recorded[0] is outcome + assert obj.released + + def test_transaction_done_cb_contract_unchanged(self): + received = {} + + def done_cb(tx_id, status, base_objs, **cb_kwargs): + received.update(tx_id=tx_id, status=status, base_objs=base_objs, kwargs=cb_kwargs) + + tx = _Transaction(timeout=10.0, num_receivers=1, transaction_done_cb=done_cb, cb_kwargs={"k": "v"}) + obj = _stub_obj() + ref = tx.add_object(obj) + ref.obj_downloaded("r1", DownloadStatus.FAILED) + + tx.transaction_done(TransactionDoneStatus.FINISHED) + + # a receiver failure does not change what transaction_done_cb sees + assert received["tx_id"] == tx.tid + assert received["status"] == TransactionDoneStatus.FINISHED + assert received["base_objs"] == [[b"chunk"]] + assert received["kwargs"] == {"k": "v"} + + +class TestServiceOutcomeTable: + """DownloadService records and serves outcomes for terminated transactions.""" + + def _add_tx(self, service, num_receivers=1, timeout=10.0): + tx = _Transaction(timeout=timeout, num_receivers=num_receivers) + with service._tx_lock: + service._tx_table[tx.tid] = tx + obj = _stub_obj() + rid = service.add_object(tx.tid, obj) + with service._tx_lock: + ref = service._ref_table[rid] + return tx, ref, obj + + def test_monitor_records_completed_outcome(self): + service = make_isolated_download_service() + tx, ref, obj = self._add_tx(service, num_receivers=1) + ref.obj_downloaded("r1", DownloadStatus.SUCCESS) + + run_monitor_once(service, now=time.time()) + + outcome = service.get_transaction_outcome(tx.tid) + assert outcome is not None + assert outcome.completed + assert obj.released + + def test_monitor_records_failed_outcome_on_receiver_failure(self): + service = make_isolated_download_service() + tx, ref, obj = self._add_tx(service, num_receivers=1) + ref.obj_downloaded("r1", DownloadStatus.FAILED) + + run_monitor_once(service, now=time.time()) + + outcome = service.get_transaction_outcome(tx.tid) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.RECEIVER_FAILED + + def test_monitor_records_timeout_outcome(self): + service = make_isolated_download_service() + tx, ref, obj = self._add_tx(service, num_receivers=2, timeout=10.0) + now = time.time() + tx.last_active_time = now - 11.0 + + run_monitor_once(service, now=now) + + outcome = service.get_transaction_outcome(tx.tid) + assert outcome.status == TransferProgressState.FAILED + assert outcome.reason == TransferOutcomeReason.TIMEOUT + + def test_delete_before_success_records_aborted(self): + service = make_isolated_download_service() + tx, ref, obj = self._add_tx(service, num_receivers=1) + + service.delete_transaction(tx.tid) + + outcome = service.get_transaction_outcome(tx.tid) + assert outcome.status == TransferProgressState.ABORTED + assert outcome.reason == TransferOutcomeReason.DELETED + + def test_delete_after_success_records_completed(self): + # the routine producer pattern: all receivers succeeded, then cleanup via + # delete_transaction before the monitor tick — must not read as aborted + service = make_isolated_download_service() + tx, ref, obj = self._add_tx(service, num_receivers=1) + ref.obj_downloaded("r1", DownloadStatus.SUCCESS) + + service.delete_transaction(tx.tid) + + outcome = service.get_transaction_outcome(tx.tid) + assert outcome.completed + assert outcome.done_status == TransactionDoneStatus.DELETED + + def test_reused_tx_id_does_not_surface_stale_outcome(self): + # retry with the same explicit tx_id (design: tx_id = transfer_id, retries reuse it) + from unittest.mock import Mock + + service = make_isolated_download_service() + service._tx_monitor = Mock() # suppress real monitor thread start + cell = Mock() + try: + first = service.new_transaction(cell=cell, timeout=10.0, num_receivers=1, tx_id="TX-REUSE") + service.delete_transaction(first) + assert service.get_transaction_outcome("TX-REUSE") is not None + + second = service.new_transaction(cell=cell, timeout=10.0, num_receivers=1, tx_id="TX-REUSE") + assert second == "TX-REUSE" + # the stale terminal outcome of the previous incarnation is purged + assert service.get_transaction_outcome("TX-REUSE") is None + finally: + service.shutdown() + + def test_stale_incarnation_cannot_record_over_live_retry(self): + # the record-after-purge race: termination removes the old tx from _tx_table, + # a retry registers the same tx_id, THEN the old incarnation records its + # outcome — it must not shadow the live retry + from functools import partial + from unittest.mock import Mock + + service = make_isolated_download_service() + service._tx_monitor = Mock() # suppress real monitor thread start + cell = Mock() + try: + service.new_transaction(cell=cell, timeout=10.0, num_receivers=1, tx_id="TX-RACE") + with service._tx_lock: + old_tx = service._tx_table.pop("TX-RACE") # termination step 1, as the monitor does + + # the retry registers the same id before the old incarnation records + service.new_transaction(cell=cell, timeout=10.0, num_receivers=1, tx_id="TX-RACE") + + # the old incarnation now finishes terminating and tries to record + old_tx.transaction_done( + TransactionDoneStatus.DELETED, on_outcome=partial(service._record_outcome, tx=old_tx) + ) + + # the live retry is unaffected: no stale terminal outcome surfaces + assert service.get_transaction_outcome("TX-RACE") is None + finally: + service.shutdown() + + def test_unknown_transaction_has_no_outcome(self): + service = make_isolated_download_service() + assert service.get_transaction_outcome("no-such-tx") is None + + def test_outcome_expires_after_ttl(self): + service = make_isolated_download_service() + tx, ref, obj = self._add_tx(service, num_receivers=1) + ref.obj_downloaded("r1", DownloadStatus.SUCCESS) + now = time.time() + run_monitor_once(service, now=now) + assert service.get_transaction_outcome(tx.tid) is not None + + # a later monitor pass past the TTL removes the record + run_monitor_once(service, now=now + service.TX_OUTCOME_TTL + 1.0) + with service._outcome_lock: + assert tx.tid not in service._tx_outcomes + + def test_get_transaction_outcome_expires_lazily(self): + service = make_isolated_download_service() + tx, ref, obj = self._add_tx(service, num_receivers=1) + ref.obj_downloaded("r1", DownloadStatus.SUCCESS) + run_monitor_once(service, now=time.time() - service.TX_OUTCOME_TTL - 1.0) + + assert service.get_transaction_outcome(tx.tid) is None + + def test_shutdown_clears_outcomes_and_stops_recording(self): + service = make_isolated_download_service() + tx, ref, obj = self._add_tx(service, num_receivers=1) + service.delete_transaction(tx.tid) + assert service.get_transaction_outcome(tx.tid) is not None + + service.shutdown() + + assert service.get_transaction_outcome(tx.tid) is None + # a monitor iteration that was mid-termination during shutdown cannot repopulate + late = compute_transfer_outcome("T-LATE", TransactionDoneStatus.FINISHED, 1, [], time.time()) + service._record_outcome(late) + assert service.get_transaction_outcome("T-LATE") is None