Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/actions/assert-is-collaborator/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ inputs:
description: The GitHub username to check
required: true
initiating-pr-number:
description: The PR number that the check may be associated with, if provided will comment on the PR incase of failures
description: The PR number that the check may be associated with, if provided will comment on the PR incase of failures
required: false
runs:
using: "composite"
Expand All @@ -22,7 +22,7 @@ runs:
script: |
try {
const username = "${{ inputs.username }}";
const result = await github.rest.repos.checkCollaborator({
const result = await github.rest.repos.checkCollaborator({
owner: context.repo.owner,
repo: context.repo.repo,
username: username
Expand All @@ -41,12 +41,11 @@ runs:
console.log(`Error checking collaborator status: ${error.message}`);
}
}

Comment on lines 22 to +44
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

GitHub Actions input ${{ inputs.username }} is directly interpolated into JavaScript code in actions/github-script, allowing command injection if an attacker provides malicious input.

More details about this

The script directly interpolates ${{ inputs.username }} into a JavaScript string using GitHub Actions' expression syntax. This creates a command injection vulnerability in the actions/github-script step.

Attack Scenario:

  1. An attacker opens a pull request or triggers a workflow and provides a malicious username input containing JavaScript code, such as: test"; process.env.GITHUB_TOKEN = ""; //

  2. When this workflow runs, the string interpolation happens in the YAML parsing phase before the script executes, so the malicious input is directly embedded into the JavaScript code like this:

    const username = "test"; process.env.GITHUB_TOKEN = ""; //";
  3. The attacker's injected JavaScript code (process.env.GITHUB_TOKEN = "";) now executes with full access to the GitHub Actions runner environment, including all secrets available to the workflow.

  4. The attacker can steal GITHUB_TOKEN, access repository secrets, modify code, or exfiltrate sensitive data that the runner has access to.

The vulnerability exists because inputs.username is user-controlled (can be set by anyone triggering the workflow) and is directly embedded into the script without sanitization or escaping.

To resolve this comment:

✨ Commit fix suggestion

Suggested change
env:
GITHUB_TOKEN: ${{ inputs.github-token }}
USERNAME: ${{ inputs.username }} # Move untrusted input into the env block
script: |
try {
const username = process.env.USERNAME; // Reference the username from env for safety
const result = await github.rest.repos.checkCollaborator({
owner: context.repo.owner,
repo: context.repo.repo,
username: username
});
if (result.status === 204) {
console.log(`${username} is a collaborator.`);
} else {
throw new Error(`${username} is NOT a collaborator.`);
}
} catch (error) {
if (error.status === 404) {
throw new Error(`${username} is NOT a collaborator.`);
} else if (error.status === 403) {
console.log(`User is not a collaborator: ${error.message}`);
} else {
console.log(`Error checking collaborator status: ${error.message}`);
}
}
View step-by-step instructions
  1. Move the untrusted input ${{ inputs.username }} from the script: section to the env: section of the step. For example, add USERNAME: ${{ inputs.username }} under env:.
  2. Update the code inside the script: block to use process.env.USERNAME instead of "${{ inputs.username }}". Change the line to const username = process.env.USERNAME;.
  3. Always use double quotes when referencing an environment variable in shell contexts, which is not needed in this JS context, but important if used elsewhere for safety.

This prevents any user input from being injected directly as code into the script. Using process.env.USERNAME reads the value at runtime, reducing the risk of code injection.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by github-script-injection.

You can view more details about this finding in the Semgrep AppSec Platform.

- name: Comment workflow permissions
if: ${{ failure() && steps.assert-is-collaborator.conclusion == 'failure' && inputs.initiating-pr-number != '' }}
uses: snapchat/gigl/.github/actions/comment-on-pr@main
with:
pr_number: ${{ inputs.initiating-pr-number }}
message: |
🔒 User ${{ inputs.username }} does not have permissions to run this workflow

4 changes: 2 additions & 2 deletions .github/actions/comment-on-pr/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ outputs:
comment_id:
description: 'The ID of the created or updated comment'
value: ${{steps.comment.outputs.result}}


runs:
using: 'composite'
Expand Down Expand Up @@ -67,4 +67,4 @@ runs:
});

return response.data.id;
}
}
2 changes: 1 addition & 1 deletion .github/actions/get-pr-src-branch/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ runs:
});
const branch_name = pr.data.head.ref;
console.log("Branch name is:", branch_name);
return branch_name;
return branch_name;
118 changes: 118 additions & 0 deletions docs/plans/20260429-add-env-var-injection-to-kfp-runner.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Plan: Add env-var injection to the KFP runner

## 1. Background

Today, every container launched by the GiGL Kubeflow pipeline (config_validator, config_populator, data_preprocessor, subgraph_sampler, split_generator, trainer, inferencer, post_processor — see `SPECED_COMPONENTS` at `gigl/orchestration/kubeflow/kfp_pipeline.py:32-41`) inherits its environment from the image. There is no first-class way for a caller of `runner.py` to set arbitrary `ENV` values on those containers at compile time.

Wiring each downstream consumer in as a typed flag (e.g. one flag per image URI, one per Python-interop toggle, one per protobuf-implementation switch) does not scale and leaks consumer-specific concepts into the GiGL surface area. A single generic `--env KEY=VALUE` flag — threaded into `KfpOrchestrator.compile(...)` and applied per-task via the KFP v2 SDK's [`PipelineTask.set_env_variable(name, value)`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.PipelineTask.set_env_variable) — is the cleanest carrier: GiGL knows nothing about the contents, callers own the meaning. GiGL's own code paths that need env config will continue to use the `GIGL_*` prefix; everything else is opaque transport.

## 2. API surface

**New CLI flag on `runner.py`** — repeatable, mirrors `--run_labels` exactly:

```
--env KEY=VALUE
```

Argparse spec — added next to `--run_labels` in `_get_parser` (`runner.py:319-328`):

- `action="append"`, `default=[]`, `help` describes the format and that values flow to every component at compile time.

**Parser helper** — added next to `_parse_labels` (`runner.py:212-225`); identical signature shape:

```
def _parse_env_vars(env_vars: list[str]) -> dict[str, str]
```

The template to mirror is the existing `_parse_labels` body verbatim — `split("=", 1)` on each entry, populate `dict[str, str]`, log the parsed result. Same error semantics: a malformed entry (no `=`) raises `ValueError` from `str.split` unpacking, exactly like `_parse_labels` does today.

**Plumbing path:**

1. `runner.py:__main__` calls `parsed_env_vars = _parse_env_vars(args.env)` alongside `parsed_additional_job_args` / `parsed_labels` (`runner.py:346-347`).
2. Both `KfpOrchestrator.compile` call sites (`runner.py:385` for the RUN action, `runner.py:412` for the COMPILE action) gain `env_vars=parsed_env_vars`.
3. `KfpOrchestrator.compile` (`kfp_orchestrator.py:51-109`) gains `env_vars: Optional[dict[str, str]] = None`, packs it into `CommonPipelineComponentConfigs` (a new field — see §3), and passes it through `generate_pipeline(...)`.
4. `kfp_pipeline.py:_generate_component_task` (`kfp_pipeline.py:54-128`) — after `add_task_resource_requirements(...)` is called and before `return component_task`, loop the dict and call `component_task.set_env_variable(name=k, value=v)` per entry.

Per the [KFP v2 API](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.PipelineTask.set_env_variable), `set_env_variable` takes one `name`/`value` pair per call and returns the task for chaining; we must loop, not pass a dict.

## 3. Where the env vars get applied

**All eight components in `SPECED_COMPONENTS`.** Justification:

- The whole point of a generic carrier is uniformity — callers expect that if they say `--env FOO=bar`, *every* container they're paying for sees `FOO=bar`.
- `_generate_component_task` is the single funnel through which every `PipelineTask` in `SPECED_COMPONENTS` is constructed (`kfp_pipeline.py:54-128`). Adding the loop there covers config_validator, config_populator, data_preprocessor, subgraph_sampler, split_generator, trainer, inferencer, post_processor in one place.
- The two non-`SPECED_COMPONENTS` tasks in the pipeline are `check_glt_backend_eligibility_component` (`utils/glt_backend.py`) and `log_metrics_to_ui` (`utils/log_metrics.py`). Decision: include them too, for a complete answer to "every container my pipeline launches sees these vars." Concretely, both are constructed inside `kfp_pipeline.py` (`_generate_component_tasks` for the GLT check, `_create_trainer_task_op` / `_create_post_processor_task_op` for log metrics). Apply the same loop right after each is built.
- Implementation choice that keeps the callsite count low: introduce a private helper `_apply_env_vars(task, env_vars)` inside `kfp_pipeline.py` (parallel in spirit to `add_task_resource_requirements`) and call it from `_generate_component_task` plus the two non-SPECED sites. One source of truth, no risk of drift.

The `env_vars` dict rides on `CommonPipelineComponentConfigs` (`gigl/common/types/resource_config.py:8-17`), added as `env_vars: dict[str, str] = field(default_factory=dict)` — same shape as the existing `additional_job_args` field.

## 4. Compile-time vs run-time

**Compile-time bake-in.** `set_env_variable` records the name/value into the compiled pipeline IR (the YAML at `dst_compiled_pipeline_path`); the values are baked at compile time, not resolved per run. That is the right choice here:

- For `--action=run`, `runner.py:384-395` *recompiles* on every invocation before calling `orchestrator.run`, so each run gets a fresh bake of whatever `--env` values were passed on that invocation. No UX loss.
- For `--action=compile`, `runner.py:411-422` produces a static artifact — the user explicitly wanted those values frozen in.
- For `--action=run_no_compile`, the user is opting into a pre-compiled pipeline; whatever envs were baked at compile time is what runs. This is consistent with how `additional_job_args` already behaves.

A run-time-resolved alternative would require declaring a KFP pipeline parameter (e.g. `env_vars: dict`) on the `@kfp.dsl.pipeline` function and passing it through every component op. KFP v2 has poor ergonomics for `dict` pipeline parameters and the immediate use case (callers injecting a config value per compile) does not benefit from late binding. We do *not* take that path; if a future caller needs per-run env, that's a separate proposal.

## 5. Validation / failure modes

Mirror `_parse_labels` exactly so behavior is predictable across all three flags:

- Malformed entry (no `=`): `str.split("=", 1)` returns one element, the unpack to `(name, value)` raises `ValueError`. Same as `_parse_labels` today — propagate, do not catch.
- Empty value (`--env FOO=`): valid, `value=""`. Mirrors `_parse_labels` — that flag also accepts empty values, so we stay consistent.
- Empty key (`--env =bar`): not validated by `_parse_labels` either; KFP's own `set_env_variable` will reject it downstream. Consistent failure surface; do not pre-empt.
- Duplicate keys across multiple `--env` invocations: last one wins (dict overwrite), same as `_parse_labels`.
- Reserved names: do *not* enforce a denylist inside GiGL. KFP itself reserves a small set (e.g. `KFP_*`); leaving validation to KFP keeps GiGL generic. Document the caveat in the help text.
- Cross-flag conflict: `_assert_required_flags` (`runner.py:134-172`) does not need a new check. `--env` is valid for all three actions (`run`, `run_no_compile`, `compile`), unlike `--run_labels` which is run-only. Document this difference.

## 6. Step-by-step implementation plan

Each bullet is one commit-sized unit:

1. **Add `env_vars` field to `CommonPipelineComponentConfigs`** at `gigl/common/types/resource_config.py:8-17`. Default-empty dict.
2. **Add `_apply_env_vars(task, env_vars)` helper in `kfp_pipeline.py`** — single-responsibility, loops the dict and calls `task.set_env_variable(name=k, value=v)`.
3. **Wire the helper into `_generate_component_task`** at `kfp_pipeline.py:54-128`, right after `add_task_resource_requirements(...)`. Also call it on `check_glt_backend_eligibility_component` and `log_metrics_to_ui` task results.
4. **Thread `env_vars: Optional[dict[str, str]] = None`** through `KfpOrchestrator.compile` (`kfp_orchestrator.py:51-109`) into `CommonPipelineComponentConfigs(...)` (existing site at `kfp_orchestrator.py:83-88`).
5. **Add `--env` arg to `_get_parser`** (`runner.py:_get_parser`) — `action="append"`, `default=[]`, help text describing the format and noting GiGL does not interpret values.
6. **Add `_parse_env_vars` helper** next to `_parse_labels` (`runner.py:212-225`).
7. **Plumb `parsed_env_vars` into both `KfpOrchestrator.compile(...)` call sites** in `runner.py` (`runner.py:385-395` and `runner.py:411-422`).
8. **Update the `runner.py` module docstring** (lines 1-71): document `--env` under both `RUN` and `COMPILE` action sections; note compile-time bake-in semantics.
9. **Unit test: parser** in `tests/unit/orchestration/kubeflow/runner_test.py` (new file, mirror layout of `kfp_orchestrator_test.py`). Cover: single var, multiple vars, value containing `=`, malformed entry raises `ValueError`, empty list returns empty dict.
10. **Unit test: compile-time injection** extending `kfp_orchestrator_test.py:KfpOrchestratorTest` — call `KfpOrchestrator.compile(..., env_vars={"FOO": "bar"})` writing to a tmp path, parse the resulting YAML, assert that every component spec under the compiled pipeline IR has an `env` entry with `name: FOO`, `value: bar`. The IR shape is stable for KFP v2 (`spec.executors.<id>.container.env`).

## 7. Test strategy

- **Parser unit test** (step 9 above): pure function, no I/O. Exhaustive on malformed inputs since this is user-facing CLI.
- **Compile integration test** (step 10): writes the compiled pipeline to a temp file, loads with `yaml.safe_load`, walks every executor in `pipelineSpec.deploymentSpec.executors`, asserts the env list contains all expected pairs. This proves the loop ran on every component, not just one. Run via `make unit_test_py PY_TEST_FILES="kfp_orchestrator_test.py"`.
- **Smoke test, manual**: from a downstream `Makefile`, add `--env=KEY1=value1 --env=KEY2=value2` to one `compile_*_kubeflow_pipeline` target, compile, and `grep -A2 "name: KEY1" build/gigl_pipeline_gnn.yaml` to confirm presence on every executor.

## 8. Rollout

GiGL-side callers of `gigl.orchestration.kubeflow.runner` to inventory:

- `Makefile:258 compile_gigl_kubeflow_pipeline`
- `Makefile:283 run_dev_gnn_kubeflow_pipeline`
- `Makefile:307 compile_simple_gigl_kubeflow_pipeline`
- `Makefile:328-ish run_dev_simple_kubeflow_pipeline` (the simple-GiGL run target — verify exact line)
- `Makefile:352, 360, 368, 376, 384, 392, 400, 407` — the e2e test targets that depend on `compile_gigl_kubeflow_pipeline`. They inherit whatever the compile target sets; no per-target change unless they need different envs.

GiGL repo: no callers beyond this `runner.py` module need changes. The flag is opt-in, default empty, fully backward-compatible.

Downstream repos that vendor GiGL as a submodule: a separate follow-up commit (out of scope of the GiGL PR) wires `--env=...` into whichever Makefile targets need it. That commit lives in the consumer repo, not GiGL.

## 9. Open questions

1. **Pipeline-parameter env**: do we expect any caller to want envs that vary *per run* of the same compiled pipeline? Not for the immediate use case driving this work; flag if that changes.
2. **VertexNotificationEmailOp env**: `_generate_component_tasks` wraps everything in an `ExitHandler(VertexNotificationEmailOp(...))` (`kfp_pipeline.py:252-256`). The notification op is a Google-managed component; should env vars be applied to it? Default answer: no — it's a managed op outside the user's control surface, applying envs is at best inert and at worst rejected. Confirm during implementation by checking whether `set_env_variable` on it raises.
3. **Naming — `--env` vs `--env_var` vs `--env_vars`**: `--env` is shortest and matches `docker run --env`; `--env_var` is more discoverable in `--help`; `--env_vars` would be most consistent with the existing plural `--run_labels`. Repeatable flags in this file use both styles (`--run_labels` plural, `--notification_emails` plural, `--additional_job_args` plural). Suggest `--env_vars` and document it as repeatable. Confirm with reviewer.
4. **Should the helper land on `CommonPipelineComponentConfigs` or be a separate parameter to `generate_pipeline`?** Going via `CommonPipelineComponentConfigs` is consistent with `additional_job_args` and minimizes signature churn. No open question; calling out the design choice.

