From 1382c98a586e0e187768005fa8a4e51c51d61301 Mon Sep 17 00:00:00 2001 From: Romain Yon <1596570+yonromai@users.noreply.github.com> Date: Fri, 22 May 2026 09:30:24 -0700 Subject: [PATCH] Support kubernetes v36 auth in Iris --- lib/iris/pyproject.toml | 4 +- .../src/iris/cluster/providers/k8s/service.py | 69 +++++++++-- .../providers/k8s/test_cloud_k8s_service.py | 110 ++++++++++++++++++ uv.lock | 11 +- 4 files changed, 180 insertions(+), 14 deletions(-) diff --git a/lib/iris/pyproject.toml b/lib/iris/pyproject.toml index 63b65c0f8d..f07e917439 100644 --- a/lib/iris/pyproject.toml +++ b/lib/iris/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ controller = [ "duckdb>=1.0.0", "pyarrow>=19.0.0", - "kubernetes>=31.0.0,<36", + "kubernetes>=31.0.0,<37", ] worker = [] @@ -53,7 +53,7 @@ examples = [ dev = [ "duckdb>=1.0.0", "pyarrow>=19.0.0", - "kubernetes>=31.0.0,<36", + "kubernetes>=31.0.0,<37", "ipykernel>=7.1.0", "jupyter>=1.1.1", "nbconvert>=7.16.6", diff --git a/lib/iris/src/iris/cluster/providers/k8s/service.py b/lib/iris/src/iris/cluster/providers/k8s/service.py index fcd0309d7c..86dcd45ea4 100644 --- a/lib/iris/src/iris/cluster/providers/k8s/service.py +++ b/lib/iris/src/iris/cluster/providers/k8s/service.py @@ -15,7 +15,7 @@ from contextlib import AbstractContextManager, contextmanager from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Protocol, runtime_checkable +from typing import Any, Protocol, runtime_checkable try: import kubernetes @@ -56,6 +56,45 @@ _SLOW_THRESHOLD_MS: int = 2000 +def _sync_bearer_token_alias(configuration: kubernetes.client.Configuration) -> None: + token = configuration.api_key.get("authorization") + if token is not None: + # kubernetes-client/python v36 Configuration.auth_settings() reads + # api_key["BearerToken"], while the config loaders still write bearer + # tokens to api_key["authorization"]. + configuration.api_key["BearerToken"] = token + + +def _keep_bearer_token_alias_fresh(configuration: kubernetes.client.Configuration) -> None: + refresh_hook = configuration.refresh_api_key_hook + _sync_bearer_token_alias(configuration) + + if refresh_hook is None: + return + + def refresh_with_bearer_token_alias(client_configuration: kubernetes.client.Configuration) -> None: + refresh_hook(client_configuration) + _sync_bearer_token_alias(client_configuration) + client_configuration.refresh_api_key_hook = refresh_with_bearer_token_alias + + configuration.refresh_api_key_hook = refresh_with_bearer_token_alias + + +def _pod_log_response_text(response: Any) -> str: + data = getattr(response, "data", response) + if isinstance(data, bytes): + return data.decode("utf-8", errors="replace") + if isinstance(data, str): + return data + return str(data) + + +def _release_pod_log_response(response: Any) -> None: + release_conn = getattr(response, "release_conn", None) + if callable(release_conn): + release_conn() + + @runtime_checkable class K8sService(Protocol): """Protocol for Kubernetes operations. @@ -198,15 +237,21 @@ def __post_init__(self) -> None: def create_api_client(self) -> kubernetes.client.ApiClient: if self.kubeconfig_path: - return kubernetes.config.new_client_from_config( + api_client = kubernetes.config.new_client_from_config( config_file=self.kubeconfig_path, ) + _keep_bearer_token_alias_fresh(api_client.configuration) + return api_client try: - kubernetes.config.load_incluster_config() - return kubernetes.client.ApiClient() + config = kubernetes.client.Configuration() + kubernetes.config.load_incluster_config(client_configuration=config) + _keep_bearer_token_alias_fresh(config) + return kubernetes.client.ApiClient(config) except kubernetes.config.ConfigException: - return kubernetes.config.new_client_from_config() + api_client = kubernetes.config.new_client_from_config() + _keep_bearer_token_alias_fresh(api_client.configuration) + return api_client def _resource_api(self, resource: K8sResource): """Get the DynamicClient resource handle for a K8sResource enum member.""" @@ -476,11 +521,16 @@ def logs(self, pod_name: str, *, container: str | None = None, tail: int = 50, p "namespace": self.namespace, "tail_lines": tail, "previous": previous, + "_preload_content": False, "_request_timeout": self.timeout, } if container: kwargs["container"] = container - return self._core_v1.read_namespaced_pod_log(**kwargs) + response = self._core_v1.read_namespaced_pod_log(**kwargs) + try: + return _pod_log_response_text(response) + finally: + _release_pod_log_response(response) except ApiException as e: if e.status == 404: return "" @@ -509,6 +559,7 @@ def stream_logs( "name": pod_name, "namespace": self.namespace, "timestamps": True, + "_preload_content": False, "_request_timeout": 15.0, } if container: @@ -520,7 +571,11 @@ def stream_logs( if limit_bytes is not None: kwargs["limit_bytes"] = limit_bytes - raw = self._core_v1.read_namespaced_pod_log(**kwargs) + response = self._core_v1.read_namespaced_pod_log(**kwargs) + try: + raw = _pod_log_response_text(response) + finally: + _release_pod_log_response(response) except ApiException as e: if e.status == 404: return KubectlLogResult(lines=[], last_timestamp=since_time) diff --git a/lib/iris/tests/cluster/providers/k8s/test_cloud_k8s_service.py b/lib/iris/tests/cluster/providers/k8s/test_cloud_k8s_service.py index ff0767af97..41eb82e387 100644 --- a/lib/iris/tests/cluster/providers/k8s/test_cloud_k8s_service.py +++ b/lib/iris/tests/cluster/providers/k8s/test_cloud_k8s_service.py @@ -5,10 +5,32 @@ from __future__ import annotations +from datetime import datetime, timezone + import pytest +from iris.cluster.providers.k8s import service as k8s_service from iris.cluster.providers.k8s.types import K8sResource +class _LogResponse: + def __init__(self, data: bytes): + self.data = data + self.released = False + + def release_conn(self): + self.released = True + + +class _CoreV1WithLogResponse: + def __init__(self, response: _LogResponse): + self.response = response + self.kwargs = None + + def read_namespaced_pod_log(self, **kwargs): + self.kwargs = kwargs + return self.response + + # Test item_path construction for namespaced resources @pytest.mark.parametrize( "resource,name,namespace,expected", @@ -126,3 +148,91 @@ def test_api_base_paths(): assert K8sResource.DEPLOYMENTS.api_base() == "/apis/apps/v1" assert K8sResource.CLUSTER_ROLES.api_base() == "/apis/rbac.authorization.k8s.io/v1" assert K8sResource.NODE_POOLS.api_base() == "/apis/compute.coreweave.com/v1alpha1" + + +def test_bearer_token_alias_is_added_for_incluster_auth(): + if k8s_service.kubernetes is None: + pytest.skip("kubernetes client is not installed") + + config = k8s_service.kubernetes.client.Configuration() + config.api_key["authorization"] = "bearer token-1" + + k8s_service._keep_bearer_token_alias_fresh(config) + + assert config.api_key["BearerToken"] == "bearer token-1" + assert config.get_api_key_with_prefix("BearerToken") == "bearer token-1" + + +def test_bearer_token_alias_tracks_incluster_token_refresh(): + if k8s_service.kubernetes is None: + pytest.skip("kubernetes client is not installed") + + config = k8s_service.kubernetes.client.Configuration() + config.api_key["authorization"] = "bearer token-1" + + def refresh(client_configuration): + client_configuration.api_key["authorization"] = "bearer token-2" + client_configuration.refresh_api_key_hook = refresh + + config.refresh_api_key_hook = refresh + k8s_service._keep_bearer_token_alias_fresh(config) + + assert config.get_api_key_with_prefix("BearerToken") == "bearer token-2" + assert config.api_key["BearerToken"] == "bearer token-2" + assert config.refresh_api_key_hook is not refresh + + +def test_create_api_client_syncs_bearer_token_for_kubeconfig(monkeypatch): + if k8s_service.kubernetes is None: + pytest.skip("kubernetes client is not installed") + + config = k8s_service.kubernetes.client.Configuration() + config.api_key["authorization"] = "bearer token-1" + api_client = k8s_service.kubernetes.client.ApiClient(config) + + def new_client_from_config(config_file=None): + assert config_file == "/tmp/kubeconfig" + return api_client + + monkeypatch.setattr(k8s_service.kubernetes.config, "new_client_from_config", new_client_from_config) + service = k8s_service.CloudK8sService.__new__(k8s_service.CloudK8sService) + service.kubeconfig_path = "/tmp/kubeconfig" + + assert service.create_api_client() is api_client + assert config.api_key["BearerToken"] == "bearer token-1" + + +def test_logs_decode_raw_response_bytes(): + response = _LogResponse(b"line 1\nline 2\n") + core_v1 = _CoreV1WithLogResponse(response) + service = k8s_service.CloudK8sService.__new__(k8s_service.CloudK8sService) + service.namespace = "test-ns" + service.timeout = 60.0 + service._core_v1 = core_v1 + + assert service.logs("pod-1", container="task", tail=10) == "line 1\nline 2\n" + assert core_v1.kwargs["_preload_content"] is False + assert core_v1.kwargs["container"] == "task" + assert response.released + + +def test_stream_logs_decodes_raw_response_bytes(): + response = _LogResponse( + b"2026-05-22T16:45:12.158721540Z I20260522 16:45:12 iris.test.verbose info-marker\n" + b"2026-05-22T16:45:12.158728857Z W20260522 16:45:12 iris.test.verbose warning-marker\n" + ) + core_v1 = _CoreV1WithLogResponse(response) + service = k8s_service.CloudK8sService.__new__(k8s_service.CloudK8sService) + service.namespace = "test-ns" + service._core_v1 = core_v1 + + result = service.stream_logs("pod-1", container="task", limit_bytes=100_000) + + assert core_v1.kwargs["_preload_content"] is False + assert core_v1.kwargs["container"] == "task" + assert response.released + assert [line.data for line in result.lines] == [ + "I20260522 16:45:12 iris.test.verbose info-marker", + "W20260522 16:45:12 iris.test.verbose warning-marker", + ] + assert result.last_timestamp == datetime(2026, 5, 22, 16, 45, 12, 158728, tzinfo=timezone.utc) diff --git a/uv.lock b/uv.lock index 8386b5e574..b49f7c1b4f 100644 --- a/uv.lock +++ b/uv.lock @@ -4400,9 +4400,10 @@ wheels = [ [[package]] name = "kubernetes" -version = "35.0.0" +version = "36.0.0" source = { registry = "https://pypi.org/simple" } dependencies = [ + { name = "aiohttp" }, { name = "certifi" }, { name = "durationpy" }, { name = "python-dateutil" }, @@ -4413,9 +4414,9 @@ dependencies = [ { name = "urllib3" }, { name = "websocket-client" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/2c/8f/85bf51ad4150f64e8c665daf0d9dfe9787ae92005efb9a4d1cba592bd79d/kubernetes-35.0.0.tar.gz", hash = "sha256:3d00d344944239821458b9efd484d6df9f011da367ecb155dadf9513f05f09ee", size = 1094642, upload-time = "2026-01-16T01:05:27.76Z" } +sdist = { url = "https://files.pythonhosted.org/packages/bf/59/dc635e4e9afb3884bc5c57f14fe23783e4c04601aa20b835ac75c41d1625/kubernetes-36.0.0.tar.gz", hash = "sha256:027b606bb8032e6c6464a53236bdd9bd9a94c237e1063bc45a303c25b304ced9", size = 2346728, upload-time = "2026-05-20T20:44:24.28Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/0c/70/05b685ea2dffcb2adbf3cdcea5d8865b7bc66f67249084cf845012a0ff13/kubernetes-35.0.0-py2.py3-none-any.whl", hash = "sha256:39e2b33b46e5834ef6c3985ebfe2047ab39135d41de51ce7641a7ca5b372a13d", size = 2017602, upload-time = "2026-01-16T01:05:25.991Z" }, + { url = "https://files.pythonhosted.org/packages/cd/d2/6f99ca9c7eb961dfdd45b9643101399a8ee20922c662c362c91e9cc7e832/kubernetes-36.0.0-py2.py3-none-any.whl", hash = "sha256:a766433357ec9f90db7565cccf52e28e7fca40b0ef366c80a6022adbc0ac0425", size = 4660469, upload-time = "2026-05-20T20:44:20.893Z" }, ] [[package]] @@ -5519,7 +5520,7 @@ requires-dist = [ { name = "grpcio", specifier = ">=1.76.0" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "humanfriendly", specifier = ">=10.0" }, - { name = "kubernetes", marker = "extra == 'controller'", specifier = ">=31.0.0,<36" }, + { name = "kubernetes", marker = "extra == 'controller'", specifier = ">=31.0.0,<37" }, { name = "marin-finelog", editable = "lib/finelog" }, { name = "marin-rigging", editable = "lib/rigging" }, { name = "pyarrow", marker = "extra == 'controller'", specifier = ">=19.0.0" }, @@ -5541,7 +5542,7 @@ dev = [ { name = "duckdb", specifier = ">=1.0.0" }, { name = "ipykernel", specifier = ">=7.1.0" }, { name = "jupyter", specifier = ">=1.1.1" }, - { name = "kubernetes", specifier = ">=31.0.0,<36" }, + { name = "kubernetes", specifier = ">=31.0.0,<37" }, { name = "nbconvert", specifier = ">=7.16.6" }, { name = "nbformat", specifier = ">=5.10.4" }, { name = "pillow", specifier = ">=10.0.0" },