update
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
"""Pipeline-Grundbausteine: Agent-Races, Single-Slot, Check→Fix, Prompts, Guide-Status.
|
||||
"""Pipeline-Grundbausteine: Agent-Races (mit Grace), Single-Slot, Schemata, Prompts, Guide-Status.
|
||||
|
||||
Hält den mutablen Pipeline-Zustand (Generierungs-Semaphore, Cancel-Set).
|
||||
Zugriff auf das Cancel-Set NUR über die Funktionen hier — kopierte Referenzen
|
||||
@@ -105,10 +105,39 @@ def _probleme_schema(data):
|
||||
return out or None
|
||||
|
||||
|
||||
def _str_liste(val) -> list[str] | None:
|
||||
"""Liste nicht-leerer Strings → gestrippte Liste (leer erlaubt) · sonst None."""
|
||||
if not isinstance(val, list) or not all(isinstance(x, str) for x in val):
|
||||
return None
|
||||
out = [x.strip() for x in val]
|
||||
return None if any(not x for x in out) else out
|
||||
|
||||
|
||||
def _rest_schema(data):
|
||||
"""{"uebernehmen": [str]} → Liste (leer erlaubt) · sonst None."""
|
||||
if not isinstance(data, dict):
|
||||
return None
|
||||
return _str_liste(data.get("uebernehmen"))
|
||||
|
||||
|
||||
def _runde_schema(data, final: bool = False):
|
||||
"""{"aufnehmen": [str], "rest": [str]} → (aufnehmen, rest) · sonst None.
|
||||
|
||||
final=True: letzte Klärungs-Runde — ein nicht-leerer Rest ist ungültig.
|
||||
"""
|
||||
if not isinstance(data, dict):
|
||||
return None
|
||||
aufnehmen = _str_liste(data.get("aufnehmen"))
|
||||
rest = _str_liste(data.get("rest"))
|
||||
if aufnehmen is None or rest is None or (final and rest):
|
||||
return None
|
||||
return aufnehmen, rest
|
||||
|
||||
|
||||
_MAX_RESTARTS = 2
|
||||
|
||||
|
||||
async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None:
|
||||
async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None, *, grace: int | None = None) -> list | None:
|
||||
"""Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse.
|
||||
|
||||
Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)`
|
||||
@@ -116,9 +145,16 @@ async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout:
|
||||
Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das
|
||||
Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt.
|
||||
`cancelled()` → True bricht ab (keine Restarts, Rückgabe None).
|
||||
|
||||
Mit `grace` wird `quorum` zum Minimum: Das erste gültige Ergebnis startet
|
||||
einen Timer von `grace` Sekunden. Nach dessen Ablauf werden laufende
|
||||
Agenten nur gekillt, wenn das Minimum steht — sonst läuft das Race samt
|
||||
Restarts weiter, bis es steht. Rückgabe: `quorum` bis `len(slots)` Ergebnisse.
|
||||
"""
|
||||
attempts = {i: 0 for i in range(len(slots))}
|
||||
tasks: dict[asyncio.Task, int] = {}
|
||||
loop = asyncio.get_running_loop()
|
||||
deadline: float | None = None
|
||||
|
||||
def spawn(i: int) -> None:
|
||||
slot = slots[i]
|
||||
@@ -136,7 +172,15 @@ async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout:
|
||||
while tasks:
|
||||
if cancelled and cancelled():
|
||||
return None
|
||||
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
|
||||
if deadline is not None and len(results) >= quorum and loop.time() >= deadline:
|
||||
return results
|
||||
# Grace gesetzt und Minimum erreicht → nur bis zum Deadline-Rest warten
|
||||
wait_timeout = None
|
||||
if deadline is not None and len(results) >= quorum:
|
||||
wait_timeout = max(0.0, deadline - loop.time())
|
||||
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED, timeout=wait_timeout)
|
||||
if not done:
|
||||
continue
|
||||
for task in done:
|
||||
i = tasks.pop(task)
|
||||
payload, err = None, None
|
||||
@@ -155,16 +199,24 @@ async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout:
|
||||
|
||||
if payload is not None:
|
||||
results.append(payload)
|
||||
if grace is not None and deadline is None:
|
||||
deadline = loop.time() + grace
|
||||
_log(topic, f"{label}: erstes Ergebnis — Grace {grace}s läuft")
|
||||
if on_update:
|
||||
on_update(len(results))
|
||||
if len(results) >= quorum:
|
||||
if len(results) >= quorum and (grace is None or loop.time() >= deadline):
|
||||
return results
|
||||
continue
|
||||
|
||||
_log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}")
|
||||
attempts[i] += 1
|
||||
if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()):
|
||||
# Steht das Minimum schon, sind Restarts sinnlos — der Neustart
|
||||
# würde am Grace-Ende ohnehin gekillt.
|
||||
satt = grace is not None and len(results) >= quorum
|
||||
if attempts[i] <= _MAX_RESTARTS and not satt and not (cancelled and cancelled()):
|
||||
spawn(i)
|
||||
if len(results) >= quorum: # alle Slots durch, Minimum steht (nur mit grace erreichbar)
|
||||
return results
|
||||
_log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)")
|
||||
return None
|
||||
finally:
|
||||
@@ -184,7 +236,7 @@ class GenContext:
|
||||
guide_id: str | None = None
|
||||
|
||||
|
||||
# Ergebnis-Status von run_single_slot/_check_then_fix
|
||||
# Ergebnis-Status von run_single_slot
|
||||
OK, CANCELLED, FAILED = "ok", "cancelled", "failed"
|
||||
|
||||
|
||||
@@ -205,47 +257,3 @@ async def run_single_slot(
|
||||
return OK, res[0]
|
||||
|
||||
|
||||
async def _check_then_fix(
|
||||
ctx: GenContext, *, name: str, step: int,
|
||||
check_key: str, check_prompt: str, check_path: Path, check_timeout: int,
|
||||
fix_key: str, build_fix_prompt, fix_payload, fix_timeout: int,
|
||||
fix_role: str = "fast", fix_caps: str = "files",
|
||||
on_fix_invalid=None,
|
||||
) -> tuple[str, object]:
|
||||
"""Check→Fix-Muster: Prüf-Agent notiert Probleme (JSON), Fix-Agent behebt sie.
|
||||
|
||||
Resume: existierende Check-Datei überspringt den ganzen Schritt.
|
||||
Check ist fatal (FAILED), Fix nicht — Original bleibt; on_fix_invalid kann
|
||||
das kanonische Original zurückschreiben, falls der Fix-Agent die
|
||||
Artefakt-Datei zerschrieben hat.
|
||||
Lese-Check (Multi-Slot, Section-genau) und Bausteine-Auswahl-Check
|
||||
(Patch-Semantik) passen bewusst NICHT in dieses Muster.
|
||||
→ (OK, neues_artefakt | None=unverändert) | (CANCELLED, None) | (FAILED, None)
|
||||
"""
|
||||
if check_path.exists():
|
||||
return OK, None
|
||||
await _set_step(ctx.guide_id, step, f"Prüfe {name}…")
|
||||
status, probleme = await run_single_slot(
|
||||
ctx, f"{name}-Prüfung", key=check_key, prompt=check_prompt,
|
||||
role="fast", capabilities="files",
|
||||
payload=lambda result: _probleme_schema(_json_datei(check_path)),
|
||||
timeout=check_timeout,
|
||||
)
|
||||
if status != OK:
|
||||
return status, None
|
||||
if not probleme:
|
||||
return OK, None
|
||||
_log(ctx.topic, f"{name}-Prüfung: {len(probleme)} Problem(e) notiert")
|
||||
await _set_step(ctx.guide_id, step, f"Passe {name} an…")
|
||||
status, fixed = await run_single_slot(
|
||||
ctx, f"{name}-Fix", key=fix_key, prompt=build_fix_prompt(probleme),
|
||||
role=fix_role, capabilities=fix_caps, payload=fix_payload, timeout=fix_timeout,
|
||||
)
|
||||
if status == CANCELLED:
|
||||
return CANCELLED, None
|
||||
if status == FAILED:
|
||||
_log(ctx.topic, f"{name}-Fix ungültig — Original bleibt")
|
||||
if on_fix_invalid:
|
||||
on_fix_invalid()
|
||||
return OK, None
|
||||
return OK, fixed
|
||||
|
||||
Reference in New Issue
Block a user