"""Provider-Schicht: führt Agent-Aufrufe über die Claude-CLI oder OpenCode (MiniMax) aus. Beide Runner sind unabhängig. Fehlt ein Binary/Key, schlägt nur der jeweilige Provider fehl — der andere läuft unverändert weiter. """ import asyncio import logging import os import re import shutil import tempfile import time import urllib.request from pathlib import Path from config import PROVIDERS, DEFAULT_PROVIDER, MAX_CONCURRENT_AGENTS, MAX_CONCURRENT_INTERACTIVE log = logging.getLogger("creator.agents") _active_processes: dict[str, asyncio.subprocess.Process] = {} # Deckelt die realen CLI-Prozesse — unabhängig von der Pipeline-Semaphore in # generator.py. Acquire passiert VOR dem Spawn, damit Wartezeit in der Queue # nicht gegen den Agent-Timeout zählt. _batch_sem = asyncio.Semaphore(MAX_CONCURRENT_AGENTS) _interactive_sem = asyncio.Semaphore(MAX_CONCURRENT_INTERACTIVE) # OpenCode-Starts serialisieren: gleichzeitig startende Prozesse kollidieren an # der internen Session-DB ("database is locked", Exit nach <1s). Der kurze # Versatz entzerrt die Starts; danach laufen die Prozesse normal parallel. _opencode_start_lock = asyncio.Lock() _OPENCODE_START_DELAY = 1.0 # Capability → Claude --allowedTools _CLAUDE_TOOLS = { "full": "Write,Bash,Read,WebSearch,WebFetch", "files": "Read,Bash,Write", "read": "Read", "none": None, } # Capability → OpenCode-Agent (Tool-Rechte in dev-ops/opencode.json definiert) _OPENCODE_AGENTS = { "full": "full", "files": "files", "read": "readonly", "none": "text", } def provider_available(provider: str) -> bool: cfg = PROVIDERS.get(provider) if not cfg: return False if shutil.which(cfg["cli"]) is None: return False env_key = cfg.get("env_key") if env_key and not os.environ.get(env_key): return False check_url = cfg.get("check_url") if check_url: try: urllib.request.urlopen(check_url, timeout=1) except Exception: return False return True def kill_process(agent_key_prefix: str) -> None: """Killt alle aktiven Prozesse, deren Key mit dem Prefix beginnt (deckt -plan/-w1… ab).""" for key, process in list(_active_processes.items()): if process.returncode is not None: # tote Einträge beim Iterieren aufräumen _active_processes.pop(key, None) continue if key.startswith(agent_key_prefix): log.debug("kill agent %s", key) process.kill() async def run_agent( agent_key: str, prompt: str, timeout: int, provider: str = DEFAULT_PROVIDER, role: str = "fast", capabilities: str = "none", lane: str = "batch", ) -> tuple[int, str, str]: if provider not in PROVIDERS: return 1, "", f"Unbekannter Provider: {provider}" if shutil.which(PROVIDERS[provider]["cli"]) is None: return 1, "", f"CLI '{PROVIDERS[provider]['cli']}' nicht installiert (Provider: {provider})" sem = _interactive_sem if lane == "interactive" else _batch_sem async with sem: if PROVIDERS[provider]["cli"] == "opencode": return await _run_opencode(agent_key, prompt, timeout, provider, role, capabilities) return await _run_claude_cli(agent_key, prompt, timeout, role, capabilities) async def _communicate(agent_key: str, cmd: list[str], stdin_data: bytes | None, timeout: int, stagger: bool = False) -> tuple[int, str, str]: start = time.monotonic() async def spawn(): return await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE if stdin_data is not None else asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) if stagger: async with _opencode_start_lock: process = await spawn() await asyncio.sleep(_OPENCODE_START_DELAY) else: process = await spawn() _active_processes[agent_key] = process try: try: stdout, stderr = await asyncio.wait_for( process.communicate(input=stdin_data), timeout=timeout, ) except asyncio.TimeoutError: process.kill() try: await asyncio.wait_for(process.wait(), timeout=5) except asyncio.TimeoutError: pass log.info("agent %s: Timeout nach %ds", agent_key, timeout) raise log.info( "agent %s: exit %s nach %.1fs (%d Bytes stdout)", agent_key, process.returncode, time.monotonic() - start, len(stdout), ) return process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") finally: # Pop nur bei Identität: ein Slot-Restart unter demselben Key darf den # NEUEN Prozess nicht aus dem Tracking werfen. if _active_processes.get(agent_key) is process: del _active_processes[agent_key] async def _run_claude_cli(agent_key: str, prompt: str, timeout: int, role: str, capabilities: str) -> tuple[int, str, str]: cfg = PROVIDERS["claude"] cmd = [cfg["cli"], "-p", "--model", cfg[role]] tools = _CLAUDE_TOOLS.get(capabilities) if tools: cmd += ["--allowedTools", tools] cmd += ["--dangerously-skip-permissions"] return await _communicate(agent_key, cmd, prompt.encode("utf-8"), timeout) async def _run_opencode(agent_key: str, prompt: str, timeout: int, provider: str, role: str, capabilities: str) -> tuple[int, str, str]: cfg = PROVIDERS[provider] # Prompt über Tempdatei statt argv (ARG_MAX-Schutz bei großen Projekt-Prompts) with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding="utf-8", dir=tempfile.gettempdir()) as f: f.write(prompt) prompt_path = Path(f.name) # Positional-Message MUSS vor -f stehen: -f ist ein Array-Flag und # frisst sonst den Text als zweiten Dateinamen ("File not found"). cmd = [ cfg["cli"], "run", "Folge exakt den Anweisungen in der angehängten Datei. Sie sind der vollständige Auftrag.", "-m", cfg[role], "--agent", _OPENCODE_AGENTS.get(capabilities, "text"), "--dangerously-skip-permissions", "-f", str(prompt_path), ] try: rc, stdout, stderr = await _communicate(agent_key, cmd, None, timeout, stagger=True) return rc, _clean_opencode_output(stdout), stderr finally: prompt_path.unlink(missing_ok=True) _ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") def _clean_opencode_output(text: str) -> str: """Entfernt ANSI-Codes und den führenden Banner ("> agent · modell").""" text = _ANSI_RE.sub("", text) lines = text.splitlines() while lines and (not lines[0].strip() or lines[0].lstrip().startswith(">")): lines.pop(0) return "\n".join(lines).strip()