Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 169 additions & 1 deletion app/frontend_management.py

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Way too many long comments.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair — trimmed in 0a2b9f8. Net -76 lines of prose across the two files:

  • Killed all narrative/scenario comments inside test bodies and helper functions (they restated the test setup).
  • Removed the multi-line block comments on AUTO_MANAGED_VERSION_SPECIFIERS and the regex constant — kept tight one-liners.
  • Collapsed the init_frontend_unsafe "promote/prune" comments into nothing — the method names already say it.
  • Shortened docstrings on _read/_prune/_untrack_auto_managed_versions to one line each.

Kept just five short comments that aren't restating code:

  • AUTO_MANAGED_VERSION_SPECIFIERS constant (says why this tuple exists vs explicit pins)
  • The regex (security: explains the allowlist purpose)
  • The atomic-write step (non-obvious tmp+rename pattern)
  • The defense-in-depth _is_safe_version_dirname re-check in _prune_auto_managed_versions (would otherwise look redundant and get "simplified" away)

33/33 tests still pass.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cut further in 273314a — only one comment remains in the new code:

# Re-check even though read already filters: keeps rmtree provably
# bounded under provider_dir regardless of caller.
if not cls._is_safe_version_dirname(stale_version):
    ...

This one I'd push back on removing: the _is_safe_version_dirname re-check inside _prune_auto_managed_versions looks redundant (the read path already filters), and without a comment a future maintainer will simplify it away. That would re-open the path-traversal vector Oracle flagged in the first review pass. The test test_prune_refuses_to_delete_outside_provider_dir would still pass without the re-check (because it relies on the resolve-and-check below it), so the comment is the only thing protecting the layered defense.

Everything else — class-constant explanations, regex purpose, atomic-write idiom, all docstrings on the new private helpers — is gone. Happy to delete this last one too if you'd rather rename the helper or merge the check into _is_safe_version_dirname's docstring instead.

33/33 tests still pass.

Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import argparse
import json
import logging
import os
import re
import shutil
import sys
import tempfile
import zipfile
Expand Down Expand Up @@ -205,6 +207,155 @@ def download_release_asset_zip(release: Release, destination_path: str) -> None:
class FrontendManager:
CUSTOM_FRONTENDS_ROOT = str(Path(__file__).parents[1] / "web_custom_versions")

AUTO_MANAGED_VERSION_SPECIFIERS = ("latest", "prerelease")
AUTO_MANAGED_METADATA_FILENAME = ".auto_managed.json"
_VERSION_DIRNAME_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")

@classmethod
def _provider_dir(cls, repo_owner: str, repo_name: str) -> Path:
return Path(cls.CUSTOM_FRONTENDS_ROOT) / f"{repo_owner}_{repo_name}"

@classmethod
def _auto_managed_metadata_path(cls, repo_owner: str, repo_name: str) -> Path:
return cls._provider_dir(repo_owner, repo_name) / cls.AUTO_MANAGED_METADATA_FILENAME

@classmethod
def _is_safe_version_dirname(cls, name: str) -> bool:
if not isinstance(name, str):
return False
if name in (".", "..") or "/" in name or "\\" in name or "\x00" in name:
return False
return bool(cls._VERSION_DIRNAME_PATTERN.match(name))

@classmethod
def _read_auto_managed_versions(cls, repo_owner: str, repo_name: str) -> list[str]:
metadata_path = cls._auto_managed_metadata_path(repo_owner, repo_name)
if not metadata_path.exists():
return []
try:
with open(metadata_path, "r", encoding="utf-8") as fh:
data = json.load(fh)
except (OSError, ValueError) as exc:
logging.warning(
"Could not read frontend auto-managed metadata at %s: %s",
metadata_path,
exc,
)
return []
if not isinstance(data, dict):
logging.warning(
"Frontend auto-managed metadata at %s has unexpected shape; ignoring.",
metadata_path,
)
return []
versions = data.get("auto_managed", [])
if not isinstance(versions, list):
return []
return [v for v in versions if cls._is_safe_version_dirname(v)]

