Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/iris/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies = [
controller = [
"duckdb>=1.0.0",
"pyarrow>=19.0.0",
"kubernetes>=31.0.0,<36",
"kubernetes>=31.0.0,<37",
]
worker = []

Expand All @@ -53,7 +53,7 @@ examples = [
dev = [
"duckdb>=1.0.0",
"pyarrow>=19.0.0",
"kubernetes>=31.0.0,<36",
"kubernetes>=31.0.0,<37",
"ipykernel>=7.1.0",
"jupyter>=1.1.1",
"nbconvert>=7.16.6",
Expand Down
69 changes: 62 additions & 7 deletions lib/iris/src/iris/cluster/providers/k8s/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from contextlib import AbstractContextManager, contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Protocol, runtime_checkable
from typing import Any, Protocol, runtime_checkable

try:
import kubernetes
Expand Down Expand Up @@ -56,6 +56,45 @@
_SLOW_THRESHOLD_MS: int = 2000


def _sync_bearer_token_alias(configuration: kubernetes.client.Configuration) -> None:
token = configuration.api_key.get("authorization")
if token is not None:
# kubernetes-client/python v36 Configuration.auth_settings() reads
# api_key["BearerToken"], while the config loaders still write bearer
# tokens to api_key["authorization"].
configuration.api_key["BearerToken"] = token


def _keep_bearer_token_alias_fresh(configuration: kubernetes.client.Configuration) -> None:
refresh_hook = configuration.refresh_api_key_hook
_sync_bearer_token_alias(configuration)

if refresh_hook is None:
return

def refresh_with_bearer_token_alias(client_configuration: kubernetes.client.Configuration) -> None:
refresh_hook(client_configuration)
_sync_bearer_token_alias(client_configuration)
client_configuration.refresh_api_key_hook = refresh_with_bearer_token_alias

configuration.refresh_api_key_hook = refresh_with_bearer_token_alias


def _pod_log_response_text(response: Any) -> str:
data = getattr(response, "data", response)
if isinstance(data, bytes):
return data.decode("utf-8", errors="replace")
if isinstance(data, str):
return data
return str(data)


def _release_pod_log_response(response: Any) -> None:
release_conn = getattr(response, "release_conn", None)
if callable(release_conn):
release_conn()


@runtime_checkable
class K8sService(Protocol):
"""Protocol for Kubernetes operations.
Expand Down Expand Up @@ -198,15 +237,21 @@ def __post_init__(self) -> None:

def create_api_client(self) -> kubernetes.client.ApiClient:
if self.kubeconfig_path:
return kubernetes.config.new_client_from_config(
api_client = kubernetes.config.new_client_from_config(
config_file=self.kubeconfig_path,
)
_keep_bearer_token_alias_fresh(api_client.configuration)
return api_client

try:
kubernetes.config.load_incluster_config()
return kubernetes.client.ApiClient()
config = kubernetes.client.Configuration()
kubernetes.config.load_incluster_config(client_configuration=config)
_keep_bearer_token_alias_fresh(config)
return kubernetes.client.ApiClient(config)
except kubernetes.config.ConfigException:
return kubernetes.config.new_client_from_config()
api_client = kubernetes.config.new_client_from_config()
_keep_bearer_token_alias_fresh(api_client.configuration)
return api_client

def _resource_api(self, resource: K8sResource):
"""Get the DynamicClient resource handle for a K8sResource enum member."""
Expand Down Expand Up @@ -476,11 +521,16 @@ def logs(self, pod_name: str, *, container: str | None = None, tail: int = 50, p
"namespace": self.namespace,
"tail_lines": tail,
"previous": previous,
"_preload_content": False,
"_request_timeout": self.timeout,
}
if container:
kwargs["container"] = container
return self._core_v1.read_namespaced_pod_log(**kwargs)
response = self._core_v1.read_namespaced_pod_log(**kwargs)
try:
return _pod_log_response_text(response)
finally:
_release_pod_log_response(response)
except ApiException as e:
if e.status == 404:
return ""
Expand Down Expand Up @@ -509,6 +559,7 @@ def stream_logs(
"name": pod_name,
"namespace": self.namespace,
"timestamps": True,
"_preload_content": False,
"_request_timeout": 15.0,
}
if container:
Expand All @@ -520,7 +571,11 @@ def stream_logs(
if limit_bytes is not None:
kwargs["limit_bytes"] = limit_bytes

raw = self._core_v1.read_namespaced_pod_log(**kwargs)
response = self._core_v1.read_namespaced_pod_log(**kwargs)
try:
raw = _pod_log_response_text(response)
finally:
_release_pod_log_response(response)
except ApiException as e:
if e.status == 404:
return KubectlLogResult(lines=[], last_timestamp=since_time)
Expand Down
110 changes: 110 additions & 0 deletions lib/iris/tests/cluster/providers/k8s/test_cloud_k8s_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,32 @@

from __future__ import annotations

from datetime import datetime, timezone

import pytest
from iris.cluster.providers.k8s import service as k8s_service
from iris.cluster.providers.k8s.types import K8sResource


class _LogResponse:
def __init__(self, data: bytes):
self.data = data
self.released = False

def release_conn(self):
self.released = True


class _CoreV1WithLogResponse:
def __init__(self, response: _LogResponse):
self.response = response
self.kwargs = None

def read_namespaced_pod_log(self, **kwargs):
self.kwargs = kwargs
return self.response


# Test item_path construction for namespaced resources
@pytest.mark.parametrize(
"resource,name,namespace,expected",
Expand Down Expand Up @@ -126,3 +148,91 @@ def test_api_base_paths():
assert K8sResource.DEPLOYMENTS.api_base() == "/apis/apps/v1"
assert K8sResource.CLUSTER_ROLES.api_base() == "/apis/rbac.authorization.k8s.io/v1"
assert K8sResource.NODE_POOLS.api_base() == "/apis/compute.coreweave.com/v1alpha1"


def test_bearer_token_alias_is_added_for_incluster_auth():
if k8s_service.kubernetes is None:
pytest.skip("kubernetes client is not installed")

config = k8s_service.kubernetes.client.Configuration()
config.api_key["authorization"] = "bearer token-1"

k8s_service._keep_bearer_token_alias_fresh(config)

assert config.api_key["BearerToken"] == "bearer token-1"
assert config.get_api_key_with_prefix("BearerToken") == "bearer token-1"


def test_bearer_token_alias_tracks_incluster_token_refresh():
if k8s_service.kubernetes is None:
pytest.skip("kubernetes client is not installed")

config = k8s_service.kubernetes.client.Configuration()
config.api_key["authorization"] = "bearer token-1"

def refresh(client_configuration):
client_configuration.api_key["authorization"] = "bearer token-2"
client_configuration.refresh_api_key_hook = refresh

config.refresh_api_key_hook = refresh
k8s_service._keep_bearer_token_alias_fresh(config)

assert config.get_api_key_with_prefix("BearerToken") == "bearer token-2"
assert config.api_key["BearerToken"] == "bearer token-2"
assert config.refresh_api_key_hook is not refresh


def test_create_api_client_syncs_bearer_token_for_kubeconfig(monkeypatch):
if k8s_service.kubernetes is None:
pytest.skip("kubernetes client is not installed")

config = k8s_service.kubernetes.client.Configuration()
config.api_key["authorization"] = "bearer token-1"
api_client = k8s_service.kubernetes.client.ApiClient(config)

def new_client_from_config(config_file=None):
assert config_file == "/tmp/kubeconfig"
return api_client

monkeypatch.setattr(k8s_service.kubernetes.config, "new_client_from_config", new_client_from_config)
service = k8s_service.CloudK8sService.__new__(k8s_service.CloudK8sService)
service.kubeconfig_path = "/tmp/kubeconfig"

assert service.create_api_client() is api_client
assert config.api_key["BearerToken"] == "bearer token-1"


def test_logs_decode_raw_response_bytes():
response = _LogResponse(b"line 1\nline 2\n")
core_v1 = _CoreV1WithLogResponse(response)
service = k8s_service.CloudK8sService.__new__(k8s_service.CloudK8sService)
service.namespace = "test-ns"
service.timeout = 60.0
service._core_v1 = core_v1

assert service.logs("pod-1", container="task", tail=10) == "line 1\nline 2\n"
assert core_v1.kwargs["_preload_content"] is False
assert core_v1.kwargs["container"] == "task"
assert response.released


def test_stream_logs_decodes_raw_response_bytes():
response = _LogResponse(
b"2026-05-22T16:45:12.158721540Z I20260522 16:45:12 iris.test.verbose info-marker\n"
b"2026-05-22T16:45:12.158728857Z W20260522 16:45:12 iris.test.verbose warning-marker\n"
)
core_v1 = _CoreV1WithLogResponse(response)
service = k8s_service.CloudK8sService.__new__(k8s_service.CloudK8sService)
service.namespace = "test-ns"
service._core_v1 = core_v1

result = service.stream_logs("pod-1", container="task", limit_bytes=100_000)

assert core_v1.kwargs["_preload_content"] is False
assert core_v1.kwargs["container"] == "task"
assert response.released
assert [line.data for line in result.lines] == [
"I20260522 16:45:12 iris.test.verbose info-marker",
"W20260522 16:45:12 iris.test.verbose warning-marker",
]
assert result.last_timestamp == datetime(2026, 5, 22, 16, 45, 12, 158728, tzinfo=timezone.utc)
11 changes: 6 additions & 5 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading