Files
creator/backend/agents.py
team3 63280d88d6 Backend: globale Agent-Semaphores (batch 12 / interactive 4)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 07:53:09 +02:00

173 lines
6.2 KiB
Python

"""Provider-Schicht: führt Agent-Aufrufe über die Claude-CLI oder OpenCode (MiniMax) aus.
Beide Runner sind unabhängig. Fehlt ein Binary/Key, schlägt nur der
jeweilige Provider fehl — der andere läuft unverändert weiter.
"""
import asyncio
import logging
import os
import re
import shutil
import tempfile
import time
import urllib.request
from pathlib import Path
from config import PROVIDERS, DEFAULT_PROVIDER, MAX_CONCURRENT_AGENTS, MAX_CONCURRENT_INTERACTIVE
log = logging.getLogger("creator.agents")
_active_processes: dict[str, asyncio.subprocess.Process] = {}
# Deckelt die realen CLI-Prozesse — unabhängig von der Pipeline-Semaphore in
# generator.py. Acquire passiert VOR dem Spawn, damit Wartezeit in der Queue
# nicht gegen den Agent-Timeout zählt.
_batch_sem = asyncio.Semaphore(MAX_CONCURRENT_AGENTS)
_interactive_sem = asyncio.Semaphore(MAX_CONCURRENT_INTERACTIVE)
# Capability → Claude --allowedTools
_CLAUDE_TOOLS = {
"full": "Write,Bash,Read,WebSearch,WebFetch",
"files": "Read,Bash,Write",
"read": "Read",
"none": None,
}
# Capability → OpenCode-Agent (Tool-Rechte in dev-ops/opencode.json definiert)
_OPENCODE_AGENTS = {
"full": "full",
"files": "files",
"read": "readonly",
"none": "text",
}
def provider_available(provider: str) -> bool:
cfg = PROVIDERS.get(provider)
if not cfg:
return False
if shutil.which(cfg["cli"]) is None:
return False
env_key = cfg.get("env_key")
if env_key and not os.environ.get(env_key):
return False
check_url = cfg.get("check_url")
if check_url:
try:
urllib.request.urlopen(check_url, timeout=1)
except Exception:
return False
return True
def kill_process(agent_key_prefix: str) -> None:
"""Killt alle aktiven Prozesse, deren Key mit dem Prefix beginnt (deckt -plan/-w1… ab)."""
for key, process in list(_active_processes.items()):
if process.returncode is not None: # tote Einträge beim Iterieren aufräumen
_active_processes.pop(key, None)
continue
if key.startswith(agent_key_prefix):
log.debug("kill agent %s", key)
process.kill()
async def run_agent(
agent_key: str,
prompt: str,
timeout: int,
provider: str = DEFAULT_PROVIDER,
role: str = "fast",
capabilities: str = "none",
lane: str = "batch",
) -> tuple[int, str, str]:
if provider not in PROVIDERS:
return 1, "", f"Unbekannter Provider: {provider}"
if shutil.which(PROVIDERS[provider]["cli"]) is None:
return 1, "", f"CLI '{PROVIDERS[provider]['cli']}' nicht installiert (Provider: {provider})"
sem = _interactive_sem if lane == "interactive" else _batch_sem
async with sem:
if PROVIDERS[provider]["cli"] == "opencode":
return await _run_opencode(agent_key, prompt, timeout, provider, role, capabilities)
return await _run_claude_cli(agent_key, prompt, timeout, role, capabilities)
async def _communicate(agent_key: str, cmd: list[str], stdin_data: bytes | None, timeout: int) -> tuple[int, str, str]:
start = time.monotonic()
process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE if stdin_data is not None else asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_active_processes[agent_key] = process
try:
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(input=stdin_data),
timeout=timeout,
)
except asyncio.TimeoutError:
process.kill()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
pass
log.info("agent %s: Timeout nach %ds", agent_key, timeout)
raise
log.info(
"agent %s: exit %s nach %.1fs (%d Bytes stdout)",
agent_key, process.returncode, time.monotonic() - start, len(stdout),
)
return process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
finally:
# Pop nur bei Identität: ein Slot-Restart unter demselben Key darf den
# NEUEN Prozess nicht aus dem Tracking werfen.
if _active_processes.get(agent_key) is process:
del _active_processes[agent_key]
async def _run_claude_cli(agent_key: str, prompt: str, timeout: int, role: str, capabilities: str) -> tuple[int, str, str]:
cfg = PROVIDERS["claude"]
cmd = [cfg["cli"], "-p", "--model", cfg[role]]
tools = _CLAUDE_TOOLS.get(capabilities)
if tools:
cmd += ["--allowedTools", tools]
cmd += ["--dangerously-skip-permissions"]
return await _communicate(agent_key, cmd, prompt.encode("utf-8"), timeout)
async def _run_opencode(agent_key: str, prompt: str, timeout: int, provider: str, role: str, capabilities: str) -> tuple[int, str, str]:
cfg = PROVIDERS[provider]
# Prompt über Tempdatei statt argv (ARG_MAX-Schutz bei großen Projekt-Prompts)
with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding="utf-8", dir=tempfile.gettempdir()) as f:
f.write(prompt)
prompt_path = Path(f.name)
# Positional-Message MUSS vor -f stehen: -f ist ein Array-Flag und
# frisst sonst den Text als zweiten Dateinamen ("File not found").
cmd = [
cfg["cli"], "run",
"Folge exakt den Anweisungen in der angehängten Datei. Sie sind der vollständige Auftrag.",
"-m", cfg[role],
"--agent", _OPENCODE_AGENTS.get(capabilities, "text"),
"--dangerously-skip-permissions",
"-f", str(prompt_path),
]
try:
rc, stdout, stderr = await _communicate(agent_key, cmd, None, timeout)
return rc, _clean_opencode_output(stdout), stderr
finally:
prompt_path.unlink(missing_ok=True)
_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
def _clean_opencode_output(text: str) -> str:
"""Entfernt ANSI-Codes und den führenden Banner ("> agent · modell")."""
text = _ANSI_RE.sub("", text)
lines = text.splitlines()
while lines and (not lines[0].strip() or lines[0].lstrip().startswith(">")):
lines.pop(0)
return "\n".join(lines).strip()