683 lines
30 KiB
Python
683 lines
30 KiB
Python
"""Guide-Generierung als Konsens-Pipeline (OnePager hat einen eigenen Weg).
|
|
|
|
Auswahl: 5 Agenten (min. 3, Grace) → Code-Voting (Mehrheit = Konsens) →
|
|
Mapping-Agent sortiert Strittiges → Klärungs-Loop (max. KONSENS_MAX_RUNDEN).
|
|
Gliederung: 5 Vorschläge (min. 3, Grace) → ein Judge wählt und kombiniert.
|
|
Schreiben: Writer pro Chunk. Lese-Prüfung: Check→Fix-Loop (max. Runden-Cap),
|
|
Folgerunden prüfen nur ersetzte Sections; danach bleiben Beanstandungen stehen.
|
|
Schritt-Dateien bleiben liegen → Abbruch erhält Fortschritt, ▶ setzt am offenen Schritt fort.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import math
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from agents import run_agent
|
|
from bausteine import _pdfs_konvertieren
|
|
from config import (
|
|
DEFAULT_PROVIDER, FORMAT_ANTEIL, KONSENS_GRACE, KONSENS_MAX_RUNDEN,
|
|
TEMPLATES_DIR,
|
|
)
|
|
from database import list_guides, update_guide
|
|
from fsutil import atomic_write_json
|
|
from jsonio import read_json_file as _json_datei
|
|
from onepager import _generate_onepager
|
|
from paths import bausteine_path, guide_content_path, project_dir
|
|
from pipeline import (
|
|
CANCELLED, FAILED, GenContext, _claude_error, _extra,
|
|
_fail, _gather_error, _log, _prompt, _race, _rest_schema, _runde_schema,
|
|
_semaphore, _set_progress, _set_step, _timeout, clear_guide_cancelled,
|
|
is_guide_cancelled, run_single_slot,
|
|
)
|
|
from textkit import (
|
|
_eindeutige_titel, _lade_bausteine, _parse_fragment, _split_chunks,
|
|
_titel, _titel_aufloesen, _titel_index, _zuteilung_text,
|
|
)
|
|
|
|
log = logging.getLogger("creator.guide")
|
|
|
|
GUIDE_STEPS = ("Auswahl", "Gliederung", "Schreiben", "Lese-Prüfung")
|
|
|
|
# Writer skalieren mit der Section-Zahl: 1 Writer je ~30 Sections (gedeckelt).
|
|
# Kleine Pakete vermeiden Lazy-Output bei langen Listen und begrenzen den Schaden
|
|
# eines fehlgeschlagenen Writers.
|
|
WRITER_SECTIONS = 30
|
|
WRITER_MAX = 20
|
|
|
|
|
|
def _guide_files(content_path: Path) -> dict:
|
|
d, stem = content_path.parent, content_path.stem
|
|
runden = range(1, KONSENS_MAX_RUNDEN + 1)
|
|
return {
|
|
# Runde 1: 5 volle Auswahl-Vorschläge; Runden 2+: 3 Klärungs-Voten
|
|
"auswahl_slots": {
|
|
n: [d / f"{stem}.auswahl-r{n}-{i}.json" for i in range(1, (5 if n == 1 else 3) + 1)]
|
|
for n in runden
|
|
},
|
|
"auswahl_mapping": {n: d / f"{stem}.auswahl-mapping-r{n}.json" for n in runden},
|
|
"gliederung_slots": [d / f"{stem}.gliederung-{i}.json" for i in (1, 2, 3, 4, 5)],
|
|
"gliederung": d / f"{stem}.gliederung.json", # Judge-Ausgabe
|
|
# chunk-/lese-check-/fix-Dateien sind dynamisch:
|
|
# {stem}.chunk-i.md, {stem}.lese-check-r{n}-{i}.json, {stem}.fix-r{n}-{i}.md
|
|
}
|
|
|
|
|
|
def guide_slot_dateien(content_path: Path) -> list[Path]:
|
|
"""Alle Schritt-Dateien eines Guides (für den Frischstart)."""
|
|
return [p for p in content_path.parent.glob(f"{content_path.stem}.*") if p != content_path]
|
|
|
|
|
|
def _resolve_auswahl(data, entries: dict[int, str], k_min: int, k_max: int) -> list[int] | None:
|
|
"""{"bausteine": [Titel]} → Nummern; None bei Schema-Verstoß/Drift/falschem Umfang."""
|
|
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
|
|
return None
|
|
idx = _titel_index(entries)
|
|
nums: list[int] = []
|
|
seen: set[int] = set()
|
|
total = unknown = 0
|
|
for t in data["bausteine"]:
|
|
total += 1
|
|
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
|
|
if num is None:
|
|
unknown += 1
|
|
elif num not in seen:
|
|
seen.add(num)
|
|
nums.append(num)
|
|
if total == 0 or (total - unknown) / total < 0.85:
|
|
return None
|
|
if len(nums) < 0.9 * k_min or len(nums) > 1.1 * k_max:
|
|
return None
|
|
return nums
|
|
|
|
|
|
def _lese_probleme_schema(data):
|
|
"""{"ok": true} → [] · {"probleme": [{"section", "problem"}]} → Liste · sonst None."""
|
|
if not isinstance(data, dict):
|
|
return None
|
|
if data.get("ok") is True:
|
|
return []
|
|
p = data.get("probleme")
|
|
if not isinstance(p, list) or not p:
|
|
return None
|
|
out = []
|
|
for x in p:
|
|
if not isinstance(x, dict) or not isinstance(x.get("section"), str) or not isinstance(x.get("problem"), str):
|
|
return None
|
|
out.append({"section": x["section"].strip(), "problem": x["problem"].strip()})
|
|
return out or None
|
|
|
|
|
|
def _resolve_gliederung(data, entries: dict[int, str], soll_min: int, soll_max: int) -> list[dict] | None:
|
|
"""{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}].
|
|
|
|
`soll_min`/`soll_max` = erlaubte Spanne gewählter Bausteine (mit kleiner Toleranz).
|
|
"""
|
|
if not isinstance(data, dict) or not isinstance(data.get("kapitel"), list):
|
|
return None
|
|
idx = _titel_index(entries)
|
|
chapters: list[dict] = []
|
|
seen: set[int] = set()
|
|
total = unknown = 0
|
|
for ch in data["kapitel"]:
|
|
if not isinstance(ch, dict) or not isinstance(ch.get("bausteine"), list):
|
|
return None
|
|
nums = []
|
|
for t in ch["bausteine"]:
|
|
total += 1
|
|
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
|
|
if num is None:
|
|
unknown += 1
|
|
elif num not in seen:
|
|
nums.append(num)
|
|
seen.add(num)
|
|
if nums:
|
|
chapters.append({"title": str(ch.get("titel", "")).strip() or "Kapitel", "nums": nums})
|
|
if not chapters or total == 0:
|
|
return None
|
|
if (total - unknown) / total < 0.85:
|
|
return None
|
|
if len(seen) < 0.9 * soll_min or len(seen) > 1.1 * soll_max:
|
|
return None
|
|
return chapters
|
|
|
|
|
|
def _voting(stimmen: list[list[int]]) -> tuple[list[int], dict[int, int]]:
|
|
"""Mehrheit (> Hälfte der Stimmen) → Konsens; ≥1 Stimme → Rest mit Votenzahl."""
|
|
zaehler: dict[int, int] = {}
|
|
for stimme in stimmen:
|
|
for num in stimme:
|
|
zaehler[num] = zaehler.get(num, 0) + 1
|
|
konsens = sorted(num for num, v in zaehler.items() if v > len(stimmen) / 2)
|
|
rest = {num: v for num, v in sorted(zaehler.items()) if v <= len(stimmen) / 2}
|
|
return konsens, rest
|
|
|
|
|
|
def _resolve_uebernehmen(data, entries: dict[int, str]) -> list[int] | None:
|
|
"""{"uebernehmen": [Titel]} → Nummern; leer gültig; >15 % unauflösbar → None."""
|
|
titel = _rest_schema(data)
|
|
if titel is None:
|
|
return None
|
|
if not titel:
|
|
return []
|
|
idx = _titel_index(entries)
|
|
nums: list[int] = []
|
|
seen: set[int] = set()
|
|
unknown = 0
|
|
for t in titel:
|
|
num = _titel_aufloesen(idx, t)
|
|
if num is None:
|
|
unknown += 1
|
|
elif num not in seen:
|
|
seen.add(num)
|
|
nums.append(num)
|
|
if unknown / len(titel) > 0.15:
|
|
return None
|
|
return nums
|
|
|
|
|
|
def _resolve_runde(data, entries: dict[int, str], konsens: list[int], k_min: int, k_max: int, final: bool) -> tuple[list[int], list[int]] | None:
|
|
"""Auswahl-Mapping-Runde auflösen — erzwingt die Zielgrößen-Grenzen schema-seitig.
|
|
|
|
Immer: Konsens + Aufnehmen + Rest muss 0.9*k_min erreichen können (sonst
|
|
wäre die Mindestgröße in späteren Runden unerreichbar). Aufnehmen über
|
|
1.1*k_max hinaus ist ungültig; final erzwingt zusätzlich leeren Rest und
|
|
die Mindestgröße. Ein bereits zu großer Konsens allein ist kein Fehler —
|
|
der Agent kann dann nichts mehr aufnehmen.
|
|
"""
|
|
res = _runde_schema(data, final=final)
|
|
if res is None:
|
|
return None
|
|
idx = _titel_index(entries)
|
|
bekannt = set(konsens)
|
|
listen: list[list[int]] = []
|
|
for titel_liste in res:
|
|
nums: list[int] = []
|
|
unknown = 0
|
|
for t in titel_liste:
|
|
num = _titel_aufloesen(idx, t)
|
|
if num is None:
|
|
unknown += 1
|
|
elif num not in bekannt:
|
|
bekannt.add(num)
|
|
nums.append(num)
|
|
if titel_liste and unknown / len(titel_liste) > 0.15:
|
|
return None
|
|
listen.append(nums)
|
|
aufnehmen, rest = listen
|
|
gesamt = len(konsens) + len(aufnehmen)
|
|
if aufnehmen and gesamt > 1.1 * k_max:
|
|
return None
|
|
if gesamt + len(rest) < 0.9 * k_min:
|
|
return None
|
|
if (final or not rest) and gesamt < 0.9 * k_min:
|
|
return None
|
|
return aufnehmen, rest
|
|
|
|
|
|
async def _konsens_auswahl(
|
|
ctx: GenContext, files: dict, entries: dict[int, str],
|
|
k_min: int, k_max: int, auswahl_auftrag: str, format_name: str,
|
|
bausteine_liste: str, instructions: str,
|
|
) -> list[int] | None:
|
|
"""Schritt 0: 5 Auswahl-Agenten → Code-Voting → Mapping → Klärungs-Loop.
|
|
|
|
Rückgabe: finale Baustein-Nummern; None = Fehler/Abbruch (bereits gemeldet).
|
|
"""
|
|
guide_id, topic, provider = ctx.guide_id, ctx.topic, ctx.provider
|
|
is_cancelled = ctx.is_cancelled
|
|
n = len(entries)
|
|
|
|
def titel_liste(nums) -> str:
|
|
return "\n".join(f"- {_titel(entries[num])}" for num in nums)
|
|
|
|
konsens: list[int] = []
|
|
rest: list[int] = []
|
|
runde = 0
|
|
while True:
|
|
runde += 1
|
|
final_runde = runde == KONSENS_MAX_RUNDEN
|
|
|
|
# Voten der Runde einsammeln — Slot-Dateien zuerst (Resume), Rest per Race
|
|
if runde == 1:
|
|
await _set_step(guide_id, 0, "Wähle Bausteine (5 Vorschläge)…")
|
|
stimmen: list[list[int]] = []
|
|
offen = []
|
|
for i, path in enumerate(files["auswahl_slots"][1], 1):
|
|
res = _resolve_auswahl(_json_datei(path), entries, k_min, k_max)
|
|
if res is not None:
|
|
stimmen.append(res)
|
|
else:
|
|
offen.append((i, path))
|
|
if len(stimmen) < 3:
|
|
slots = [
|
|
{
|
|
"key": f"{guide_id}-auswahl-r1-{i}",
|
|
"prompt": _prompt(
|
|
"Guide-Auswahl",
|
|
topic=topic, format_name=format_name, bausteine=bausteine_liste,
|
|
auswahl_auftrag=auswahl_auftrag, out_path=path, extra=_extra(instructions),
|
|
),
|
|
"role": "guide", "capabilities": "files",
|
|
"payload": (lambda result, p=path: _resolve_auswahl(_json_datei(p), entries, k_min, k_max)),
|
|
}
|
|
for i, path in offen
|
|
]
|
|
neue = await _race(
|
|
topic, "Guide-Auswahl", slots, 3 - len(stimmen), _timeout("guide_auswahl", n),
|
|
provider, cancelled=is_cancelled, grace=KONSENS_GRACE,
|
|
)
|
|
if is_cancelled():
|
|
return None
|
|
if neue is None:
|
|
await _fail(guide_id, "Auswahl fehlgeschlagen (Minimum nicht erreicht)")
|
|
return None
|
|
stimmen += neue
|
|
konsens, voten = _voting(stimmen)
|
|
rest = list(voten)
|
|
stimmen_n = len(stimmen)
|
|
else:
|
|
await _set_step(guide_id, 0, f"Kläre strittige Bausteine (Runde {runde}/{KONSENS_MAX_RUNDEN})…")
|
|
entscheidungen: list[list[int]] = []
|
|
offen = []
|
|
for i, path in enumerate(files["auswahl_slots"][runde], 1):
|
|
res = _resolve_uebernehmen(_json_datei(path), entries)
|
|
if res is not None:
|
|
entscheidungen.append(res)
|
|
else:
|
|
offen.append((i, path))
|
|
if len(entscheidungen) < 2:
|
|
slots = [
|
|
{
|
|
"key": f"{guide_id}-auswahl-r{runde}-{i}",
|
|
"prompt": _prompt(
|
|
"Guide-Klaerung",
|
|
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
|
|
konsens=titel_liste(konsens) or "- (leer)", rest=titel_liste(rest),
|
|
out_path=path, extra=_extra(instructions),
|
|
),
|
|
"role": "fast", "capabilities": "files",
|
|
"payload": (lambda result, p=path: _resolve_uebernehmen(_json_datei(p), entries)),
|
|
}
|
|
for i, path in offen
|
|
]
|
|
neue = await _race(
|
|
topic, f"Guide-Klärung r{runde}", slots, 2 - len(entscheidungen),
|
|
_timeout("auswahl", len(rest)), provider, cancelled=is_cancelled, grace=KONSENS_GRACE,
|
|
)
|
|
if is_cancelled():
|
|
return None
|
|
if neue is None:
|
|
await _fail(guide_id, f"Auswahl fehlgeschlagen (Runde {runde}, Minimum nicht erreicht)")
|
|
return None
|
|
entscheidungen += neue
|
|
voten = {num: sum(1 for e in entscheidungen if num in e) for num in rest}
|
|
stimmen_n = len(entscheidungen)
|
|
|
|
# Mapping-Agent sortiert die strittigen Voten — gültige Datei = Resume
|
|
mapping_path = files["auswahl_mapping"][runde]
|
|
ergebnis = _resolve_runde(_json_datei(mapping_path), entries, konsens, k_min, k_max, final_runde)
|
|
if ergebnis is None:
|
|
mapping_path.unlink(missing_ok=True)
|
|
voten_block = "\n".join(
|
|
f"{i}. {_titel(entries[num])} (von {voten[num]}/{stimmen_n} Agenten gewählt)"
|
|
for i, num in enumerate(rest, 1)
|
|
) or "- (keine)"
|
|
final_zusatz = (
|
|
"\n- LETZTE RUNDE: Es gibt keine weitere Runde. `rest` MUSS leer sein"
|
|
" — entscheide JEDEN Eintrag selbst: aufnehmen oder verwerfen."
|
|
if final_runde else ""
|
|
)
|
|
status, ergebnis = await run_single_slot(
|
|
ctx, f"Auswahl-Mapping r{runde}",
|
|
key=f"{guide_id}-auswahl-mapping-r{runde}",
|
|
prompt=_prompt(
|
|
"Guide-Auswahl-Mapping",
|
|
topic=topic, format_name=format_name, n=stimmen_n,
|
|
auswahl_auftrag=auswahl_auftrag, konsens_n=len(konsens),
|
|
k_min=k_min, k_max=k_max,
|
|
konsens=titel_liste(konsens) or "- (leer)", rest=voten_block,
|
|
final=final_zusatz, out_path=mapping_path,
|
|
),
|
|
role="judge", capabilities="files",
|
|
payload=lambda result, p=mapping_path, k=tuple(konsens), f=final_runde:
|
|
_resolve_runde(_json_datei(p), entries, list(k), k_min, k_max, f),
|
|
timeout=_timeout("auswahl_mapping", len(konsens) + len(rest)),
|
|
)
|
|
if status == CANCELLED:
|
|
return None
|
|
if status == FAILED:
|
|
await _fail(guide_id, f"Auswahl-Mapping fehlgeschlagen (Runde {runde})")
|
|
return None
|
|
|
|
aufnehmen, rest = ergebnis
|
|
konsens = konsens + aufnehmen
|
|
_log(topic, f"Auswahl Runde {runde}: {len(aufnehmen)} aufgenommen, {len(rest)} strittig, Konsens {len(konsens)}")
|
|
if not rest or final_runde:
|
|
return konsens
|
|
|
|
|
|
async def _generate_sections(
|
|
guide_id: str, topic: str, format_name: str, entries: dict[int, str],
|
|
facts: str, instructions: str, provider: str,
|
|
content_path: Path,
|
|
) -> list[dict] | None:
|
|
def is_cancelled() -> bool:
|
|
return is_guide_cancelled(guide_id)
|
|
|
|
ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled, guide_id=guide_id)
|
|
spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8")
|
|
files = _guide_files(content_path)
|
|
bausteine_liste = "\n".join(f"- {t}" for t in entries.values())
|
|
n = len(entries)
|
|
anteil_min, anteil_max, minimum, zweck = FORMAT_ANTEIL[format_name]
|
|
k_min = min(n, max(minimum, math.ceil(anteil_min * n)))
|
|
k_max = min(n, max(k_min, math.floor(anteil_max * n)))
|
|
auswahl_auftrag = (
|
|
f"Wähle MINDESTENS {k_min} und HÖCHSTENS {k_max} der Bausteine und baue daraus {zweck}. "
|
|
"Wähle, was diesem Zweck dient — lass weg, was dafür nicht nötig ist."
|
|
)
|
|
|
|
# Schritt 0: Auswahl-Konsens (5 Agenten → Voting → Mapping → Klärungs-Loop)
|
|
auswahl = await _konsens_auswahl(
|
|
ctx, files, entries, k_min, k_max, auswahl_auftrag, format_name,
|
|
bausteine_liste, instructions,
|
|
)
|
|
if auswahl is None:
|
|
return None
|
|
|
|
sel_entries = {num: entries[num] for num in auswahl}
|
|
soll = len(sel_entries)
|
|
sel_liste = "\n".join(f"- {t}" for t in sel_entries.values())
|
|
|
|
# Schritt 1: Gliederung — 5 Vorschläge (min. 3, Grace), ein Judge wählt.
|
|
# Gültiges gliederung.json (auch aus Altläufen) überspringt den Schritt.
|
|
plan = _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)
|
|
if plan is None:
|
|
await _set_step(guide_id, 1, "Gliederungs-Vorschläge (5 Agenten)…")
|
|
files["gliederung"].unlink(missing_ok=True)
|
|
vorschlaege: list[list[dict]] = []
|
|
offen = []
|
|
for i, path in enumerate(files["gliederung_slots"], 1):
|
|
res = _resolve_gliederung(_json_datei(path), sel_entries, soll, soll)
|
|
if res is not None:
|
|
vorschlaege.append(res)
|
|
else:
|
|
offen.append((i, path))
|
|
if len(vorschlaege) < 3:
|
|
slots = [
|
|
{
|
|
"key": f"{guide_id}-gliederung-{i}",
|
|
"prompt": _prompt(
|
|
"Guide-Gliederung",
|
|
topic=topic, format_name=format_name, bausteine=sel_liste,
|
|
out_path=path, extra=_extra(instructions),
|
|
),
|
|
"role": "guide", "capabilities": "files",
|
|
"payload": (lambda result, p=path: _resolve_gliederung(_json_datei(p), sel_entries, soll, soll)),
|
|
}
|
|
for i, path in offen
|
|
]
|
|
neue = await _race(
|
|
topic, "Gliederung", slots, 3 - len(vorschlaege), _timeout("plan", soll),
|
|
provider, cancelled=is_cancelled, grace=KONSENS_GRACE,
|
|
)
|
|
if is_cancelled():
|
|
return None
|
|
if neue is None:
|
|
await _fail(guide_id, "Gliederung fehlgeschlagen (Minimum nicht erreicht)")
|
|
return None
|
|
vorschlaege += neue
|
|
|
|
await _set_step(guide_id, 1, "Wähle beste Gliederung…")
|
|
bloecke = "\n\n".join(
|
|
f"### Vorschlag {i}\n"
|
|
+ "\n".join(_zuteilung_text([ch], {num: _titel(entries[num]) for num in ch["nums"]}) for ch in v)
|
|
for i, v in enumerate(vorschlaege, 1)
|
|
)
|
|
status, plan = await run_single_slot(
|
|
ctx, "Gliederungs-Judge",
|
|
key=f"{guide_id}-gliederung-judge",
|
|
prompt=_prompt(
|
|
"Guide-Gliederung-Judge",
|
|
topic=topic, format_name=format_name, zweck=zweck, n=len(vorschlaege),
|
|
bausteine=sel_liste, gliederungen=bloecke,
|
|
out_path=files["gliederung"], extra=_extra(instructions),
|
|
),
|
|
role="judge", capabilities="files",
|
|
payload=lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll),
|
|
timeout=_timeout("plan_judge", soll),
|
|
)
|
|
if status == CANCELLED:
|
|
return None
|
|
if status == FAILED:
|
|
await _fail(guide_id, "Gliederung fehlgeschlagen (Judge ohne gültiges Ergebnis)")
|
|
return None
|
|
|
|
# Schritt 2: Schreiben — vorhandene Chunk-Dateien werden übernommen (Resume)
|
|
total_sections = sum(len(c["nums"]) for c in plan)
|
|
chunks = _split_chunks(plan, min(WRITER_MAX, max(1, math.ceil(total_sections / WRITER_SECTIONS))))
|
|
zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks]
|
|
chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks]
|
|
writer_count = len(zuteilungen)
|
|
paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)]
|
|
offen = [i for i, p in enumerate(paths) if not p.exists()]
|
|
if offen:
|
|
await _set_step(guide_id, 2, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…")
|
|
results = await asyncio.gather(*[
|
|
run_agent(
|
|
f"{guide_id}-w{i + 1}",
|
|
_prompt(
|
|
"Guide-Writer",
|
|
topic=topic, format_name=format_name, zuteilung=zuteilungen[i],
|
|
facts=facts, spec=spec, out_path=paths[i], extra=_extra(instructions),
|
|
),
|
|
_timeout("writer", chunk_sizes[i]), provider=provider, role="guide", capabilities="full",
|
|
)
|
|
for i in offen
|
|
], return_exceptions=True)
|
|
if is_cancelled():
|
|
return None
|
|
for i, r in zip(offen, results):
|
|
if isinstance(r, BaseException):
|
|
_log(topic, f"Writer {i + 1}: {type(r).__name__}: {r}")
|
|
elif r[0] != 0:
|
|
_log(topic, f"Writer {i + 1}: {_claude_error('Fehler', *r)}")
|
|
elif not paths[i].exists():
|
|
_log(topic, f"Writer {i + 1}: keine Ausgabedatei erstellt")
|
|
if not any(p.exists() for p in paths):
|
|
await _fail(guide_id, _gather_error("Writer-Fehler", list(results)))
|
|
return None
|
|
|
|
idx = _titel_index(entries)
|
|
by_num: dict[int, dict] = {}
|
|
for p in paths:
|
|
if not p.exists():
|
|
continue
|
|
for sec in _parse_fragment(p.read_text(encoding="utf-8")):
|
|
num = _titel_aufloesen(idx, sec["titel"])
|
|
if num is None:
|
|
_log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)")
|
|
elif num not in by_num:
|
|
by_num[num] = sec
|
|
if not by_num:
|
|
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
|
|
return None
|
|
|
|
# Schritt 3: Lese-Prüfungs-Loop — Check pro Writer-Paket, Fix nur für
|
|
# beanstandete Sections; Folgerunden prüfen NUR die ersetzten Sections.
|
|
# Nach dem Runden-Cap bleiben offene Beanstandungen stehen.
|
|
chunk_nums = [[num for ch in chunk for num in ch["nums"] if num in by_num] for chunk in chunks]
|
|
|
|
def sections_text(nums: list[int]) -> str:
|
|
return "\n\n".join(f"SECTION: {_titel(entries[num])}\n{by_num[num]['md']}" for num in nums)
|
|
|
|
def auftraege_text(nums: list[int], probleme: dict[int, str]) -> str:
|
|
return "\n\n".join(
|
|
f"SECTION: {_titel(entries[num])}\nPROBLEM: {probleme[num]}\nAKTUELLER INHALT:\n{by_num[num]['md']}"
|
|
for num in nums
|
|
)
|
|
|
|
scope = chunk_nums
|
|
for runde in range(1, KONSENS_MAX_RUNDEN + 1):
|
|
check_paths = [content_path.parent / f"{content_path.stem}.lese-check-r{runde}-{i}.json" for i in range(1, writer_count + 1)]
|
|
offen_checks = [i for i, p in enumerate(check_paths) if scope[i] and _lese_probleme_schema(_json_datei(p)) is None]
|
|
if offen_checks:
|
|
await _set_step(guide_id, 3, f"Prüfe Lesbarkeit (Runde {runde}/{KONSENS_MAX_RUNDEN})…")
|
|
slots = [{
|
|
"key": f"{guide_id}-lese-check-r{runde}-{i + 1}",
|
|
"prompt": _prompt(
|
|
"Guide-Lese-Check",
|
|
topic=topic, format_name=format_name, spec=spec,
|
|
sections=sections_text(scope[i]),
|
|
out_path=check_paths[i], extra=_extra(instructions),
|
|
),
|
|
"role": "judge", "capabilities": "files",
|
|
"payload": (lambda result, p=check_paths[i]: _lese_probleme_schema(_json_datei(p))),
|
|
} for i in offen_checks]
|
|
res = await _race(topic, f"Lese-Prüfung r{runde}", slots, len(slots), _timeout("lese_check", max(chunk_sizes)), provider, cancelled=is_cancelled)
|
|
if is_cancelled():
|
|
return None
|
|
if res is None:
|
|
if runde == 1:
|
|
await _fail(guide_id, "Lese-Prüfung fehlgeschlagen")
|
|
return None
|
|
_log(topic, f"Lese-Prüfung Runde {runde} fehlgeschlagen — Stand bleibt")
|
|
break
|
|
|
|
probleme_by_num: dict[int, str] = {}
|
|
for i, p in enumerate(check_paths):
|
|
geltung = set(scope[i])
|
|
for item in (_lese_probleme_schema(_json_datei(p)) or []):
|
|
num = _titel_aufloesen(idx, item["section"])
|
|
if num in geltung and num in by_num and num not in probleme_by_num:
|
|
probleme_by_num[num] = item["problem"]
|
|
if not probleme_by_num:
|
|
break
|
|
|
|
_log(topic, f"Lese-Prüfung Runde {runde}: {len(probleme_by_num)} Section(s) beanstandet")
|
|
await _set_step(guide_id, 3, f"Überarbeite {len(probleme_by_num)} Section(s) (Runde {runde})…")
|
|
fix_chunks = [[num for num in nums if num in probleme_by_num] for nums in chunk_nums]
|
|
fix_paths = [content_path.parent / f"{content_path.stem}.fix-r{runde}-{i + 1}.md" for i in range(writer_count)]
|
|
fix_offen = [i for i, nums in enumerate(fix_chunks) if nums and not fix_paths[i].exists()]
|
|
results = await asyncio.gather(*[
|
|
run_agent(
|
|
f"{guide_id}-fix-r{runde}-w{i + 1}",
|
|
_prompt(
|
|
"Guide-Sections-Fix",
|
|
topic=topic, format_name=format_name, facts=facts, spec=spec,
|
|
auftraege=auftraege_text(fix_chunks[i], probleme_by_num),
|
|
out_path=fix_paths[i], extra=_extra(instructions),
|
|
),
|
|
_timeout("writer", len(fix_chunks[i])), provider=provider, role="guide", capabilities="full",
|
|
)
|
|
for i in fix_offen
|
|
], return_exceptions=True)
|
|
if is_cancelled():
|
|
return None
|
|
for i, r in zip(fix_offen, results):
|
|
if isinstance(r, BaseException) or (not isinstance(r, BaseException) and r[0] != 0):
|
|
_log(topic, f"Sections-Fix {i + 1} (Runde {runde}) fehlgeschlagen — Original bleibt")
|
|
ersetzt: set[int] = set()
|
|
for p in fix_paths:
|
|
if not p.exists():
|
|
continue
|
|
for sec in _parse_fragment(p.read_text(encoding="utf-8")):
|
|
num = _titel_aufloesen(idx, sec["titel"])
|
|
if num in probleme_by_num and sec["md"].strip():
|
|
by_num[num] = sec
|
|
ersetzt.add(num)
|
|
_log(topic, f"Lese-Prüfung Runde {runde}: {len(ersetzt)} Section(s) überarbeitet")
|
|
if not ersetzt:
|
|
break
|
|
if runde == KONSENS_MAX_RUNDEN:
|
|
_log(topic, f"Lese-Prüfung: Cap erreicht — letzte Überarbeitung bleibt ungeprüft")
|
|
break
|
|
scope = [[num for num in nums if num in ersetzt] for nums in chunk_nums]
|
|
|
|
await _set_progress(guide_id, "Setze zusammen…")
|
|
chapters: list[dict] = []
|
|
for ch in plan:
|
|
sections = [
|
|
{"num": num, "title": _titel(entries[num]), "md": by_num[num]["md"]}
|
|
for num in ch["nums"] if num in by_num
|
|
]
|
|
if sections:
|
|
chapters.append({"title": ch["title"], "sections": sections})
|
|
geplant = {num for ch in plan for num in ch["nums"]}
|
|
missing = sorted(geplant - set(by_num))
|
|
if missing:
|
|
_log(topic, f"Sections fehlen in der Writer-Ausgabe: {[_titel(entries[n]) for n in missing]}")
|
|
if not chapters:
|
|
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
|
|
return None
|
|
return chapters
|
|
|
|
|
|
async def reconcile_guides() -> None:
|
|
"""DB↔Dateisystem abgleichen: status=done ohne Content-Datei → error.
|
|
|
|
Läuft beim Server-Start (nach init_db) — fängt Crashes zwischen
|
|
Datei-Write und Status-Update ab.
|
|
"""
|
|
for g in await list_guides():
|
|
if g["status"] == "done" and not guide_content_path(g["topic"], g["format"]).exists():
|
|
log.warning("[%s] Guide %s: done ohne Content-Datei — auf error gesetzt", g["topic"], g["id"])
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
await update_guide(g["id"], status="error", error_msg="Inhalt fehlt — neu generieren", updated_at=now)
|
|
|
|
|
|
async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
|
|
async with _semaphore:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now)
|
|
|
|
content_path = guide_content_path(topic, format_name)
|
|
content_path.parent.mkdir(parents=True, exist_ok=True)
|
|
project = project_dir(topic) if project_dir(topic).is_dir() else None
|
|
|
|
try:
|
|
if is_guide_cancelled(guide_id):
|
|
return
|
|
|
|
if project:
|
|
await asyncio.to_thread(_pdfs_konvertieren, project)
|
|
|
|
# „Neu erstellen": fertiger Guide → kompletter Frischstart.
|
|
# Sonst sind Schritt-Dateien Reste eines Abbruchs/Fehlers → Resume.
|
|
if content_path.exists():
|
|
for p_alt in guide_slot_dateien(content_path):
|
|
p_alt.unlink(missing_ok=True)
|
|
|
|
if format_name == "OnePager":
|
|
chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path)
|
|
else:
|
|
alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8"))
|
|
if not alle:
|
|
await _fail(guide_id, "Keine Bausteine gefunden")
|
|
return
|
|
entries = _eindeutige_titel(alle)
|
|
facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema")
|
|
chapters = await _generate_sections(
|
|
guide_id, topic, format_name, entries,
|
|
facts, instructions, provider, content_path,
|
|
)
|
|
if chapters is None or is_guide_cancelled(guide_id):
|
|
return
|
|
|
|
atomic_write_json(content_path, {"topic": topic, "format": format_name, "chapters": chapters}, indent=1)
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
await update_guide(guide_id, status="done", progress=None, step=None, updated_at=now)
|
|
|
|
except asyncio.TimeoutError:
|
|
await _fail(guide_id, "Timeout bei der Generierung")
|
|
except FileNotFoundError:
|
|
await _fail(guide_id, "Bausteine fehlen")
|
|
except Exception as e:
|
|
log.exception("[%s] Guide-Generierung fehlgeschlagen (%s)", topic, guide_id)
|
|
await _fail(guide_id, str(e)[:2000])
|
|
finally:
|
|
clear_guide_cancelled(guide_id)
|