"""Pipeline-Grundbausteine: Agent-Races (mit Grace), Single-Slot, Schemata, Prompts, Guide-Status. Hält den mutablen Pipeline-Zustand (Generierungs-Semaphore, Cancel-Set). Zugriff auf das Cancel-Set NUR über die Funktionen hier — kopierte Referenzen in anderen Modulen würden bei einem Re-Assign auseinanderlaufen. """ import asyncio import logging from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable from agents import run_agent, kill_process from config import MAX_CONCURRENT_GENERATIONS, TEMPLATES_DIR, TIMEOUTS from database import update_guide from jsonio import read_json_file as _json_datei log = logging.getLogger("creator.pipeline") _semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS) _cancelled: set[str] = set() async def cancel_guide(guide_id: str) -> bool: _cancelled.add(guide_id) kill_process(guide_id) now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now) return True def is_guide_cancelled(guide_id: str) -> bool: return guide_id in _cancelled def clear_guide_cancelled(guide_id: str) -> None: _cancelled.discard(guide_id) async def _set_progress(guide_id: str, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, progress=progress, updated_at=now) async def _set_step(guide_id: str, step: int, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, step=step, progress=progress, updated_at=now) async def _fail(guide_id: str, msg: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now) def _prompt(name: str, **kwargs) -> str: template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8") return template.format(**kwargs) def _extra(instructions: str) -> str: return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else "" def _log(topic: str, msg: str) -> None: log.info("[%s] %s", topic, msg) def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str: stderr = (stderr or "").strip() if stderr: return f"{label}: {stderr[:1000]}" tail = (stdout or "").strip()[-500:] if tail: return f"{label} (exit {returncode}, stderr leer): …{tail}" return f"{label} (exit {returncode}, ohne Ausgabe)" def _gather_error(label: str, results: list) -> str: for r in results: if isinstance(r, BaseException): return f"{label}: {type(r).__name__}: {r}" returncode, stdout, stderr = r if returncode != 0: return _claude_error(label, returncode, stdout, stderr) return f"{label}: kein verwertbares Ergebnis" def _timeout(step: str, n: int = 0) -> int: base, per = TIMEOUTS[step] return base + per * n def _probleme_schema(data): """{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None.""" if not isinstance(data, dict): return None if data.get("ok") is True: return [] p = data.get("probleme") if not isinstance(p, list) or not p: return None out = [str(x).strip() for x in p if str(x).strip()] return out or None def _str_liste(val) -> list[str] | None: """Liste nicht-leerer Strings → gestrippte Liste (leer erlaubt) · sonst None.""" if not isinstance(val, list) or not all(isinstance(x, str) for x in val): return None out = [x.strip() for x in val] return None if any(not x for x in out) else out def _rest_schema(data): """{"uebernehmen": [str]} → Liste (leer erlaubt) · sonst None.""" if not isinstance(data, dict): return None return _str_liste(data.get("uebernehmen")) def _runde_schema(data, final: bool = False): """{"aufnehmen": [str], "rest": [str]} → (aufnehmen, rest) · sonst None. final=True: letzte Klärungs-Runde — ein nicht-leerer Rest ist ungültig. """ if not isinstance(data, dict): return None aufnehmen = _str_liste(data.get("aufnehmen")) rest = _str_liste(data.get("rest")) if aufnehmen is None or rest is None or (final and rest): return None return aufnehmen, rest _MAX_RESTARTS = 2 async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None, *, grace: int | None = None) -> list | None: """Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse. Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)` prüft die Gültigkeit und liefert das Slot-Ergebnis oder None. Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt. `cancelled()` → True bricht ab (keine Restarts, Rückgabe None). Mit `grace` wird `quorum` zum Minimum: Das erste gültige Ergebnis startet einen Timer von `grace` Sekunden. Nach dessen Ablauf werden laufende Agenten nur gekillt, wenn das Minimum steht — sonst läuft das Race samt Restarts weiter, bis es steht. Rückgabe: `quorum` bis `len(slots)` Ergebnisse. """ attempts = {i: 0 for i in range(len(slots))} tasks: dict[asyncio.Task, int] = {} loop = asyncio.get_running_loop() deadline: float | None = None def spawn(i: int) -> None: slot = slots[i] task = asyncio.create_task(run_agent( slot["key"], slot["prompt"], timeout, provider=provider, role=slot["role"], capabilities=slot["capabilities"], )) tasks[task] = i for i in range(len(slots)): spawn(i) results: list = [] try: while tasks: if cancelled and cancelled(): return None if deadline is not None and len(results) >= quorum and loop.time() >= deadline: return results # Grace gesetzt und Minimum erreicht → nur bis zum Deadline-Rest warten wait_timeout = None if deadline is not None and len(results) >= quorum: wait_timeout = max(0.0, deadline - loop.time()) done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED, timeout=wait_timeout) if not done: continue for task in done: i = tasks.pop(task) payload, err = None, None try: result = task.result() if result[0] != 0: err = _claude_error("Fehler", *result) else: payload = slots[i]["payload"](result) if payload is None: err = "Ergebnis ungültig/nicht parsebar" except asyncio.TimeoutError: err = f"Timeout nach {timeout}s" except Exception as e: err = f"{type(e).__name__}: {e}" if payload is not None: results.append(payload) if grace is not None and deadline is None: deadline = loop.time() + grace _log(topic, f"{label}: erstes Ergebnis — Grace {grace}s läuft") if on_update: on_update(len(results)) if len(results) >= quorum and (grace is None or loop.time() >= deadline): return results continue _log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}") attempts[i] += 1 # Steht das Minimum schon, sind Restarts sinnlos — der Neustart # würde am Grace-Ende ohnehin gekillt. satt = grace is not None and len(results) >= quorum if attempts[i] <= _MAX_RESTARTS and not satt and not (cancelled and cancelled()): spawn(i) if len(results) >= quorum: # alle Slots durch, Minimum steht (nur mit grace erreichbar) return results _log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)") return None finally: for task, i in tasks.items(): kill_process(slots[i]["key"]) task.cancel() if tasks: await asyncio.gather(*tasks.keys(), return_exceptions=True) @dataclass class GenContext: """Durchgereichte Pipeline-Parameter — erspart lange Argument-Signaturen.""" topic: str provider: str is_cancelled: Callable[[], bool] guide_id: str | None = None # Ergebnis-Status von run_single_slot OK, CANCELLED, FAILED = "ok", "cancelled", "failed" async def run_single_slot( ctx: GenContext, label: str, *, key: str, prompt: str, role: str, capabilities: str, payload, timeout: int, ) -> tuple[str, object]: """Ein Agent, ein gültiges Ergebnis (Race mit Quorum 1). → (OK, wert) | (CANCELLED, None) | (FAILED, None) """ slots = [{"key": key, "prompt": prompt, "role": role, "capabilities": capabilities, "payload": payload}] res = await _race(ctx.topic, label, slots, 1, timeout, ctx.provider, cancelled=ctx.is_cancelled) if ctx.is_cancelled(): return CANCELLED, None if res is None: return FAILED, None return OK, res[0]