Files
creator/backend/pipeline.py
2026-06-12 17:18:42 +02:00

260 lines
9.4 KiB
Python

"""Pipeline-Grundbausteine: Agent-Races (mit Grace), Single-Slot, Schemata, Prompts, Guide-Status.
Hält den mutablen Pipeline-Zustand (Generierungs-Semaphore, Cancel-Set).
Zugriff auf das Cancel-Set NUR über die Funktionen hier — kopierte Referenzen
in anderen Modulen würden bei einem Re-Assign auseinanderlaufen.
"""
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from agents import run_agent, kill_process
from config import MAX_CONCURRENT_GENERATIONS, TEMPLATES_DIR, TIMEOUTS
from database import update_guide
from jsonio import read_json_file as _json_datei
log = logging.getLogger("creator.pipeline")
_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS)
_cancelled: set[str] = set()
async def cancel_guide(guide_id: str) -> bool:
_cancelled.add(guide_id)
kill_process(guide_id)
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now)
return True
def is_guide_cancelled(guide_id: str) -> bool:
return guide_id in _cancelled
def clear_guide_cancelled(guide_id: str) -> None:
_cancelled.discard(guide_id)
async def _set_progress(guide_id: str, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, progress=progress, updated_at=now)
async def _set_step(guide_id: str, step: int, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, step=step, progress=progress, updated_at=now)
async def _fail(guide_id: str, msg: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now)
def _prompt(name: str, **kwargs) -> str:
template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8")
return template.format(**kwargs)
def _extra(instructions: str) -> str:
return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
def _log(topic: str, msg: str) -> None:
log.info("[%s] %s", topic, msg)
def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str:
stderr = (stderr or "").strip()
if stderr:
return f"{label}: {stderr[:1000]}"
tail = (stdout or "").strip()[-500:]
if tail:
return f"{label} (exit {returncode}, stderr leer): …{tail}"
return f"{label} (exit {returncode}, ohne Ausgabe)"
def _gather_error(label: str, results: list) -> str:
for r in results:
if isinstance(r, BaseException):
return f"{label}: {type(r).__name__}: {r}"
returncode, stdout, stderr = r
if returncode != 0:
return _claude_error(label, returncode, stdout, stderr)
return f"{label}: kein verwertbares Ergebnis"
def _timeout(step: str, n: int = 0) -> int:
base, per = TIMEOUTS[step]
return base + per * n
def _probleme_schema(data):
"""{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None."""
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return []
p = data.get("probleme")
if not isinstance(p, list) or not p:
return None
out = [str(x).strip() for x in p if str(x).strip()]
return out or None
def _str_liste(val) -> list[str] | None:
"""Liste nicht-leerer Strings → gestrippte Liste (leer erlaubt) · sonst None."""
if not isinstance(val, list) or not all(isinstance(x, str) for x in val):
return None
out = [x.strip() for x in val]
return None if any(not x for x in out) else out
def _rest_schema(data):
"""{"uebernehmen": [str]} → Liste (leer erlaubt) · sonst None."""
if not isinstance(data, dict):
return None
return _str_liste(data.get("uebernehmen"))
def _runde_schema(data, final: bool = False):
"""{"aufnehmen": [str], "rest": [str]} → (aufnehmen, rest) · sonst None.
final=True: letzte Klärungs-Runde — ein nicht-leerer Rest ist ungültig.
"""
if not isinstance(data, dict):
return None
aufnehmen = _str_liste(data.get("aufnehmen"))
rest = _str_liste(data.get("rest"))
if aufnehmen is None or rest is None or (final and rest):
return None
return aufnehmen, rest
_MAX_RESTARTS = 2
async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None, *, grace: int | None = None) -> list | None:
"""Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse.
Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)`
prüft die Gültigkeit und liefert das Slot-Ergebnis oder None.
Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das
Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt.
`cancelled()` → True bricht ab (keine Restarts, Rückgabe None).
Mit `grace` wird `quorum` zum Minimum: Das erste gültige Ergebnis startet
einen Timer von `grace` Sekunden. Nach dessen Ablauf werden laufende
Agenten nur gekillt, wenn das Minimum steht — sonst läuft das Race samt
Restarts weiter, bis es steht. Rückgabe: `quorum` bis `len(slots)` Ergebnisse.
"""
attempts = {i: 0 for i in range(len(slots))}
tasks: dict[asyncio.Task, int] = {}
loop = asyncio.get_running_loop()
deadline: float | None = None
def spawn(i: int) -> None:
slot = slots[i]
task = asyncio.create_task(run_agent(
slot["key"], slot["prompt"], timeout,
provider=provider, role=slot["role"], capabilities=slot["capabilities"],
))
tasks[task] = i
for i in range(len(slots)):
spawn(i)
results: list = []
try:
while tasks:
if cancelled and cancelled():
return None
if deadline is not None and len(results) >= quorum and loop.time() >= deadline:
return results
# Grace gesetzt und Minimum erreicht → nur bis zum Deadline-Rest warten
wait_timeout = None
if deadline is not None and len(results) >= quorum:
wait_timeout = max(0.0, deadline - loop.time())
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED, timeout=wait_timeout)
if not done:
continue
for task in done:
i = tasks.pop(task)
payload, err = None, None
try:
result = task.result()
if result[0] != 0:
err = _claude_error("Fehler", *result)
else:
payload = slots[i]["payload"](result)
if payload is None:
err = "Ergebnis ungültig/nicht parsebar"
except asyncio.TimeoutError:
err = f"Timeout nach {timeout}s"
except Exception as e:
err = f"{type(e).__name__}: {e}"
if payload is not None:
results.append(payload)
if grace is not None and deadline is None:
deadline = loop.time() + grace
_log(topic, f"{label}: erstes Ergebnis — Grace {grace}s läuft")
if on_update:
on_update(len(results))
if len(results) >= quorum and (grace is None or loop.time() >= deadline):
return results
continue
_log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}")
attempts[i] += 1
# Steht das Minimum schon, sind Restarts sinnlos — der Neustart
# würde am Grace-Ende ohnehin gekillt.
satt = grace is not None and len(results) >= quorum
if attempts[i] <= _MAX_RESTARTS and not satt and not (cancelled and cancelled()):
spawn(i)
if len(results) >= quorum: # alle Slots durch, Minimum steht (nur mit grace erreichbar)
return results
_log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)")
return None
finally:
for task, i in tasks.items():
kill_process(slots[i]["key"])
task.cancel()
if tasks:
await asyncio.gather(*tasks.keys(), return_exceptions=True)
@dataclass
class GenContext:
"""Durchgereichte Pipeline-Parameter — erspart lange Argument-Signaturen."""
topic: str
provider: str
is_cancelled: Callable[[], bool]
guide_id: str | None = None
# Ergebnis-Status von run_single_slot
OK, CANCELLED, FAILED = "ok", "cancelled", "failed"
async def run_single_slot(
ctx: GenContext, label: str, *,
key: str, prompt: str, role: str, capabilities: str, payload, timeout: int,
) -> tuple[str, object]:
"""Ein Agent, ein gültiges Ergebnis (Race mit Quorum 1).
→ (OK, wert) | (CANCELLED, None) | (FAILED, None)
"""
slots = [{"key": key, "prompt": prompt, "role": role, "capabilities": capabilities, "payload": payload}]
res = await _race(ctx.topic, label, slots, 1, timeout, ctx.provider, cancelled=ctx.is_cancelled)
if ctx.is_cancelled():
return CANCELLED, None
if res is None:
return FAILED, None
return OK, res[0]