Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fe0c848
feat(kfp-api): support s3-integrator as an object storage backend
mvlassis Jun 18, 2026
514a98a
ci(kfp-api): run s3 in default integration env, add integration-objec…
mvlassis Jun 18, 2026
3cf4b5c
Update tox docs
mvlassis Jun 18, 2026
97ac343
Make object storage relations optional
mvlassis Jun 19, 2026
2cde759
Merge branch 'main' into kf-8704-kfp-api
mvlassis Jun 19, 2026
bfc9d87
Update comments
mvlassis Jun 19, 2026
33b96c4
Create bucket if missing for s3-integrator
mvlassis Jun 22, 2026
2e066f3
Use bucket from config
mvlassis Jun 22, 2026
5e3573d
Update ci
mvlassis Jun 22, 2026
f9eee71
Add CA certification
mvlassis Jun 22, 2026
3c1144a
Merge branch 'main' into kf-8704-kfp-api
mvlassis Jun 23, 2026
2362dab
Add option for bucket
mvlassis Jun 23, 2026
d09715c
Add limit to object storage relations
mvlassis Jun 23, 2026
b85fdce
Add upgrade test
mvlassis Jun 23, 2026
a9fd8cd
Fix integration tests
mvlassis Jun 23, 2026
8897aa8
Remove certs
mvlassis Jun 23, 2026
b744ee1
Update tox.ini
mvlassis Jun 24, 2026
61e6054
Best-effort cleanup for `ca-chain`
mvlassis Jun 24, 2026
79d3841
Fix comment
mvlassis Jun 24, 2026
e419e06
Error handling for host
mvlassis Jun 24, 2026
2b3c801
Fix linting
mvlassis Jun 24, 2026
7f33cbe
Also show missing fields in s3-credentials relation
mvlassis Jun 24, 2026
11b7726
Better variable name
mvlassis Jun 24, 2026
684dcd0
Emit BlockedStatus on a TLS error
mvlassis Jun 24, 2026
5698acb
Use mysql instead of mariadb
mvlassis Jun 24, 2026
9e1c851
Return None tls-ca-chain for minio
mvlassis Jun 24, 2026
f97978a
fix: add connect/read timeouts to cached S3 client property
Copilot Jun 24, 2026
0058b73
Address comments
mvlassis Jun 25, 2026
0782d0f
Update charms/kfp-api/src/services/s3.py
mvlassis Jun 25, 2026
088a9c8
Address comments
mvlassis Jun 25, 2026
714f587
Address comments
mvlassis Jun 25, 2026
1102e34
Address comments
mvlassis Jun 25, 2026
d164ce5
Address comments
mvlassis Jun 25, 2026
a222d7a
fix: Address comments
mvlassis Jun 25, 2026
e858c78
Use profile: testing for mysql-k8s
mvlassis Jun 25, 2026
26d21b3
Remove uneccessary minio-service
mvlassis Jun 25, 2026
f44ddfd
Mocked resource handlere
mvlassis Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ jobs:
with open(os.environ["GITHUB_OUTPUT"], "a") as f:
f.write(f"track={track}\n")
f.write(f"channel={channel}\n")

print(f"Track: {track}")
print(f"Channel: {channel}")

Expand Down Expand Up @@ -199,13 +199,29 @@ jobs:
test-type:
- integration
- integration-ambient
- integration-object-storage
exclude:
- charm: kfp-schedwf
test-type: integration-ambient
- charm: kfp-viewer
test-type: integration-ambient
- charm: kfp-metadata-writer
test-type: integration-ambient
# TODO: Update with other charms that have an integration via object-storage
- charm: kfp-persistence
test-type: integration-object-storage
- charm: kfp-profile-controller
test-type: integration-object-storage
- charm: kfp-schedwf
test-type: integration-object-storage
- charm: kfp-viewer
test-type: integration-object-storage
- charm: kfp-ui
test-type: integration-object-storage
- charm: kfp-metadata-writer
test-type: integration-object-storage
- charm: kfp-viz
test-type: integration-object-storage
steps:
# Ideally we'd use self-hosted runners, but this effort is still not stable
# This action will remove unused software (dotnet, haskell, android libs, codeql,
Expand Down
4 changes: 2 additions & 2 deletions bundle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ applications:
metacontroller-operator: { charm: ch:metacontroller-operator, channel: latest/edge, scale: 1, trust: true }
minio: { charm: ch:minio, channel: latest/edge, scale: 1 }
kfp-api: { charm: ch:kfp-api, channel: latest/edge, scale: 1, trust: true}
kfp-db: { charm: charmed-osm-mariadb-k8s, channel: latest/stable, scale: 1, options: { database: mlpipeline } }
kfp-db: { charm: mysql-k8s, channel: 8.0/stable, scale: 1, trust: true, options: { profile: testing } }
kfp-profile-controller: { charm: ch:kfp-profile-controller, channel: latest/edge, scale: 1 }
kfp-persistence: { charm: ch:kfp-persistence, channel: latest/edge, scale: 1 }
kfp-schedwf: { charm: ch:kfp-schedwf, channel: latest/edge, scale: 1 }
Expand All @@ -14,7 +14,7 @@ applications:
kfp-viz: { charm: ch:kfp-viz, channel: latest/edge, scale: 1 }
relations:
- [argo-controller:object-storage, minio:object-storage]
- [kfp-api, kfp-db]
- [kfp-api:relational-db, kfp-db:database]
- [kfp-api:kfp-api, kfp-persistence:kfp-api]
- [kfp-api:kfp-api, kfp-ui:kfp-api]
- [kfp-api:kfp-viz, kfp-viz:kfp-viz]
Expand Down
6 changes: 6 additions & 0 deletions charms/kfp-api/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@ requires:
relational-db:
interface: mysql_client
limit: 1
s3-credentials:
interface: s3
optional: true
limit: 1
object-storage:
interface: object-storage
optional: true
limit: 1
schema:
v1:
provides:
Expand Down
1,589 changes: 788 additions & 801 deletions charms/kfp-api/poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions charms/kfp-api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ optional = true

