diff --git a/docs/release_notes/flare_280.rst b/docs/release_notes/flare_280.rst index 67c79d9ce9..0b29bf8e09 100644 --- a/docs/release_notes/flare_280.rst +++ b/docs/release_notes/flare_280.rst @@ -400,6 +400,22 @@ Compatibility and Migration Notes - The deprecated FLAdminAPI surface has been removed. Use the FLARE API, Recipe environments, and ``nvflare`` CLI workflows for new automation. - HA/Overseer code has been removed from the 2.8 branch. +- CellPipe cell names now keep the runtime token and pipe mode in one + explicitly marked FQCN leaf segment (``site-1.cellpipe-_active``, + or ``.cellpipe-alias-__active`` behind a relay) so a + pipe cell's FQCN parent matches the cell it actually connects to and pipe + names can never be confused with other cell names. As part of this change, + CellPipe validates tokens at construction: tokens must be non-empty, may + not start with ``alias-``, may not contain ``.`` when the pipe connects to + the site's own CP or a relay, and may not contain ``_`` or ``.`` when + connected through a relay. Custom ``FlareAgentWithCellPipe`` agent ids that + violate these rules now fail fast with a ``ValueError`` instead of + producing unroutable cell names. +- Both ends of a CellPipe pair derive each other's cell names independently, + so a Client Job process and an external training process must run the same + NVFlare naming scheme. A training environment pinned to an older NVFlare + fails with "peer FQCN mismatch" when paired with a 2.8 CJ; align the + training environment's NVFlare version with the site's. See the :ref:`migration_guide` for additional API and configuration migration notes. diff --git a/nvflare/app_common/ccwf/client_ctl.py b/nvflare/app_common/ccwf/client_ctl.py index 9c8f34dddb..b8df6e5305 100644 --- a/nvflare/app_common/ccwf/client_ctl.py +++ b/nvflare/app_common/ccwf/client_ctl.py @@ -461,7 +461,10 @@ def _do_learn(self): try: self.do_learn_task(t.task_name, t.task_data, t.fl_ctx, t.abort_signal) except: - self.logger.log(f"exception from do_learn_task: {secure_format_traceback()}") + self.log_exception(t.fl_ctx, "exception from do_learn_task") + # report the failure to the server so the job ends with an + # error status instead of FINISHED:COMPLETED + self.update_status(action="do_learn_task", error=ReturnCode.EXECUTION_EXCEPTION) finally: # force garbage collection gc.collect() diff --git a/nvflare/fuel/f3/cellnet/core_cell.py b/nvflare/fuel/f3/cellnet/core_cell.py index 52fdb75396..a030efa47a 100644 --- a/nvflare/fuel/f3/cellnet/core_cell.py +++ b/nvflare/fuel/f3/cellnet/core_cell.py @@ -647,7 +647,7 @@ def _set_bb_for_client_child(self, parent_url: str, create_internal_listener: bo self._create_bb_external_connector() elif not parent_url and self.root_url: # A cell configured with only a root URL (e.g. a CellPipe cell named - # .. that joins the cellnet at the root) has no + # .cellpipe-_ that joins the cellnet at the root) has no # other way to connect, regardless of its generation. self._create_bb_external_connector() @@ -1178,10 +1178,13 @@ def _try_find_ep(self, target_fqcn: str, for_msg: Message) -> Union[None, Endpoi agent = self.agents.get(parent_fqcn) if agent: return agent.endpoint - # I'm not connected to my FQCN parent: cells with hierarchical - # names (e.g. CellPipe cells named ..) connect - # to an ancestor or to the root instead. Fall through to the - # generic resolution below. + # I'm not connected to my FQCN parent: some hierarchical cells + # connect to an ancestor or to the root instead (e.g. CellPipe + # cells named .cellpipe-_ that connect to + # the server root). This fall-through is load-bearing for such cells - see + # test_pipe_cell_reaches_peer_through_server_root in + # core_cell_routing_test.py. Fall through to the generic + # resolution below. self.logger.debug(f"{self.my_info.fqcn}: no connection to parent {parent_fqcn}") # not the same family, or no direct path within the family diff --git a/nvflare/fuel/f3/cellnet/fqcn.py b/nvflare/fuel/f3/cellnet/fqcn.py index 1312ba8c92..7abd7fce53 100644 --- a/nvflare/fuel/f3/cellnet/fqcn.py +++ b/nvflare/fuel/f3/cellnet/fqcn.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional, Tuple + from nvflare.fuel.common.fqn import FQN @@ -18,6 +20,75 @@ class FQCN(FQN): pass +# CellPipe cells use explicitly marked leaf segments so pipe names can never +# be confused with other cell names or with each other: +# - plain leaf "cellpipe-_" for pipes connected to the server +# root or to the site's own CP; +# - alias leaf "cellpipe-alias-__" for pipes connected +# through another cell (e.g. a relay). The alias maps the cell to the +# owning site for mTLS identity resolution and stream message +# authentication. Both directions of the alias grammar live here so they +# cannot drift apart. +# +# CellPipe cell-name schemes, in historical order: +# 1. flat (pre-2.7): the whole FQCN is "__", a root-level +# sibling of the site cell. +# 2. hierarchical (#4801, never released): "..". Replaced +# because the extra segments created unconnected FQCN parents that broke +# routing (NVBug 6371056). +# 3. topology (current): a single prefixed leaf segment under the FQCN of +# the cell the pipe actually connects to, +# ".cellpipe-_", or +# ".cellpipe-alias-__" when connected +# through another cell. +# Mixed-version notes: scheme-1 aliases are still accepted by identity +# resolution and stream auth (as whole-FQCN aliases via the bare grammar), +# but the two ends of one pipe pair must run the same scheme - each end +# derives the peer's name from its own code, so a CJ and a training +# subprocess on different schemes fail with "peer FQCN mismatch". +CELL_PIPE_LEAF_PREFIX = "cellpipe-" +CELL_PIPE_ALIAS_PREFIX = "cellpipe-alias-" +CELL_PIPE_ALIAS_MODES = ("active", "passive") + + +def make_cell_pipe_alias(owner: str, runtime_id: str, mode: str) -> str: + return f"{CELL_PIPE_ALIAS_PREFIX}{owner}_{runtime_id}_{mode}" + + +def parse_cell_pipe_alias(segment: str) -> Optional[Tuple[str, str, str]]: + """Parse a CellPipe alias leaf segment into (owner, runtime_id, mode). + + Two shapes are accepted: + - the current explicit form "cellpipe-alias-__"; + - the bare legacy form "__" used by pre-2.8 + flat CellPipe names, where the whole FQCN is the alias. Callers decide + where the bare form is acceptable; it is normally restricted to + single-segment FQCNs so an unmarked "_" leaf inside a + longer FQCN is never misread as an alias. + + In both shapes the runtime_id must be non-empty and contain no "." or + "_": parsing from the right makes the interpretation unambiguous, so + "site-a_x__active" can only belong to "site-a_x", never to "site-a" + with a runtime id of "x_". + + Returns None if the segment is not a valid alias. + """ + if segment.startswith(CELL_PIPE_ALIAS_PREFIX): + segment = segment[len(CELL_PIPE_ALIAS_PREFIX) :] + + head, sep, mode = segment.rpartition("_") + if not sep or mode not in CELL_PIPE_ALIAS_MODES: + return None + + # rpartition splits on the last "_", so runtime_id can never contain "_"; + # only the "." constraint needs an explicit check. + owner, sep, runtime_id = head.rpartition("_") + if not sep or not owner or not runtime_id or "." in runtime_id: + return None + + return owner, runtime_id, mode + + class FqcnInfo: def __init__(self, fqcn: str): self.fqcn = fqcn diff --git a/nvflare/fuel/f3/cellnet/identity.py b/nvflare/fuel/f3/cellnet/identity.py index b29abd7273..b19888b5f5 100644 --- a/nvflare/fuel/f3/cellnet/identity.py +++ b/nvflare/fuel/f3/cellnet/identity.py @@ -18,7 +18,7 @@ from cryptography.x509.oid import NameOID from nvflare.apis.fl_constant import ConnectionSecurity -from nvflare.fuel.f3.cellnet.fqcn import FQCN +from nvflare.fuel.f3.cellnet.fqcn import CELL_PIPE_ALIAS_PREFIX, CELL_PIPE_LEAF_PREFIX, FQCN, parse_cell_pipe_alias from nvflare.fuel.f3.drivers.driver_params import DriverParams from nvflare.fuel.f3.drivers.net_utils import SECURE_SCHEMES from nvflare.fuel.utils.admin_name_utils import is_valid_admin_client_name @@ -126,27 +126,13 @@ def _resolve_local_child_identity(self, fqcn: str) -> Optional[str]: @staticmethod def _get_cell_pipe_alias_owner(segment: str) -> Optional[str]: - # CellPipe cells from older NVFlare versions use sibling names like - # "site-1__active" but authenticate with the owning site's - # certificate. Current versions name these cells .., - # which resolves through the normal FQCN hierarchy; this parser is kept - # for backward compatibility with peers running older versions. Only the - # constrained form __(active|passive) with a non-empty - # runtime_id that contains no "." or "_" is treated as an alias: parsing - # from the right makes the interpretation unambiguous, so - # "site-a_x__active" can only belong to "site-a_x", never to - # "site-a" with a runtime id of "x_". - head, sep, mode = segment.rpartition("_") - if not sep or mode not in ("active", "passive"): - return None - - # rpartition splits on the last "_", so runtime_id can never contain "_"; - # only the "." constraint needs an explicit check. - owner, sep, runtime_id = head.rpartition("_") - if not sep or not owner or not runtime_id or "." in runtime_id: - return None - - return owner + # CellPipe cells connected through another cell use an alias leaf like + # "cellpipe-alias-site-1__active" but authenticate with + # the owning site's certificate. Older NVFlare versions used the bare + # sibling form "site-1__active" as the whole FQCN. See + # parse_cell_pipe_alias for the alias grammar. + parsed = parse_cell_pipe_alias(segment) + return parsed[0] if parsed else None def resolve(self, fqcn: str) -> Optional[str]: if not fqcn: @@ -167,13 +153,29 @@ def resolve(self, fqcn: str) -> Optional[str]: if identity: return identity - # This legacy-alias check intentionally precedes _resolve_local_child_identity: - # an old-format CellPipe alias cell may connect as a direct child of this - # local cell, but it authenticates with the owning site's certificate, not - # with a certificate named after the alias segment itself. - alias_owner = self._get_cell_pipe_alias_owner(parts[-1]) if parts else None - if alias_owner: - return self.resolve(alias_owner) + # The alias interpretation applies to the two shapes that carry an + # alias: the explicitly marked leaf "cellpipe-alias-..." at any depth, + # or a legacy (pre-2.8 flat naming) cell whose whole FQCN is the bare + # alias. An unmarked "_"-shaped leaf inside a longer FQCN + # is never an alias - its token may itself contain "_" (e.g. + # "site-1.cellpipe-ext_trainer_active"), and alias-parsing it would + # fabricate a wrong owner such as "ext". This check intentionally + # precedes _resolve_local_child_identity: an alias cell connected as a + # direct child of this local cell authenticates with the owning site's + # certificate, not with a certificate named after the alias segment. + leaf = parts[-1] + if leaf.startswith(CELL_PIPE_ALIAS_PREFIX) or len(parts) == 1: + alias_owner = self._get_cell_pipe_alias_owner(leaf) + if alias_owner: + return self.resolve(alias_owner) + + # A plain CellPipe leaf ("cellpipe-_") authenticates with + # the identity of the cell it is named under, so resolve its parent. + # This must also precede _resolve_local_child_identity: a CP resolving + # its own pipe child expects the site's identity, not one named after + # the leaf segment. + if len(parts) > 1 and leaf.startswith(CELL_PIPE_LEAF_PREFIX): + return self.resolve(FQCN.join(parts[:-1])) identity = self._resolve_local_child_identity(fqcn) if identity: diff --git a/nvflare/fuel/utils/pipe/cell_pipe.py b/nvflare/fuel/utils/pipe/cell_pipe.py index 3bf6ed8c01..530bc20f23 100644 --- a/nvflare/fuel/utils/pipe/cell_pipe.py +++ b/nvflare/fuel/utils/pipe/cell_pipe.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import queue import threading import time @@ -22,7 +23,7 @@ from nvflare.fuel.f3.cellnet.cell import Cell from nvflare.fuel.f3.cellnet.cell import Message as CellMessage from nvflare.fuel.f3.cellnet.defs import MessageHeaderKey, ReturnCode -from nvflare.fuel.f3.cellnet.fqcn import FQCN +from nvflare.fuel.f3.cellnet.fqcn import CELL_PIPE_LEAF_PREFIX, FQCN, make_cell_pipe_alias from nvflare.fuel.f3.cellnet.net_agent import NetAgent from nvflare.fuel.f3.cellnet.utils import make_reply from nvflare.fuel.f3.drivers.driver_params import DriverParams @@ -44,24 +45,81 @@ _HEADER_START_TIME = _PREFIX + "start" _HEADER_HB_SEQ = _PREFIX + "hb_seq" +_logger = logging.getLogger(__name__) + def _cell_fqcn(mode, site_name, token, parent_fqcn): # The FQCN of the cell must be unique in the whole cellnet. - # The cell is named .., scoped under the FQCN of the - # cell it connects to. The hierarchical form keeps the owning site as a - # leading segment, so mTLS identity resolution and message routing follow - # the normal FQCN rules without any alias parsing. + # The runtime token and pipe mode are kept in one leaf segment to avoid + # introducing an unconnected FQCN parent such as .. + # When connecting to the site's own CP or to a relay, the FQCN parent is + # the actually connected cell, so normal parent routing applies. When + # connecting to the server root (e.g. simulator, or pipes configured with + # a root url), the cell is named . while physically connected + # to the root: its FQCN parent is NOT connected, and routing relies + # on the fall-through in CoreCell._try_find_ep() that resolves the next + # leg through the root when the FQCN parent is absent. That fall-through + # is load-bearing for these cells (covered by + # test_pipe_cell_reaches_peer_through_server_root in + # core_cell_routing_test.py). # The two peer pipes on the same site share the same site_name and token, # but are differentiated by their modes. + if not token: + # The configured token (e.g. "{JOB_ID}") resolved to an empty string. + # An empty token cannot uniquely name the cell (all such pipes on the + # site would collide), so fail fast. A generated fallback is not an + # option: the two ends of a pipe pair derive each other's names + # independently, so any per-process unique value would break the + # pair's rendezvous. + raise ValueError("invalid CellPipe token: token must be a non-empty string") + if token.startswith("alias-"): + # a plain leaf "cellpipe-alias-..._" would collide with the + # relay alias namespace and could parse to a fabricated owner + raise ValueError(f"invalid CellPipe token '{token}': the 'alias-' prefix is reserved for alias cell names") + + cell_name = f"{CELL_PIPE_LEAF_PREFIX}{token}_{mode}" if parent_fqcn == FQCN.ROOT_SERVER: + # A "." in the token adds phantom FQCN segments, but root-connected + # cells route through the root fall-through regardless of depth, so + # dotted user-chosen tokens (e.g. agent ids) keep working as they did + # before the topology naming. prefix = site_name - elif FQCN.split(parent_fqcn)[-1] == site_name: + elif parent_fqcn and FQCN.split(parent_fqcn)[-1] == site_name: # connecting to the site's own CP: the site is already the last segment + if "." in token: + # a "." would put an unconnected FQCN parent between the CP and + # the cell, making the cell unreachable + raise ValueError( + f"invalid CellPipe token '{token}': '.' would split the cell name into extra FQCN segments" + ) + prefix = parent_fqcn + elif parent_fqcn: + # Connecting to another cell (e.g. a relay): use the alias leaf + # cellpipe-alias-__ so mTLS identity resolution and + # stream auth map the cell to the owning site (see + # parse_cell_pipe_alias). The token is the alias runtime id and must + # not contain "_" or ".", or the alias would parse to the wrong owner + # (or not parse at all) and the connection/messages would be rejected. + # Tokens are normally job ids (UUIDs), so this only affects custom + # tokens such as FlareAgentWithCellPipe agent ids. + if "_" in token or "." in token: + raise ValueError( + f"invalid CellPipe token '{token}': must not contain '_' or '.' " + f"when connected through another cell ({parent_fqcn})" + ) prefix = parent_fqcn + cell_name = make_cell_pipe_alias(site_name, token, mode) else: - # connecting to another cell (e.g. a relay): scope the name to the site - prefix = FQCN.join([parent_fqcn, site_name]) - return FQCN.join([prefix, token, mode]) + # Missing parent FQCN: keep the cell under the owning site so routing + # still has a topology-shaped parent instead of creating .. + # No in-tree caller hits this today; warn so a misconfiguration that + # produces a correct-looking name is still diagnosable. + _logger.warning( + f"CellPipe conn props have no parent FQCN; naming cell under site '{site_name}' " + f"and relying on root routing" + ) + prefix = site_name + return FQCN.join([prefix, cell_name]) def _to_cell_message(msg: Message, extra=None) -> CellMessage: diff --git a/nvflare/private/fed/authenticator.py b/nvflare/private/fed/authenticator.py index dcef96eaa2..b5cdde4e63 100644 --- a/nvflare/private/fed/authenticator.py +++ b/nvflare/private/fed/authenticator.py @@ -28,7 +28,7 @@ from nvflare.fuel.f3.cellnet.defs import IdentityChallengeKey, MessageHeaderKey from nvflare.fuel.f3.cellnet.defs import ReturnCode from nvflare.fuel.f3.cellnet.defs import ReturnCode as F3ReturnCode -from nvflare.fuel.f3.cellnet.fqcn import FQCN +from nvflare.fuel.f3.cellnet.fqcn import CELL_PIPE_ALIAS_PREFIX, FQCN, parse_cell_pipe_alias from nvflare.fuel.f3.message import Message from nvflare.fuel.f3.message import Message as CellMessage from nvflare.fuel.f3.streaming.stream_const import STREAM_CHANNEL @@ -322,25 +322,33 @@ def _origin_matches_fqcn(origin: str, fqcn: str, channel: Optional[str] = None) if origin == fqcn or FQCN.is_ancestor(fqcn, origin): return True - # CellPipe stream cells from older NVFlare versions use sibling names such - # as "site-1__active" and "site-1__passive", but their auth - # token is issued to the registered site FQCN ("site-1"). Current versions - # name these cells .., which the descendant check above - # already covers. Treat only the legacy stream aliases as the owning site; - # normal server-command origins remain bound to the exact registered - # FQCN/descendant relationship above. - if channel != STREAM_CHANNEL or not origin.startswith(f"{fqcn}_"): + # CellPipe stream cells can use an alias leaf such as + # "cellpipe-alias-site-1__active" when connected through another + # cell, but their auth token is issued to the owning site FQCN. Treat only stream aliases + # under the same FQCN parent as the owning site; normal server-command + # origins remain bound to the exact registered FQCN/descendant relationship. + if channel != STREAM_CHANNEL: return False - runtime_id, sep, mode = origin[len(fqcn) + 1 :].rpartition("_") - if not sep or mode not in {"active", "passive"}: + origin_parent = FQCN.get_parent(origin) + fqcn_parent = FQCN.get_parent(fqcn) + if origin_parent != fqcn_parent: return False - # Deployed CellPipe aliases use the job UUID as the runtime id. Do not allow - # FQCN separators or alias separators inside this portion: otherwise a token - # for "site" could validate an origin such as "site_x__active" when - # "site_x" is also a valid client FQCN. - return bool(runtime_id) and "." not in runtime_id and "_" not in runtime_id + # The bare alias grammar is only valid for single-segment origins (legacy + # pre-2.8 flat CellPipe names); at any depth the alias must carry the + # explicit cellpipe-alias- marker, so an unmarked "_" leaf + # can never be misread as an alias. + origin_leaf = FQCN.split(origin)[-1] + if origin_parent and not origin_leaf.startswith(CELL_PIPE_ALIAS_PREFIX): + return False + + # parse_cell_pipe_alias parses from the right and rejects "." or "_" in the + # runtime id, so a token for "site" can never validate an origin such as + # "site_x__active" when "site_x" is also a valid client FQCN. + owner = FQCN.split(fqcn)[-1] + parsed = parse_cell_pipe_alias(origin_leaf) + return parsed is not None and parsed[0] == owner def validate_auth_headers( diff --git a/tests/integration_test/data/test_configs/standalone_job/ext_process_streaming.yml b/tests/integration_test/data/test_configs/standalone_job/ext_process_streaming.yml new file mode 100644 index 0000000000..0bbb0464d8 --- /dev/null +++ b/tests/integration_test/data/test_configs/standalone_job/ext_process_streaming.yml @@ -0,0 +1,43 @@ +# Regression tests for the ext-process Client API + streaming path (NVBug 6371056): +# the training script runs in a subprocess whose CellPipe cell must route streamed +# data (including download-service tensor transfers) to server.. These two +# test cases also run as part of the client_api group; this group exists so the +# streaming routing path can be exercised on its own: +# NVFLARE_TEST_FRAMEWORK=ext_process_streaming pytest system_test.py +n_servers: 1 +n_clients: 2 +jobs_root_dir: ./data/jobs +cleanup: True + + +tests: + - test_name: "run np-loop-cell-pipe" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job np_loop_cell_pipe" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + - test_name: "run pt-large-model-pass-through" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job pt_large_model_pass_through" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } diff --git a/tests/integration_test/test_configs.yml b/tests/integration_test/test_configs.yml index 8d0828313f..dbae735a85 100644 --- a/tests/integration_test/test_configs.yml +++ b/tests/integration_test/test_configs.yml @@ -28,3 +28,5 @@ test_configs: - ./data/test_configs/standalone_job/client_api_qa.yml model_controller_api: - ./data/test_configs/standalone_job/model_controller_api.yml + ext_process_streaming: + - ./data/test_configs/standalone_job/ext_process_streaming.yml diff --git a/tests/unit_test/app_common/ccwf/client_ctl_test.py b/tests/unit_test/app_common/ccwf/client_ctl_test.py new file mode 100644 index 0000000000..664aec5aa3 --- /dev/null +++ b/tests/unit_test/app_common/ccwf/client_ctl_test.py @@ -0,0 +1,57 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import MagicMock + +from nvflare.apis.fl_constant import ReturnCode +from nvflare.apis.fl_context import FLContext +from nvflare.apis.shareable import Shareable +from nvflare.apis.signal import Signal +from nvflare.app_common.ccwf.client_ctl import ClientSideController, _LearnTask + + +class _FailingClientSideController(ClientSideController): + def __init__(self): + super().__init__( + task_name_prefix="test", + learn_task_check_interval=0.01, + learn_task_ack_timeout=1.0, + learn_task_abort_timeout=1.0, + final_result_ack_timeout=1.0, + ) + + def start_workflow(self, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable: + raise NotImplementedError + + def do_learn_task(self, name: str, data: Shareable, fl_ctx: FLContext, abort_signal: Signal): + self.asked_to_stop = True + raise RuntimeError("learn failed") + + +def test_do_learn_logs_exception_from_learn_task(): + ctl = _FailingClientSideController() + ctl.logger = MagicMock() + ctl.learn_task = _LearnTask("train", Shareable(), FLContext()) + + ctl._do_learn() + + # log_exception logs the contextualized message, then the traceback + assert ctl.logger.error.call_count == 2 + assert "exception from do_learn_task" in ctl.logger.error.call_args_list[0].args[0] + assert ctl.learn_task is None + # the failure must be recorded so the next status report tells the server + # to end the job with an error status instead of FINISHED:COMPLETED + assert ctl.current_status.error == ReturnCode.EXECUTION_EXCEPTION + report = ctl._get_status_report() + assert report is not None and report.error == ReturnCode.EXECUTION_EXCEPTION diff --git a/tests/unit_test/fuel/f3/cellnet/core_cell_routing_test.py b/tests/unit_test/fuel/f3/cellnet/core_cell_routing_test.py index f6b80dde40..4db7ec36e2 100644 --- a/tests/unit_test/fuel/f3/cellnet/core_cell_routing_test.py +++ b/tests/unit_test/fuel/f3/cellnet/core_cell_routing_test.py @@ -12,14 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Endpoint resolution for cells whose FQCN parent is not their physical parent. - -CellPipe cells are named .. but physically connect to the -site's CP, a relay, or the server root rather than to their literal FQCN -parent (the job cell). _try_find_ep must fall through to generic path -resolution (target's ancestor chain, then the server root) when the FQCN -parent is not connected. -""" +"""Endpoint resolution for topology-shaped CellPipe FQCNs.""" import logging @@ -42,11 +35,18 @@ def _routing_cell(fqcn, connected): def test_pipe_cell_reaches_peer_through_connected_cp(): - # The pipe cell connects to the CP ("site-1"), not to its FQCN parent - # ("site-1.job-123"); the peer must be routed through the CP. - cell = _routing_cell("site-1.job-123.active", ["site-1"]) + cell = _routing_cell("site-1.cellpipe-job-123_active", ["site-1"]) - ep = cell._try_find_ep("site-1.job-123.passive", None) + ep = cell._try_find_ep("site-1.cellpipe-job-123_passive", None) + + assert ep is not None + assert ep.name == "site-1" + + +def test_pipe_cell_reaches_server_job_through_connected_cp(): + cell = _routing_cell("site-1.cellpipe-job-123_active", ["site-1"]) + + ep = cell._try_find_ep("server.job-123", None) assert ep is not None assert ep.name == "site-1" @@ -55,16 +55,36 @@ def test_pipe_cell_reaches_peer_through_connected_cp(): def test_pipe_cell_reaches_peer_through_server_root(): # With pipe_connect_type VIA_ROOT the pipe cell connects only to the # server root; the same-family peer must be routed through it. - cell = _routing_cell("site-1.job-123.active", ["server"]) + cell = _routing_cell("site-1.cellpipe-job-123_active", ["server"]) - ep = cell._try_find_ep("site-1.job-123.passive", None) + ep = cell._try_find_ep("site-1.cellpipe-job-123_passive", None) assert ep is not None assert ep.name == "server" +def test_relay_alias_pipe_cell_reaches_peer_through_connected_relay(): + # A pipe cell behind a relay is named .cellpipe-alias-__: its + # FQCN parent is the connected relay, so normal parent routing applies. + cell = _routing_cell("relay-1.cellpipe-alias-site-1_job-123_active", ["relay-1"]) + + ep = cell._try_find_ep("relay-1.cellpipe-alias-site-1_job-123_passive", None) + + assert ep is not None + assert ep.name == "relay-1" + + +def test_relay_alias_pipe_cell_reaches_server_job_through_connected_relay(): + cell = _routing_cell("relay-1.cellpipe-alias-site-1_job-123_active", ["relay-1"]) + + ep = cell._try_find_ep("server.job-123", None) + + assert ep is not None + assert ep.name == "relay-1" + + def test_pipe_cell_reaches_site_ancestor_through_connected_cp(): - cell = _routing_cell("site-1.job-123.active", ["site-1"]) + cell = _routing_cell("site-1.cellpipe-job-123_active", ["site-1"]) ep = cell._try_find_ep("site-1", None) @@ -83,6 +103,6 @@ def test_same_family_routing_still_prefers_fqcn_parent(): def test_pipe_cell_with_no_connection_is_unreachable(): - cell = _routing_cell("site-1.job-123.active", []) + cell = _routing_cell("site-1.cellpipe-job-123_active", []) - assert cell._try_find_ep("site-1.job-123.passive", None) is None + assert cell._try_find_ep("site-1.cellpipe-job-123_passive", None) is None diff --git a/tests/unit_test/fuel/f3/cellnet/fqcn_test.py b/tests/unit_test/fuel/f3/cellnet/fqcn_test.py new file mode 100644 index 0000000000..f2058271fd --- /dev/null +++ b/tests/unit_test/fuel/f3/cellnet/fqcn_test.py @@ -0,0 +1,70 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Direct tests for the CellPipe alias grammar shared by naming, mTLS +identity resolution, and stream message authentication.""" + +import pytest + +from nvflare.fuel.f3.cellnet.fqcn import CELL_PIPE_ALIAS_PREFIX, make_cell_pipe_alias, parse_cell_pipe_alias + + +class TestCellPipeAliasGrammar: + @pytest.mark.parametrize( + "owner,runtime_id,mode", + [ + ("site-1", "job-123", "active"), + ("site-1", "8cb50f16-8158-46f6-a8d7-ec85b1f06c53", "passive"), + ("site_a", "job", "active"), # owners may contain "_" + ], + ) + def test_round_trip(self, owner, runtime_id, mode): + alias = make_cell_pipe_alias(owner, runtime_id, mode) + assert alias.startswith(CELL_PIPE_ALIAS_PREFIX) + assert parse_cell_pipe_alias(alias) == (owner, runtime_id, mode) + + def test_legacy_bare_alias_is_still_parsed(self): + # pre-2.8 flat CellPipe names are whole-FQCN aliases with no prefix + assert parse_cell_pipe_alias("site-1_job-123_active") == ("site-1", "job-123", "active") + + @pytest.mark.parametrize("segment", ["site-a_x_job-123_active", "cellpipe-alias-site-a_x_job-123_active"]) + def test_owner_with_underscores_parses_from_the_right(self, segment): + # right-anchored parsing: the runtime id can never contain "_", so the + # only valid owner of this alias is "site-a_x", never "site-a" + assert parse_cell_pipe_alias(segment) == ("site-a_x", "job-123", "active") + + @pytest.mark.parametrize( + "segment", + [ + "site-1_active", # too few parts + "site-1__active", # empty runtime id + "_job_active", # empty owner + "site-1_job-123_idle", # unknown mode + "site-1_job.x_active", # "." in runtime id + "job-123", # no mode suffix + "cellpipe-alias-x_active", # marked, but no owner/runtime split + "cellpipe-alias-site-1_job-123_idle", # marked, unknown mode + "cellpipe-job-123_active", # plain pipe leaf, no owner/runtime split + "", + ], + ) + def test_invalid_segments_are_rejected(self, segment): + assert parse_cell_pipe_alias(segment) is None + + def test_plain_leaf_with_underscore_token_bare_parses_so_callers_must_gate(self): + # The bare legacy grammar cannot tell a plain leaf with an underscore + # token from an alias - that is exactly why callers only accept the + # bare form for whole-FQCN (single-segment) names and require the + # cellpipe-alias- marker everywhere else. + assert parse_cell_pipe_alias("cellpipe-simulate_job_active") == ("cellpipe-simulate", "job", "active") diff --git a/tests/unit_test/fuel/f3/cellnet/identity_binding_test.py b/tests/unit_test/fuel/f3/cellnet/identity_binding_test.py index 62231066b0..5adc92a1b5 100644 --- a/tests/unit_test/fuel/f3/cellnet/identity_binding_test.py +++ b/tests/unit_test/fuel/f3/cellnet/identity_binding_test.py @@ -199,18 +199,49 @@ def test_identity_resolver_rejects_admin_like_endpoint_without_authenticated_ide resolver.require_match("_admin_not-a-uuid", "admin@nvidia.com", "connection admin") -def test_identity_resolver_maps_hierarchical_cell_pipe_cell_to_owner_identity(): - # current CellPipe naming: .. resolves via the FQCN hierarchy +def test_identity_resolver_maps_topology_cell_pipe_cell_to_owner_identity(): resolver = CellIdentityResolver(local_fqcn="server", prefix_identity_map={"site-1": "site-1"}) - assert resolver.resolve("site-1.8cb50f16-8158-46f6-a8d7-ec85b1f06c53.active") == "site-1" - assert resolver.resolve("site-1.8cb50f16-8158-46f6-a8d7-ec85b1f06c53.passive") == "site-1" + assert resolver.resolve("site-1.cellpipe-8cb50f16-8158-46f6-a8d7-ec85b1f06c53_active") == "site-1" + assert resolver.resolve("site-1.cellpipe-8cb50f16-8158-46f6-a8d7-ec85b1f06c53_passive") == "site-1" -def test_identity_resolver_maps_hierarchical_cell_pipe_cell_behind_relay_to_owner_identity(): +def test_identity_resolver_maps_underscore_token_pipe_cell_to_site_identity(): + # A root-connected pipe cell may carry "_" in its user-chosen token (e.g. + # FlareAgentWithCellPipe agent_id="ext_trainer"). The plain cellpipe- leaf + # is never alias-parsed: with a sparse identity map (provisioning omits + # identities equal to the name), the cell must resolve to its site, not to + # a fabricated alias owner such as "ext". + resolver = CellIdentityResolver(local_fqcn="server") + + assert resolver.resolve("site-1.cellpipe-ext_trainer_active") == "site-1" + assert resolver.resolve("site-1.cellpipe-simulate_job_passive") == "site-1" + + +def test_identity_resolver_cp_resolves_own_underscore_token_child_to_site_identity(): + # The explicit leaf prefixes remove the old ambiguity: a CP resolving its + # own pipe child with an underscore token sees a plain (non-alias) leaf + # and resolves it to the site's identity, never to a fabricated alias + # owner such as "simulate". + resolver = CellIdentityResolver(local_fqcn="site-1") + + assert resolver.resolve("site-1.cellpipe-simulate_job_active") == "site-1" + + +def test_identity_resolver_maps_relay_cell_pipe_cell_to_owner_identity(): resolver = CellIdentityResolver(local_fqcn="relay-1", exact_identity_map={"relay-1": "relay-1"}) - assert resolver.resolve("relay-1.site-1.job-123.active") == "site-1" + assert resolver.resolve("relay-1.cellpipe-alias-site-1_job-123_active") == "site-1" + assert resolver.resolve("relay-1.site-1.cellpipe-job-123_active") == "site-1" + + +def test_identity_resolver_maps_relay_cell_pipe_alias_from_a_distant_cell(): + # The explicit alias marker is authoritative at any depth, so a cell that + # is not the connected relay (e.g. the server during cert exchange) also + # resolves the alias to the owning site. + resolver = CellIdentityResolver(local_fqcn="server") + + assert resolver.resolve("relay-1.cellpipe-alias-site-1_job-123_active") == "site-1" def test_identity_resolver_maps_legacy_cell_pipe_alias_to_owner_identity(): @@ -237,7 +268,7 @@ def test_identity_resolver_maps_cell_pipe_alias_owner_with_underscores_from_the_ def test_identity_resolver_maps_dotted_cell_pipe_alias_to_owner_identity(): resolver = CellIdentityResolver(local_fqcn="site-1") - assert resolver.resolve("site-1.site-1_job-123_passive") == "site-1" + assert resolver.resolve("site-1.cellpipe-alias-site-1_job-123_passive") == "site-1" def test_identity_resolver_does_not_treat_unconstrained_names_as_cell_pipe_aliases(): @@ -284,14 +315,14 @@ def test_mtls_handshake_rejects_spoofed_endpoint_identity(): assert conn.closed -def test_mtls_handshake_accepts_hierarchical_cell_pipe_cell_with_site_cert_identity(): +def test_mtls_handshake_accepts_topology_cell_pipe_cell_with_site_cert_identity(): manager = _conn_manager(identity_map={"site-1": "site-1"}) conn = _FakeConnection(peer_cn="site-1") sfm_conn = SfmConnection(conn, Endpoint("server")) - manager.update_endpoint(sfm_conn, {HandshakeKeys.ENDPOINT_NAME: "site-1.job-123.active"}) + manager.update_endpoint(sfm_conn, {HandshakeKeys.ENDPOINT_NAME: "site-1.cellpipe-job-123_active"}) - assert "site-1.job-123.active" in manager.sfm_endpoints + assert "site-1.cellpipe-job-123_active" in manager.sfm_endpoints assert not conn.closed diff --git a/tests/unit_test/fuel/utils/pipe/cell_pipe_test.py b/tests/unit_test/fuel/utils/pipe/cell_pipe_test.py index 6bd6f8adf0..297bd182a3 100644 --- a/tests/unit_test/fuel/utils/pipe/cell_pipe_test.py +++ b/tests/unit_test/fuel/utils/pipe/cell_pipe_test.py @@ -113,23 +113,71 @@ def _make_msg(topic="train", data="payload"): class TestCellFqcnFormat: - """CellPipe cells are named .., scoped under the cell - they connect to, so identity resolution and routing follow the normal - FQCN hierarchy.""" + """CellPipe cells are named under the cell they connect to, with the token + and mode in one leaf segment so the FQCN parent matches the topology.""" def test_connect_to_root_server(self): - assert _cell_fqcn("active", "site-1", "job-123", FQCN.ROOT_SERVER) == "site-1.job-123.active" + assert _cell_fqcn("active", "site-1", "job-123", FQCN.ROOT_SERVER) == "site-1.cellpipe-job-123_active" def test_connect_to_own_cp(self): - assert _cell_fqcn("passive", "site-1", "job-123", "site-1") == "site-1.job-123.passive" + assert _cell_fqcn("passive", "site-1", "job-123", "site-1") == "site-1.cellpipe-job-123_passive" def test_connect_to_relay(self): - assert _cell_fqcn("active", "site-1", "job-123", "relay-1") == "relay-1.site-1.job-123.active" + # The passive and active pipes for a given pair are built from the same + # root_url, so they share the same parent FQCN. Relay and CP-behind-relay + # names can differ because they describe different physical parents. + assert _cell_fqcn("active", "site-1", "job-123", "relay-1") == "relay-1.cellpipe-alias-site-1_job-123_active" + assert _cell_fqcn("passive", "site-1", "job-123", "relay-1") == "relay-1.cellpipe-alias-site-1_job-123_passive" def test_connect_to_cp_behind_relay(self): - # same name as connecting to the relay itself, so the two peer pipes - # agree on each other's FQCN regardless of which of the two they use - assert _cell_fqcn("active", "site-1", "job-123", "relay-1.site-1") == "relay-1.site-1.job-123.active" + assert _cell_fqcn("active", "site-1", "job-123", "relay-1.site-1") == "relay-1.site-1.cellpipe-job-123_active" + assert _cell_fqcn("passive", "site-1", "job-123", "relay-1.site-1") == "relay-1.site-1.cellpipe-job-123_passive" + + @pytest.mark.parametrize("parent_fqcn", ["", None]) + def test_missing_parent_fqcn_falls_back_to_site_parent(self, parent_fqcn): + assert _cell_fqcn("active", "site-1", "job-123", parent_fqcn) == "site-1.cellpipe-job-123_active" + + @pytest.mark.parametrize("parent_fqcn", ["site-1", "relay-1", "relay-1.site-1"]) + def test_token_with_dot_is_rejected_behind_connected_parent(self, parent_fqcn): + # a "." would split the cell name into extra FQCN segments, recreating + # an unconnected FQCN parent between the connected cell and the pipe + with pytest.raises(ValueError): + _cell_fqcn("active", "site-1", "my.token", parent_fqcn) + + @pytest.mark.parametrize("parent_fqcn", [FQCN.ROOT_SERVER, ""]) + def test_token_with_dot_is_allowed_for_root_connection(self, parent_fqcn): + # root-connected cells route via the root fall-through regardless of + # phantom segments, so dotted user tokens (e.g. agent ids) keep working + assert _cell_fqcn("active", "site-1", "agent.v2", parent_fqcn) == "site-1.cellpipe-agent.v2_active" + + @pytest.mark.parametrize("token", ["my_token", "my.token"]) + def test_bad_alias_token_is_rejected_behind_relay(self, token): + # behind another cell the token is the alias runtime id, which must be + # free of "_" or "." or the alias parses to the wrong owner + with pytest.raises(ValueError): + _cell_fqcn("active", "site-1", token, "relay-1") + + @pytest.mark.parametrize("parent_fqcn", [FQCN.ROOT_SERVER, "site-1", "relay-1", ""]) + def test_empty_token_is_rejected(self, parent_fqcn): + # an empty token (e.g. "{JOB_ID}" resolving to nothing) cannot uniquely + # name the cell: all such pipes on a site would collide, and both pipe + # ends derive names independently so no generated fallback can keep + # the pair in agreement + with pytest.raises(ValueError): + _cell_fqcn("active", "site-1", "", parent_fqcn) + + @pytest.mark.parametrize("parent_fqcn", [FQCN.ROOT_SERVER, "site-1", "relay-1", ""]) + def test_reserved_alias_token_prefix_is_rejected(self, parent_fqcn): + # a token starting with "alias-" would make the plain leaf + # "cellpipe-alias-..." collide with the relay alias namespace + with pytest.raises(ValueError): + _cell_fqcn("active", "site-1", "alias-x_y", parent_fqcn) + + @pytest.mark.parametrize("parent_fqcn", [FQCN.ROOT_SERVER, "site-1", ""]) + def test_underscore_token_is_allowed_when_not_aliased(self, parent_fqcn): + # the simulator uses "simulate_job" as the token; only the alias form + # used behind another cell restricts "_" + assert _cell_fqcn("active", "site-1", "simulate_job", parent_fqcn) == "site-1.cellpipe-simulate_job_active" # --------------------------------------------------------------------------- diff --git a/tests/unit_test/private/fed/authenticator_test.py b/tests/unit_test/private/fed/authenticator_test.py index e6ffc4d10d..a828ee17b6 100644 --- a/tests/unit_test/private/fed/authenticator_test.py +++ b/tests/unit_test/private/fed/authenticator_test.py @@ -51,7 +51,7 @@ def _validate(origin, client_fqcn_resolver, channel=CellChannel.SERVER_COMMAND, ) -@pytest.mark.parametrize("origin", ["site-a", "site-a.job-1", "site-a.site-a-child.job-1"]) +@pytest.mark.parametrize("origin", ["site-a", "site-a.job-1", "site-a.job-1_active", "site-a.site-a-child.job-1"]) def test_validate_auth_headers_accepts_token_from_registered_origin(origin): # Job and hierarchical child cells under a registered site are still part of that site's trust boundary. assert _validate(origin, lambda _client_name, _token: "site-a") is None @@ -76,6 +76,25 @@ def test_validate_auth_headers_accepts_direct_cell_pipe_stream_alias(origin): ) +@pytest.mark.parametrize( + "origin, registered_fqcn", + [ + ("relay-1.cellpipe-alias-site-a_8065f1c4-fd35-47ef-b945-800f4d0d5176_active", "relay-1.site-a"), + ("relay-1.cellpipe-alias-site-a_x_8065f1c4-fd35-47ef-b945-800f4d0d5176_passive", "relay-1.site-a_x"), + ], +) +def test_validate_auth_headers_accepts_relay_cell_pipe_stream_alias(origin, registered_fqcn): + assert ( + _validate( + origin, + lambda _client_name, _token: registered_fqcn, + channel=STREAM_CHANNEL, + topic=STREAM_DATA_TOPIC, + ) + is None + ) + + def test_validate_auth_headers_rejects_token_from_different_origin(): reply = _validate("site-b", lambda _client_name, _token: "site-a") @@ -93,6 +112,30 @@ def test_validate_auth_headers_rejects_cell_pipe_alias_for_different_client(): assert reply.get_header(MessageHeaderKey.RETURN_CODE) == ReturnCode.UNAUTHENTICATED +def test_validate_auth_headers_rejects_relay_cell_pipe_alias_under_different_parent(): + reply = _validate( + "relay-1.cellpipe-alias-site-a_8065f1c4-fd35-47ef-b945-800f4d0d5176_passive", + lambda _client_name, _token: "relay-2.site-a", + channel=STREAM_CHANNEL, + topic=STREAM_DATA_TOPIC, + ) + + assert reply.get_header(MessageHeaderKey.RETURN_CODE) == ReturnCode.UNAUTHENTICATED + + +def test_validate_auth_headers_rejects_unmarked_alias_shape_at_depth(): + # the bare alias grammar is only honored for whole-FQCN (legacy flat) + # origins; at any depth the origin must carry the cellpipe-alias- marker + reply = _validate( + "relay-1.site-a_8065f1c4-fd35-47ef-b945-800f4d0d5176_passive", + lambda _client_name, _token: "relay-1.site-a", + channel=STREAM_CHANNEL, + topic=STREAM_DATA_TOPIC, + ) + + assert reply.get_header(MessageHeaderKey.RETURN_CODE) == ReturnCode.UNAUTHENTICATED + + def test_validate_auth_headers_rejects_cell_pipe_alias_on_non_stream_channel(): reply = _validate( "site-a_8065f1c4-fd35-47ef-b945-800f4d0d5176_passive",