Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions gigl/common/services/vertex_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ class VertexAiJobConfig:
reservation_affinity: Optional ``ReservationAffinity`` that maps to
``MachineSpec.reservation_affinity``. ``None`` uses the Vertex
AI default (no reservation).
base_output_dir: Optional CustomJob base output directory. When set,
Vertex AI derives ``AIP_MODEL_DIR``, ``AIP_CHECKPOINT_DIR``, and
``AIP_TENSORBOARD_LOG_DIR`` from this directory. Setting this is
how GiGL trainers learn where to write TensorBoard events; the
chief-rank uploader (started inside the trainer) is what streams
them to a Vertex AI ``TensorboardExperiment`` for cross-job
comparison. See
https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec.
"""

job_name: str
Expand All @@ -153,6 +161,7 @@ class VertexAiJobConfig:
enable_web_access: bool = True
scheduling_strategy: Optional[aiplatform.gapic.Scheduling.Strategy] = None
reservation_affinity: Optional[ReservationAffinity] = None
base_output_dir: Optional[str] = None


class VertexAIService:
Expand Down Expand Up @@ -347,6 +356,7 @@ def _submit_job(
location=self._location,
labels=job_config.labels,
staging_bucket=self._staging_bucket,
base_output_dir=job_config.base_output_dir,
)
job.submit(
service_account=self._service_account,
Expand Down
2 changes: 1 addition & 1 deletion gigl/src/common/constants/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ def get_tensorboard_logs_gcs_path(
"""
return GcsUri.join(
get_trainer_asset_dir_gcs_path(applied_task_identifier=applied_task_identifier),
"tensorboard_logs/",
"logs/",
)


Expand Down
196 changes: 182 additions & 14 deletions gigl/src/common/vertex_ai_launcher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Shared functionality for launching Vertex AI jobs for training and inference."""

import datetime
import re
from collections.abc import Mapping
from typing import Final, Optional

from google.cloud import aiplatform
from google.cloud.aiplatform_v1.types import (
ReservationAffinity,
Scheduling,
Expand Down Expand Up @@ -39,6 +42,75 @@
{"NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"}
)

# The SDK TensorBoard uploader rewrites run names by replacing every char
# outside this character class with ``-``
# (.venv/.../tensorboard/uploader_utils.py:46). We pre-sanitize the GCS
# subdir name to match what the SDK will produce, so the directory and
# the resulting TensorboardRun ID agree.
_VERTEX_RUN_NAME_REPLACE_PATTERN: Final[re.Pattern[str]] = re.compile(
r"[^a-zA-Z0-9\n-]"
)

# Captures the project/location/tensorboard_id pieces of a fully-qualified
# Vertex AI TensorBoard resource name. Used to build the TensorBoard UI URL.
_TENSORBOARD_RESOURCE_NAME_PATTERN: Final[re.Pattern[str]] = re.compile(
r"^projects/(?P<project>[^/]+)"
r"/locations/(?P<location>[^/]+)"
r"/tensorboards/(?P<tensorboard_id>[^/]+)$"
)


def _maybe_log_tensorboard_url(
vertex_ai_resource_config: VertexAiResourceConfig,
) -> None:
"""Log the cross-job TensorBoard UI URL when the experiment is configured.

The chief-rank uploader inside the trainer container also logs this URL,
but that only surfaces in Vertex AI job logs (which take a minute to
materialize). Logging it here means the URL appears in the launcher's
local stdout immediately at submit time.
"""
tb_resource = vertex_ai_resource_config.tensorboard_resource_name
experiment_name = vertex_ai_resource_config.tensorboard_experiment_name
if not tb_resource or not experiment_name:
return
match = _TENSORBOARD_RESOURCE_NAME_PATTERN.match(tb_resource)
if not match:
return
url = (
f"https://{match['location']}.tensorboard.googleusercontent.com/experiment/"
f"projects+{match['project']}"
f"+locations+{match['location']}"
f"+tensorboards+{match['tensorboard_id']}"
f"+experiments+{experiment_name}"
)
logger.info(
f"View TensorBoard (cross-job comparison, experiment={experiment_name!r}): "
f"{url}"
)


def _sanitize_for_vertex_run(value: str) -> str:
"""Coerce ``value`` into the SDK's TensorboardRun-name character class.

Mirrors ``google.cloud.aiplatform.tensorboard.uploader_utils.reformat_run_name``
so the GCS subdir we create and the SDK-derived run name match.
"""
return _VERTEX_RUN_NAME_REPLACE_PATTERN.sub("-", value)


def _build_unique_run_name(job_name: str) -> str:
"""Return a launch-unique, sanitized run name for ``job_name``.

The display ``job_name`` is not guaranteed unique across reruns of the
same task identifier, and the SDK reuses an existing
``TensorboardRun`` by name (silently merging events). We append a UTC
timestamp so two launches of the same task always produce two distinct
runs in a shared experiment.
"""
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S")
return _sanitize_for_vertex_run(f"{job_name}-{timestamp}")


def launch_single_pool_job(
vertex_ai_resource_config: VertexAiResourceConfig,
Expand All @@ -52,9 +124,14 @@ def launch_single_pool_job(
cuda_docker_uri: Optional[str],
component: GiGLComponents,
vertex_ai_region: str,
) -> None:
tensorboard_logs_uri: Optional[Uri] = None,
) -> aiplatform.CustomJob:
"""Launch a single pool job on Vertex AI.

The ``tensorboard_resource_name`` and ``tensorboard_experiment_name``
fields on ``vertex_ai_resource_config`` drive TensorBoard wiring; the
launcher reads them directly off the proto.

Args:
vertex_ai_resource_config: The Vertex AI resource configuration
job_name: Full name for the Vertex AI job
Expand All @@ -67,6 +144,12 @@ def launch_single_pool_job(
cuda_docker_uri: Docker image URI for GPU execution
component: The GiGL component (Trainer or Inferencer)
vertex_ai_region: The Vertex AI region to launch the job in
tensorboard_logs_uri: Optional TensorBoard log URI for trainer jobs

Returns:
The submitted ``aiplatform.CustomJob``. Useful for callers that need
the job's resource name to look up downstream artifacts (e.g. the
per-job ``TensorboardExperiment``).
"""
if component not in _LAUNCHABLE_COMPONENTS:
raise ValueError(
Expand All @@ -85,21 +168,23 @@ def launch_single_pool_job(
resource_config_uri=resource_config_uri,
command_str=process_command,
args=process_runtime_args,
use_cuda=not is_cpu_execution,
use_cuda=is_cpu_execution,
container_uri=container_uri,
vertex_ai_resource_config=vertex_ai_resource_config,
env_vars=[env_var.EnvVar(name="TF_CPP_MIN_LOG_LEVEL", value="3")],
labels=resource_config_wrapper.get_resource_labels(component=component),
tensorboard_logs_uri=tensorboard_logs_uri,
)
logger.info(f"Launching {component.value} job with config: {job_config}")
_maybe_log_tensorboard_url(vertex_ai_resource_config)

vertex_ai_service = VertexAIService(
project=resource_config_wrapper.project,
location=vertex_ai_region,
service_account=resource_config_wrapper.service_account_email,
staging_bucket=resource_config_wrapper.temp_assets_regional_bucket_path.uri,
)
vertex_ai_service.launch_job(job_config=job_config)
return vertex_ai_service.launch_job(job_config=job_config)


def launch_graph_store_enabled_job(
Expand All @@ -115,9 +200,16 @@ def launch_graph_store_enabled_job(
cpu_docker_uri: Optional[str],
cuda_docker_uri: Optional[str],
component: GiGLComponents,
tensorboard_logs_uri: Optional[Uri] = None,
) -> None:
"""Launch a graph store enabled job on Vertex AI with separate storage and compute pools.

The ``compute_pool`` of ``vertex_ai_graph_store_config`` carries
``tensorboard_resource_name`` and ``tensorboard_experiment_name`` (the
same Vertex AI metaparams that single-pool reads off its own
``VertexAiResourceConfig``); the launcher reads them directly off the
proto.

Args:
vertex_ai_graph_store_config: The Vertex AI graph store configuration
job_name: Full name for the Vertex AI job
Expand All @@ -131,6 +223,7 @@ def launch_graph_store_enabled_job(
cpu_docker_uri: Docker image URI for CPU execution
cuda_docker_uri: Docker image URI for GPU execution
component: The GiGL component (Trainer or Inferencer)
tensorboard_logs_uri: Optional TensorBoard log URI for trainer jobs
"""
if component not in _LAUNCHABLE_COMPONENTS:
raise ValueError(
Expand All @@ -139,23 +232,21 @@ def launch_graph_store_enabled_job(
storage_pool_config = vertex_ai_graph_store_config.graph_store_pool
compute_pool_config = vertex_ai_graph_store_config.compute_pool

# Compute workers may use GPUs, but storage workers always run the CPU graph-store entrypoint.
is_compute_cpu_execution = _determine_if_cpu_execution(
# Determine if CPU or GPU based on compute pool
is_cpu_execution = _determine_if_cpu_execution(
vertex_ai_resource_config=compute_pool_config
)
cpu_docker_uri = cpu_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CPU
cuda_docker_uri = cuda_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CUDA
compute_container_uri = (
cpu_docker_uri if is_compute_cpu_execution else cuda_docker_uri
)
container_uri = cpu_docker_uri if is_cpu_execution else cuda_docker_uri

logger.info(f"Running {component.value} with command: {compute_commmand}")

num_compute_processes = (
vertex_ai_graph_store_config.compute_cluster_local_world_size
)
if not num_compute_processes:
if is_compute_cpu_execution:
if is_cpu_execution:
num_compute_processes = 1
else:
num_compute_processes = vertex_ai_graph_store_config.compute_pool.gpu_limit
Expand All @@ -178,11 +269,12 @@ def launch_graph_store_enabled_job(
resource_config_uri=resource_config_uri,
command_str=compute_commmand,
args=compute_runtime_args,
use_cuda=not is_compute_cpu_execution,
container_uri=compute_container_uri,
use_cuda=is_cpu_execution,
container_uri=container_uri,
vertex_ai_resource_config=compute_pool_config,
env_vars=environment_variables,
labels=labels,
tensorboard_logs_uri=tensorboard_logs_uri,
)

# Create storage pool job config
Expand All @@ -192,8 +284,8 @@ def launch_graph_store_enabled_job(
resource_config_uri=resource_config_uri,
command_str=storage_command,
args=storage_args,
use_cuda=False,
container_uri=cpu_docker_uri,
use_cuda=is_cpu_execution,
container_uri=container_uri,
vertex_ai_resource_config=storage_pool_config,
env_vars=environment_variables,
labels=labels,
Expand All @@ -206,6 +298,8 @@ def launch_graph_store_enabled_job(
else resource_config_wrapper.region
)

_maybe_log_tensorboard_url(compute_pool_config)

vertex_ai_service = VertexAIService(
project=resource_config_wrapper.project,
location=region,
Expand All @@ -229,13 +323,19 @@ def _build_job_config(
vertex_ai_resource_config: VertexAiResourceConfig,
env_vars: list[env_var.EnvVar],
labels: Optional[dict[str, str]] = None,
tensorboard_logs_uri: Optional[Uri] = None,
) -> VertexAiJobConfig:
"""Build a VertexAiJobConfig for training or inference jobs.

This function constructs a configuration object for running GiGL training or inference
jobs on Vertex AI. It assembles job arguments, sets appropriate job naming conventions,
and configures resource specifications based on the provided parameters.

``tensorboard_resource_name`` and ``tensorboard_experiment_name`` come
from ``vertex_ai_resource_config`` directly — single-pool launches read
them off the trainer's ``VertexAiResourceConfig``; graph-store launches
pass ``compute_pool`` here, which carries the same fields.

Args:
job_name (str): The base name for the job. Will be prefixed with "gigl_train_" or "gigl_infer_".
is_inference (bool): Whether this is an inference job (True) or training job (False).
Expand All @@ -249,6 +349,7 @@ def _build_job_config(
machine type, GPU type, replica count, timeout, and scheduling strategy.
env_vars (list[env_var.EnvVar]): Environment variables to set in the container.
labels (Optional[dict[str, str]]): Labels to associate with the job. Defaults to None.
tensorboard_logs_uri (Optional[Uri]): TensorBoard log URI for trainer jobs.

Returns:
VertexAiJobConfig: A configuration object ready to be used with VertexAIService.launch_job().
Expand All @@ -264,13 +365,55 @@ def _build_job_config(
)

command = command_str.strip().split(" ")
base_output_dir = (
_get_base_output_dir_from_tensorboard_logs_uri(
tensorboard_logs_uri=tensorboard_logs_uri
)
if tensorboard_logs_uri is not None
else None
)

# When the user opted into a stable Vertex AI TensorboardExperiment, inject
# env vars into the worker so the chief-rank trainer can stream events
# directly to that experiment via ``aiplatform.start_upload_tb_log``.
# Validation guarantees ``tensorboard_resource_name`` and
# ``tensorboard_experiment_name`` are set together.
#
# ``GIGL_TENSORBOARD_RUN_NAME`` carries a launch-unique, sanitized run
# name. The writer creates a subdirectory of ``AIP_TENSORBOARD_LOG_DIR``
# with this name; the SDK ``LogdirLoader`` then surfaces it as a distinct
# ``TensorboardRun`` in the named experiment, so two jobs sharing the
# experiment name show up as two runs (instead of merging into one
# ``default`` run).
#
# References:
# https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-overview
# https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec
container_env_vars = list(env_vars)
if vertex_ai_resource_config.tensorboard_experiment_name:
container_env_vars.extend(
[
env_var.EnvVar(
name="GIGL_TENSORBOARD_RESOURCE_NAME",
value=vertex_ai_resource_config.tensorboard_resource_name,
),
env_var.EnvVar(
name="GIGL_TENSORBOARD_EXPERIMENT_NAME",
value=vertex_ai_resource_config.tensorboard_experiment_name,
),
env_var.EnvVar(
name="GIGL_TENSORBOARD_RUN_NAME",
value=_build_unique_run_name(job_name),
),
]
)

job_config = VertexAiJobConfig(
job_name=job_name,
container_uri=container_uri,
command=command,
args=job_args,
environment_variables=env_vars,
environment_variables=container_env_vars,
machine_type=vertex_ai_resource_config.machine_type,
accelerator_type=vertex_ai_resource_config.gpu_type.upper().replace("-", "_"),
accelerator_count=vertex_ai_resource_config.gpu_limit,
Expand All @@ -293,10 +436,35 @@ def _build_job_config(
reservation_affinity=_build_reservation_affinity(
vertex_ai_resource_config.reservation_affinity
),
base_output_dir=base_output_dir,
)
return job_config


def _get_base_output_dir_from_tensorboard_logs_uri(
tensorboard_logs_uri: Uri,
) -> str:
"""Return the CustomJob base output directory for a TensorBoard log URI.

Args:
tensorboard_logs_uri: GiGL TensorBoard log URI. This is expected to
point at the ``logs/`` directory underneath the trainer asset dir.

Returns:
The parent directory to use as ``base_output_dir``.

Raises:
ValueError: If the URI does not contain a parent directory.
"""
normalized_tensorboard_logs_uri = tensorboard_logs_uri.uri.rstrip("/")
base_output_dir, separator, _ = normalized_tensorboard_logs_uri.rpartition("/")
if not separator or not base_output_dir:
raise ValueError(
f"TensorBoard logs URI must include a parent directory, got {tensorboard_logs_uri.uri!r}."
)
return base_output_dir


def _build_reservation_affinity(
affinity: VertexAiReservationAffinity,
) -> Optional[ReservationAffinity]:
Expand Down
Loading