From 079fbdb389812f2ad74e284865048fb854ce1119 Mon Sep 17 00:00:00 2001 From: avdunn Date: Fri, 15 May 2026 10:58:20 -0700 Subject: [PATCH 1/7] First draft of FIC support --- msal/application.py | 60 ++++++++ msal/oauth2cli/oauth2.py | 73 ++++++++- msal/token_cache.py | 5 + tests/test_application.py | 303 +++++++++++++++++++++++++++++++++++++- 4 files changed, 435 insertions(+), 6 deletions(-) diff --git a/msal/application.py b/msal/application.py index 084f9bf3..cadba9b2 100644 --- a/msal/application.py +++ b/msal/application.py @@ -242,6 +242,7 @@ class ClientApplication(object): ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" ACQUIRE_TOKEN_INTERACTIVE = "169" + ACQUIRE_TOKEN_BY_USER_FIC_ID = "950" GET_ACCOUNTS_ID = "902" REMOVE_ACCOUNT_ID = "903" @@ -2572,3 +2573,62 @@ def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=No response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP telemetry_context.update_telemetry(response) return response + + def acquire_token_by_user_federated_identity_credential( + self, scopes, assertion, username=None, user_object_id=None, + claims_challenge=None, **kwargs): + """Acquires a user-scoped token using the ``user_fic`` grant type. + + This method exchanges a federated identity credential (typically an + agent instance token from Leg 2 of the agent identity protocol) for + a user-scoped access token, enabling an agent to act on behalf of + a specific user. + + :param list[str] scopes: Scopes required by downstream API (a resource). + :param str assertion: + The federated identity credential token (e.g. the instance token + obtained from Leg 2 of the agent identity flow). + :param str username: + The target user's UPN (User Principal Name). + Mutually exclusive with ``user_object_id``. + :param str user_object_id: + The target user's Object ID. + Mutually exclusive with ``username``. + :param claims_challenge: + The claims_challenge parameter requests specific claims requested by the resource provider + in the form of a claims_challenge directive in the www-authenticate header to be + returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. + It is a string of a JSON object which contains lists of claims being requested from these locations. + + :return: A dict representing the json response from Microsoft Entra: + + - A successful response would contain "access_token" key, + - an error response would contain "error" and usually "error_description". + """ + # Input validation + if not assertion: + raise ValueError("assertion is required and must be non-empty") + if not username and not user_object_id: + raise ValueError( + "Either username or user_object_id must be provided") + if username and user_object_id: + raise ValueError( + "username and user_object_id are mutually exclusive") + + telemetry_context = self._build_telemetry_context( + self.ACQUIRE_TOKEN_BY_USER_FIC_ID) + response = _clean_up(self.client.obtain_token_by_user_fic( + scope=self._decorate_scope(scopes), + assertion=assertion, + username=username, + user_object_id=user_object_id, + headers=telemetry_context.generate_headers(), + data=dict( + kwargs.pop("data", {}), + claims=_merge_claims_challenge_and_capabilities( + self._client_capabilities, claims_challenge)), + **kwargs)) + if "access_token" in response: + response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP + telemetry_context.update_telemetry(response) + return response diff --git a/msal/oauth2cli/oauth2.py b/msal/oauth2cli/oauth2.py index 68b0e84e..184946f2 100644 --- a/msal/oauth2cli/oauth2.py +++ b/msal/oauth2cli/oauth2.py @@ -7,6 +7,7 @@ except ImportError: from urlparse import parse_qs, urlparse, urlunparse from urllib import urlencode, quote_plus +import inspect import logging import warnings import time @@ -104,6 +105,11 @@ def __init__( or a raw JWT assertion in bytes (which we will relay to http layer). It can also be a callable (recommended), so that we will do lazy creation of an assertion. + + The callable may accept zero arguments (legacy) or one argument. + When it accepts one argument, it will receive a dict containing + ``"client_id"``, ``"token_endpoint"``, and optionally ``"fmi_path"`` + (when an FMI path is set on the current request). client_assertion_type (str): The type of your :attr:`client_assertion` parameter. It is typically the value of :attr:`CLIENT_ASSERTION_TYPE_SAML2` or @@ -168,6 +174,35 @@ def __init__( # A workaround for requests not supporting session-wide timeout self._http_client.request, timeout=timeout) + @staticmethod + def _accepts_context(func): + """Check if a callable accepts at least one positional argument.""" + try: + sig = inspect.signature(func) + params = [ + p for p in sig.parameters.values() + if p.kind in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ) + ] + return len(params) >= 1 + except (ValueError, TypeError): + return False # Signature not inspectable; treat as zero-arg + + def _invoke_assertion_callable(self, assertion_callable, data=None): + """Invoke an assertion callable, passing context if it accepts one.""" + if self._accepts_context(assertion_callable): + context = { + "client_id": self.client_id, + "token_endpoint": self.configuration.get( + "token_endpoint", ""), + } + if data and data.get("fmi_path"): + context["fmi_path"] = data["fmi_path"] + return assertion_callable(context) + return assertion_callable() + def _build_auth_request_params(self, response_type, **kwargs): # response_type is a string defined in # https://tools.ietf.org/html/rfc6749#section-3.1.1 @@ -198,11 +233,11 @@ def _obtain_token( # The verb "obtain" is influenced by OAUTH2 RFC 6749 # See https://tools.ietf.org/html/rfc7521#section-4.2 encoder = self.client_assertion_encoders.get( self.default_body["client_assertion_type"], lambda a: a) - _data["client_assertion"] = encoder( - self.client_assertion() # Do lazy on-the-fly computation - if callable(self.client_assertion) else self.client_assertion - ) # The type is bytes, which is preferable. See also: - # https://github.com/psf/requests/issues/4503#issuecomment-455001070 + if callable(self.client_assertion): + raw = self._invoke_assertion_callable(self.client_assertion, data) + else: + raw = self.client_assertion + _data["client_assertion"] = encoder(raw) _data.update(self.default_body) # It may contain authen parameters _data.update(data or {}) # So the content in data param prevails @@ -770,6 +805,34 @@ class initialization. data.update(scope=scope) return self._obtain_token("client_credentials", data=data, **kwargs) + def obtain_token_by_user_fic( + self, scope, assertion, username=None, user_object_id=None, + **kwargs): + """Obtain token using the ``user_fic`` grant type. + + This exchanges a federated identity credential (e.g. an agent + instance token) for a user-scoped access token. + + :param scope: Scopes for the target resource (already decorated + with OIDC scopes by the caller). + :param str assertion: The federated identity credential token. + :param str username: The target user's UPN (mutually exclusive + with *user_object_id*). + :param str user_object_id: The target user's Object ID (mutually + exclusive with *username*). + """ + data = kwargs.pop("data", {}) + data.update( + scope=scope, + user_federated_identity_credential=assertion, + client_info="1", + ) + if user_object_id: + data["user_id"] = str(user_object_id) + elif username: + data["username"] = username + return self._obtain_token("user_fic", data=data, **kwargs) + def __init__(self, server_configuration, client_id, on_obtaining_tokens=lambda event: None, # event is defined in _obtain_token(...) diff --git a/msal/token_cache.py b/msal/token_cache.py index d6e2a2b1..cd02849c 100644 --- a/msal/token_cache.py +++ b/msal/token_cache.py @@ -65,6 +65,11 @@ "token_type", "req_cnf", "key_id", + # user_fic grant parameters — these are standard body params for the + # user_fic flow; FIC tokens use normal user cache keys (not extended). + "user_federated_identity_credential", + "user_id", + "client_info", }) diff --git a/tests/test_application.py b/tests/test_application.py index 54da96c0..49a2285c 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1090,4 +1090,305 @@ def mock_post(url, headers=None, *args, **kwargs): result = app.acquire_token_for_client([scope]) self.assertEqual(result[app._TOKEN_SOURCE], app._TOKEN_SOURCE_CACHE) self.assertEqual("AT_with_valid_scope1_valid_scope2_scopes", result.get("access_token")) - self.assertIsNone(result.get("scope"), "scope field is not returned when token comes from cache") \ No newline at end of file + self.assertIsNone(result.get("scope"), "scope field is not returned when token comes from cache") + + +def _build_user_fic_response(uid="user_oid", utid="tenant_id", access_token="user_at"): + """Build a mock user_fic response with client_info and id_token.""" + client_info = base64.b64encode(json.dumps({ + "uid": uid, "utid": utid, + }).encode()).decode("utf-8") + id_token_claims = { + "iss": "https://login.microsoftonline.com/tenant_id/v2.0", + "sub": "subject", + "aud": "agent_app_id", + "exp": time.time() + 3600, + "iat": time.time(), + "oid": uid, + "preferred_username": "user@contoso.com", + "tid": utid, + } + id_token = "header.%s.signature" % base64.b64encode( + json.dumps(id_token_claims).encode()).decode("utf-8") + return json.dumps({ + "access_token": access_token, + "expires_in": 3600, + "token_type": "Bearer", + "client_info": client_info, + "id_token": id_token, + "refresh_token": "a_refresh_token", + }) + + +import base64 + + +@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) +class TestUserFicProtocol(unittest.TestCase): + """Tests that acquire_token_by_user_federated_identity_credential sends correct POST body.""" + + def _make_app(self): + return ConfidentialClientApplication( + "agent_app_id", client_credential="secret", + authority="https://login.microsoftonline.com/my_tenant") + + def test_sends_correct_grant_type_and_params(self): + app = self._make_app() + captured_data = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + result = app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="instance_token_t2", + username="user@contoso.com", + post=mock_post) + self.assertIn("access_token", result) + self.assertEqual("user_fic", captured_data.get("grant_type")) + self.assertEqual("instance_token_t2", + captured_data.get("user_federated_identity_credential")) + self.assertEqual("1", captured_data.get("client_info")) + self.assertEqual("agent_app_id", captured_data.get("client_id")) + + def test_scope_includes_oidc_scopes(self): + app = self._make_app() + captured_data = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + scope_str = captured_data.get("scope", "") + for oidc_scope in ("openid", "offline_access", "profile"): + self.assertIn(oidc_scope, scope_str, + "OIDC scope '{}' should be present".format(oidc_scope)) + + def test_with_username_sends_username_not_user_id(self): + app = self._make_app() + captured_data = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", username="user@contoso.com", post=mock_post) + self.assertEqual("user@contoso.com", captured_data.get("username")) + self.assertNotIn("user_id", captured_data, + "user_id should NOT be in body when username is provided") + + def test_with_oid_sends_user_id_not_username(self): + app = self._make_app() + captured_data = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", + user_object_id="00000000-0000-0000-0000-000000000001", + post=mock_post) + self.assertEqual("00000000-0000-0000-0000-000000000001", + captured_data.get("user_id")) + self.assertNotIn("username", captured_data, + "username should NOT be in body when user_object_id is provided") + + +@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) +class TestUserFicCacheBehavior(unittest.TestCase): + """Tests that user_fic tokens are stored in user cache with account info.""" + + def _make_app(self): + return ConfidentialClientApplication( + "agent_app_id", client_credential="secret", + authority="https://login.microsoftonline.com/my_tenant") + + def test_token_stored_in_user_cache_with_account(self): + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response( + uid="user_oid", utid="tenant_id", access_token="fic_at")) + + result = app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + self.assertIn("access_token", result) + + # Verify the account was created + accounts = app.get_accounts() + self.assertTrue(len(accounts) > 0, "Account should be created from user_fic response") + account = accounts[0] + self.assertEqual("user_oid.tenant_id", account["home_account_id"]) + + def test_token_not_stored_as_atext(self): + """user_fic tokens should use standard AccessToken type, not atext.""" + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + + # Check the raw cache for credential type + at_entries = list(app.token_cache.search( + msal.TokenCache.CredentialType.ACCESS_TOKEN, query={})) + self.assertTrue(len(at_entries) > 0, "AT should be cached") + self.assertNotIn("ext_cache_key", at_entries[0], + "user_fic tokens should NOT have ext_cache_key") + + def test_acquire_token_silent_returns_cached_fic_token(self): + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response( + uid="user_oid", utid="tenant_id", access_token="cached_fic_at")) + + app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + + accounts = app.get_accounts() + self.assertTrue(len(accounts) > 0) + + # Silent call should return cached token without hitting network + silent_result = app.acquire_token_silent( + ["https://graph.microsoft.com/.default"], account=accounts[0]) + self.assertIn("access_token", silent_result) + self.assertEqual("cached_fic_at", silent_result["access_token"]) + + +@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) +class TestUserFicInputValidation(unittest.TestCase): + """Tests that input validation rejects invalid parameters.""" + + def _make_app(self): + return ConfidentialClientApplication( + "agent_app_id", client_credential="secret", + authority="https://login.microsoftonline.com/my_tenant") + + def test_empty_assertion_raises(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="", username="user@contoso.com") + + def test_none_assertion_raises(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion=None, username="user@contoso.com") + + def test_no_user_identifier_raises(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2") + + def test_both_user_identifiers_raises(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", + username="user@contoso.com", + user_object_id="oid-123") + + def test_reserved_scopes_rejected(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["openid"], assertion="t2", username="user@contoso.com") + + +@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) +class TestAssertionCallbackContext(unittest.TestCase): + """Tests that assertion callbacks receive context when they accept arguments.""" + + def test_context_aware_callback_receives_fmi_path(self): + received_context = {} + + def assertion_with_context(context): + received_context.update(context) + return "assertion_value" + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": assertion_with_context}, + authority="https://login.microsoftonline.com/my_tenant") + + app.acquire_token_for_client( + ["scope"], fmi_path="agent_app_123", + post=lambda url, **kwargs: MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600}))) + + self.assertEqual("client_id", received_context.get("client_id")) + self.assertIn("token_endpoint", received_context) + self.assertEqual("agent_app_123", received_context.get("fmi_path")) + + def test_context_aware_callback_omits_fmi_path_when_not_set(self): + received_context = {} + + def assertion_with_context(context): + received_context.update(context) + return "assertion_value" + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": assertion_with_context}, + authority="https://login.microsoftonline.com/my_tenant") + + app.acquire_token_for_client( + ["scope"], + post=lambda url, **kwargs: MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600}))) + + self.assertEqual("client_id", received_context.get("client_id")) + self.assertNotIn("fmi_path", received_context) + + def test_legacy_zero_arg_callback_still_works(self): + call_count = [0] + + def legacy_callback(): + call_count[0] += 1 + return "legacy_assertion" + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": legacy_callback}, + authority="https://login.microsoftonline.com/my_tenant") + + result = app.acquire_token_for_client( + ["scope"], + post=lambda url, **kwargs: MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600}))) + + self.assertIn("access_token", result) + self.assertEqual(1, call_count[0], "Legacy callback should be invoked once") + + def test_context_callback_type_error_not_swallowed(self): + """If a one-arg callback raises TypeError internally, it should propagate.""" + def buggy_callback(context): + raise TypeError("Bug inside callback") + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": buggy_callback}, + authority="https://login.microsoftonline.com/my_tenant") + + with self.assertRaises(TypeError, msg="Internal TypeError should propagate"): + app.acquire_token_for_client( + ["scope"], + post=lambda url, **kwargs: MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600}))) \ No newline at end of file From a5e9eb7954d9a2d2c2bd8f554f05d401e3dea2a4 Mon Sep 17 00:00:00 2001 From: avdunn Date: Fri, 15 May 2026 11:48:20 -0700 Subject: [PATCH 2/7] Small fixes and feedback --- msal/oauth2cli/oauth2.py | 18 ++++++++---- msal/token_cache.py | 1 + tests/test_application.py | 60 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 69 insertions(+), 10 deletions(-) diff --git a/msal/oauth2cli/oauth2.py b/msal/oauth2cli/oauth2.py index 184946f2..33caca73 100644 --- a/msal/oauth2cli/oauth2.py +++ b/msal/oauth2cli/oauth2.py @@ -176,17 +176,23 @@ def __init__( @staticmethod def _accepts_context(func): - """Check if a callable accepts at least one positional argument.""" + """Check if a callable requires at least one positional argument. + + Returns True only when the callable has a positional parameter + **without** a default value. This ensures that legacy zero-arg + callables — including ``lambda token=token: token`` patterns + where every positional param has a default — are still invoked + with no arguments. + """ try: sig = inspect.signature(func) - params = [ - p for p in sig.parameters.values() + for p in sig.parameters.values(): if p.kind in ( inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD, - ) - ] - return len(params) >= 1 + ) and p.default is inspect.Parameter.empty: + return True + return False except (ValueError, TypeError): return False # Signature not inspectable; treat as zero-arg diff --git a/msal/token_cache.py b/msal/token_cache.py index cd02849c..78999292 100644 --- a/msal/token_cache.py +++ b/msal/token_cache.py @@ -306,6 +306,7 @@ def make_clean_copy(dictionary, sensitive_fields): # Masks sensitive info event, data=make_clean_copy(event.get("data", {}), ( "password", "client_secret", "refresh_token", "assertion", + "user_federated_identity_credential", )), response=make_clean_copy(event.get("response", {}), ( "id_token_claims", # Provided by broker diff --git a/tests/test_application.py b/tests/test_application.py index 49a2285c..e56be943 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1,5 +1,6 @@ # Note: Since Aug 2019 we move all e2e tests into test_e2e.py, # so this test_application file contains only unit tests without dependency. +import base64 import json import logging import sys @@ -1120,9 +1121,6 @@ def _build_user_fic_response(uid="user_oid", utid="tenant_id", access_token="use }) -import base64 - - @patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) class TestUserFicProtocol(unittest.TestCase): """Tests that acquire_token_by_user_federated_identity_credential sends correct POST body.""" @@ -1265,6 +1263,34 @@ def mock_post(url, headers=None, data=None, *args, **kwargs): self.assertIn("access_token", silent_result) self.assertEqual("cached_fic_at", silent_result["access_token"]) + def test_oid_path_token_stored_and_retrievable_via_silent(self): + """user_fic with user_object_id should cache and retrieve like username.""" + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response( + uid="user_oid", utid="tenant_id", access_token="oid_fic_at")) + + result = app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", user_object_id="user_oid", post=mock_post) + self.assertIn("access_token", result) + + # Verify no ext_cache_key on cached token + at_entries = list(app.token_cache.search( + msal.TokenCache.CredentialType.ACCESS_TOKEN, query={})) + self.assertTrue(len(at_entries) > 0, "AT should be cached") + self.assertNotIn("ext_cache_key", at_entries[0], + "OID-path user_fic tokens should NOT have ext_cache_key") + + # Verify account and silent retrieval + accounts = app.get_accounts() + self.assertTrue(len(accounts) > 0) + silent_result = app.acquire_token_silent( + ["https://graph.microsoft.com/.default"], account=accounts[0]) + self.assertIn("access_token", silent_result) + self.assertEqual("oid_fic_at", silent_result["access_token"]) + @patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) class TestUserFicInputValidation(unittest.TestCase): @@ -1391,4 +1417,30 @@ def buggy_callback(context): ["scope"], post=lambda url, **kwargs: MinimalResponse( status_code=200, text=json.dumps({ - "access_token": "an_at", "expires_in": 3600}))) \ No newline at end of file + "access_token": "an_at", "expires_in": 3600}))) + + def test_lambda_with_defaulted_param_treated_as_zero_arg(self): + """A lambda like ``lambda token=token: token`` should be treated as + zero-arg because all its positional params have defaults.""" + captured_value = "my_assertion_value" + assertion_callable = lambda token=captured_value: token # noqa: E731 + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": assertion_callable}, + authority="https://login.microsoftonline.com/my_tenant") + + captured_data = {} + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600})) + + result = app.acquire_token_for_client(["scope"], post=mock_post) + self.assertIn("access_token", result) + # The assertion should be the string value, not a dict context object + self.assertEqual( + captured_value, captured_data.get("client_assertion"), + "Lambda with defaulted params should return its default value, " + "not receive a context dict") \ No newline at end of file From 214ec744f848ad6e6863bf06f1db4ecab3fe9f63 Mon Sep 17 00:00:00 2001 From: avdunn Date: Mon, 18 May 2026 13:46:46 -0700 Subject: [PATCH 3/7] PR feedback --- msal/application.py | 8 ++++++- msal/oauth2cli/oauth2.py | 10 +++++--- msal/token_cache.py | 2 +- tests/test_application.py | 49 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/msal/application.py b/msal/application.py index cadba9b2..f98d77c1 100644 --- a/msal/application.py +++ b/msal/application.py @@ -2617,12 +2617,18 @@ def acquire_token_by_user_federated_identity_credential( telemetry_context = self._build_telemetry_context( self.ACQUIRE_TOKEN_BY_USER_FIC_ID) + headers = telemetry_context.generate_headers() + if username: + headers["X-AnchorMailbox"] = "upn:{}".format(username) + elif user_object_id: + headers["X-AnchorMailbox"] = "Oid:{}@{}".format( + user_object_id, self.authority.tenant) response = _clean_up(self.client.obtain_token_by_user_fic( scope=self._decorate_scope(scopes), assertion=assertion, username=username, user_object_id=user_object_id, - headers=telemetry_context.generate_headers(), + headers=headers, data=dict( kwargs.pop("data", {}), claims=_merge_claims_challenge_and_capabilities( diff --git a/msal/oauth2cli/oauth2.py b/msal/oauth2cli/oauth2.py index 33caca73..4590d52d 100644 --- a/msal/oauth2cli/oauth2.py +++ b/msal/oauth2cli/oauth2.py @@ -106,9 +106,13 @@ def __init__( It can also be a callable (recommended), so that we will do lazy creation of an assertion. - The callable may accept zero arguments (legacy) or one argument. - When it accepts one argument, it will receive a dict containing - ``"client_id"``, ``"token_endpoint"``, and optionally ``"fmi_path"`` + The callable may accept zero arguments (legacy) or one + required positional argument. Callables whose positional + parameters all have default values (e.g. + ``lambda token=token: token``) are treated as zero-arg. + When the callable declares a required positional parameter, + it will receive a dict containing ``"client_id"``, + ``"token_endpoint"``, and optionally ``"fmi_path"`` (when an FMI path is set on the current request). client_assertion_type (str): The type of your :attr:`client_assertion` parameter. diff --git a/msal/token_cache.py b/msal/token_cache.py index 78999292..0ca250df 100644 --- a/msal/token_cache.py +++ b/msal/token_cache.py @@ -416,7 +416,7 @@ def __add(self, event, now=None): } grant_types_that_establish_an_account = ( _GRANT_TYPE_BROKER, "authorization_code", "password", - Client.DEVICE_FLOW["GRANT_TYPE"]) + Client.DEVICE_FLOW["GRANT_TYPE"], "user_fic") if event.get("grant_type") in grant_types_that_establish_an_account: account["account_source"] = event["grant_type"] self.modify(self.CredentialType.ACCOUNT, account, account) diff --git a/tests/test_application.py b/tests/test_application.py index e56be943..8d278b45 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1197,6 +1197,37 @@ def mock_post(url, headers=None, data=None, *args, **kwargs): self.assertNotIn("username", captured_data, "username should NOT be in body when user_object_id is provided") + def test_ccs_routing_header_with_username(self): + app = self._make_app() + captured_headers = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_headers.update(headers or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", username="user@contoso.com", post=mock_post) + self.assertEqual("upn:user@contoso.com", + captured_headers.get("X-AnchorMailbox"), + "CCS routing header should use UPN format for username path") + + def test_ccs_routing_header_with_oid(self): + app = self._make_app() + captured_headers = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_headers.update(headers or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", + user_object_id="user_oid_123", post=mock_post) + self.assertIn("X-AnchorMailbox", captured_headers, + "CCS routing header should be present for OID path") + self.assertTrue( + captured_headers["X-AnchorMailbox"].startswith("Oid:"), + "CCS routing header should use Oid format for user_object_id path") + @patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) class TestUserFicCacheBehavior(unittest.TestCase): @@ -1291,6 +1322,24 @@ def mock_post(url, headers=None, data=None, *args, **kwargs): self.assertIn("access_token", silent_result) self.assertEqual("oid_fic_at", silent_result["access_token"]) + def test_account_source_is_set_to_user_fic(self): + """Accounts created by user_fic should have account_source set.""" + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response( + uid="user_oid", utid="tenant_id")) + + app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + + accounts = app.get_accounts() + self.assertTrue(len(accounts) > 0) + self.assertEqual("user_fic", accounts[0].get("account_source"), + "FIC accounts should have account_source='user_fic' to avoid " + "broker path misrouting") + @patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) class TestUserFicInputValidation(unittest.TestCase): From a654737ede4eea235f84992731a32d56bb2f48f7 Mon Sep 17 00:00:00 2001 From: avdunn Date: Mon, 18 May 2026 16:48:02 -0700 Subject: [PATCH 4/7] PR feedback --- msal/throttled_http_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/msal/throttled_http_client.py b/msal/throttled_http_client.py index 7c64fbf6..8cabe261 100644 --- a/msal/throttled_http_client.py +++ b/msal/throttled_http_client.py @@ -126,7 +126,8 @@ def __init__(self, *args, default_throttle_time=None, **kwargs): # TODO: We may want to disable it for confidential client, though _extract_data(kwargs, "refresh_token", # "account" during refresh _extract_data(kwargs, "code", # "account" of auth code grant - _extract_data(kwargs, "username")))), # "account" of ROPC + _extract_data(kwargs, "username", # "account" of ROPC + _extract_data(kwargs, "user_id"))))), # "account" of user_fic (OID path) ), expires_in=RetryAfterParser(default_throttle_time or 5).parse, )(self.post) From 68cfaaf5c665b90346ef3cf3c1a86fd3ccfff179 Mon Sep 17 00:00:00 2001 From: avdunn Date: Wed, 20 May 2026 10:26:35 -0700 Subject: [PATCH 5/7] Add integration tests --- tests/test_agentic_e2e.py | 288 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 tests/test_agentic_e2e.py diff --git a/tests/test_agentic_e2e.py b/tests/test_agentic_e2e.py new file mode 100644 index 00000000..7114131c --- /dev/null +++ b/tests/test_agentic_e2e.py @@ -0,0 +1,288 @@ +"""End-to-end tests for agentic (agent identity) scenarios. + +These tests verify the full agent identity flow using MSAL Python APIs: +1. Assertion callback context propagation (fmi_path flows to callback) +2. Agent app-only token acquisition using FMI-sourced client assertion (Leg 2) +3. Full 3-leg flow: FMI → assertion → user_fic → user-scoped token +4. Cache isolation between app-only and user-scoped tokens + +Corresponds to: +- .NET: Agentic.cs +- Java: AgenticIT.java + +Test configuration uses the same lab infrastructure as test_fmi_e2e.py. +Requires LAB_APP_CLIENT_CERT_PFX_PATH environment variable. +""" + +import logging +import os +import sys +import unittest + +import msal +from tests.http_client import MinimalHttpClient +from tests.lab_config import get_client_certificate +from tests.test_e2e import LabBasedTestCase + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG if "-v" in sys.argv else logging.INFO) + +# ============================================================================= +# Test configuration — matches .NET/Java agentic test constants +# ============================================================================= +_TENANT_ID = "10c419d4-4a50-45b2-aa4e-919fb84df24f" +_BLUEPRINT_CLIENT_ID = "aab5089d-e764-47e3-9f28-cc11c2513821" +_AGENT_APP_ID = "ab18ca07-d139-4840-8b3b-4be9610c6ed5" +_RMA_CLIENT_ID = "3bf56293-fbb5-42bd-a407-248ba7431a8c" +_USER_UPN = "agentuser1@id4slab1.onmicrosoft.com" +_TOKEN_EXCHANGE_SCOPE = "api://AzureADTokenExchange/.default" +_FMI_EXCHANGE_SCOPE = "api://AzureFMITokenExchange/.default" +_GRAPH_SCOPE = "https://graph.microsoft.com/.default" +_FMI_PATH = "SomeFmiPath/FmiCredentialPath" +_AUTHORITY = "https://login.microsoftonline.com/" + _TENANT_ID + + +# ============================================================================= +# Helpers — mirror .NET GetAppCredentialAsync / Java acquireFmiCredentialForAgent +# ============================================================================= + +def _acquire_fmi_credential_for_agent(agent_app_id): + """Leg 1: Blueprint app acquires FMI credential (T1) for the given agent. + + Uses certificate authentication with SNI (sendX5C) and fmi_path set to + the agent app ID — matching .NET and Java helper methods. + """ + blueprint_app = msal.ConfidentialClientApplication( + _BLUEPRINT_CLIENT_ID, + client_credential=get_client_certificate(), + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + result = blueprint_app.acquire_token_for_client( + [_TOKEN_EXCHANGE_SCOPE], fmi_path=agent_app_id) + if "access_token" not in result: + raise RuntimeError( + "Leg 1 failed — could not acquire FMI credential: {}: {}".format( + result.get("error"), result.get("error_description"))) + return result["access_token"] + + +def _acquire_fmi_credential_from_rma(): + """Acquire an FMI credential from the RMA app using certificate credentials. + + Mirrors Java's acquireFmiCredentialFromRma and Python's test_fmi_e2e helper. + Used for assertion callback context tests where the callback just needs to + return a valid FMI token (not specifically for an agent app). + """ + rma_app = msal.ConfidentialClientApplication( + _RMA_CLIENT_ID, + client_credential=get_client_certificate(), + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + result = rma_app.acquire_token_for_client( + [_FMI_EXCHANGE_SCOPE], fmi_path=_FMI_PATH) + if "access_token" not in result: + raise RuntimeError( + "RMA FMI credential acquisition failed: {}: {}".format( + result.get("error"), result.get("error_description"))) + return result["access_token"] + + +def _acquire_instance_token_for_agent(): + """Leg 1 + Leg 2: Acquire an instance token (T2) for the agent app. + + 1. Blueprint → T1 (FMI credential via fmi_path) + 2. Agent uses T1 as client_assertion → T2 (instance token) + + T2 is used as user_federated_identity_credential in Leg 3 (user_fic). + """ + t1 = _acquire_fmi_credential_for_agent(_AGENT_APP_ID) + + agent_app = msal.ConfidentialClientApplication( + _AGENT_APP_ID, + client_credential={"client_assertion": t1}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + result = agent_app.acquire_token_for_client([_TOKEN_EXCHANGE_SCOPE]) + if "access_token" not in result: + raise RuntimeError( + "Leg 2 failed — could not acquire instance token: {}: {}".format( + result.get("error"), result.get("error_description"))) + return result["access_token"] + + +# ============================================================================= +# Tests +# ============================================================================= + +class TestAssertionCallbackContext(LabBasedTestCase): + """Verify assertion callback receives correct context when fmi_path is set. + + Corresponds to Java's assertionCallback_ReceivesFmiPathContext. + """ + + def test_assertion_callback_receives_fmi_path(self): + captured_context = {} + + def assertion_callback(context): + captured_context.update(context) + return _acquire_fmi_credential_from_rma() + + app = msal.ConfidentialClientApplication( + "urn:microsoft:identity:fmi", + client_credential={"client_assertion": assertion_callback}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + + result = app.acquire_token_for_client( + [_FMI_EXCHANGE_SCOPE], fmi_path=_AGENT_APP_ID) + self.assertIn("access_token", result, + "acquire_token_for_client failed: {}: {}".format( + result.get("error"), result.get("error_description"))) + + # Verify context was passed to callback + self.assertEqual(_AGENT_APP_ID, captured_context.get("fmi_path"), + "fmi_path should flow to assertion callback context") + self.assertEqual("urn:microsoft:identity:fmi", captured_context.get("client_id"), + "client_id should be in assertion callback context") + self.assertTrue(captured_context.get("token_endpoint"), + "token_endpoint should be in assertion callback context") + + +class TestAgentAppToken(LabBasedTestCase): + """Agent acquires app-only token for Graph using FMI-sourced assertion. + + Corresponds to .NET's AgentGetsAppTokenForGraphTest and + Java's agentGetsAppToken_UsingFmiAssertion. + + Flow: Blueprint → T1 (assertion callback) → Agent CCA → app token + """ + + def test_agent_gets_app_token_for_graph(self): + def assertion_provider(context): + return _acquire_fmi_credential_for_agent(_AGENT_APP_ID) + + agent_app = msal.ConfidentialClientApplication( + _AGENT_APP_ID, + client_credential={"client_assertion": assertion_provider}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + + result = agent_app.acquire_token_for_client([_GRAPH_SCOPE]) + self.assertIn("access_token", result, + "Agent app token acquisition failed: {}: {}".format( + result.get("error"), result.get("error_description"))) + self.assertTrue(result["access_token"], + "Access token should not be empty") + + +class TestAgentUserIdentity(LabBasedTestCase): + """Full 3-leg agent identity flow: FMI → assertion → user_fic → user token. + + Corresponds to .NET's AgentUserIdentityGetsTokenForGraphTest and + Java's agentUserIdentity_GetsTokenForGraph. + + Flow: + 1. Blueprint → T1 (FMI credential) + 2. Agent uses T1 → T2 (instance token) + 3. Agent exchanges T2 via user_fic → user-scoped Graph token + 4. Verify token is cached and retrievable via acquire_token_silent + """ + + def test_agent_user_identity_gets_token_for_graph(self): + # Get instance token (T2) for user_fic exchange + t2 = _acquire_instance_token_for_agent() + + # Build agent CCA with assertion callback + def assertion_provider(context): + return _acquire_fmi_credential_for_agent(_AGENT_APP_ID) + + agent_app = msal.ConfidentialClientApplication( + _AGENT_APP_ID, + client_credential={"client_assertion": assertion_provider}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + + # Exchange T2 for user-scoped token via user_fic grant + result = agent_app.acquire_token_by_user_federated_identity_credential( + [_GRAPH_SCOPE], assertion=t2, username=_USER_UPN) + self.assertIn("access_token", result, + "user_fic token acquisition failed: {}: {}".format( + result.get("error"), result.get("error_description"))) + self.assertTrue(result["access_token"], + "Access token should not be empty") + + # Verify account was created + accounts = agent_app.get_accounts() + self.assertTrue(len(accounts) > 0, + "Account should be created from user_fic response") + + # Verify silent retrieval works (token should be cached) + account = accounts[0] + silent_result = agent_app.acquire_token_silent( + [_GRAPH_SCOPE], account=account) + self.assertIsNotNone(silent_result, + "acquire_token_silent should return cached token") + self.assertIn("access_token", silent_result) + self.assertEqual(result["access_token"], silent_result["access_token"], + "Silent call should return the same cached token") + + +class TestAgentCacheIsolation(LabBasedTestCase): + """App-only and user-scoped tokens are isolated in cache on the same CCA. + + Corresponds to Java's agentCca_AppAndUserTokens_CacheIsolation. + """ + + def test_app_and_user_tokens_are_isolated(self): + def assertion_provider(context): + return _acquire_fmi_credential_for_agent(_AGENT_APP_ID) + + agent_app = msal.ConfidentialClientApplication( + _AGENT_APP_ID, + client_credential={"client_assertion": assertion_provider}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + + # Acquire app-only token + app_result = agent_app.acquire_token_for_client([_GRAPH_SCOPE]) + self.assertIn("access_token", app_result, + "App token acquisition failed: {}: {}".format( + app_result.get("error"), app_result.get("error_description"))) + + # Acquire user token via user_fic + t2 = _acquire_instance_token_for_agent() + user_result = agent_app.acquire_token_by_user_federated_identity_credential( + [_GRAPH_SCOPE], assertion=t2, username=_USER_UPN) + self.assertIn("access_token", user_result, + "User token acquisition failed: {}: {}".format( + user_result.get("error"), user_result.get("error_description"))) + + # Tokens should be different (app-scoped vs user-scoped) + self.assertNotEqual(app_result["access_token"], user_result["access_token"], + "App token and user token should be different") + + # Verify both are independently retrievable + # App token: second call should return from cache + app_cached = agent_app.acquire_token_for_client([_GRAPH_SCOPE]) + self.assertEqual("cache", app_cached.get("token_source"), + "App token should be returned from cache on second call") + self.assertEqual(app_result["access_token"], app_cached["access_token"]) + + # User token: silent call should return from cache + accounts = agent_app.get_accounts() + self.assertTrue(len(accounts) > 0) + user_cached = agent_app.acquire_token_silent( + [_GRAPH_SCOPE], account=accounts[0]) + self.assertIsNotNone(user_cached) + self.assertEqual(user_result["access_token"], user_cached["access_token"]) + + +if __name__ == "__main__": + unittest.main() From 6dbf88cb9edb4c2be729ae049937319342bd0a4f Mon Sep 17 00:00:00 2001 From: avdunn Date: Wed, 20 May 2026 11:19:22 -0700 Subject: [PATCH 6/7] Adjust SNI behavior around .pfx files --- msal/application.py | 9 ++++++++- tests/test_agentic_e2e.py | 28 ++++++---------------------- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/msal/application.py b/msal/application.py index f98d77c1..af34e35a 100644 --- a/msal/application.py +++ b/msal/application.py @@ -91,12 +91,19 @@ def _parse_pfx(pfx_path, passphrase_bytes): # Cert concepts https://security.stackexchange.com/a/226758/125264 from cryptography.hazmat.primitives.serialization import pkcs12 with open(pfx_path, 'rb') as f: - private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ + private_key, cert, additional_certs = pkcs12.load_key_and_certificates( + # cryptography 2.5+ # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates f.read(), passphrase_bytes) if not (private_key and cert): raise ValueError("Your PFX file shall contain both private key and cert") sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert) + # Per RFC 7515 §4.1.6, x5c should include the full certificate chain + # (leaf first, then intermediates) for SNI (Subject Name/Issuer) auth. + if additional_certs: + for extra_cert in additional_certs: + _, _, extra_x5c = _extract_cert_and_thumbprints(extra_cert) + x5c.extend(extra_x5c) return private_key, sha256_thumbprint, sha1_thumbprint, x5c diff --git a/tests/test_agentic_e2e.py b/tests/test_agentic_e2e.py index 7114131c..a23bed09 100644 --- a/tests/test_agentic_e2e.py +++ b/tests/test_agentic_e2e.py @@ -6,10 +6,6 @@ 3. Full 3-leg flow: FMI → assertion → user_fic → user-scoped token 4. Cache isolation between app-only and user-scoped tokens -Corresponds to: -- .NET: Agentic.cs -- Java: AgenticIT.java - Test configuration uses the same lab infrastructure as test_fmi_e2e.py. Requires LAB_APP_CLIENT_CERT_PFX_PATH environment variable. """ @@ -28,7 +24,7 @@ logging.basicConfig(level=logging.DEBUG if "-v" in sys.argv else logging.INFO) # ============================================================================= -# Test configuration — matches .NET/Java agentic test constants +# Test configuration — shared lab app registrations for agentic scenarios # ============================================================================= _TENANT_ID = "10c419d4-4a50-45b2-aa4e-919fb84df24f" _BLUEPRINT_CLIENT_ID = "aab5089d-e764-47e3-9f28-cc11c2513821" @@ -43,14 +39,14 @@ # ============================================================================= -# Helpers — mirror .NET GetAppCredentialAsync / Java acquireFmiCredentialForAgent +# Helpers # ============================================================================= def _acquire_fmi_credential_for_agent(agent_app_id): """Leg 1: Blueprint app acquires FMI credential (T1) for the given agent. Uses certificate authentication with SNI (sendX5C) and fmi_path set to - the agent app ID — matching .NET and Java helper methods. + the agent app ID. """ blueprint_app = msal.ConfidentialClientApplication( _BLUEPRINT_CLIENT_ID, @@ -70,7 +66,7 @@ def _acquire_fmi_credential_for_agent(agent_app_id): def _acquire_fmi_credential_from_rma(): """Acquire an FMI credential from the RMA app using certificate credentials. - Mirrors Java's acquireFmiCredentialFromRma and Python's test_fmi_e2e helper. + Uses the same RMA pattern as test_fmi_e2e._get_fmi_credential_from_rma(). Used for assertion callback context tests where the callback just needs to return a valid FMI token (not specifically for an agent app). """ @@ -118,10 +114,7 @@ def _acquire_instance_token_for_agent(): # ============================================================================= class TestAssertionCallbackContext(LabBasedTestCase): - """Verify assertion callback receives correct context when fmi_path is set. - - Corresponds to Java's assertionCallback_ReceivesFmiPathContext. - """ + """Verify assertion callback receives correct context when fmi_path is set.""" def test_assertion_callback_receives_fmi_path(self): captured_context = {} @@ -155,9 +148,6 @@ def assertion_callback(context): class TestAgentAppToken(LabBasedTestCase): """Agent acquires app-only token for Graph using FMI-sourced assertion. - Corresponds to .NET's AgentGetsAppTokenForGraphTest and - Java's agentGetsAppToken_UsingFmiAssertion. - Flow: Blueprint → T1 (assertion callback) → Agent CCA → app token """ @@ -183,9 +173,6 @@ def assertion_provider(context): class TestAgentUserIdentity(LabBasedTestCase): """Full 3-leg agent identity flow: FMI → assertion → user_fic → user token. - Corresponds to .NET's AgentUserIdentityGetsTokenForGraphTest and - Java's agentUserIdentity_GetsTokenForGraph. - Flow: 1. Blueprint → T1 (FMI credential) 2. Agent uses T1 → T2 (instance token) @@ -234,10 +221,7 @@ def assertion_provider(context): class TestAgentCacheIsolation(LabBasedTestCase): - """App-only and user-scoped tokens are isolated in cache on the same CCA. - - Corresponds to Java's agentCca_AppAndUserTokens_CacheIsolation. - """ + """App-only and user-scoped tokens are isolated in cache on the same CCA.""" def test_app_and_user_tokens_are_isolated(self): def assertion_provider(context): From 12174d97c1ad133c9507e541529e146f0bf155ff Mon Sep 17 00:00:00 2001 From: avdunn Date: Wed, 20 May 2026 11:32:01 -0700 Subject: [PATCH 7/7] Revert pfx changes and disable tests --- msal/application.py | 9 +-------- tests/test_agentic_e2e.py | 21 ++++++++++++++++++++- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/msal/application.py b/msal/application.py index af34e35a..f98d77c1 100644 --- a/msal/application.py +++ b/msal/application.py @@ -91,19 +91,12 @@ def _parse_pfx(pfx_path, passphrase_bytes): # Cert concepts https://security.stackexchange.com/a/226758/125264 from cryptography.hazmat.primitives.serialization import pkcs12 with open(pfx_path, 'rb') as f: - private_key, cert, additional_certs = pkcs12.load_key_and_certificates( - # cryptography 2.5+ + private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates f.read(), passphrase_bytes) if not (private_key and cert): raise ValueError("Your PFX file shall contain both private key and cert") sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert) - # Per RFC 7515 §4.1.6, x5c should include the full certificate chain - # (leaf first, then intermediates) for SNI (Subject Name/Issuer) auth. - if additional_certs: - for extra_cert in additional_certs: - _, _, extra_x5c = _extract_cert_and_thumbprints(extra_cert) - x5c.extend(extra_x5c) return private_key, sha256_thumbprint, sha1_thumbprint, x5c diff --git a/tests/test_agentic_e2e.py b/tests/test_agentic_e2e.py index a23bed09..153d7fb7 100644 --- a/tests/test_agentic_e2e.py +++ b/tests/test_agentic_e2e.py @@ -149,8 +149,16 @@ class TestAgentAppToken(LabBasedTestCase): """Agent acquires app-only token for Graph using FMI-sourced assertion. Flow: Blueprint → T1 (assertion callback) → Agent CCA → app token + + Disabled in CI: The blueprint app (aab5089d) requires SNI authentication, + but the CI pipeline's PFX-based cert loading does not include intermediate + certs in the x5c chain, causing AADSTS700027. These tests pass locally + where the OS cert store can resolve the chain. """ + @unittest.skipUnless( + os.environ.get("MSAL_RUN_LOCAL_ONLY_TESTS"), + "Requires local cert store for SNI — set MSAL_RUN_LOCAL_ONLY_TESTS=1") def test_agent_gets_app_token_for_graph(self): def assertion_provider(context): return _acquire_fmi_credential_for_agent(_AGENT_APP_ID) @@ -178,8 +186,13 @@ class TestAgentUserIdentity(LabBasedTestCase): 2. Agent uses T1 → T2 (instance token) 3. Agent exchanges T2 via user_fic → user-scoped Graph token 4. Verify token is cached and retrievable via acquire_token_silent + + Disabled in CI: see TestAgentAppToken docstring. """ + @unittest.skipUnless( + os.environ.get("MSAL_RUN_LOCAL_ONLY_TESTS"), + "Requires local cert store for SNI — set MSAL_RUN_LOCAL_ONLY_TESTS=1") def test_agent_user_identity_gets_token_for_graph(self): # Get instance token (T2) for user_fic exchange t2 = _acquire_instance_token_for_agent() @@ -221,8 +234,14 @@ def assertion_provider(context): class TestAgentCacheIsolation(LabBasedTestCase): - """App-only and user-scoped tokens are isolated in cache on the same CCA.""" + """App-only and user-scoped tokens are isolated in cache on the same CCA. + + Disabled in CI: see TestAgentAppToken docstring. + """ + @unittest.skipUnless( + os.environ.get("MSAL_RUN_LOCAL_ONLY_TESTS"), + "Requires local cert store for SNI — set MSAL_RUN_LOCAL_ONLY_TESTS=1") def test_app_and_user_tokens_are_isolated(self): def assertion_provider(context): return _acquire_fmi_credential_for_agent(_AGENT_APP_ID)