Files
creator/backend/generator.py
2026-06-07 11:42:12 +02:00

1353 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import math
import shutil
import subprocess
import re
import uuid
from datetime import datetime, timezone
from pathlib import Path
from agents import run_agent, kill_process
from config import (
DEFAULT_PROVIDER,
FORMAT_ANTEIL,
TEMPLATES_DIR,
TIMEOUTS,
MAX_CONCURRENT_GENERATIONS,
)
from database import update_guide
from paths import arbeit_dir, bausteine_path, guide_content_path, project_dir
_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS)
_cancelled: set[str] = set()
async def cancel_guide(guide_id: str) -> bool:
_cancelled.add(guide_id)
kill_process(guide_id)
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now)
return True
async def _set_progress(guide_id: str, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, progress=progress, updated_at=now)
def _prompt(name: str, **kwargs) -> str:
template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8")
return template.format(**kwargs)
def _extra(instructions: str) -> str:
return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
def _log(topic: str, msg: str) -> None:
print(f"[generator] {topic}: {msg}", flush=True)
def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str:
stderr = (stderr or "").strip()
if stderr:
return f"{label}: {stderr[:1000]}"
tail = (stdout or "").strip()[-500:]
if tail:
return f"{label} (exit {returncode}, stderr leer): …{tail}"
return f"{label} (exit {returncode}, ohne Ausgabe)"
def _gather_error(label: str, results: list) -> str:
for r in results:
if isinstance(r, BaseException):
return f"{label}: {type(r).__name__}: {r}"
returncode, stdout, stderr = r
if returncode != 0:
return _claude_error(label, returncode, stdout, stderr)
return f"{label}: kein verwertbares Ergebnis"
async def _fail(guide_id: str, msg: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now)
def _norm_titel(s: str) -> str:
"""Normalisiert einen Titel für den Schlüssel-Vergleich."""
s = re.sub(r"[`'\"<>]", "", s)
return re.sub(r"\s+", " ", s).strip().lower()
def _titel(entry: str) -> str:
return entry.split("")[0].strip() or entry
def _eindeutige_titel(entries: dict[int, str]) -> dict[int, str]:
"""Macht Titel eindeutig (Suffix " (2)", " (3)" …), damit sie als Schlüssel taugen."""
seen: dict[str, int] = {}
out: dict[int, str] = {}
for num, text in entries.items():
titel = _titel(text)
key = _norm_titel(titel)
seen[key] = seen.get(key, 0) + 1
if seen[key] > 1:
rest = text.split("", 1)
text = f"{titel} ({seen[key]})" + (f"{rest[1]}" if len(rest) == 2 else "")
# zweiter Durchlauf nicht nötig: Suffixe kollidieren praktisch nicht
out[num] = text
return out
def _titel_index(entries: dict[int, str]) -> dict[str, int]:
return {_norm_titel(_titel(text)): num for num, text in entries.items()}
def _json_datei(path: Path):
"""Liest eine JSON-Datei (Code-Fences tolerant); None bei fehlend/ungültig."""
if not path.exists():
return None
try:
text = path.read_text(encoding="utf-8").strip()
text = re.sub(r"^```(?:json)?\s*|\s*```$", "", text)
return json.loads(text)
except Exception:
return None
def _timeout(step: str, n: int = 0) -> int:
base, per = TIMEOUTS[step]
return base + per * n
_MAX_RESTARTS = 2
async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None:
"""Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse.
Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)`
prüft die Gültigkeit und liefert das Slot-Ergebnis oder None.
Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das
Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt.
`cancelled()` → True bricht ab (keine Restarts, Rückgabe None).
"""
attempts = {i: 0 for i in range(len(slots))}
tasks: dict[asyncio.Task, int] = {}
def spawn(i: int) -> None:
slot = slots[i]
task = asyncio.create_task(run_agent(
slot["key"], slot["prompt"], timeout,
provider=provider, role=slot["role"], capabilities=slot["capabilities"],
))
tasks[task] = i
for i in range(len(slots)):
spawn(i)
results: list = []
try:
while tasks:
if cancelled and cancelled():
return None
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
for task in done:
i = tasks.pop(task)
payload, err = None, None
try:
result = task.result()
if result[0] != 0:
err = _claude_error("Fehler", *result)
else:
payload = slots[i]["payload"](result)
if payload is None:
err = "Ergebnis ungültig/nicht parsebar"
except asyncio.TimeoutError:
err = f"Timeout nach {timeout}s"
except Exception as e:
err = f"{type(e).__name__}: {e}"
if payload is not None:
results.append(payload)
if on_update:
on_update(len(results))
if len(results) >= quorum:
return results
continue
_log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}")
attempts[i] += 1
if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()):
spawn(i)
_log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)")
return None
finally:
for task, i in tasks.items():
kill_process(slots[i]["key"])
task.cancel()
if tasks:
await asyncio.gather(*tasks.keys(), return_exceptions=True)
# --- Bausteine-Pipeline: 4x Recherche (3) → 2x Auswahl (1) → Prüfung — reines Inventar, unsortiert ---
_bausteine_progress: dict[str, str] = {}
_bausteine_errors: dict[str, str] = {}
_bausteine_cancelled: set[str] = set()
_bausteine_step: dict[str, int] = {}
BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung")
_CATEGORIES = ("KERN", "WICHTIG", "REST") # nur noch für den Altformat-Reader
def _bausteine_steps(topic: str) -> tuple:
"""Projekte haben einen 4. Schritt: Themenfeld-Ergänzung per Web-Recherche."""
if project_dir(topic).is_dir():
return BAUSTEINE_STEPS + ("Ergänzung",)
return BAUSTEINE_STEPS
def _bausteine_files(topic: str) -> dict:
arbeit = arbeit_dir(topic)
return {
"final": bausteine_path(topic),
"arbeit": arbeit,
"recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)],
"auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)],
"auswahl_check": arbeit / "auswahl-check.json",
"ergaenzung": arbeit / "ergaenzung.json",
}
def _alle_slot_dateien(files: dict) -> list[Path]:
return [*files["recherche"], *files["auswahl"], files["auswahl_check"], files["ergaenzung"]]
def cancel_bausteine(topic: str) -> bool:
if topic not in _bausteine_progress:
return False
_bausteine_cancelled.add(topic)
kill_process(f"bausteine-{topic}-")
return True
def _resume_step(topic: str) -> int:
"""Erster noch offener Schritt anhand der persistierten Zwischendateien."""
files = _bausteine_files(topic)
if sum(p.exists() for p in files["recherche"]) < 3:
return 0
if not any(p.exists() for p in files["auswahl"]):
return 1
if not files["auswahl_check"].exists():
return 2
if project_dir(topic).is_dir() and not files["ergaenzung"].exists():
return 3
return len(_bausteine_steps(topic))
def bausteine_status(topic: str) -> dict:
steps = _bausteine_steps(topic)
ready = bausteine_path(topic).exists()
generating = topic in _bausteine_progress
partial = False
if generating:
current = _bausteine_step.get(topic)
states = [
"pending" if current is None else "done" if i < current else "active" if i == current else "pending"
for i in range(len(steps))
]
elif ready:
states = ["done"] * len(steps)
else:
nxt = _resume_step(topic)
partial = nxt > 0
states = ["done" if i < nxt else "pending" for i in range(len(steps))]
return {
"ready": ready,
"generating": generating,
"progress": _bausteine_progress.get(topic),
"error": _bausteine_errors.get(topic),
"partial": partial,
"steps": [{"label": label, "state": s} for label, s in zip(steps, states)],
}
def active_bausteine() -> list[dict]:
return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()]
def reset_bausteine(topic: str) -> None:
files = _bausteine_files(topic)
files["final"].unlink(missing_ok=True)
shutil.rmtree(files["arbeit"], ignore_errors=True)
_bausteine_errors.pop(topic, None)
def _ergaenzung_schema(data):
"""{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None."""
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
return None
out = []
for b in data["bausteine"]:
if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str):
return None
titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip()
if not titel:
return None
out.append((titel, beschreibung))
return out
def _pdfs_konvertieren(project: Path) -> None:
"""PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern.
Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die
.txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet.
Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt
unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten).
"""
pdfs = list(project.rglob("*.pdf"))
if not pdfs:
return
if shutil.which("pdftotext") is None:
raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden")
for pdf in pdfs:
txt = pdf.with_suffix(".txt")
if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime:
continue
try:
subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120)
_log(project.name, f"PDF konvertiert: {pdf.name}{txt.name}")
except Exception as e:
raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e
def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str:
if project:
source = _prompt("Bausteine-Quelle-Projekt", project=project)
else:
source = _prompt("Bausteine-Quelle-Thema", topic=topic)
return _prompt(
"Bausteine-Recherche",
topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions),
)
def _parse_auswahl(text: str) -> dict[int, str]:
"""Parst eine Baustein-Liste: `N. Titel — Kurzbeschreibung` pro Zeile."""
entries: dict[int, str] = {}
last = None
for line in text.splitlines():
m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line)
if m:
last = int(m.group(1))
entries[last] = m.group(2)
elif last is not None and line.strip():
entries[last] += " " + line.strip()
return entries
def _parse_kategorien(text: str) -> dict[str, list[str]]:
"""Altformat-Reader: finale Baustein-Datei mit ## KERN/WICHTIG/REST-Abschnitten."""
cats: dict[str, list[str]] = {}
current = None
for line in text.splitlines():
s = line.strip()
m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE)
if m:
current = m.group(1).upper()
cats.setdefault(current, [])
continue
m = re.match(r"(\d+)[.)]\s+(.*\S)", s)
if m and current:
cats[current].append(m.group(2))
return cats
def _lade_bausteine(text: str) -> dict[int, str]:
"""Lädt die finale Baustein-Datei — sortierte Liste (neu) oder Kategorien (Altformat)."""
if re.search(r"^#+\s*KERN\b", text, re.IGNORECASE | re.MULTILINE):
cats = _parse_kategorien(text)
texts = [t for cat in _CATEGORIES for t in cats.get(cat, [])]
return {i: t for i, t in enumerate(texts, 1)}
return _parse_auswahl(text)
def _file_payload(path: Path):
"""Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält."""
if not path.exists():
return None
text = path.read_text(encoding="utf-8")
return text if _parse_auswahl(text) else None
def _auswahl_payload(path: Path):
if not path.exists():
return None
text = path.read_text(encoding="utf-8")
entries = _parse_auswahl(text)
return (text, entries) if entries else None
def _auswahl_check_schema(data):
"""{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß."""
if not isinstance(data, dict):
return None
nach = data.get("nachtraege", [])
streich = data.get("streichen", [])
if not isinstance(nach, list) or not isinstance(streich, list):
return None
if not all(isinstance(x, str) for x in [*nach, *streich]):
return None
return {"nachtraege": nach, "streichen": streich}
def _titel_aufloesen(idx: dict[str, int], t: str) -> int | None:
"""Titel → Nummer; toleriert mitgeschleppte Beschreibungen ("Titel — …")."""
if not isinstance(t, str):
return None
return idx.get(_norm_titel(t)) or idx.get(_norm_titel(_titel(t)))
async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
if topic in _bausteine_progress:
return
_bausteine_progress[topic] = "Wartend…"
_bausteine_errors.pop(topic, None)
files = _bausteine_files(topic)
final_path = files["final"]
project = project_dir(topic) if project_dir(topic).is_dir() else None
def set_p(msg: str, step: int | None = None) -> None:
_bausteine_progress[topic] = msg
if step is not None:
_bausteine_step[topic] = step
def is_cancelled() -> bool:
return topic in _bausteine_cancelled
def abgebrochen() -> None:
_bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten"
try:
async with _semaphore:
files["arbeit"].mkdir(parents=True, exist_ok=True)
if project:
await asyncio.to_thread(_pdfs_konvertieren, project)
# „Neu erstellen": fertige Bausteine → kompletter Frischstart.
# Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume.
if final_path.exists():
for p_alt in _alle_slot_dateien(files):
p_alt.unlink(missing_ok=True)
# Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen
recherchen: list[str] = []
offen = []
for i, path in enumerate(files["recherche"], 1):
text = _file_payload(path)
if text is not None and len(recherchen) < 3:
recherchen.append(text)
else:
offen.append((i, path))
vorhanden = len(recherchen)
set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0)
if vorhanden < 3:
caps = "files" if project else "full"
slots = [
{
"key": f"bausteine-{topic}-recherche-{i}",
"prompt": _build_recherche_prompt(topic, path, instructions, project),
"role": "quick", "capabilities": caps,
"payload": (lambda result, p=path: _file_payload(p)),
}
for i, path in offen
]
neue = await _race(
topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider,
on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"),
cancelled=is_cancelled,
)
if is_cancelled():
abgebrochen()
return
if neue is None:
_bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)"
return
recherchen += neue
# Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen
n_est = max(len(_parse_auswahl(t)) for t in recherchen)
bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None)
if bestehende is not None:
flat, entries = bestehende
else:
set_p("Konsolidiere Recherche…", step=1)
results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1))
slots = [
{
"key": f"bausteine-{topic}-auswahl-{i}",
"prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path),
"role": "fast", "capabilities": "files",
"payload": (lambda result, p=path: _auswahl_payload(p)),
}
for i, path in enumerate(files["auswahl"], 1)
]
auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled)
if is_cancelled():
abgebrochen()
return
if auswahl is None:
_bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)"
return
flat, entries = auswahl[0]
# Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal)
set_p("Prüfe Auswahl…", step=2)
check_path = files["auswahl_check"]
patch = _auswahl_check_schema(_json_datei(check_path))
if patch is None:
check_path.unlink(missing_ok=True)
titel_listen = "\n\n".join(
f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values())
for i, text in enumerate(recherchen, 1)
)
slots = [{
"key": f"bausteine-{topic}-auswahlcheck-1",
"prompt": _prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _auswahl_check_schema(_json_datei(check_path))),
}]
checks = await _race(topic, "Auswahl-Check", slots, 1, _timeout("auswahl_check", len(entries)), provider, cancelled=is_cancelled)
if is_cancelled():
abgebrochen()
return
if checks is None:
_log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort")
else:
patch = checks[0]
if patch is not None and (patch["streichen"] or patch["nachtraege"]):
idx = _titel_index(entries)
weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None}
if weg:
_log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}")
entries = {n: t for n, t in entries.items() if n not in weg}
if patch["nachtraege"]:
_log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine")
texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"])
entries = {i: t for i, t in enumerate(texts, 1)}
# Schritt 4 (nur Projekte): Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt,
# ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung].
if project:
set_p("Ergänze Themenfeld…", step=3)
erg_path = files["ergaenzung"]
ergaenzungen = _ergaenzung_schema(_json_datei(erg_path))
if ergaenzungen is None:
erg_path.unlink(missing_ok=True)
slots = [{
"key": f"bausteine-{topic}-ergaenzung-1",
"prompt": _prompt(
"Bausteine-Ergaenzung",
topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()),
out_path=erg_path, extra=_extra(instructions),
),
"role": "quick", "capabilities": "full",
"payload": (lambda result: _ergaenzung_schema(_json_datei(erg_path))),
}]
res = await _race(topic, "Ergänzung", slots, 1, _timeout("ergaenzung"), provider, cancelled=is_cancelled)
if is_cancelled():
abgebrochen()
return
if res is None:
_bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)"
return
ergaenzungen = res[0]
idx = _titel_index(entries)
neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None]
if neu:
_log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt")
start = max(entries, default=0) + 1
for off, (t, b) in enumerate(neu):
entries[start + off] = f"{t}{b} [Ergänzung]"
# Titel eindeutig machen und unsortiertes Inventar schreiben
entries = _eindeutige_titel(entries)
final_path.write_text(
"\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n",
encoding="utf-8",
)
except Exception as e:
_bausteine_errors[topic] = str(e)[:2000]
finally:
# Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit.
_bausteine_progress.pop(topic, None)
_bausteine_step.pop(topic, None)
_bausteine_cancelled.discard(topic)
# --- Guide-Generierung: 6 Schritte mit Prüfung nach jeder Phase (OnePager hat einen eigenen Weg) ---
# Prüf-Agenten notieren nur Probleme; das Anpassen übernimmt der jeweilige Erzeuger-Typ.
# Schritt-Dateien bleiben liegen → Abbruch erhält Fortschritt, ▶ setzt am offenen Schritt fort.
GUIDE_STEPS = ("Auswahl", "Auswahl-Prüfung", "Gliederung", "Gliederungs-Prüfung", "Schreiben", "Lese-Prüfung")
# Writer skalieren mit der Section-Zahl: 1 Writer je ~30 Sections (gedeckelt).
# Kleine Pakete vermeiden Lazy-Output bei langen Listen und begrenzen den Schaden
# eines fehlgeschlagenen Writers.
WRITER_SECTIONS = 30
WRITER_MAX = 20
def _guide_files(content_path: Path) -> dict:
d, stem = content_path.parent, content_path.stem
return {
"auswahl": d / f"{stem}.auswahl.json",
"auswahl_check": d / f"{stem}.auswahl-check.json",
"gliederung": d / f"{stem}.gliederung.json",
"gliederung_check": d / f"{stem}.gliederung-check.json",
# chunk-/lese-check-/fix-Dateien sind dynamisch: {stem}.chunk-i.md usw.
}
def guide_slot_dateien(content_path: Path) -> list[Path]:
"""Alle Schritt-Dateien eines Guides (für den Frischstart)."""
return [p for p in content_path.parent.glob(f"{content_path.stem}.*") if p != content_path]
async def _set_step(guide_id: str, step: int, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, step=step, progress=progress, updated_at=now)
def _resolve_auswahl(data, entries: dict[int, str], k_min: int, k_max: int) -> list[int] | None:
"""{"bausteine": [Titel]} → Nummern; None bei Schema-Verstoß/Drift/falschem Umfang."""
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
return None
idx = _titel_index(entries)
nums: list[int] = []
seen: set[int] = set()
total = unknown = 0
for t in data["bausteine"]:
total += 1
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
if num is None:
unknown += 1
elif num not in seen:
seen.add(num)
nums.append(num)
if total == 0 or (total - unknown) / total < 0.85:
return None
if len(nums) < 0.9 * k_min or len(nums) > 1.1 * k_max:
return None
return nums
def _probleme_schema(data):
"""{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None."""
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return []
p = data.get("probleme")
if not isinstance(p, list) or not p:
return None
out = [str(x).strip() for x in p if str(x).strip()]
return out or None
def _lese_probleme_schema(data):
"""{"ok": true} → [] · {"probleme": [{"section", "problem"}]} → Liste · sonst None."""
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return []
p = data.get("probleme")
if not isinstance(p, list) or not p:
return None
out = []
for x in p:
if not isinstance(x, dict) or not isinstance(x.get("section"), str) or not isinstance(x.get("problem"), str):
return None
out.append({"section": x["section"].strip(), "problem": x["problem"].strip()})
return out or None
def _resolve_gliederung(data, entries: dict[int, str], soll_min: int, soll_max: int) -> list[dict] | None:
"""{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}].
`soll_min`/`soll_max` = erlaubte Spanne gewählter Bausteine (mit kleiner Toleranz).
"""
if not isinstance(data, dict) or not isinstance(data.get("kapitel"), list):
return None
idx = _titel_index(entries)
chapters: list[dict] = []
seen: set[int] = set()
total = unknown = 0
for ch in data["kapitel"]:
if not isinstance(ch, dict) or not isinstance(ch.get("bausteine"), list):
return None
nums = []
for t in ch["bausteine"]:
total += 1
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
if num is None:
unknown += 1
elif num not in seen:
nums.append(num)
seen.add(num)
if nums:
chapters.append({"title": str(ch.get("titel", "")).strip() or "Kapitel", "nums": nums})
if not chapters or total == 0:
return None
if (total - unknown) / total < 0.85:
return None
if len(seen) < 0.9 * soll_min or len(seen) > 1.1 * soll_max:
return None
return chapters
def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]:
"""Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl."""
n = max(1, min(n, len(chapters)))
chunks: list[list[dict]] = []
current: list[dict] = []
count = 0
remaining_total = sum(len(c["nums"]) for c in chapters)
remaining_chunks = n
for ch in chapters:
current.append(ch)
count += len(ch["nums"])
if remaining_chunks > 1 and count >= remaining_total / remaining_chunks:
chunks.append(current)
remaining_total -= count
remaining_chunks -= 1
current = []
count = 0
if current:
chunks.append(current)
return chunks
def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str:
lines = []
for ch in chunk:
lines.append(f"KAPITEL: {ch['title']}")
lines.extend(f"- {entries[num]}" for num in ch["nums"])
return "\n".join(lines)
_FRAGMENT_KAPITEL_RE = re.compile(r"<!--\s*kapitel\s*:\s*(.*?)\s*-->", re.IGNORECASE)
_FRAGMENT_SECTION_RE = re.compile(r"<!--\s*section\s*:\s*(.*?)\s*-->", re.IGNORECASE)
def _parse_fragment(text: str) -> list[dict]:
"""Parst eine Writer-Datei → [{"kapitel", "titel", "md"}] in Datei-Reihenfolge."""
sections: list[dict] = []
kapitel = None
current = None
for line in text.splitlines():
s = line.strip()
m = _FRAGMENT_KAPITEL_RE.match(s)
if m:
kapitel = m.group(1)
current = None
continue
m = _FRAGMENT_SECTION_RE.match(s)
if m:
current = {"kapitel": kapitel, "titel": m.group(1), "md": []}
sections.append(current)
continue
if current is not None:
current["md"].append(line)
for sec in sections:
sec["md"] = "\n".join(sec["md"]).strip()
return sections
async def _generate_onepager(
guide_id: str, topic: str, instructions: str, provider: str,
project: Path | None, content_path: Path,
) -> list[dict] | None:
def is_cancelled() -> bool:
return guide_id in _cancelled
# 3×3-Raster: 7 Karten mit festen Schlüsseln (Reihenfolge = Lesereihenfolge mobil)
KARTEN_KEYS = ("info", "eigenschaften", "beispiel", "zusammenhaenge", "voraussetzungen", "modern", "veraltet")
def karten_schema(data):
"""{"karten": {key: {titel, md}}} → Liste · sonst None."""
if not isinstance(data, dict):
return None
karten = data.get("karten")
if not isinstance(karten, dict):
return None
out = []
for key in KARTEN_KEYS:
k = karten.get(key)
if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("md"), str):
return None
titel, md = k["titel"].strip(), k["md"].strip()
if not titel or len(md) < 5: # abgebrochene/leere Karten sind ungültig
return None
out.append({"key": key, "titel": titel, "md": md})
return out
d, stem = content_path.parent, content_path.stem
recherche_path = d / f"{stem}.recherche.md"
recherche_check_path = d / f"{stem}.recherche-check.json"
karten_path = d / f"{stem}.karten.json"
check_path = d / f"{stem}.onepager-check.json"
# Projekte bekommen eigene Recherche-Dimensionen — Produkt-Fragen
# (Version, Lizenz, Alternativen) laufen dort ins Leere.
if project:
source = _prompt("OnePager-Quelle-Projekt", project=project)
recherche_template = "OnePager-Recherche-Projekt"
recherche_check_template = "OnePager-Recherche-Check-Projekt"
else:
source = _prompt("OnePager-Quelle-Thema", topic=topic)
recherche_template = "OnePager-Recherche"
recherche_check_template = "OnePager-Recherche-Check"
def recherche_payload(result=None):
if not recherche_path.exists():
return None
text = recherche_path.read_text(encoding="utf-8").strip()
return text or None
# Schritt 1: Recherche — vorhandene Datei wird übernommen (Resume)
recherche = recherche_payload()
if recherche is None:
await _set_step(guide_id, 0, "Recherchiere…")
slots = [{
"key": f"{guide_id}-recherche",
"prompt": _prompt(recherche_template, topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)),
"role": "quick", "capabilities": "files" if project else "full",
"payload": recherche_payload,
}]
res = await _race(topic, "OnePager-Recherche", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Recherche fehlgeschlagen")
return None
recherche = res[0]
# Schritt 2: Recherche-Prüfung — notiert Probleme; Anpassung macht ein Recherche-Agent
if not recherche_check_path.exists():
await _set_step(guide_id, 1, "Prüfe Recherche…")
slots = [{
"key": f"{guide_id}-recherche-check",
"prompt": _prompt(recherche_check_template, topic=topic, recherche=recherche, out_path=recherche_check_path),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _probleme_schema(_json_datei(recherche_check_path))),
}]
res = await _race(topic, "Recherche-Prüfung", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Recherche-Prüfung fehlgeschlagen")
return None
probleme = res[0]
if probleme:
_log(topic, f"Recherche-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(guide_id, 1, "Passe Recherche an…")
slots = [{
"key": f"{guide_id}-recherche-fix",
"prompt": _prompt(
"OnePager-Recherche-Fix",
topic=topic, source=source, recherche=recherche,
probleme="\n".join(f"- {p}" for p in probleme),
out_path=recherche_path, extra=_extra(instructions),
),
"role": "quick", "capabilities": "files" if project else "full",
"payload": recherche_payload,
}]
res = await _race(topic, "Recherche-Fix", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "Recherche-Fix ungültig — ursprüngliche Recherche bleibt")
else:
recherche = res[0]
# Schritt 3: Bauen — Karten nur aus der Faktenbasis (Resume: gültige Datei wird übernommen)
karten = karten_schema(_json_datei(karten_path))
if karten is None:
await _set_step(guide_id, 2, "Baue OnePager…")
karten_path.unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-bauen",
"prompt": _prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)),
"role": "fast", "capabilities": "files",
"payload": (lambda result: karten_schema(_json_datei(karten_path))),
}]
res = await _race(topic, "OnePager-Bauen", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Bau fehlgeschlagen")
return None
karten = res[0]
def karten_block() -> str:
return "\n\n".join(f"### {k['titel']} [{k['key']}]\n{k['md']}" for k in karten)
# Schritt 4: Prüfung — notiert Probleme; Anpassung macht ein Bauen-Agent
if not check_path.exists():
await _set_step(guide_id, 3, "Prüfe OnePager…")
slots = [{
"key": f"{guide_id}-verify",
"prompt": _prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block(), out_path=check_path),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _probleme_schema(_json_datei(check_path))),
}]
res = await _race(topic, "OnePager-Prüfung", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Prüfung fehlgeschlagen")
return None
probleme = res[0]
if probleme:
_log(topic, f"OnePager-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(guide_id, 3, "Passe OnePager an…")
slots = [{
"key": f"{guide_id}-karten-fix",
"prompt": _prompt(
"OnePager-Fix",
topic=topic, recherche=recherche, karten=karten_block(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=karten_path, extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result: karten_schema(_json_datei(karten_path))),
}]
res = await _race(topic, "OnePager-Fix", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "OnePager-Fix ungültig — ursprüngliche Karten bleiben")
karten_path.write_text(
json.dumps({"karten": {k["key"]: {"titel": k["titel"], "md": k["md"]} for k in karten}}, ensure_ascii=False),
encoding="utf-8",
)
else:
karten = res[0]
sections = [
{"num": i, "title": k["titel"], "md": k["md"], "key": k["key"]}
for i, k in enumerate(karten, 1)
]
return [{"title": topic, "sections": sections}]
async def _generate_sections(
guide_id: str, topic: str, format_name: str, entries: dict[int, str],
facts: str, instructions: str, provider: str,
content_path: Path,
) -> list[dict] | None:
def is_cancelled() -> bool:
return guide_id in _cancelled
spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8")
files = _guide_files(content_path)
bausteine_liste = "\n".join(f"- {t}" for t in entries.values())
n = len(entries)
anteil_min, anteil_max, minimum, zweck = FORMAT_ANTEIL[format_name]
k_min = min(n, max(minimum, math.ceil(anteil_min * n)))
k_max = min(n, max(k_min, math.floor(anteil_max * n)))
auswahl_auftrag = (
f"Wähle MINDESTENS {k_min} und HÖCHSTENS {k_max} der Bausteine und baue daraus {zweck}. "
"Wähle, was diesem Zweck dient — lass weg, was dafür nicht nötig ist."
)
# Schritt 1: Auswahl — vorhandene gültige Datei wird übernommen (Resume)
auswahl = _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)
if auswahl is None:
await _set_step(guide_id, 0, "Wähle Bausteine…")
files["auswahl"].unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-auswahl",
"prompt": _prompt(
"Guide-Auswahl",
topic=topic, format_name=format_name, bausteine=bausteine_liste,
auswahl_auftrag=auswahl_auftrag, out_path=files["auswahl"], extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)),
}]
res = await _race(topic, "Guide-Auswahl", slots, 1, _timeout("guide_auswahl", n), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Auswahl fehlgeschlagen")
return None
auswahl = res[0]
def auswahl_titel() -> str:
return "\n".join(f"- {_titel(entries[num])}" for num in auswahl)
def auswahl_json() -> str:
return json.dumps({"bausteine": [_titel(entries[num]) for num in auswahl]}, ensure_ascii=False)
# Schritt 2: Auswahl-Prüfung — notiert Probleme; Anpassung macht ein Auswahl-Agent
if not files["auswahl_check"].exists():
await _set_step(guide_id, 1, "Prüfe Auswahl…")
slots = [{
"key": f"{guide_id}-auswahl-check",
"prompt": _prompt(
"Guide-Auswahl-Check",
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
bausteine=bausteine_liste, auswahl=auswahl_titel(),
out_path=files["auswahl_check"], extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _probleme_schema(_json_datei(files["auswahl_check"]))),
}]
res = await _race(topic, "Auswahl-Prüfung", slots, 1, _timeout("guide_check", len(auswahl)), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Auswahl-Prüfung fehlgeschlagen")
return None
probleme = res[0]
if probleme:
_log(topic, f"Auswahl-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(guide_id, 1, "Passe Auswahl an…")
slots = [{
"key": f"{guide_id}-auswahl-fix",
"prompt": _prompt(
"Guide-Auswahl-Fix",
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
bausteine=bausteine_liste, auswahl=auswahl_titel(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=files["auswahl"], extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)),
}]
res = await _race(topic, "Auswahl-Fix", slots, 1, _timeout("guide_auswahl", n), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "Auswahl-Fix ungültig — ursprüngliche Auswahl bleibt")
files["auswahl"].write_text(auswahl_json(), encoding="utf-8")
else:
auswahl = res[0]
sel_entries = {num: entries[num] for num in auswahl}
soll = len(sel_entries)
sel_liste = "\n".join(f"- {t}" for t in sel_entries.values())
# Schritt 3: Gliederung der festen Auswahl
plan = _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)
if plan is None:
await _set_step(guide_id, 2, "Plane Gliederung…")
files["gliederung"].unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-gliederung",
"prompt": _prompt(
"Guide-Gliederung",
topic=topic, format_name=format_name, bausteine=sel_liste,
out_path=files["gliederung"], extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)),
}]
res = await _race(topic, "Gliederung", slots, 1, _timeout("plan", soll), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Gliederung fehlgeschlagen")
return None
plan = res[0]
def gliederung_text() -> str:
return "\n".join(_zuteilung_text([ch], {num: _titel(entries[num]) for num in ch["nums"]}) for ch in plan)
def gliederung_json() -> str:
return json.dumps(
{"kapitel": [{"titel": ch["title"], "bausteine": [_titel(entries[num]) for num in ch["nums"]]} for ch in plan]},
ensure_ascii=False,
)
# Schritt 4: Gliederungs-Prüfung
if not files["gliederung_check"].exists():
await _set_step(guide_id, 3, "Prüfe Gliederung…")
slots = [{
"key": f"{guide_id}-gliederung-check",
"prompt": _prompt(
"Guide-Gliederung-Check",
topic=topic, format_name=format_name, zweck=zweck,
auswahl=auswahl_titel(), gliederung=gliederung_text(),
out_path=files["gliederung_check"], extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _probleme_schema(_json_datei(files["gliederung_check"]))),
}]
res = await _race(topic, "Gliederungs-Prüfung", slots, 1, _timeout("guide_check", soll), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Gliederungs-Prüfung fehlgeschlagen")
return None
probleme = res[0]
if probleme:
_log(topic, f"Gliederungs-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(guide_id, 3, "Passe Gliederung an…")
slots = [{
"key": f"{guide_id}-gliederung-fix",
"prompt": _prompt(
"Guide-Gliederung-Fix",
topic=topic, format_name=format_name,
auswahl=auswahl_titel(), gliederung=gliederung_text(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=files["gliederung"], extra=_extra(instructions),
),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)),
}]
res = await _race(topic, "Gliederungs-Fix", slots, 1, _timeout("plan", soll), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "Gliederungs-Fix ungültig — ursprüngliche Gliederung bleibt")
files["gliederung"].write_text(gliederung_json(), encoding="utf-8")
else:
plan = res[0]
# Schritt 5: Schreiben — vorhandene Chunk-Dateien werden übernommen (Resume)
total_sections = sum(len(c["nums"]) for c in plan)
chunks = _split_chunks(plan, min(WRITER_MAX, max(1, math.ceil(total_sections / WRITER_SECTIONS))))
zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks]
chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks]
writer_count = len(zuteilungen)
paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)]
offen = [i for i, p in enumerate(paths) if not p.exists()]
if offen:
await _set_step(guide_id, 4, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…")
results = await asyncio.gather(*[
run_agent(
f"{guide_id}-w{i + 1}",
_prompt(
"Guide-Writer",
topic=topic, format_name=format_name, zuteilung=zuteilungen[i],
facts=facts, spec=spec, out_path=paths[i], extra=_extra(instructions),
),
_timeout("writer", chunk_sizes[i]), provider=provider, role="guide", capabilities="full",
)
for i in offen
], return_exceptions=True)
if is_cancelled():
return None
for i, r in zip(offen, results):
if isinstance(r, BaseException):
_log(topic, f"Writer {i + 1}: {type(r).__name__}: {r}")
elif r[0] != 0:
_log(topic, f"Writer {i + 1}: {_claude_error('Fehler', *r)}")
elif not paths[i].exists():
_log(topic, f"Writer {i + 1}: keine Ausgabedatei erstellt")
if not any(p.exists() for p in paths):
await _fail(guide_id, _gather_error("Writer-Fehler", list(results)))
return None
idx = _titel_index(entries)
by_num: dict[int, dict] = {}
for p in paths:
if not p.exists():
continue
for sec in _parse_fragment(p.read_text(encoding="utf-8")):
num = _titel_aufloesen(idx, sec["titel"])
if num is None:
_log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)")
elif num not in by_num:
by_num[num] = sec
if not by_num:
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
return None
# Schritt 6: Lese-Prüfung pro Writer-Paket — Fix beauftragt Writer nur mit beanstandeten Sections
chunk_nums = [[num for ch in chunk for num in ch["nums"] if num in by_num] for chunk in chunks]
check_paths = [content_path.parent / f"{content_path.stem}.lese-check-{i}.json" for i in range(1, writer_count + 1)]
offen_checks = [i for i, p in enumerate(check_paths) if _lese_probleme_schema(_json_datei(p)) is None and chunk_nums[i]]
if offen_checks:
await _set_step(guide_id, 5, f"Prüfe Lesbarkeit ({len(offen_checks)} Prüfer)…" if len(offen_checks) > 1 else "Prüfe Lesbarkeit…")
def sections_text(nums: list[int]) -> str:
return "\n\n".join(f"SECTION: {_titel(entries[num])}\n{by_num[num]['md']}" for num in nums)
slots = [{
"key": f"{guide_id}-lese-check-{i + 1}",
"prompt": _prompt(
"Guide-Lese-Check",
topic=topic, format_name=format_name, spec=spec,
sections=sections_text(chunk_nums[i]),
out_path=check_paths[i], extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result, p=check_paths[i]: _lese_probleme_schema(_json_datei(p))),
} for i in offen_checks]
res = await _race(topic, "Lese-Prüfung", slots, len(slots), _timeout("lese_check", max(chunk_sizes)), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Lese-Prüfung fehlgeschlagen")
return None
probleme_by_num: dict[int, str] = {}
for p in check_paths:
for item in (_lese_probleme_schema(_json_datei(p)) or []):
num = _titel_aufloesen(idx, item["section"])
if num in by_num and num not in probleme_by_num:
probleme_by_num[num] = item["problem"]
if probleme_by_num:
_log(topic, f"Lese-Prüfung: {len(probleme_by_num)} Section(s) beanstandet")
await _set_step(guide_id, 5, f"Überarbeite {len(probleme_by_num)} Section(s)…")
fix_chunks = [[num for num in nums if num in probleme_by_num] for nums in chunk_nums]
fix_offen = [i for i, nums in enumerate(fix_chunks) if nums]
fix_paths = [content_path.parent / f"{content_path.stem}.fix-{i + 1}.md" for i in range(writer_count)]
def auftraege_text(nums: list[int]) -> str:
return "\n\n".join(
f"SECTION: {_titel(entries[num])}\nPROBLEM: {probleme_by_num[num]}\nAKTUELLER INHALT:\n{by_num[num]['md']}"
for num in nums
)
results = await asyncio.gather(*[
run_agent(
f"{guide_id}-fix-w{i + 1}",
_prompt(
"Guide-Sections-Fix",
topic=topic, format_name=format_name, facts=facts, spec=spec,
auftraege=auftraege_text(fix_chunks[i]),
out_path=fix_paths[i], extra=_extra(instructions),
),
_timeout("writer", len(fix_chunks[i])), provider=provider, role="guide", capabilities="full",
)
for i in fix_offen
], return_exceptions=True)
if is_cancelled():
return None
for i, r in zip(fix_offen, results):
if isinstance(r, BaseException) or (not isinstance(r, BaseException) and r[0] != 0):
_log(topic, f"Sections-Fix {i + 1} fehlgeschlagen — Original bleibt")
ersetzt = 0
for i in fix_offen:
if not fix_paths[i].exists():
continue
for sec in _parse_fragment(fix_paths[i].read_text(encoding="utf-8")):
num = _titel_aufloesen(idx, sec["titel"])
if num in probleme_by_num and sec["md"].strip():
by_num[num] = sec
ersetzt += 1
_log(topic, f"Lese-Prüfung: {ersetzt} Section(s) überarbeitet")
await _set_progress(guide_id, "Setze zusammen…")
chapters: list[dict] = []
for ch in plan:
sections = [
{"num": num, "title": _titel(entries[num]), "md": by_num[num]["md"]}
for num in ch["nums"] if num in by_num
]
if sections:
chapters.append({"title": ch["title"], "sections": sections})
geplant = {num for ch in plan for num in ch["nums"]}
missing = sorted(geplant - set(by_num))
if missing:
_log(topic, f"Sections fehlen in der Writer-Ausgabe: {[_titel(entries[n]) for n in missing]}")
if not chapters:
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
return None
return chapters
async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
async with _semaphore:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now)
content_path = guide_content_path(topic, format_name)
content_path.parent.mkdir(parents=True, exist_ok=True)
project = project_dir(topic) if project_dir(topic).is_dir() else None
try:
if guide_id in _cancelled:
return
if project:
await asyncio.to_thread(_pdfs_konvertieren, project)
# „Neu erstellen": fertiger Guide → kompletter Frischstart.
# Sonst sind Schritt-Dateien Reste eines Abbruchs/Fehlers → Resume.
if content_path.exists():
for p_alt in guide_slot_dateien(content_path):
p_alt.unlink(missing_ok=True)
if format_name == "OnePager":
chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path)
else:
alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8"))
if not alle:
await _fail(guide_id, "Keine Bausteine gefunden")
return
entries = _eindeutige_titel(alle)
facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema")
chapters = await _generate_sections(
guide_id, topic, format_name, entries,
facts, instructions, provider, content_path,
)
if chapters is None or guide_id in _cancelled:
return
content_path.write_text(
json.dumps({"topic": topic, "format": format_name, "chapters": chapters}, ensure_ascii=False, indent=1),
encoding="utf-8",
)
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="done", progress=None, step=None, updated_at=now)
except asyncio.TimeoutError:
await _fail(guide_id, "Timeout bei der Generierung")
except FileNotFoundError:
await _fail(guide_id, "Bausteine fehlen")
except Exception as e:
await _fail(guide_id, str(e)[:2000])
finally:
_cancelled.discard(guide_id)
# --- Tutor-Chat ---
def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str:
transcript = "\n".join(
f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}"
for m in messages
)
return _prompt(
"Chat",
topic=topic, format_name=format_name,
outline_block=outline.strip() or "(keine)",
section_block=section.strip() or "(kein Abschnitt erkannt)",
transcript=transcript,
)
async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str:
try:
prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages)
returncode, stdout, stderr = await run_agent(
"chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
)
if returncode != 0:
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
reply = stdout.strip()
return reply or "Entschuldigung, ich habe keine Antwort erhalten."
except Exception:
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."