### Critical Files for Implementation

- `gigl/orchestration/kubeflow/runner.py`
- `gigl/orchestration/kubeflow/kfp_orchestrator.py`
- `gigl/orchestration/kubeflow/kfp_pipeline.py`
- `gigl/common/types/resource_config.py`
- `tests/unit/orchestration/kubeflow/kfp_orchestrator_test.py`
62 changes: 62 additions & 0 deletions gigl/common/omegaconf_resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import subprocess
from collections.abc import Mapping
from datetime import datetime, timedelta

from omegaconf import OmegaConf
Expand All @@ -16,6 +17,12 @@

_SUPPORTED_UNITS = ("weeks", "days", "seconds", "minutes", "hours")

# Module-level value dict consumed by the ``gigl`` resolver. Populated
# by ``set_gigl_resolver_values`` (typically called by ``launch_custom``
# right before re-resolving a ``CustomResourceConfig``'s ``command`` /
# ``args`` strings).
_GIGL_RESOLVER_VALUES: dict[str, str] = {}


def now_resolver(*args: str) -> str:
"""Resolver that creates a string representing the current time (with optional offset) using strftime.
Expand Down Expand Up @@ -157,6 +164,53 @@ def git_hash_resolver() -> str:
return ""


def gigl_resolver(key: str) -> str:
"""Resolve ``${gigl:<key>}`` from a module-level value dict.

Registered resolvers are invoked with colon syntax in OmegaConf
(``${gigl:foo}``), not dotted (``${gigl.foo}``). The fallback string
therefore mirrors that form so the placeholder can round-trip through
a first-pass YAML load and be re-resolved later once runtime values
are populated via ``set_gigl_resolver_values``.

Args:
key: The runtime-value key being looked up
(e.g. ``cuda_docker_image``).

Returns:
The runtime value if ``set_gigl_resolver_values`` has populated
``key``; otherwise the literal placeholder string
``"${gigl:<key>}"`` so the early YAML pass
(``ProtoUtils.read_proto_from_yaml``) is lossless.

Example:
In a YAML loaded by ``ProtoUtils.read_proto_from_yaml``:

```yaml
custom_trainer_config:
command: "${gigl:task_config_uri}"
```

``launch_custom`` later sets ``task_config_uri`` via
``set_gigl_resolver_values`` and re-resolves the field.
"""
return _GIGL_RESOLVER_VALUES.get(key, f"${{gigl:{key}}}")


def set_gigl_resolver_values(values: Mapping[str, str]) -> None:
"""Replace the module-level dict the ``gigl`` resolver reads from.

The dict is cleared before the new values are written so stale
runtime values from a prior ``launch_custom`` call cannot leak into
a subsequent invocation.

Args:
values: New key→value mapping.
"""
_GIGL_RESOLVER_VALUES.clear()
_GIGL_RESOLVER_VALUES.update(values)


def register_resolvers() -> None:
"""Register all custom OmegaConf resolvers.

Expand All @@ -178,3 +232,11 @@ def register_resolvers() -> None:
logger.debug(
"OmegaConf resolver 'git_hash' already registered, skipping registration"
)

if not OmegaConf.has_resolver("gigl"):
logger.info("Registering OmegaConf resolver 'gigl'")
OmegaConf.register_new_resolver("gigl", gigl_resolver)
else:
logger.debug(
"OmegaConf resolver 'gigl' already registered, skipping registration"
)
4 changes: 4 additions & 0 deletions gigl/common/types/resource_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ class CommonPipelineComponentConfigs:
additional_job_args: dict[GiGLComponents, dict[str, str]] = field(
default_factory=dict
)
# Environment variables baked into every GiGL-owned container at compile time.
# Applied uniformly across all SPECED_COMPONENTS plus the GLT eligibility check
# and log_metrics_to_ui tasks. The managed VertexNotificationEmailOp is excluded.
env_vars: dict[str, str] = field(default_factory=dict)
Loading