This commit is contained in:
Team3
2026-06-07 09:00:20 +02:00
parent fce82fbd16
commit 8d6d1bf089
22 changed files with 876 additions and 274 deletions

View File

@@ -501,9 +501,12 @@ async def generate_bausteine(topic: str, instructions: str = "", provider: str =
_bausteine_cancelled.discard(topic)
# --- Guide-Generierung: Bausteine → (Plan) → Writer → JSON ---
# --- Guide-Generierung: 6 Schritte mit Prüfung nach jeder Phase (OnePager hat einen eigenen Weg) ---
# Prüf-Agenten notieren nur Probleme; das Anpassen übernimmt der jeweilige Erzeuger-Typ.
# Schritt-Dateien bleiben liegen → Abbruch erhält Fortschritt, ▶ setzt am offenen Schritt fort.
GUIDE_STEPS = ("Auswahl", "Auswahl-Prüfung", "Gliederung", "Gliederungs-Prüfung", "Schreiben", "Lese-Prüfung")
# Parallele Writer pro Format (OnePager hat einen eigenen Weg).
# Writer skalieren mit der Section-Zahl: 1 Writer je ~30 Sections (gedeckelt).
# Kleine Pakete vermeiden Lazy-Output bei langen Listen und begrenzen den Schaden
# eines fehlgeschlagenen Writers.
@@ -511,6 +514,80 @@ WRITER_SECTIONS = 30
WRITER_MAX = 20
def _guide_files(content_path: Path) -> dict:
d, stem = content_path.parent, content_path.stem
return {
"auswahl": d / f"{stem}.auswahl.json",
"auswahl_check": d / f"{stem}.auswahl-check.json",
"gliederung": d / f"{stem}.gliederung.json",
"gliederung_check": d / f"{stem}.gliederung-check.json",
# chunk-/lese-check-/fix-Dateien sind dynamisch: {stem}.chunk-i.md usw.
}
def guide_slot_dateien(content_path: Path) -> list[Path]:
"""Alle Schritt-Dateien eines Guides (für den Frischstart)."""
return [p for p in content_path.parent.glob(f"{content_path.stem}.*") if p != content_path]
async def _set_step(guide_id: str, step: int, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, step=step, progress=progress, updated_at=now)
def _resolve_auswahl(data, entries: dict[int, str], k_min: int, k_max: int) -> list[int] | None:
"""{"bausteine": [Titel]} → Nummern; None bei Schema-Verstoß/Drift/falschem Umfang."""
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
return None
idx = _titel_index(entries)
nums: list[int] = []
seen: set[int] = set()
total = unknown = 0
for t in data["bausteine"]:
total += 1
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
if num is None:
unknown += 1
elif num not in seen:
seen.add(num)
nums.append(num)
if total == 0 or (total - unknown) / total < 0.85:
return None
if len(nums) < 0.9 * k_min or len(nums) > 1.1 * k_max:
return None
return nums
def _probleme_schema(data):
"""{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None."""
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return []
p = data.get("probleme")
if not isinstance(p, list) or not p:
return None
out = [str(x).strip() for x in p if str(x).strip()]
return out or None
def _lese_probleme_schema(data):
"""{"ok": true} → [] · {"probleme": [{"section", "problem"}]} → Liste · sonst None."""
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return []
p = data.get("probleme")
if not isinstance(p, list) or not p:
return None
out = []
for x in p:
if not isinstance(x, dict) or not isinstance(x.get("section"), str) or not isinstance(x.get("problem"), str):
return None
out.append({"section": x["section"].strip(), "problem": x["problem"].strip()})
return out or None
def _resolve_gliederung(data, entries: dict[int, str], soll_min: int, soll_max: int) -> list[dict] | None:
"""{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}].
@@ -605,40 +682,38 @@ def _parse_fragment(text: str) -> list[dict]:
async def _generate_onepager(
guide_id: str, topic: str, instructions: str, provider: str,
project: Path | None, content_path: Path, fragment_paths: list[Path],
project: Path | None, content_path: Path,
) -> list[dict] | None:
def is_cancelled() -> bool:
return guide_id in _cancelled
PFLICHT_KARTEN = ("was ist", "welches problem", "wann nehmen", "einordnung", "so sieht", "fakten", "erste schritte")
# 3×3-Raster: 7 Karten mit festen Schlüsseln (Reihenfolge = Lesereihenfolge mobil)
KARTEN_KEYS = ("info", "eigenschaften", "beispiel", "zusammenhaenge", "voraussetzungen", "modern", "veraltet")
def karten_schema(data):
"""{"karten": {key: {titel, md}}} → Liste · sonst None."""
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return "ok"
karten = data.get("karten")
if not isinstance(karten, list) or not karten:
if not isinstance(karten, dict):
return None
out = []
for k in karten:
if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("merksatz"), str):
for key in KARTEN_KEYS:
k = karten.get(key)
if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("md"), str):
return None
titel, merksatz = k["titel"].strip(), k["merksatz"].strip()
if len(merksatz) < 5: # abgebrochene/leere Karten ("Per") sind ungültig
return None
out.append({"titel": titel, "merksatz": merksatz})
vorhanden = [k["titel"].lower() for k in out]
for pflicht in PFLICHT_KARTEN:
if not any(t.startswith(pflicht) for t in vorhanden):
titel, md = k["titel"].strip(), k["md"].strip()
if not titel or len(md) < 5: # abgebrochene/leere Karten sind ungültig
return None
out.append({"key": key, "titel": titel, "md": md})
return out
# Schritt 1: Recherche — eigene Faktenbasis, unabhängig von den Bausteinen
await _set_progress(guide_id, "Recherchiere…")
recherche_path = content_path.parent / f"{content_path.stem}.recherche.md"
fragment_paths.append(recherche_path)
recherche_path.unlink(missing_ok=True)
d, stem = content_path.parent, content_path.stem
recherche_path = d / f"{stem}.recherche.md"
recherche_check_path = d / f"{stem}.recherche-check.json"
karten_path = d / f"{stem}.karten.json"
check_path = d / f"{stem}.onepager-check.json"
# Projekte bekommen eigene Recherche-Dimensionen — Produkt-Fragen
# (Version, Lizenz, Alternativen) laufen dort ins Leere.
if project:
@@ -647,62 +722,135 @@ async def _generate_onepager(
else:
source = _prompt("OnePager-Quelle-Thema", topic=topic)
recherche_template = "OnePager-Recherche"
slots = [{
"key": f"{guide_id}-recherche",
"prompt": _prompt(recherche_template, topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)),
"role": "quick", "capabilities": "files" if project else "full",
"payload": (lambda result: recherche_path.read_text(encoding="utf-8") if recherche_path.exists() else None),
}]
res = await _race(topic, "OnePager-Recherche", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Recherche fehlgeschlagen")
return None
recherche = res[0]
# Schritt 2: Bauen — Karten nur aus der Faktenbasis (JSON)
await _set_progress(guide_id, "Baue OnePager…")
karten_path = content_path.parent / f"{content_path.stem}.karten.json"
fragment_paths.append(karten_path)
karten_path.unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-bauen",
"prompt": _prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)),
"role": "fast", "capabilities": "files",
"payload": (lambda result: (k if isinstance(k := karten_schema(_json_datei(karten_path)), list) else None)),
}]
res = await _race(topic, "OnePager-Bauen", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Bau fehlgeschlagen")
return None
karten = res[0]
def recherche_payload(result=None):
if not recherche_path.exists():
return None
text = recherche_path.read_text(encoding="utf-8").strip()
return text or None
# Schritt 3: Verifizieren — {"ok": true} oder vollständig korrigierte Liste (nicht fatal)
await _set_progress(guide_id, "Verifiziere OnePager…")
check_path = content_path.parent / f"{content_path.stem}.onepager-check.json"
fragment_paths.append(check_path)
check_path.unlink(missing_ok=True)
karten_block = "\n".join(f"- {k['titel']}{k['merksatz']}" for k in karten)
slots = [{
"key": f"{guide_id}-verify",
"prompt": _prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block, out_path=check_path),
"role": "fast", "capabilities": "files",
"payload": (lambda result: karten_schema(_json_datei(check_path))),
}]
res = await _race(topic, "OnePager-Verifikation", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "OnePager-Verifikation fehlgeschlagen — ungeprüfte Version wird verwendet")
elif isinstance(res[0], list):
_log(topic, "OnePager-Verifikation hat Korrekturen geliefert")
# Schritt 1: Recherche — vorhandene Datei wird übernommen (Resume)
recherche = recherche_payload()
if recherche is None:
await _set_step(guide_id, 0, "Recherchiere…")
slots = [{
"key": f"{guide_id}-recherche",
"prompt": _prompt(recherche_template, topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)),
"role": "quick", "capabilities": "files" if project else "full",
"payload": recherche_payload,
}]
res = await _race(topic, "OnePager-Recherche", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Recherche fehlgeschlagen")
return None
recherche = res[0]
# Schritt 2: Recherche-Prüfung — notiert Probleme; Anpassung macht ein Recherche-Agent
if not recherche_check_path.exists():
await _set_step(guide_id, 1, "Prüfe Recherche…")
slots = [{
"key": f"{guide_id}-recherche-check",
"prompt": _prompt("OnePager-Recherche-Check", topic=topic, recherche=recherche, out_path=recherche_check_path),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _probleme_schema(_json_datei(recherche_check_path))),
}]
res = await _race(topic, "Recherche-Prüfung", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Recherche-Prüfung fehlgeschlagen")
return None
probleme = res[0]
if probleme:
_log(topic, f"Recherche-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(guide_id, 1, "Passe Recherche an…")
slots = [{
"key": f"{guide_id}-recherche-fix",
"prompt": _prompt(
"OnePager-Recherche-Fix",
topic=topic, source=source, recherche=recherche,
probleme="\n".join(f"- {p}" for p in probleme),
out_path=recherche_path, extra=_extra(instructions),
),
"role": "quick", "capabilities": "files" if project else "full",
"payload": recherche_payload,
}]
res = await _race(topic, "Recherche-Fix", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "Recherche-Fix ungültig — ursprüngliche Recherche bleibt")
else:
recherche = res[0]
# Schritt 3: Bauen — Karten nur aus der Faktenbasis (Resume: gültige Datei wird übernommen)
karten = karten_schema(_json_datei(karten_path))
if karten is None:
await _set_step(guide_id, 2, "Baue OnePager…")
karten_path.unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-bauen",
"prompt": _prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)),
"role": "fast", "capabilities": "files",
"payload": (lambda result: karten_schema(_json_datei(karten_path))),
}]
res = await _race(topic, "OnePager-Bauen", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Bau fehlgeschlagen")
return None
karten = res[0]
def karten_block() -> str:
return "\n\n".join(f"### {k['titel']} [{k['key']}]\n{k['md']}" for k in karten)
# Schritt 4: Prüfung — notiert Probleme; Anpassung macht ein Bauen-Agent
if not check_path.exists():
await _set_step(guide_id, 3, "Prüfe OnePager…")
slots = [{
"key": f"{guide_id}-verify",
"prompt": _prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block(), out_path=check_path),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _probleme_schema(_json_datei(check_path))),
}]
res = await _race(topic, "OnePager-Prüfung", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Prüfung fehlgeschlagen")
return None
probleme = res[0]
if probleme:
_log(topic, f"OnePager-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(guide_id, 3, "Passe OnePager an…")
slots = [{
"key": f"{guide_id}-karten-fix",
"prompt": _prompt(
"OnePager-Fix",
topic=topic, recherche=recherche, karten=karten_block(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=karten_path, extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result: karten_schema(_json_datei(karten_path))),
}]
res = await _race(topic, "OnePager-Fix", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "OnePager-Fix ungültig — ursprüngliche Karten bleiben")
karten_path.write_text(
json.dumps({"karten": {k["key"]: {"titel": k["titel"], "md": k["md"]} for k in karten}}, ensure_ascii=False),
encoding="utf-8",
)
else:
karten = res[0]
sections = [
{"num": i, "title": k["titel"], "md": k["merksatz"]}
{"num": i, "title": k["titel"], "md": k["md"], "key": k["key"]}
for i, k in enumerate(karten, 1)
]
return [{"title": topic, "sections": sections}]
@@ -711,12 +859,13 @@ async def _generate_onepager(
async def _generate_sections(
guide_id: str, topic: str, format_name: str, entries: dict[int, str],
facts: str, instructions: str, provider: str,
content_path: Path, fragment_paths: list[Path],
content_path: Path,
) -> list[dict] | None:
def is_cancelled() -> bool:
return guide_id in _cancelled
spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8")
files = _guide_files(content_path)
bausteine_liste = "\n".join(f"- {t}" for t in entries.values())
n = len(entries)
anteil_min, anteil_max, minimum, zweck = FORMAT_ANTEIL[format_name]
@@ -727,76 +876,289 @@ async def _generate_sections(
"Wähle, was diesem Zweck dient — lass weg, was dafür nicht nötig ist."
)
await _set_progress(guide_id, "Wähle Bausteine & plane Gliederung…")
plan_path = content_path.parent / f"{content_path.stem}.gliederung.json"
fragment_paths.append(plan_path)
plan_path.unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-plan",
"prompt": _prompt(
"Guide-Plan",
topic=topic, format_name=format_name, bausteine=bausteine_liste,
auswahl_auftrag=auswahl_auftrag, out_path=plan_path, extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_gliederung(_json_datei(plan_path), entries, k_min, k_max)),
}]
res = await _race(topic, "Gliederung", slots, 1, _timeout("plan", n), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Gliederung fehlgeschlagen")
return None
plan = res[0]
# Schritt 1: Auswahl — vorhandene gültige Datei wird übernommen (Resume)
auswahl = _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)
if auswahl is None:
await _set_step(guide_id, 0, "Wähle Bausteine…")
files["auswahl"].unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-auswahl",
"prompt": _prompt(
"Guide-Auswahl",
topic=topic, format_name=format_name, bausteine=bausteine_liste,
auswahl_auftrag=auswahl_auftrag, out_path=files["auswahl"], extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)),
}]
res = await _race(topic, "Guide-Auswahl", slots, 1, _timeout("guide_auswahl", n), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Auswahl fehlgeschlagen")
return None
auswahl = res[0]
def auswahl_titel() -> str:
return "\n".join(f"- {_titel(entries[num])}" for num in auswahl)
def auswahl_json() -> str:
return json.dumps({"bausteine": [_titel(entries[num]) for num in auswahl]}, ensure_ascii=False)
# Schritt 2: Auswahl-Prüfung — notiert Probleme; Anpassung macht ein Auswahl-Agent
if not files["auswahl_check"].exists():
await _set_step(guide_id, 1, "Prüfe Auswahl…")
slots = [{
"key": f"{guide_id}-auswahl-check",
"prompt": _prompt(
"Guide-Auswahl-Check",
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
bausteine=bausteine_liste, auswahl=auswahl_titel(),
out_path=files["auswahl_check"], extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _probleme_schema(_json_datei(files["auswahl_check"]))),
}]
res = await _race(topic, "Auswahl-Prüfung", slots, 1, _timeout("guide_check", len(auswahl)), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Auswahl-Prüfung fehlgeschlagen")
return None
probleme = res[0]
if probleme:
_log(topic, f"Auswahl-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(guide_id, 1, "Passe Auswahl an…")
slots = [{
"key": f"{guide_id}-auswahl-fix",
"prompt": _prompt(
"Guide-Auswahl-Fix",
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
bausteine=bausteine_liste, auswahl=auswahl_titel(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=files["auswahl"], extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)),
}]
res = await _race(topic, "Auswahl-Fix", slots, 1, _timeout("guide_auswahl", n), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "Auswahl-Fix ungültig — ursprüngliche Auswahl bleibt")
files["auswahl"].write_text(auswahl_json(), encoding="utf-8")
else:
auswahl = res[0]
sel_entries = {num: entries[num] for num in auswahl}
soll = len(sel_entries)
sel_liste = "\n".join(f"- {t}" for t in sel_entries.values())
# Schritt 3: Gliederung der festen Auswahl
plan = _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)
if plan is None:
await _set_step(guide_id, 2, "Plane Gliederung…")
files["gliederung"].unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-gliederung",
"prompt": _prompt(
"Guide-Gliederung",
topic=topic, format_name=format_name, bausteine=sel_liste,
out_path=files["gliederung"], extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)),
}]
res = await _race(topic, "Gliederung", slots, 1, _timeout("plan", soll), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Gliederung fehlgeschlagen")
return None
plan = res[0]
def gliederung_text() -> str:
return "\n".join(_zuteilung_text([ch], {num: _titel(entries[num]) for num in ch["nums"]}) for ch in plan)
def gliederung_json() -> str:
return json.dumps(
{"kapitel": [{"titel": ch["title"], "bausteine": [_titel(entries[num]) for num in ch["nums"]]} for ch in plan]},
ensure_ascii=False,
)
# Schritt 4: Gliederungs-Prüfung
if not files["gliederung_check"].exists():
await _set_step(guide_id, 3, "Prüfe Gliederung…")
slots = [{
"key": f"{guide_id}-gliederung-check",
"prompt": _prompt(
"Guide-Gliederung-Check",
topic=topic, format_name=format_name, zweck=zweck,
auswahl=auswahl_titel(), gliederung=gliederung_text(),
out_path=files["gliederung_check"], extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _probleme_schema(_json_datei(files["gliederung_check"]))),
}]
res = await _race(topic, "Gliederungs-Prüfung", slots, 1, _timeout("guide_check", soll), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Gliederungs-Prüfung fehlgeschlagen")
return None
probleme = res[0]
if probleme:
_log(topic, f"Gliederungs-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(guide_id, 3, "Passe Gliederung an…")
slots = [{
"key": f"{guide_id}-gliederung-fix",
"prompt": _prompt(
"Guide-Gliederung-Fix",
topic=topic, format_name=format_name,
auswahl=auswahl_titel(), gliederung=gliederung_text(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=files["gliederung"], extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)),
}]
res = await _race(topic, "Gliederungs-Fix", slots, 1, _timeout("plan", soll), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "Gliederungs-Fix ungültig — ursprüngliche Gliederung bleibt")
files["gliederung"].write_text(gliederung_json(), encoding="utf-8")
else:
plan = res[0]
# Schritt 5: Schreiben — vorhandene Chunk-Dateien werden übernommen (Resume)
total_sections = sum(len(c["nums"]) for c in plan)
chunks = _split_chunks(plan, min(WRITER_MAX, max(1, math.ceil(total_sections / WRITER_SECTIONS))))
zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks]
chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks]
writer_count = len(zuteilungen)
await _set_progress(guide_id, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…")
paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)]
fragment_paths.extend(paths)
results = await asyncio.gather(*[
run_agent(
f"{guide_id}-w{i}",
_prompt(
"Guide-Writer",
topic=topic, format_name=format_name, zuteilung=zuteilung,
facts=facts, spec=spec, out_path=path, extra=_extra(instructions),
),
_timeout("writer", size), provider=provider, role="guide", capabilities="full",
)
for i, (zuteilung, path, size) in enumerate(zip(zuteilungen, paths, chunk_sizes), 1)
], return_exceptions=True)
if is_cancelled():
return None
for i, (r, p) in enumerate(zip(results, paths), 1):
if isinstance(r, BaseException):
_log(topic, f"Writer {i}: {type(r).__name__}: {r}")
elif r[0] != 0:
_log(topic, f"Writer {i}: {_claude_error('Fehler', *r)}")
elif not p.exists():
_log(topic, f"Writer {i}: keine Ausgabedatei erstellt")
fragments: list[dict] = []
for p in paths:
if p.exists():
fragments.extend(_parse_fragment(p.read_text(encoding="utf-8")))
if not fragments:
await _fail(guide_id, _gather_error("Writer-Fehler", list(results)))
return None
offen = [i for i, p in enumerate(paths) if not p.exists()]
if offen:
await _set_step(guide_id, 4, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…")
results = await asyncio.gather(*[
run_agent(
f"{guide_id}-w{i + 1}",
_prompt(
"Guide-Writer",
topic=topic, format_name=format_name, zuteilung=zuteilungen[i],
facts=facts, spec=spec, out_path=paths[i], extra=_extra(instructions),
),
_timeout("writer", chunk_sizes[i]), provider=provider, role="guide", capabilities="full",
)
for i in offen
], return_exceptions=True)
if is_cancelled():
return None
for i, r in zip(offen, results):
if isinstance(r, BaseException):
_log(topic, f"Writer {i + 1}: {type(r).__name__}: {r}")
elif r[0] != 0:
_log(topic, f"Writer {i + 1}: {_claude_error('Fehler', *r)}")
elif not paths[i].exists():
_log(topic, f"Writer {i + 1}: keine Ausgabedatei erstellt")
if not any(p.exists() for p in paths):
await _fail(guide_id, _gather_error("Writer-Fehler", list(results)))
return None
await _set_progress(guide_id, "Setze zusammen…")
idx = _titel_index(entries)
by_num: dict[int, dict] = {}
for sec in fragments:
num = _titel_aufloesen(idx, sec["titel"])
if num is None:
_log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)")
for p in paths:
if not p.exists():
continue
if num not in by_num:
by_num[num] = sec
for sec in _parse_fragment(p.read_text(encoding="utf-8")):
num = _titel_aufloesen(idx, sec["titel"])
if num is None:
_log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)")
elif num not in by_num:
by_num[num] = sec
if not by_num:
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
return None
# Schritt 6: Lese-Prüfung pro Writer-Paket — Fix beauftragt Writer nur mit beanstandeten Sections
chunk_nums = [[num for ch in chunk for num in ch["nums"] if num in by_num] for chunk in chunks]
check_paths = [content_path.parent / f"{content_path.stem}.lese-check-{i}.json" for i in range(1, writer_count + 1)]
offen_checks = [i for i, p in enumerate(check_paths) if _lese_probleme_schema(_json_datei(p)) is None and chunk_nums[i]]
if offen_checks:
await _set_step(guide_id, 5, f"Prüfe Lesbarkeit ({len(offen_checks)} Prüfer)…" if len(offen_checks) > 1 else "Prüfe Lesbarkeit…")
def sections_text(nums: list[int]) -> str:
return "\n\n".join(f"SECTION: {_titel(entries[num])}\n{by_num[num]['md']}" for num in nums)
slots = [{
"key": f"{guide_id}-lese-check-{i + 1}",
"prompt": _prompt(
"Guide-Lese-Check",
topic=topic, format_name=format_name, spec=spec,
sections=sections_text(chunk_nums[i]),
out_path=check_paths[i], extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result, p=check_paths[i]: _lese_probleme_schema(_json_datei(p))),
} for i in offen_checks]
res = await _race(topic, "Lese-Prüfung", slots, len(slots), _timeout("lese_check", max(chunk_sizes)), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Lese-Prüfung fehlgeschlagen")
return None
probleme_by_num: dict[int, str] = {}
for p in check_paths:
for item in (_lese_probleme_schema(_json_datei(p)) or []):
num = _titel_aufloesen(idx, item["section"])
if num in by_num and num not in probleme_by_num:
probleme_by_num[num] = item["problem"]
if probleme_by_num:
_log(topic, f"Lese-Prüfung: {len(probleme_by_num)} Section(s) beanstandet")
await _set_step(guide_id, 5, f"Überarbeite {len(probleme_by_num)} Section(s)…")
fix_chunks = [[num for num in nums if num in probleme_by_num] for nums in chunk_nums]
fix_offen = [i for i, nums in enumerate(fix_chunks) if nums]
fix_paths = [content_path.parent / f"{content_path.stem}.fix-{i + 1}.md" for i in range(writer_count)]
def auftraege_text(nums: list[int]) -> str:
return "\n\n".join(
f"SECTION: {_titel(entries[num])}\nPROBLEM: {probleme_by_num[num]}\nAKTUELLER INHALT:\n{by_num[num]['md']}"
for num in nums
)
results = await asyncio.gather(*[
run_agent(
f"{guide_id}-fix-w{i + 1}",
_prompt(
"Guide-Sections-Fix",
topic=topic, format_name=format_name, facts=facts, spec=spec,
auftraege=auftraege_text(fix_chunks[i]),
out_path=fix_paths[i], extra=_extra(instructions),
),
_timeout("writer", len(fix_chunks[i])), provider=provider, role="guide", capabilities="full",
)
for i in fix_offen
], return_exceptions=True)
if is_cancelled():
return None
for i, r in zip(fix_offen, results):
if isinstance(r, BaseException) or (not isinstance(r, BaseException) and r[0] != 0):
_log(topic, f"Sections-Fix {i + 1} fehlgeschlagen — Original bleibt")
ersetzt = 0
for i in fix_offen:
if not fix_paths[i].exists():
continue
for sec in _parse_fragment(fix_paths[i].read_text(encoding="utf-8")):
num = _titel_aufloesen(idx, sec["titel"])
if num in probleme_by_num and sec["md"].strip():
by_num[num] = sec
ersetzt += 1
_log(topic, f"Lese-Prüfung: {ersetzt} Section(s) überarbeitet")
await _set_progress(guide_id, "Setze zusammen…")
chapters: list[dict] = []
for ch in plan:
sections = [
@@ -823,14 +1185,19 @@ async def generate_guide(guide_id: str, topic: str, format_name: str, instructio
content_path = guide_content_path(topic, format_name)
content_path.parent.mkdir(parents=True, exist_ok=True)
project = project_dir(topic) if project_dir(topic).is_dir() else None
fragment_paths: list[Path] = []
try:
if guide_id in _cancelled:
return
# „Neu erstellen": fertiger Guide → kompletter Frischstart.
# Sonst sind Schritt-Dateien Reste eines Abbruchs/Fehlers → Resume.
if content_path.exists():
for p_alt in guide_slot_dateien(content_path):
p_alt.unlink(missing_ok=True)
if format_name == "OnePager":
chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path, fragment_paths)
chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path)
else:
alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8"))
if not alle:
@@ -840,7 +1207,7 @@ async def generate_guide(guide_id: str, topic: str, format_name: str, instructio
facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema")
chapters = await _generate_sections(
guide_id, topic, format_name, entries,
facts, instructions, provider, content_path, fragment_paths,
facts, instructions, provider, content_path,
)
if chapters is None or guide_id in _cancelled:
return
@@ -851,7 +1218,7 @@ async def generate_guide(guide_id: str, topic: str, format_name: str, instructio
)
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="done", progress=None, updated_at=now)
await update_guide(guide_id, status="done", progress=None, step=None, updated_at=now)
except asyncio.TimeoutError:
await _fail(guide_id, "Timeout bei der Generierung")
@@ -861,8 +1228,6 @@ async def generate_guide(guide_id: str, topic: str, format_name: str, instructio
await _fail(guide_id, str(e)[:2000])
finally:
_cancelled.discard(guide_id)
for p in fragment_paths:
p.unlink(missing_ok=True)
# --- Tutor-Chat ---