diff --git a/.gitignore b/.gitignore index 5eb9616c8c..8304f1e311 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ uv.lock # IDE and editors .vscode .idea +.zed/ # Logs and temporary files botpy.log diff --git a/astrbot/cli/commands/cmd_plug.py b/astrbot/cli/commands/cmd_plug.py index 462c8e8b9e..17bb864af0 100644 --- a/astrbot/cli/commands/cmd_plug.py +++ b/astrbot/cli/commands/cmd_plug.py @@ -10,6 +10,7 @@ check_astrbot_root, get_astrbot_root, get_git_repo, + install_local_plugin, manage_plugin, ) @@ -143,12 +144,32 @@ def list(all: bool) -> None: @plug.command() -@click.argument("name") +@click.argument("name", required=False) +@click.option( + "--editable", + "-e", + "local_path", + type=click.Path(exists=True, file_okay=False, path_type=Path), + help="Install a plugin from a local directory as a symlink", +) @click.option("--proxy", help="Proxy server address") -def install(name: str, proxy: str | None) -> None: +def install(name: str | None, local_path: Path | None, proxy: str | None) -> None: """Install a plugin""" base_path = _get_data_path() plug_path = base_path / "plugins" + + if local_path is not None: + install_local_plugin(local_path, plug_path, editable=True) + return + + if name is None: + raise click.ClickException("Missing plugin name or local plugin path") + + local_name_path = Path(name).expanduser() + if local_name_path.exists() and local_name_path.is_dir(): + install_local_plugin(local_name_path, plug_path, editable=False) + return + plugins = build_plug_list(base_path / "plugins") plugin = next( diff --git a/astrbot/cli/utils/__init__.py b/astrbot/cli/utils/__init__.py index 3830682f0d..f46a9b940e 100644 --- a/astrbot/cli/utils/__init__.py +++ b/astrbot/cli/utils/__init__.py @@ -3,7 +3,13 @@ check_dashboard, get_astrbot_root, ) -from .plugin import PluginStatus, build_plug_list, get_git_repo, manage_plugin +from .plugin import ( + PluginStatus, + build_plug_list, + get_git_repo, + install_local_plugin, + manage_plugin, +) from .version_comparator import VersionComparator __all__ = [ @@ -14,5 +20,6 @@ "check_dashboard", "get_astrbot_root", "get_git_repo", + "install_local_plugin", "manage_plugin", ] diff --git a/astrbot/cli/utils/plugin.py b/astrbot/cli/utils/plugin.py index c06dda3500..fb89c5723f 100644 --- a/astrbot/cli/utils/plugin.py +++ b/astrbot/cli/utils/plugin.py @@ -1,5 +1,6 @@ import shutil import tempfile +import uuid from enum import Enum from io import BytesIO from pathlib import Path @@ -19,6 +20,35 @@ class PluginStatus(str, Enum): NOT_PUBLISHED = "unpublished" +LOCAL_PLUGIN_COPY_IGNORE = shutil.ignore_patterns( + ".git", + "__pycache__", + "*.pyc", + ".venv", + "venv", + ".idea", + ".vscode", + ".zed", +) + + +def _validate_plugin_dir_name(plugin_name: str, source_path: Path) -> str: + plugin_name = plugin_name.strip() + plugin_path = Path(plugin_name) + has_separator = "/" in plugin_name or "\\" in plugin_name + if ( + not plugin_name + or plugin_name in {".", ".."} + or plugin_path.is_absolute() + or has_separator + or plugin_path.name != plugin_name + ): + raise click.ClickException( + f"Local plugin {source_path} metadata.yaml has invalid name: {plugin_name}" + ) + return plugin_name + + def get_git_repo(url: str, target_path: Path, proxy: str | None = None) -> None: """Download code from a Git repository and extract to the specified path""" temp_dir = Path(tempfile.mkdtemp()) @@ -190,6 +220,78 @@ def build_plug_list(plugins_dir: Path) -> list: return result +def _cleanup_local_plugin_target(target_path: Path) -> None: + if target_path.is_symlink() or target_path.is_file(): + target_path.unlink(missing_ok=True) + elif target_path.exists(): + shutil.rmtree(target_path, ignore_errors=True) + + +def _copy_local_plugin(source_path: Path, plugins_dir: Path, target_path: Path) -> None: + temp_target = plugins_dir / f".{target_path.name}.tmp-{uuid.uuid4().hex}" + try: + shutil.copytree(source_path, temp_target, ignore=LOCAL_PLUGIN_COPY_IGNORE) + temp_target.rename(target_path) + except FileExistsError: + raise click.ClickException( + f"Plugin {target_path.name} already exists" + ) from None + except Exception: + raise + finally: + if temp_target.exists() or temp_target.is_symlink(): + _cleanup_local_plugin_target(temp_target) + + +def install_local_plugin( + source_path: Path, + plugins_dir: Path, + editable: bool = False, +) -> None: + """Install a plugin from a local directory.""" + source_path = source_path.expanduser().resolve() + plugins_dir = plugins_dir.resolve() + + if not source_path.exists() or not source_path.is_dir(): + raise click.ClickException(f"Local plugin path does not exist: {source_path}") + + metadata = load_yaml_metadata(source_path) + plugin_name = metadata.get("name") + if not isinstance(plugin_name, str) or not plugin_name.strip(): + raise click.ClickException( + f"Local plugin {source_path} must contain metadata.yaml with a valid name" + ) + plugin_name = _validate_plugin_dir_name(plugin_name, source_path) + + target_path = plugins_dir / plugin_name + if target_path.exists(): + raise click.ClickException(f"Plugin {plugin_name} already exists") + + try: + plugins_dir.mkdir(parents=True, exist_ok=True) + if editable: + try: + target_path.symlink_to(source_path, target_is_directory=True) + except OSError as e: + raise click.ClickException( + f"Failed to create symlink for editable install: {e}. " + "On Windows, you may need to run as Administrator or enable Developer Mode." + ) from e + else: + _copy_local_plugin(source_path, plugins_dir, target_path) + click.echo(f"Plugin {plugin_name} installed successfully from {source_path}") + except FileExistsError: + raise click.ClickException(f"Plugin {plugin_name} already exists") from None + except click.ClickException: + raise + except Exception as e: + if editable and target_path.is_symlink(): + _cleanup_local_plugin_target(target_path) + raise click.ClickException( + f"Error installing local plugin {plugin_name}: {e}" + ) from e + + def manage_plugin( plugin: dict, plugins_dir: Path, diff --git a/tests/test_cli_plugin.py b/tests/test_cli_plugin.py new file mode 100644 index 0000000000..d52e4e5b62 --- /dev/null +++ b/tests/test_cli_plugin.py @@ -0,0 +1,190 @@ +from pathlib import Path + +import pytest +from click import ClickException +from click.testing import CliRunner + +import astrbot.cli.utils.plugin as plugin_utils +from astrbot.cli.commands.cmd_plug import plug + + +def _write_plugin(path: Path, name: str = "astrbot_plugin_local_demo") -> None: + path.mkdir(parents=True) + (path / "metadata.yaml").write_text( + "\n".join( + [ + f"name: {name}", + "desc: Local plugin", + "version: 1.0.0", + "author: AstrBot", + "repo: https://example.com/local-plugin", + ], + ), + encoding="utf-8", + ) + (path / "main.py").write_text("PLUGIN_LOADED = True\n", encoding="utf-8") + + +def _write_ignored_plugin_files(path: Path) -> None: + for ignored_dir in [".git", ".venv", "__pycache__", ".idea", ".vscode", ".zed"]: + ignored_path = path / ignored_dir + ignored_path.mkdir() + (ignored_path / "ignored.txt").write_text("ignored\n", encoding="utf-8") + (path / "__pycache__" / "main.pyc").write_bytes(b"ignored") + + +def _write_astrbot_root(path: Path) -> None: + (path / ".astrbot").touch() + (path / "data" / "plugins").mkdir(parents=True) + + +def test_plugin_install_editable_symlinks_local_plugin( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + root = tmp_path / "root" + source = tmp_path / "source-plugin" + root.mkdir() + _write_astrbot_root(root) + _write_plugin(source) + monkeypatch.chdir(root) + + result = CliRunner().invoke( + plug, + ["install", "-e", str(source)], + catch_exceptions=False, + ) + + target = root / "data" / "plugins" / "astrbot_plugin_local_demo" + target = root / "data" / "plugins" / "astrbot_plugin_local_demo" + assert result.exit_code == 0 + assert target.is_symlink() + assert (target / "metadata.yaml").exists() + assert (target / "main.py").read_text(encoding="utf-8") == "PLUGIN_LOADED = True\n" + + +def test_plugin_install_accepts_local_path_without_editable_flag( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + root = tmp_path / "root" + source = tmp_path / "source-plugin" + root.mkdir() + _write_astrbot_root(root) + _write_plugin(source) + _write_ignored_plugin_files(source) + monkeypatch.chdir(root) + + result = CliRunner().invoke(plug, ["install", str(source)]) + + target = root / "data" / "plugins" / "astrbot_plugin_local_demo" + assert result.exit_code == 0 + assert not target.is_symlink() + assert (target / "metadata.yaml").exists() + assert not (target / ".git").exists() + assert not (target / ".venv").exists() + assert not (target / "__pycache__").exists() + assert not (target / ".idea").exists() + assert not (target / ".vscode").exists() + assert not (target / ".zed").exists() + + +def test_plugin_install_editable_rejects_existing_plugin( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + root = tmp_path / "root" + source = tmp_path / "source-plugin" + root.mkdir() + _write_astrbot_root(root) + _write_plugin(source) + _write_plugin(root / "data" / "plugins" / "astrbot_plugin_local_demo") + monkeypatch.chdir(root) + + result = CliRunner().invoke(plug, ["install", "-e", str(source)]) + + assert result.exit_code != 0 + assert "already exists" in result.output + + +def test_plugin_install_rejects_plugin_name_with_path_separator( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + root = tmp_path / "root" + source = tmp_path / "source-plugin" + root.mkdir() + _write_astrbot_root(root) + _write_plugin(source, name="../bad_plugin") + monkeypatch.chdir(root) + + result = CliRunner().invoke(plug, ["install", str(source)]) + + assert result.exit_code != 0 + assert "invalid name" in result.output + assert not (root / "data" / "bad_plugin").exists() + + +def test_plugin_install_copy_does_not_delete_existing_target_on_race( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + root = tmp_path / "root" + source = tmp_path / "source-plugin" + root.mkdir() + _write_astrbot_root(root) + _write_plugin(source) + monkeypatch.chdir(root) + + target = root / "data" / "plugins" / "astrbot_plugin_local_demo" + target.mkdir() + marker = target / "keep.txt" + marker.write_text("keep\n", encoding="utf-8") + + result = CliRunner().invoke(plug, ["install", str(source)]) + + assert result.exit_code != 0 + assert "already exists" in result.output + assert marker.read_text(encoding="utf-8") == "keep\n" + + +def test_plugin_install_copy_does_not_delete_concurrently_created_target( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source = tmp_path / "source-plugin" + plugins_dir = tmp_path / "plugins" + _write_plugin(source) + + target = plugins_dir / "astrbot_plugin_local_demo" + + def create_target_then_fail( + _source_path: Path, + _plugins_dir: Path, + _target_path: Path, + ) -> None: + target.mkdir(parents=True) + (target / "keep.txt").write_text("keep\n", encoding="utf-8") + raise FileExistsError + + monkeypatch.setattr(plugin_utils, "_copy_local_plugin", create_target_then_fail) + + with pytest.raises(ClickException, match="already exists"): + plugin_utils.install_local_plugin(source, plugins_dir) + + assert (target / "keep.txt").read_text(encoding="utf-8") == "keep\n" + + +def test_plugin_install_requires_name_or_editable_path( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + root = tmp_path / "root" + root.mkdir() + _write_astrbot_root(root) + monkeypatch.chdir(root) + + result = CliRunner().invoke(plug, ["install"]) + + assert result.exit_code != 0 + assert "Missing plugin name or local plugin path" in result.output