[tool.poetry.group.charm.dependencies]
boto3 = ">=1.42.51"
charmed-kubeflow-chisme = "^0.4.25"
charmed-kubeflow-chisme = "^0.4.29"
cosl = ">=0.0.48"
lightkube = "^0.15.6"
oci-image = "^1.0.0"
object-storage-charmlib = "^1.0.0"
ops = "^2.17.1"
serialized-data-interface = "^0.7.0"
tenacity = "^9.0.0"
Expand Down Expand Up @@ -92,7 +93,7 @@ optional = true

[tool.poetry.group.integration.dependencies]
juju = "<4.0"
charmed-kubeflow-chisme = "^0.4.25"
charmed-kubeflow-chisme = "^0.4.29"
lightkube = "^0.15.6"
pytest-operator = "^0.38.0"
selenium = "^4.27.1"
Expand Down
184 changes: 157 additions & 27 deletions charms/kfp-api/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import logging
from pathlib import Path
from urllib.parse import urlparse

import botocore.exceptions
from charmed_kubeflow_chisme.exceptions import ErrorWithStatus, GenericCharmRuntimeError
Expand Down Expand Up @@ -44,6 +45,7 @@
from lightkube.resources.core_v1 import Service, ServiceAccount
from lightkube.resources.rbac_authorization_v1 import ClusterRole, ClusterRoleBinding
from lightkube_extensions.types import AuthorizationPolicy
from object_storage import S3Requirer
from ops import main
from ops.charm import CharmBase
from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, ModelError, WaitingStatus
Expand All @@ -66,7 +68,6 @@
K8S_RESOURCE_FILES = [
"src/templates/auth_manifests.yaml.j2",
"src/templates/ml-pipeline-service.yaml.j2",
"src/templates/minio-service.yaml.j2",
]
MYSQL_WARNING = "Relation mysql is deprecated."
UNBLOCK_MESSAGE = "Remove deprecated mysql relation to unblock."
Expand Down Expand Up @@ -122,6 +123,9 @@ def __init__(self, *args):
self.framework.observe(self.on.apiserver_pebble_ready, self._on_event)
change_events = [
self.on["object-storage"].relation_changed,
self.on["object-storage"].relation_broken,
self.on["s3-credentials"].relation_changed,
self.on["s3-credentials"].relation_broken,
self.on["kfp-viz"].relation_changed,
self.on["kfp-api"].relation_changed,
self.on["kfp-api-grpc"].relation_changed,
Expand Down Expand Up @@ -159,6 +163,12 @@ def __init__(self, *args):
self.framework.observe(self.database.on.database_created, self._on_relational_db_relation)
self.framework.observe(self.database.on.endpoints_changed, self._on_relational_db_relation)

self.s3 = S3Requirer(
self,
relation_name="s3-credentials",
bucket=self.model.config["object-store-bucket-name"],
)

self.prometheus_provider = MetricsEndpointProvider(
charm=self,
relation_name=METRICS_ENDPOINT_RELATION_NAME,
Expand Down Expand Up @@ -200,20 +210,12 @@ def container(self):
@property
def _context(self):
"""Return the context used for generating kubernetes resources."""
interfaces = self._get_interfaces()
object_storage = self._get_object_storage(interfaces)

minio_url = f"{object_storage['service']}.{object_storage['namespace']}.svc.cluster.local"

context = {
"app_name": self._name,
"namespace": self._namespace,
"service": self._name,
"grpc_port": self._grcp_port,
"http_port": self._http_port,
# Must include .svc.cluster.local for DNS resolution
"minio_url": minio_url,
"minio_port": str(object_storage["port"]),
"ambient_enabled": self.is_ambient_mesh_enabled,
# The waypoint name format should match the format set by istio-beacon-k8s
# See how the label is generated:
Expand Down Expand Up @@ -360,7 +362,7 @@ def _generate_environment(self) -> dict:
try:
interfaces = self._get_interfaces()
db_data = self._get_db_data()
object_storage = self._get_object_storage(interfaces)
object_storage = self._get_object_storage_data(interfaces)
viz_data = self._get_viz(interfaces)
except ErrorWithStatus as error:
self.logger.error("Failed to generate container configuration.")
Expand All @@ -375,8 +377,9 @@ def _generate_environment(self) -> dict:
"KUBEFLOW_USERID_HEADER": "kubeflow-userid",
"KUBEFLOW_USERID_PREFIX": "",
"POD_NAMESPACE": self.model.name,
"OBJECTSTORECONFIG_SECURE": "false",
"OBJECTSTORECONFIG_BUCKETNAME": self.model.config["object-store-bucket-name"],
"OBJECTSTORECONFIG_SECURE": "true" if object_storage["secure"] else "false",
"OBJECTSTORECONFIG_BUCKETNAME": object_storage["bucket"]
or self.model.config["object-store-bucket-name"],
"DBCONFIG_CONMAXLIFETIME": "120s",
"DB_DRIVER_NAME": "mysql",
"DBCONFIG_MYSQLCONFIG_USER": db_data["db_username"],
Expand All @@ -402,17 +405,12 @@ def _generate_environment(self) -> dict:
"ARCHIVE_CONFIG_LOG_FILE_NAME": self.model.config["log-archive-filename"],
"ARCHIVE_CONFIG_LOG_PATH_PREFIX": self.model.config["log-archive-prefix"],
"CLUSTER_DOMAIN": "cluster.local",
# OBJECTSTORECONFIG_HOST and _PORT set the object storage configurations,
# taking precedence over configuration in the config.json or
# MINIO_SERVICE_SERVICE_* environment variables.
# NOTE: While OBJECTSTORECONFIG_HOST and _PORT control the object store
# that the apiserver connects to, other parts of kfp currently cannot use
# object stores with arbitrary names. See
# https://github.com/kubeflow/pipelines/issues/9689 and
# https://github.com/canonical/minio-operator/pull/151 for more details.
"OBJECTSTORECONFIG_HOST": f"{object_storage['service']}.{object_storage['namespace']}",
# OBJECTSTORECONFIG_HOST and _PORT set the object storage configuration
# that the apiserver connects to, taking precedence over the values in
# the config.json.
"OBJECTSTORECONFIG_HOST": object_storage["host"],
"OBJECTSTORECONFIG_PORT": str(object_storage["port"]),
"OBJECTSTORECONFIG_REGION": "",
"OBJECTSTORECONFIG_REGION": object_storage["region"],
# The following 3 configuration options change the behavior for pipeline pods,
# overriding an SDK-specified values.
# DEFAULT_SECURITY_CONTEXT_RUN_AS_USER: Change user for pipeline pods
Expand Down Expand Up @@ -551,6 +549,113 @@ def _get_object_storage(self, interfaces):
relation_name = "object-storage"
return self._validate_sdi_interface(interfaces, relation_name)

def _get_s3_data(self) -> dict:
"""Retrieve and validate data from the s3-credentials relation.

Raises:
ErrorWithStatus(..., Waiting) if the relation exists but required data
(access-key, secret-key, endpoint) is not yet available.
"""
relation = self.model.get_relation("s3-credentials")
info = self.s3.get_storage_connection_info(relation)
required_fields = ("access-key", "secret-key", "endpoint")
if not info:
raise ErrorWithStatus("Waiting for s3-credentials relation data", WaitingStatus)
missing = [field for field in required_fields if not info.get(field)]
if missing:
raise ErrorWithStatus(
f"Waiting for s3-credentials relation data, missing fields: {', '.join(missing)}",
WaitingStatus,
)
return info

def _get_object_storage_data(self, interfaces=None) -> dict:
"""Return normalized object storage data from the active storage relation.

Supports both the `object-storage` and `s3` interfaces, returning a common dict with
keys: access-key, secret-key, host, port, secure, region, bucket, tls-ca-chain, is_s3.

Exactly one of the `object-storage` or `s3-credentials` relations is expected.

Raises:
ErrorWithStatus(..., Blocked) if both relations are established at once.
ErrorWithStatus(..., Blocked) if neither relation is established.
ErrorWithStatus(..., Waiting) if the active relation has no data yet.
"""
has_object_storage = self.model.relations["object-storage"]
has_s3 = self.model.relations["s3-credentials"]

if has_object_storage and has_s3:
raise ErrorWithStatus(
"Too many object storage relations. Please relate to only one of "
"`object-storage` or `s3-credentials`.",
BlockedStatus,
)

if not has_object_storage and not has_s3:
raise ErrorWithStatus(
"Missing object storage relation. Please relate to one of "
"`object-storage` or `s3-credentials`.",
BlockedStatus,
)

if has_s3:
data = self._get_s3_data()
host, port, secure = self._parse_s3_endpoint(data["endpoint"])
if not host:
raise ErrorWithStatus(
f"Invalid s3 endpoint: {data['endpoint']!r}",
BlockedStatus,
)
return {
"access-key": data["access-key"],
"secret-key": data["secret-key"],
"host": host,
"port": port,
"secure": secure,
"region": data.get("region", ""),
"bucket": data.get("bucket", ""),
"tls-ca-chain": data.get("tls-ca-chain"),
"is_s3": True,
}

if interfaces is None:
interfaces = self._get_interfaces()
obj = self._get_object_storage(interfaces)
return {
"access-key": obj["access-key"],
"secret-key": obj["secret-key"],
"host": f"{obj['service']}.{obj['namespace']}",
"port": obj["port"],
"secure": obj["secure"],
"region": "",
"bucket": "",
"tls-ca-chain": None,
"is_s3": False,
}
Comment thread
Copilot marked this conversation as resolved.

@staticmethod
def _parse_s3_endpoint(endpoint: str) -> tuple:
"""Parse an s3 endpoint into a (host, port, secure) tuple.

The endpoint may be a full URL (e.g. "https://s3.example.com:443") or a bare
"host[:port]". kfp expects the host, port and TLS flag as separate values.

When a URL scheme is present it determines TLS and the default port.
When only a bare host[:port] is given, TLS is inferred from the port:
- 443 -> HTTPs
- Otherwise -> HTTP
"""
parsed_endpoint = urlparse(endpoint if "://" in endpoint else f"//{endpoint}")
if parsed_endpoint.scheme:
secure = True if parsed_endpoint.scheme == "https" else False
port = parsed_endpoint.port or (443 if secure else 80)
else:
# bare host[:port]: infer TLS from port
port = parsed_endpoint.port or 80
secure = True if port == 443 else False
return parsed_endpoint.hostname, port, secure

def _get_viz(self, interfaces):
"""Retrieve kfp-viz relation data, return default, if empty."""
relation_name = "kfp-viz"
Expand Down Expand Up @@ -871,18 +976,39 @@ def _check_config(self) -> None:

def _ensure_bucket_exists(self) -> None:
"""Ensure bucket on object storage exists by using a boto3 client."""
interfaces = self._get_interfaces()
obj = self._get_object_storage(interfaces)
obj = self._get_object_storage_data()

s3_wrapper = S3BucketWrapper(
access_key=obj.get("access-key"),
secret_access_key=obj.get("secret-key"),
s3_service=f"{obj['service']}.{obj['namespace']}",
s3_service=obj["host"],
s3_port=obj["port"],
secure=obj["secure"],
region=obj["region"],
tls_ca_chain=obj.get("tls-ca-chain"),
)

# Try creating the bucket we need for object storage
bucket_name = self.model.config["object-store-bucket-name"]
# The bucket name comes from the active object storage relation:
# - For s3-credentials, either through the provider side (s3-integrator) or through the
# `object-store-bucket-name` option. Provider side takes precedence.
# - For object-storage, through the `object-store-bucket-name` config option.
# Raise an error if no bucket is provided.
if obj["bucket"]:
bucket_name = obj["bucket"]
else:
bucket_name = self.model.config["object-store-bucket-name"]
if bucket_name:
relation_name = "s3-credentials" if obj["is_s3"] else "object-storage"
self.logger.info(
f"{relation_name} relation doesn't provide a bucket; using the "
f"'object-store-bucket-name' config option: '{bucket_name}'."
)
Comment thread
Copilot marked this conversation as resolved.
if not bucket_name:
raise ErrorWithStatus(
"No object storage bucket name available. Set the 'object-store-bucket-name' "
"config option or provide a bucket through the s3-credentials relation.",
BlockedStatus,
)
try:
self.unit.status = MaintenanceStatus(f"Checking if bucket {bucket_name} exists.")
# Check if bucket already exists
Expand All @@ -894,6 +1020,10 @@ def _ensure_bucket_exists(self) -> None:
s3_wrapper.create_bucket(bucket_name)
return

except botocore.exceptions.SSLError as e:
msg = "Object storage TLS verification failed. Check CA chain configuration."
self.logger.error(f"{msg}: {e}")
raise ErrorWithStatus(msg, BlockedStatus)
except (
botocore.exceptions.ClientError,
botocore.exceptions.ConnectTimeoutError,
Expand Down
Loading
Loading