"""Pipeline-Grundbausteine: Agent-Races, Single-Slot, Check→Fix, Prompts, Guide-Status. Hält den mutablen Pipeline-Zustand (Generierungs-Semaphore, Cancel-Set). Zugriff auf das Cancel-Set NUR über die Funktionen hier — kopierte Referenzen in anderen Modulen würden bei einem Re-Assign auseinanderlaufen. """ import asyncio import logging from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable from agents import run_agent, kill_process from config import MAX_CONCURRENT_GENERATIONS, TEMPLATES_DIR, TIMEOUTS from database import update_guide from jsonio import read_json_file as _json_datei log = logging.getLogger("creator.pipeline") _semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS) _cancelled: set[str] = set() async def cancel_guide(guide_id: str) -> bool: _cancelled.add(guide_id) kill_process(guide_id) now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now) return True def is_guide_cancelled(guide_id: str) -> bool: return guide_id in _cancelled def clear_guide_cancelled(guide_id: str) -> None: _cancelled.discard(guide_id) async def _set_progress(guide_id: str, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, progress=progress, updated_at=now) async def _set_step(guide_id: str, step: int, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, step=step, progress=progress, updated_at=now) async def _fail(guide_id: str, msg: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now) def _prompt(name: str, **kwargs) -> str: template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8") return template.format(**kwargs) def _extra(instructions: str) -> str: return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else "" def _log(topic: str, msg: str) -> None: log.info("[%s] %s", topic, msg) def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str: stderr = (stderr or "").strip() if stderr: return f"{label}: {stderr[:1000]}" tail = (stdout or "").strip()[-500:] if tail: return f"{label} (exit {returncode}, stderr leer): …{tail}" return f"{label} (exit {returncode}, ohne Ausgabe)" def _gather_error(label: str, results: list) -> str: for r in results: if isinstance(r, BaseException): return f"{label}: {type(r).__name__}: {r}" returncode, stdout, stderr = r if returncode != 0: return _claude_error(label, returncode, stdout, stderr) return f"{label}: kein verwertbares Ergebnis" def _timeout(step: str, n: int = 0) -> int: base, per = TIMEOUTS[step] return base + per * n def _probleme_schema(data): """{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None.""" if not isinstance(data, dict): return None if data.get("ok") is True: return [] p = data.get("probleme") if not isinstance(p, list) or not p: return None out = [str(x).strip() for x in p if str(x).strip()] return out or None _MAX_RESTARTS = 2 async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None: """Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse. Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)` prüft die Gültigkeit und liefert das Slot-Ergebnis oder None. Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt. `cancelled()` → True bricht ab (keine Restarts, Rückgabe None). """ attempts = {i: 0 for i in range(len(slots))} tasks: dict[asyncio.Task, int] = {} def spawn(i: int) -> None: slot = slots[i] task = asyncio.create_task(run_agent( slot["key"], slot["prompt"], timeout, provider=provider, role=slot["role"], capabilities=slot["capabilities"], )) tasks[task] = i for i in range(len(slots)): spawn(i) results: list = [] try: while tasks: if cancelled and cancelled(): return None done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) for task in done: i = tasks.pop(task) payload, err = None, None try: result = task.result() if result[0] != 0: err = _claude_error("Fehler", *result) else: payload = slots[i]["payload"](result) if payload is None: err = "Ergebnis ungültig/nicht parsebar" except asyncio.TimeoutError: err = f"Timeout nach {timeout}s" except Exception as e: err = f"{type(e).__name__}: {e}" if payload is not None: results.append(payload) if on_update: on_update(len(results)) if len(results) >= quorum: return results continue _log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}") attempts[i] += 1 if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()): spawn(i) _log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)") return None finally: for task, i in tasks.items(): kill_process(slots[i]["key"]) task.cancel() if tasks: await asyncio.gather(*tasks.keys(), return_exceptions=True) @dataclass class GenContext: """Durchgereichte Pipeline-Parameter — erspart lange Argument-Signaturen.""" topic: str provider: str is_cancelled: Callable[[], bool] guide_id: str | None = None # Ergebnis-Status von run_single_slot/_check_then_fix OK, CANCELLED, FAILED = "ok", "cancelled", "failed" async def run_single_slot( ctx: GenContext, label: str, *, key: str, prompt: str, role: str, capabilities: str, payload, timeout: int, ) -> tuple[str, object]: """Ein Agent, ein gültiges Ergebnis (Race mit Quorum 1). → (OK, wert) | (CANCELLED, None) | (FAILED, None) """ slots = [{"key": key, "prompt": prompt, "role": role, "capabilities": capabilities, "payload": payload}] res = await _race(ctx.topic, label, slots, 1, timeout, ctx.provider, cancelled=ctx.is_cancelled) if ctx.is_cancelled(): return CANCELLED, None if res is None: return FAILED, None return OK, res[0] async def _check_then_fix( ctx: GenContext, *, name: str, step: int, check_key: str, check_prompt: str, check_path: Path, check_timeout: int, fix_key: str, build_fix_prompt, fix_payload, fix_timeout: int, fix_role: str = "fast", fix_caps: str = "files", on_fix_invalid=None, ) -> tuple[str, object]: """Check→Fix-Muster: Prüf-Agent notiert Probleme (JSON), Fix-Agent behebt sie. Resume: existierende Check-Datei überspringt den ganzen Schritt. Check ist fatal (FAILED), Fix nicht — Original bleibt; on_fix_invalid kann das kanonische Original zurückschreiben, falls der Fix-Agent die Artefakt-Datei zerschrieben hat. Lese-Check (Multi-Slot, Section-genau) und Bausteine-Auswahl-Check (Patch-Semantik) passen bewusst NICHT in dieses Muster. → (OK, neues_artefakt | None=unverändert) | (CANCELLED, None) | (FAILED, None) """ if check_path.exists(): return OK, None await _set_step(ctx.guide_id, step, f"Prüfe {name}…") status, probleme = await run_single_slot( ctx, f"{name}-Prüfung", key=check_key, prompt=check_prompt, role="fast", capabilities="files", payload=lambda result: _probleme_schema(_json_datei(check_path)), timeout=check_timeout, ) if status != OK: return status, None if not probleme: return OK, None _log(ctx.topic, f"{name}-Prüfung: {len(probleme)} Problem(e) notiert") await _set_step(ctx.guide_id, step, f"Passe {name} an…") status, fixed = await run_single_slot( ctx, f"{name}-Fix", key=fix_key, prompt=build_fix_prompt(probleme), role=fix_role, capabilities=fix_caps, payload=fix_payload, timeout=fix_timeout, ) if status == CANCELLED: return CANCELLED, None if status == FAILED: _log(ctx.topic, f"{name}-Fix ungültig — Original bleibt") if on_fix_invalid: on_fix_invalid() return OK, None return OK, fixed