@classmethod
def _write_auto_managed_versions(
cls, repo_owner: str, repo_name: str, versions: list[str]
) -> None:
metadata_path = cls._auto_managed_metadata_path(repo_owner, repo_name)
metadata_path.parent.mkdir(parents=True, exist_ok=True)
safe_versions = [v for v in versions if cls._is_safe_version_dirname(v)]
payload = {"auto_managed": sorted(set(safe_versions))}
tmp_path = metadata_path.with_suffix(metadata_path.suffix + ".tmp")
try:
with open(tmp_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, indent=2, sort_keys=True)
os.replace(tmp_path, metadata_path)
except OSError as exc:
logging.warning(
"Could not write frontend auto-managed metadata at %s: %s",
metadata_path,
exc,
)
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass

@classmethod
def _prune_auto_managed_versions(
cls, repo_owner: str, repo_name: str, keep_version: str
) -> None:
tracked = cls._read_auto_managed_versions(repo_owner, repo_name)
if not tracked and keep_version is None:
return

provider_dir = cls._provider_dir(repo_owner, repo_name)
try:
provider_dir_resolved = provider_dir.resolve()
except OSError as exc:
logging.warning(
"Could not resolve provider directory %s for cleanup: %s",
provider_dir,
exc,
)
return

for stale_version in tracked:
if stale_version == keep_version:
continue
# Re-check even though read already filters: keeps rmtree provably
# bounded under provider_dir regardless of caller.
if not cls._is_safe_version_dirname(stale_version):
logging.warning(
"Refusing to clean up suspicious frontend version name: %r",
stale_version,
)
continue
stale_path = provider_dir / stale_version
if not stale_path.exists():
continue
try:
stale_resolved = stale_path.resolve()
except OSError as exc:
logging.warning(
"Could not resolve stale frontend path %s: %s",
stale_path,
exc,
)
continue
if (
stale_resolved == provider_dir_resolved
or provider_dir_resolved not in stale_resolved.parents
):
logging.warning(
"Refusing to remove path outside provider dir: %s (provider=%s)",
stale_resolved,
provider_dir_resolved,
)
continue
try:
shutil.rmtree(stale_path)
logging.info(
"Removed stale auto-managed frontend version: %s",
stale_path,
)
except OSError as exc:
logging.warning(
"Failed to remove stale frontend version at %s: %s",
stale_path,
exc,
)

new_tracked = [keep_version] if keep_version else []
cls._write_auto_managed_versions(repo_owner, repo_name, new_tracked)

@classmethod
def _untrack_auto_managed_version(
cls, repo_owner: str, repo_name: str, version: str
) -> None:
tracked = cls._read_auto_managed_versions(repo_owner, repo_name)
if version not in tracked:
return
tracked = [v for v in tracked if v != version]
cls._write_auto_managed_versions(repo_owner, repo_name, tracked)

@classmethod
def get_required_frontend_version(cls) -> str:
"""Get the required frontend package version."""
Expand Down Expand Up @@ -372,6 +523,7 @@ def init_frontend_unsafe(
return cls.default_frontend_path()

repo_owner, repo_name, version = cls.parse_version_string(version_string)
is_auto_managed = version in cls.AUTO_MANAGED_VERSION_SPECIFIERS

if version.startswith("v"):
expected_path = str(
Expand All @@ -383,6 +535,9 @@ def init_frontend_unsafe(
logging.info(
f"Using existing copy of specific frontend version tag: {repo_owner}/{repo_name}@{version}"
)
cls._untrack_auto_managed_version(
repo_owner, repo_name, version.lstrip("v")
)
return expected_path

logging.info(
Expand All @@ -396,7 +551,8 @@ def init_frontend_unsafe(
web_root = str(
Path(cls.CUSTOM_FRONTENDS_ROOT) / provider.folder_name / semantic_version
)
if not os.path.exists(web_root):
download_succeeded = os.path.exists(web_root)
if not download_succeeded:
try:
os.makedirs(web_root, exist_ok=True)
logging.info(
Expand All @@ -407,10 +563,22 @@ def init_frontend_unsafe(
)
logging.debug(release)
download_release_asset_zip(release, destination_path=web_root)
download_succeeded = True
finally:
# Clean up the directory if it is empty, i.e. the download failed
if not os.listdir(web_root):
os.rmdir(web_root)
download_succeeded = False

if download_succeeded:
if is_auto_managed:
cls._prune_auto_managed_versions(
repo_owner, repo_name, semantic_version
)
else:
cls._untrack_auto_managed_version(
repo_owner, repo_name, semantic_version
)

return web_root

Expand Down
Loading
Loading