Files
creator/backend/generator.py
2026-06-12 07:58:52 +02:00

1594 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import logging
import math
import shutil
import subprocess
import re
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from agents import run_agent, kill_process
from config import (
DEFAULT_PROVIDER,
FORMAT_ANTEIL,
TEMPLATES_DIR,
TIMEOUTS,
MAX_CONCURRENT_GENERATIONS,
)
from database import list_guides, update_guide
from fsutil import atomic_write_json, atomic_write_text
from jsonio import parse_json_text as _parse_json_text, read_json_file as _json_datei
from paths import arbeit_dir, bausteine_path, guide_content_path, project_dir
_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS)
_cancelled: set[str] = set()
async def cancel_guide(guide_id: str) -> bool:
_cancelled.add(guide_id)
kill_process(guide_id)
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now)
return True
async def _set_progress(guide_id: str, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, progress=progress, updated_at=now)
def _prompt(name: str, **kwargs) -> str:
template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8")
return template.format(**kwargs)
def _extra(instructions: str) -> str:
return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
log = logging.getLogger("creator.generator")
def _log(topic: str, msg: str) -> None:
log.info("[%s] %s", topic, msg)
def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str:
stderr = (stderr or "").strip()
if stderr:
return f"{label}: {stderr[:1000]}"
tail = (stdout or "").strip()[-500:]
if tail:
return f"{label} (exit {returncode}, stderr leer): …{tail}"
return f"{label} (exit {returncode}, ohne Ausgabe)"
def _gather_error(label: str, results: list) -> str:
for r in results:
if isinstance(r, BaseException):
return f"{label}: {type(r).__name__}: {r}"
returncode, stdout, stderr = r
if returncode != 0:
return _claude_error(label, returncode, stdout, stderr)
return f"{label}: kein verwertbares Ergebnis"
async def _fail(guide_id: str, msg: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now)
def _norm_titel(s: str) -> str:
"""Normalisiert einen Titel für den Schlüssel-Vergleich."""
s = re.sub(r"[`'\"<>]", "", s)
return re.sub(r"\s+", " ", s).strip().lower()
def _titel(entry: str) -> str:
return entry.split("")[0].strip() or entry
def _eindeutige_titel(entries: dict[int, str]) -> dict[int, str]:
"""Macht Titel eindeutig (Suffix " (2)", " (3)" …), damit sie als Schlüssel taugen."""
seen: dict[str, int] = {}
out: dict[int, str] = {}
for num, text in entries.items():
titel = _titel(text)
key = _norm_titel(titel)
seen[key] = seen.get(key, 0) + 1
if seen[key] > 1:
rest = text.split("", 1)
text = f"{titel} ({seen[key]})" + (f"{rest[1]}" if len(rest) == 2 else "")
# zweiter Durchlauf nicht nötig: Suffixe kollidieren praktisch nicht
out[num] = text
return out
def _titel_index(entries: dict[int, str]) -> dict[str, int]:
return {_norm_titel(_titel(text)): num for num, text in entries.items()}
def _timeout(step: str, n: int = 0) -> int:
base, per = TIMEOUTS[step]
return base + per * n
_MAX_RESTARTS = 2
async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None:
"""Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse.
Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)`
prüft die Gültigkeit und liefert das Slot-Ergebnis oder None.
Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das
Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt.
`cancelled()` → True bricht ab (keine Restarts, Rückgabe None).
"""
attempts = {i: 0 for i in range(len(slots))}
tasks: dict[asyncio.Task, int] = {}
def spawn(i: int) -> None:
slot = slots[i]
task = asyncio.create_task(run_agent(
slot["key"], slot["prompt"], timeout,
provider=provider, role=slot["role"], capabilities=slot["capabilities"],
))
tasks[task] = i
for i in range(len(slots)):
spawn(i)
results: list = []
try:
while tasks:
if cancelled and cancelled():
return None
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
for task in done:
i = tasks.pop(task)
payload, err = None, None
try:
result = task.result()
if result[0] != 0:
err = _claude_error("Fehler", *result)
else:
payload = slots[i]["payload"](result)
if payload is None:
err = "Ergebnis ungültig/nicht parsebar"
except asyncio.TimeoutError:
err = f"Timeout nach {timeout}s"
except Exception as e:
err = f"{type(e).__name__}: {e}"
if payload is not None:
results.append(payload)
if on_update:
on_update(len(results))
if len(results) >= quorum:
return results
continue
_log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}")
attempts[i] += 1
if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()):
spawn(i)
_log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)")
return None
finally:
for task, i in tasks.items():
kill_process(slots[i]["key"])
task.cancel()
if tasks:
await asyncio.gather(*tasks.keys(), return_exceptions=True)
@dataclass
class GenContext:
"""Durchgereichte Pipeline-Parameter — erspart lange Argument-Signaturen."""
topic: str
provider: str
is_cancelled: Callable[[], bool]
guide_id: str | None = None
# Ergebnis-Status von run_single_slot/_check_then_fix
OK, CANCELLED, FAILED = "ok", "cancelled", "failed"
async def run_single_slot(
ctx: GenContext, label: str, *,
key: str, prompt: str, role: str, capabilities: str, payload, timeout: int,
) -> tuple[str, object]:
"""Ein Agent, ein gültiges Ergebnis (Race mit Quorum 1).
→ (OK, wert) | (CANCELLED, None) | (FAILED, None)
"""
slots = [{"key": key, "prompt": prompt, "role": role, "capabilities": capabilities, "payload": payload}]
res = await _race(ctx.topic, label, slots, 1, timeout, ctx.provider, cancelled=ctx.is_cancelled)
if ctx.is_cancelled():
return CANCELLED, None
if res is None:
return FAILED, None
return OK, res[0]
async def _check_then_fix(
ctx: GenContext, *, name: str, step: int,
check_key: str, check_prompt: str, check_path: Path, check_timeout: int,
fix_key: str, build_fix_prompt, fix_payload, fix_timeout: int,
fix_role: str = "fast", fix_caps: str = "files",
on_fix_invalid=None,
) -> tuple[str, object]:
"""Check→Fix-Muster: Prüf-Agent notiert Probleme (JSON), Fix-Agent behebt sie.
Resume: existierende Check-Datei überspringt den ganzen Schritt.
Check ist fatal (FAILED), Fix nicht — Original bleibt; on_fix_invalid kann
das kanonische Original zurückschreiben, falls der Fix-Agent die
Artefakt-Datei zerschrieben hat.
Lese-Check (Multi-Slot, Section-genau) und Bausteine-Auswahl-Check
(Patch-Semantik) passen bewusst NICHT in dieses Muster.
→ (OK, neues_artefakt | None=unverändert) | (CANCELLED, None) | (FAILED, None)
"""
if check_path.exists():
return OK, None
await _set_step(ctx.guide_id, step, f"Prüfe {name}")
status, probleme = await run_single_slot(
ctx, f"{name}-Prüfung", key=check_key, prompt=check_prompt,
role="fast", capabilities="files",
payload=lambda result: _probleme_schema(_json_datei(check_path)),
timeout=check_timeout,
)
if status != OK:
return status, None
if not probleme:
return OK, None
_log(ctx.topic, f"{name}-Prüfung: {len(probleme)} Problem(e) notiert")
await _set_step(ctx.guide_id, step, f"Passe {name} an…")
status, fixed = await run_single_slot(
ctx, f"{name}-Fix", key=fix_key, prompt=build_fix_prompt(probleme),
role=fix_role, capabilities=fix_caps, payload=fix_payload, timeout=fix_timeout,
)
if status == CANCELLED:
return CANCELLED, None
if status == FAILED:
_log(ctx.topic, f"{name}-Fix ungültig — Original bleibt")
if on_fix_invalid:
on_fix_invalid()
return OK, None
return OK, fixed
# --- Bausteine-Pipeline: 4x Recherche (3) → 2x Auswahl (1) → Prüfung — reines Inventar, unsortiert ---
_bausteine_progress: dict[str, str] = {}
_bausteine_errors: dict[str, str] = {}
_bausteine_cancelled: set[str] = set()
_bausteine_step: dict[str, int] = {}
BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung")
_CATEGORIES = ("KERN", "WICHTIG", "REST") # nur noch für den Altformat-Reader
def _bausteine_steps(topic: str) -> tuple:
"""Projekte haben einen 4. Schritt: Themenfeld-Ergänzung per Web-Recherche."""
if project_dir(topic).is_dir():
return BAUSTEINE_STEPS + ("Ergänzung",)
return BAUSTEINE_STEPS
def _bausteine_files(topic: str) -> dict:
arbeit = arbeit_dir(topic)
return {
"final": bausteine_path(topic),
"arbeit": arbeit,
"recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)],
"auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)],
"auswahl_check": arbeit / "auswahl-check.json",
"ergaenzung": arbeit / "ergaenzung.json",
}
def _alle_slot_dateien(files: dict) -> list[Path]:
return [*files["recherche"], *files["auswahl"], files["auswahl_check"], files["ergaenzung"]]
def cancel_bausteine(topic: str) -> bool:
if topic not in _bausteine_progress:
return False
_bausteine_cancelled.add(topic)
kill_process(f"bausteine-{topic}-")
return True
def _resume_step(topic: str) -> int:
"""Erster noch offener Schritt anhand der persistierten Zwischendateien."""
files = _bausteine_files(topic)
if sum(p.exists() for p in files["recherche"]) < 3:
return 0
if not any(p.exists() for p in files["auswahl"]):
return 1
if not files["auswahl_check"].exists():
return 2
if project_dir(topic).is_dir() and not files["ergaenzung"].exists():
return 3
return len(_bausteine_steps(topic))
def bausteine_status(topic: str) -> dict:
steps = _bausteine_steps(topic)
ready = bausteine_path(topic).exists()
generating = topic in _bausteine_progress
partial = False
if generating:
current = _bausteine_step.get(topic)
states = [
"pending" if current is None else "done" if i < current else "active" if i == current else "pending"
for i in range(len(steps))
]
elif ready:
states = ["done"] * len(steps)
else:
nxt = _resume_step(topic)
partial = nxt > 0
states = ["done" if i < nxt else "pending" for i in range(len(steps))]
return {
"ready": ready,
"generating": generating,
"progress": _bausteine_progress.get(topic),
"error": _bausteine_errors.get(topic),
"partial": partial,
"steps": [{"label": label, "state": s} for label, s in zip(steps, states)],
}
def active_bausteine() -> list[dict]:
return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()]
def reset_bausteine(topic: str) -> None:
files = _bausteine_files(topic)
files["final"].unlink(missing_ok=True)
shutil.rmtree(files["arbeit"], ignore_errors=True)
_bausteine_errors.pop(topic, None)
def _ergaenzung_schema(data):
"""{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None."""
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
return None
out = []
for b in data["bausteine"]:
if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str):
return None
titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip()
if not titel:
return None
out.append((titel, beschreibung))
return out
def _pdfs_konvertieren(project: Path) -> None:
"""PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern.
Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die
.txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet.
Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt
unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten).
"""
pdfs = list(project.rglob("*.pdf"))
if not pdfs:
return
if shutil.which("pdftotext") is None:
raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden")
for pdf in pdfs:
txt = pdf.with_suffix(".txt")
if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime:
continue
try:
subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120)
_log(project.name, f"PDF konvertiert: {pdf.name}{txt.name}")
except Exception as e:
raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e
def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str:
if project:
source = _prompt("Bausteine-Quelle-Projekt", project=project)
else:
source = _prompt("Bausteine-Quelle-Thema", topic=topic)
return _prompt(
"Bausteine-Recherche",
topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions),
)
def _parse_auswahl(text: str) -> dict[int, str]:
"""Parst eine Baustein-Liste: `N. Titel — Kurzbeschreibung` pro Zeile."""
entries: dict[int, str] = {}
last = None
for line in text.splitlines():
m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line)
if m:
last = int(m.group(1))
entries[last] = m.group(2)
elif last is not None and line.strip():
entries[last] += " " + line.strip()
return entries
def _parse_kategorien(text: str) -> dict[str, list[str]]:
"""Altformat-Reader: finale Baustein-Datei mit ## KERN/WICHTIG/REST-Abschnitten."""
cats: dict[str, list[str]] = {}
current = None
for line in text.splitlines():
s = line.strip()
m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE)
if m:
current = m.group(1).upper()
cats.setdefault(current, [])
continue
m = re.match(r"(\d+)[.)]\s+(.*\S)", s)
if m and current:
cats[current].append(m.group(2))
return cats
def _lade_bausteine(text: str) -> dict[int, str]:
"""Lädt die finale Baustein-Datei — sortierte Liste (neu) oder Kategorien (Altformat)."""
if re.search(r"^#+\s*KERN\b", text, re.IGNORECASE | re.MULTILINE):
cats = _parse_kategorien(text)
texts = [t for cat in _CATEGORIES for t in cats.get(cat, [])]
return {i: t for i, t in enumerate(texts, 1)}
return _parse_auswahl(text)
def _file_payload(path: Path):
"""Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält."""
if not path.exists():
return None
text = path.read_text(encoding="utf-8")
return text if _parse_auswahl(text) else None
def _auswahl_payload(path: Path):
if not path.exists():
return None
text = path.read_text(encoding="utf-8")
entries = _parse_auswahl(text)
return (text, entries) if entries else None
def _auswahl_check_schema(data):
"""{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß."""
if not isinstance(data, dict):
return None
nach = data.get("nachtraege", [])
streich = data.get("streichen", [])
if not isinstance(nach, list) or not isinstance(streich, list):
return None
if not all(isinstance(x, str) for x in [*nach, *streich]):
return None
return {"nachtraege": nach, "streichen": streich}
def _titel_aufloesen(idx: dict[str, int], t: str) -> int | None:
"""Titel → Nummer; toleriert mitgeschleppte Beschreibungen ("Titel — …")."""
if not isinstance(t, str):
return None
return idx.get(_norm_titel(t)) or idx.get(_norm_titel(_titel(t)))
async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
if topic in _bausteine_progress:
return
_bausteine_progress[topic] = "Wartend…"
_bausteine_errors.pop(topic, None)
files = _bausteine_files(topic)
final_path = files["final"]
project = project_dir(topic) if project_dir(topic).is_dir() else None
def set_p(msg: str, step: int | None = None) -> None:
_bausteine_progress[topic] = msg
if step is not None:
_bausteine_step[topic] = step
def is_cancelled() -> bool:
return topic in _bausteine_cancelled
def abgebrochen() -> None:
_bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten"
ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled)
try:
async with _semaphore:
files["arbeit"].mkdir(parents=True, exist_ok=True)
if project:
await asyncio.to_thread(_pdfs_konvertieren, project)
# „Neu erstellen": fertige Bausteine → kompletter Frischstart.
# Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume.
if final_path.exists():
for p_alt in _alle_slot_dateien(files):
p_alt.unlink(missing_ok=True)
# Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen
recherchen: list[str] = []
offen = []
for i, path in enumerate(files["recherche"], 1):
text = _file_payload(path)
if text is not None and len(recherchen) < 3:
recherchen.append(text)
else:
offen.append((i, path))
vorhanden = len(recherchen)
set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0)
if vorhanden < 3:
caps = "files" if project else "full"
slots = [
{
"key": f"bausteine-{topic}-recherche-{i}",
"prompt": _build_recherche_prompt(topic, path, instructions, project),
"role": "quick", "capabilities": caps,
"payload": (lambda result, p=path: _file_payload(p)),
}
for i, path in offen
]
neue = await _race(
topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider,
on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"),
cancelled=is_cancelled,
)
if is_cancelled():
abgebrochen()
return
if neue is None:
_bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)"
return
recherchen += neue
# Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen
n_est = max(len(_parse_auswahl(t)) for t in recherchen)
bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None)
if bestehende is not None:
flat, entries = bestehende
else:
set_p("Konsolidiere Recherche…", step=1)
results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1))
slots = [
{
"key": f"bausteine-{topic}-auswahl-{i}",
"prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path),
"role": "fast", "capabilities": "files",
"payload": (lambda result, p=path: _auswahl_payload(p)),
}
for i, path in enumerate(files["auswahl"], 1)
]
auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled)
if is_cancelled():
abgebrochen()
return
if auswahl is None:
_bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)"
return
flat, entries = auswahl[0]
# Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal)
set_p("Prüfe Auswahl…", step=2)
check_path = files["auswahl_check"]
patch = _auswahl_check_schema(_json_datei(check_path))
if patch is None:
check_path.unlink(missing_ok=True)
titel_listen = "\n\n".join(
f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values())
for i, text in enumerate(recherchen, 1)
)
status, check = await run_single_slot(
ctx, "Auswahl-Check",
key=f"bausteine-{topic}-auswahlcheck-1",
prompt=_prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path),
role="fast", capabilities="files",
payload=lambda result: _auswahl_check_schema(_json_datei(check_path)),
timeout=_timeout("auswahl_check", len(entries)),
)
if status == CANCELLED:
abgebrochen()
return
if status == FAILED:
_log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort")
else:
patch = check
if patch is not None and (patch["streichen"] or patch["nachtraege"]):
idx = _titel_index(entries)
weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None}
if weg:
_log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}")
entries = {n: t for n, t in entries.items() if n not in weg}
if patch["nachtraege"]:
_log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine")
texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"])
entries = {i: t for i, t in enumerate(texts, 1)}
# Schritt 4 (nur Projekte): Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt,
# ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung].
if project:
set_p("Ergänze Themenfeld…", step=3)
erg_path = files["ergaenzung"]
ergaenzungen = _ergaenzung_schema(_json_datei(erg_path))
if ergaenzungen is None:
erg_path.unlink(missing_ok=True)
status, ergaenzungen = await run_single_slot(
ctx, "Ergänzung",
key=f"bausteine-{topic}-ergaenzung-1",
prompt=_prompt(
"Bausteine-Ergaenzung",
topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()),
out_path=erg_path, extra=_extra(instructions),
),
role="quick", capabilities="full",
payload=lambda result: _ergaenzung_schema(_json_datei(erg_path)),
timeout=_timeout("ergaenzung"),
)
if status == CANCELLED:
abgebrochen()
return
if status == FAILED:
_bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)"
return
idx = _titel_index(entries)
neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None]
if neu:
_log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt")
start = max(entries, default=0) + 1
for off, (t, b) in enumerate(neu):
entries[start + off] = f"{t}{b} [Ergänzung]"
# Titel eindeutig machen und unsortiertes Inventar schreiben
entries = _eindeutige_titel(entries)
atomic_write_text(final_path, "\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n")
except Exception as e:
log.exception("[%s] Bausteine-Generierung fehlgeschlagen", topic)
_bausteine_errors[topic] = str(e)[:2000]
finally:
# Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit.
_bausteine_progress.pop(topic, None)
_bausteine_step.pop(topic, None)
_bausteine_cancelled.discard(topic)
# --- Guide-Generierung: 6 Schritte mit Prüfung nach jeder Phase (OnePager hat einen eigenen Weg) ---
# Prüf-Agenten notieren nur Probleme; das Anpassen übernimmt der jeweilige Erzeuger-Typ.
# Schritt-Dateien bleiben liegen → Abbruch erhält Fortschritt, ▶ setzt am offenen Schritt fort.
GUIDE_STEPS = ("Auswahl", "Auswahl-Prüfung", "Gliederung", "Gliederungs-Prüfung", "Schreiben", "Lese-Prüfung")
# Writer skalieren mit der Section-Zahl: 1 Writer je ~30 Sections (gedeckelt).
# Kleine Pakete vermeiden Lazy-Output bei langen Listen und begrenzen den Schaden
# eines fehlgeschlagenen Writers.
WRITER_SECTIONS = 30
WRITER_MAX = 20
def _guide_files(content_path: Path) -> dict:
d, stem = content_path.parent, content_path.stem
return {
"auswahl": d / f"{stem}.auswahl.json",
"auswahl_check": d / f"{stem}.auswahl-check.json",
"gliederung": d / f"{stem}.gliederung.json",
"gliederung_check": d / f"{stem}.gliederung-check.json",
# chunk-/lese-check-/fix-Dateien sind dynamisch: {stem}.chunk-i.md usw.
}
def guide_slot_dateien(content_path: Path) -> list[Path]:
"""Alle Schritt-Dateien eines Guides (für den Frischstart)."""
return [p for p in content_path.parent.glob(f"{content_path.stem}.*") if p != content_path]
async def _set_step(guide_id: str, step: int, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, step=step, progress=progress, updated_at=now)
def _resolve_auswahl(data, entries: dict[int, str], k_min: int, k_max: int) -> list[int] | None:
"""{"bausteine": [Titel]} → Nummern; None bei Schema-Verstoß/Drift/falschem Umfang."""
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
return None
idx = _titel_index(entries)
nums: list[int] = []
seen: set[int] = set()
total = unknown = 0
for t in data["bausteine"]:
total += 1
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
if num is None:
unknown += 1
elif num not in seen:
seen.add(num)
nums.append(num)
if total == 0 or (total - unknown) / total < 0.85:
return None
if len(nums) < 0.9 * k_min or len(nums) > 1.1 * k_max:
return None
return nums
def _probleme_schema(data):
"""{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None."""
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return []
p = data.get("probleme")
if not isinstance(p, list) or not p:
return None
out = [str(x).strip() for x in p if str(x).strip()]
return out or None
def _lese_probleme_schema(data):
"""{"ok": true} → [] · {"probleme": [{"section", "problem"}]} → Liste · sonst None."""
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return []
p = data.get("probleme")
if not isinstance(p, list) or not p:
return None
out = []
for x in p:
if not isinstance(x, dict) or not isinstance(x.get("section"), str) or not isinstance(x.get("problem"), str):
return None
out.append({"section": x["section"].strip(), "problem": x["problem"].strip()})
return out or None
def _resolve_gliederung(data, entries: dict[int, str], soll_min: int, soll_max: int) -> list[dict] | None:
"""{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}].
`soll_min`/`soll_max` = erlaubte Spanne gewählter Bausteine (mit kleiner Toleranz).
"""
if not isinstance(data, dict) or not isinstance(data.get("kapitel"), list):
return None
idx = _titel_index(entries)
chapters: list[dict] = []
seen: set[int] = set()
total = unknown = 0
for ch in data["kapitel"]:
if not isinstance(ch, dict) or not isinstance(ch.get("bausteine"), list):
return None
nums = []
for t in ch["bausteine"]:
total += 1
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
if num is None:
unknown += 1
elif num not in seen:
nums.append(num)
seen.add(num)
if nums:
chapters.append({"title": str(ch.get("titel", "")).strip() or "Kapitel", "nums": nums})
if not chapters or total == 0:
return None
if (total - unknown) / total < 0.85:
return None
if len(seen) < 0.9 * soll_min or len(seen) > 1.1 * soll_max:
return None
return chapters
def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]:
"""Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl."""
n = max(1, min(n, len(chapters)))
chunks: list[list[dict]] = []
current: list[dict] = []
count = 0
remaining_total = sum(len(c["nums"]) for c in chapters)
remaining_chunks = n
for ch in chapters:
current.append(ch)
count += len(ch["nums"])
if remaining_chunks > 1 and count >= remaining_total / remaining_chunks:
chunks.append(current)
remaining_total -= count
remaining_chunks -= 1
current = []
count = 0
if current:
chunks.append(current)
return chunks
def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str:
lines = []
for ch in chunk:
lines.append(f"KAPITEL: {ch['title']}")
lines.extend(f"- {entries[num]}" for num in ch["nums"])
return "\n".join(lines)
_FRAGMENT_KAPITEL_RE = re.compile(r"<!--\s*kapitel\s*:\s*(.*?)\s*-->", re.IGNORECASE)
_FRAGMENT_SECTION_RE = re.compile(r"<!--\s*section\s*:\s*(.*?)\s*-->", re.IGNORECASE)
def _parse_fragment(text: str) -> list[dict]:
"""Parst eine Writer-Datei → [{"kapitel", "titel", "md"}] in Datei-Reihenfolge."""
sections: list[dict] = []
kapitel = None
current = None
for line in text.splitlines():
s = line.strip()
m = _FRAGMENT_KAPITEL_RE.match(s)
if m:
kapitel = m.group(1)
current = None
continue
m = _FRAGMENT_SECTION_RE.match(s)
if m:
current = {"kapitel": kapitel, "titel": m.group(1), "md": []}
sections.append(current)
continue
if current is not None:
current["md"].append(line)
for sec in sections:
sec["md"] = "\n".join(sec["md"]).strip()
return sections
async def _generate_onepager(
guide_id: str, topic: str, instructions: str, provider: str,
project: Path | None, content_path: Path,
) -> list[dict] | None:
ctx = GenContext(topic=topic, provider=provider, is_cancelled=lambda: guide_id in _cancelled, guide_id=guide_id)
# 3×3-Raster: 7 Karten mit festen Schlüsseln (Reihenfolge = Lesereihenfolge mobil)
KARTEN_KEYS = ("info", "eigenschaften", "beispiel", "zusammenhaenge", "voraussetzungen", "modern", "veraltet")
def karten_schema(data):
"""{"karten": {key: {titel, md}}} → Liste · sonst None."""
if not isinstance(data, dict):
return None
karten = data.get("karten")
if not isinstance(karten, dict):
return None
out = []
for key in KARTEN_KEYS:
k = karten.get(key)
if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("md"), str):
return None
titel, md = k["titel"].strip(), k["md"].strip()
if not titel or len(md) < 5: # abgebrochene/leere Karten sind ungültig
return None
out.append({"key": key, "titel": titel, "md": md})
return out
d, stem = content_path.parent, content_path.stem
recherche_path = d / f"{stem}.recherche.md"
recherche_check_path = d / f"{stem}.recherche-check.json"
karten_path = d / f"{stem}.karten.json"
check_path = d / f"{stem}.onepager-check.json"
# Projekte bekommen eigene Recherche-Dimensionen — Produkt-Fragen
# (Version, Lizenz, Alternativen) laufen dort ins Leere.
if project:
source = _prompt("OnePager-Quelle-Projekt", project=project)
recherche_template = "OnePager-Recherche-Projekt"
recherche_check_template = "OnePager-Recherche-Check-Projekt"
else:
source = _prompt("OnePager-Quelle-Thema", topic=topic)
recherche_template = "OnePager-Recherche"
recherche_check_template = "OnePager-Recherche-Check"
def recherche_payload(result=None):
if not recherche_path.exists():
return None
text = recherche_path.read_text(encoding="utf-8").strip()
return text or None
# Schritt 1: Recherche — vorhandene Datei wird übernommen (Resume)
recherche = recherche_payload()
if recherche is None:
await _set_step(guide_id, 0, "Recherchiere…")
status, recherche = await run_single_slot(
ctx, "OnePager-Recherche",
key=f"{guide_id}-recherche",
prompt=_prompt(recherche_template, topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)),
role="quick", capabilities="files" if project else "full",
payload=recherche_payload, timeout=_timeout("onepager_recherche"),
)
if status == CANCELLED:
return None
if status == FAILED:
await _fail(guide_id, "OnePager-Recherche fehlgeschlagen")
return None
# Schritt 2: Recherche-Prüfung — notiert Probleme; Anpassung macht ein Recherche-Agent
status, fixed = await _check_then_fix(
ctx, name="Recherche", step=1,
check_key=f"{guide_id}-recherche-check",
check_prompt=_prompt(recherche_check_template, topic=topic, recherche=recherche, out_path=recherche_check_path),
check_path=recherche_check_path, check_timeout=_timeout("onepager_verify"),
fix_key=f"{guide_id}-recherche-fix",
build_fix_prompt=lambda probleme: _prompt(
"OnePager-Recherche-Fix",
topic=topic, source=source, recherche=recherche,
probleme="\n".join(f"- {p}" for p in probleme),
out_path=recherche_path, extra=_extra(instructions),
),
fix_payload=recherche_payload, fix_timeout=_timeout("onepager_recherche"),
fix_role="quick", fix_caps="files" if project else "full",
)
if status == CANCELLED:
return None
if status == FAILED:
await _fail(guide_id, "Recherche-Prüfung fehlgeschlagen")
return None
if fixed is not None:
recherche = fixed
# Schritt 3: Bauen — Karten nur aus der Faktenbasis (Resume: gültige Datei wird übernommen)
karten = karten_schema(_json_datei(karten_path))
if karten is None:
await _set_step(guide_id, 2, "Baue OnePager…")
karten_path.unlink(missing_ok=True)
status, karten = await run_single_slot(
ctx, "OnePager-Bauen",
key=f"{guide_id}-bauen",
prompt=_prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)),
role="fast", capabilities="files",
payload=lambda result: karten_schema(_json_datei(karten_path)),
timeout=_timeout("onepager_bauen"),
)
if status == CANCELLED:
return None
if status == FAILED:
await _fail(guide_id, "OnePager-Bau fehlgeschlagen")
return None
def karten_block() -> str:
return "\n\n".join(f"### {k['titel']} [{k['key']}]\n{k['md']}" for k in karten)
# Schritt 4: Prüfung — notiert Probleme; Anpassung macht ein Bauen-Agent
status, fixed = await _check_then_fix(
ctx, name="OnePager", step=3,
check_key=f"{guide_id}-verify",
check_prompt=_prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block(), out_path=check_path),
check_path=check_path, check_timeout=_timeout("onepager_verify"),
fix_key=f"{guide_id}-karten-fix",
build_fix_prompt=lambda probleme: _prompt(
"OnePager-Fix",
topic=topic, recherche=recherche, karten=karten_block(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=karten_path, extra=_extra(instructions),
),
fix_payload=lambda result: karten_schema(_json_datei(karten_path)),
fix_timeout=_timeout("onepager_bauen"),
on_fix_invalid=lambda: atomic_write_json(
karten_path, {"karten": {k["key"]: {"titel": k["titel"], "md": k["md"]} for k in karten}},
),
)
if status == CANCELLED:
return None
if status == FAILED:
await _fail(guide_id, "OnePager-Prüfung fehlgeschlagen")
return None
if fixed is not None:
karten = fixed
sections = [
{"num": i, "title": k["titel"], "md": k["md"], "key": k["key"]}
for i, k in enumerate(karten, 1)
]
return [{"title": topic, "sections": sections}]
async def _generate_sections(
guide_id: str, topic: str, format_name: str, entries: dict[int, str],
facts: str, instructions: str, provider: str,
content_path: Path,
) -> list[dict] | None:
def is_cancelled() -> bool:
return guide_id in _cancelled
ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled, guide_id=guide_id)
spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8")
files = _guide_files(content_path)
bausteine_liste = "\n".join(f"- {t}" for t in entries.values())
n = len(entries)
anteil_min, anteil_max, minimum, zweck = FORMAT_ANTEIL[format_name]
k_min = min(n, max(minimum, math.ceil(anteil_min * n)))
k_max = min(n, max(k_min, math.floor(anteil_max * n)))
auswahl_auftrag = (
f"Wähle MINDESTENS {k_min} und HÖCHSTENS {k_max} der Bausteine und baue daraus {zweck}. "
"Wähle, was diesem Zweck dient — lass weg, was dafür nicht nötig ist."
)
# Schritt 1: Auswahl — vorhandene gültige Datei wird übernommen (Resume)
auswahl = _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)
if auswahl is None:
await _set_step(guide_id, 0, "Wähle Bausteine…")
files["auswahl"].unlink(missing_ok=True)
status, auswahl = await run_single_slot(
ctx, "Guide-Auswahl",
key=f"{guide_id}-auswahl",
prompt=_prompt(
"Guide-Auswahl",
topic=topic, format_name=format_name, bausteine=bausteine_liste,
auswahl_auftrag=auswahl_auftrag, out_path=files["auswahl"], extra=_extra(instructions),
),
role="guide", capabilities="files",
payload=lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max),
timeout=_timeout("guide_auswahl", n),
)
if status == CANCELLED:
return None
if status == FAILED:
await _fail(guide_id, "Auswahl fehlgeschlagen")
return None
def auswahl_titel() -> str:
return "\n".join(f"- {_titel(entries[num])}" for num in auswahl)
def auswahl_json() -> str:
return json.dumps({"bausteine": [_titel(entries[num]) for num in auswahl]}, ensure_ascii=False)
# Schritt 2: Auswahl-Prüfung — notiert Probleme; Anpassung macht ein Auswahl-Agent
status, fixed = await _check_then_fix(
ctx, name="Auswahl", step=1,
check_key=f"{guide_id}-auswahl-check",
check_prompt=_prompt(
"Guide-Auswahl-Check",
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
bausteine=bausteine_liste, auswahl=auswahl_titel(),
out_path=files["auswahl_check"], extra=_extra(instructions),
),
check_path=files["auswahl_check"], check_timeout=_timeout("guide_check", len(auswahl)),
fix_key=f"{guide_id}-auswahl-fix",
build_fix_prompt=lambda probleme: _prompt(
"Guide-Auswahl-Fix",
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
bausteine=bausteine_liste, auswahl=auswahl_titel(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=files["auswahl"], extra=_extra(instructions),
),
fix_payload=lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max),
fix_timeout=_timeout("guide_auswahl", n), fix_role="guide",
on_fix_invalid=lambda: atomic_write_text(files["auswahl"], auswahl_json()),
)
if status == CANCELLED:
return None
if status == FAILED:
await _fail(guide_id, "Auswahl-Prüfung fehlgeschlagen")
return None
if fixed is not None:
auswahl = fixed
sel_entries = {num: entries[num] for num in auswahl}
soll = len(sel_entries)
sel_liste = "\n".join(f"- {t}" for t in sel_entries.values())
# Schritt 3: Gliederung der festen Auswahl
plan = _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)
if plan is None:
await _set_step(guide_id, 2, "Plane Gliederung…")
files["gliederung"].unlink(missing_ok=True)
status, plan = await run_single_slot(
ctx, "Gliederung",
key=f"{guide_id}-gliederung",
prompt=_prompt(
"Guide-Gliederung",
topic=topic, format_name=format_name, bausteine=sel_liste,
out_path=files["gliederung"], extra=_extra(instructions),
),
role="guide", capabilities="files",
payload=lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll),
timeout=_timeout("plan", soll),
)
if status == CANCELLED:
return None
if status == FAILED:
await _fail(guide_id, "Gliederung fehlgeschlagen")
return None
def gliederung_text() -> str:
return "\n".join(_zuteilung_text([ch], {num: _titel(entries[num]) for num in ch["nums"]}) for ch in plan)
def gliederung_json() -> str:
return json.dumps(
{"kapitel": [{"titel": ch["title"], "bausteine": [_titel(entries[num]) for num in ch["nums"]]} for ch in plan]},
ensure_ascii=False,
)
# Schritt 4: Gliederungs-Prüfung
status, fixed = await _check_then_fix(
ctx, name="Gliederung", step=3,
check_key=f"{guide_id}-gliederung-check",
check_prompt=_prompt(
"Guide-Gliederung-Check",
topic=topic, format_name=format_name, zweck=zweck,
auswahl=auswahl_titel(), gliederung=gliederung_text(),
out_path=files["gliederung_check"], extra=_extra(instructions),
),
check_path=files["gliederung_check"], check_timeout=_timeout("guide_check", soll),
fix_key=f"{guide_id}-gliederung-fix",
build_fix_prompt=lambda probleme: _prompt(
"Guide-Gliederung-Fix",
topic=topic, format_name=format_name,
auswahl=auswahl_titel(), gliederung=gliederung_text(),
probleme="\n".join(f"- {p}" for p in probleme),
out_path=files["gliederung"], extra=_extra(instructions),
),
fix_payload=lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll),
fix_timeout=_timeout("plan", soll), fix_role="guide",
on_fix_invalid=lambda: atomic_write_text(files["gliederung"], gliederung_json()),
)
if status == CANCELLED:
return None
if status == FAILED:
await _fail(guide_id, "Gliederungs-Prüfung fehlgeschlagen")
return None
if fixed is not None:
plan = fixed
# Schritt 5: Schreiben — vorhandene Chunk-Dateien werden übernommen (Resume)
total_sections = sum(len(c["nums"]) for c in plan)
chunks = _split_chunks(plan, min(WRITER_MAX, max(1, math.ceil(total_sections / WRITER_SECTIONS))))
zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks]
chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks]
writer_count = len(zuteilungen)
paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)]
offen = [i for i, p in enumerate(paths) if not p.exists()]
if offen:
await _set_step(guide_id, 4, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…")
results = await asyncio.gather(*[
run_agent(
f"{guide_id}-w{i + 1}",
_prompt(
"Guide-Writer",
topic=topic, format_name=format_name, zuteilung=zuteilungen[i],
facts=facts, spec=spec, out_path=paths[i], extra=_extra(instructions),
),
_timeout("writer", chunk_sizes[i]), provider=provider, role="guide", capabilities="full",
)
for i in offen
], return_exceptions=True)
if is_cancelled():
return None
for i, r in zip(offen, results):
if isinstance(r, BaseException):
_log(topic, f"Writer {i + 1}: {type(r).__name__}: {r}")
elif r[0] != 0:
_log(topic, f"Writer {i + 1}: {_claude_error('Fehler', *r)}")
elif not paths[i].exists():
_log(topic, f"Writer {i + 1}: keine Ausgabedatei erstellt")
if not any(p.exists() for p in paths):
await _fail(guide_id, _gather_error("Writer-Fehler", list(results)))
return None
idx = _titel_index(entries)
by_num: dict[int, dict] = {}
for p in paths:
if not p.exists():
continue
for sec in _parse_fragment(p.read_text(encoding="utf-8")):
num = _titel_aufloesen(idx, sec["titel"])
if num is None:
_log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)")
elif num not in by_num:
by_num[num] = sec
if not by_num:
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
return None
# Schritt 6: Lese-Prüfung pro Writer-Paket — Fix beauftragt Writer nur mit beanstandeten Sections
chunk_nums = [[num for ch in chunk for num in ch["nums"] if num in by_num] for chunk in chunks]
check_paths = [content_path.parent / f"{content_path.stem}.lese-check-{i}.json" for i in range(1, writer_count + 1)]
offen_checks = [i for i, p in enumerate(check_paths) if _lese_probleme_schema(_json_datei(p)) is None and chunk_nums[i]]
if offen_checks:
await _set_step(guide_id, 5, f"Prüfe Lesbarkeit ({len(offen_checks)} Prüfer)…" if len(offen_checks) > 1 else "Prüfe Lesbarkeit…")
def sections_text(nums: list[int]) -> str:
return "\n\n".join(f"SECTION: {_titel(entries[num])}\n{by_num[num]['md']}" for num in nums)
slots = [{
"key": f"{guide_id}-lese-check-{i + 1}",
"prompt": _prompt(
"Guide-Lese-Check",
topic=topic, format_name=format_name, spec=spec,
sections=sections_text(chunk_nums[i]),
out_path=check_paths[i], extra=_extra(instructions),
),
"role": "fast", "capabilities": "files",
"payload": (lambda result, p=check_paths[i]: _lese_probleme_schema(_json_datei(p))),
} for i in offen_checks]
res = await _race(topic, "Lese-Prüfung", slots, len(slots), _timeout("lese_check", max(chunk_sizes)), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Lese-Prüfung fehlgeschlagen")
return None
probleme_by_num: dict[int, str] = {}
for p in check_paths:
for item in (_lese_probleme_schema(_json_datei(p)) or []):
num = _titel_aufloesen(idx, item["section"])
if num in by_num and num not in probleme_by_num:
probleme_by_num[num] = item["problem"]
if probleme_by_num:
_log(topic, f"Lese-Prüfung: {len(probleme_by_num)} Section(s) beanstandet")
await _set_step(guide_id, 5, f"Überarbeite {len(probleme_by_num)} Section(s)…")
fix_chunks = [[num for num in nums if num in probleme_by_num] for nums in chunk_nums]
fix_offen = [i for i, nums in enumerate(fix_chunks) if nums]
fix_paths = [content_path.parent / f"{content_path.stem}.fix-{i + 1}.md" for i in range(writer_count)]
def auftraege_text(nums: list[int]) -> str:
return "\n\n".join(
f"SECTION: {_titel(entries[num])}\nPROBLEM: {probleme_by_num[num]}\nAKTUELLER INHALT:\n{by_num[num]['md']}"
for num in nums
)
results = await asyncio.gather(*[
run_agent(
f"{guide_id}-fix-w{i + 1}",
_prompt(
"Guide-Sections-Fix",
topic=topic, format_name=format_name, facts=facts, spec=spec,
auftraege=auftraege_text(fix_chunks[i]),
out_path=fix_paths[i], extra=_extra(instructions),
),
_timeout("writer", len(fix_chunks[i])), provider=provider, role="guide", capabilities="full",
)
for i in fix_offen
], return_exceptions=True)
if is_cancelled():
return None
for i, r in zip(fix_offen, results):
if isinstance(r, BaseException) or (not isinstance(r, BaseException) and r[0] != 0):
_log(topic, f"Sections-Fix {i + 1} fehlgeschlagen — Original bleibt")
ersetzt = 0
for i in fix_offen:
if not fix_paths[i].exists():
continue
for sec in _parse_fragment(fix_paths[i].read_text(encoding="utf-8")):
num = _titel_aufloesen(idx, sec["titel"])
if num in probleme_by_num and sec["md"].strip():
by_num[num] = sec
ersetzt += 1
_log(topic, f"Lese-Prüfung: {ersetzt} Section(s) überarbeitet")
await _set_progress(guide_id, "Setze zusammen…")
chapters: list[dict] = []
for ch in plan:
sections = [
{"num": num, "title": _titel(entries[num]), "md": by_num[num]["md"]}
for num in ch["nums"] if num in by_num
]
if sections:
chapters.append({"title": ch["title"], "sections": sections})
geplant = {num for ch in plan for num in ch["nums"]}
missing = sorted(geplant - set(by_num))
if missing:
_log(topic, f"Sections fehlen in der Writer-Ausgabe: {[_titel(entries[n]) for n in missing]}")
if not chapters:
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
return None
return chapters
async def reconcile_guides() -> None:
"""DB↔Dateisystem abgleichen: status=done ohne Content-Datei → error.
Läuft beim Server-Start (nach init_db) — fängt Crashes zwischen
Datei-Write und Status-Update ab.
"""
for g in await list_guides():
if g["status"] == "done" and not guide_content_path(g["topic"], g["format"]).exists():
log.warning("[%s] Guide %s: done ohne Content-Datei — auf error gesetzt", g["topic"], g["id"])
now = datetime.now(timezone.utc).isoformat()
await update_guide(g["id"], status="error", error_msg="Inhalt fehlt — neu generieren", updated_at=now)
async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
async with _semaphore:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now)
content_path = guide_content_path(topic, format_name)
content_path.parent.mkdir(parents=True, exist_ok=True)
project = project_dir(topic) if project_dir(topic).is_dir() else None
try:
if guide_id in _cancelled:
return
if project:
await asyncio.to_thread(_pdfs_konvertieren, project)
# „Neu erstellen": fertiger Guide → kompletter Frischstart.
# Sonst sind Schritt-Dateien Reste eines Abbruchs/Fehlers → Resume.
if content_path.exists():
for p_alt in guide_slot_dateien(content_path):
p_alt.unlink(missing_ok=True)
if format_name == "OnePager":
chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path)
else:
alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8"))
if not alle:
await _fail(guide_id, "Keine Bausteine gefunden")
return
entries = _eindeutige_titel(alle)
facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema")
chapters = await _generate_sections(
guide_id, topic, format_name, entries,
facts, instructions, provider, content_path,
)
if chapters is None or guide_id in _cancelled:
return
atomic_write_json(content_path, {"topic": topic, "format": format_name, "chapters": chapters}, indent=1)
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="done", progress=None, step=None, updated_at=now)
except asyncio.TimeoutError:
await _fail(guide_id, "Timeout bei der Generierung")
except FileNotFoundError:
await _fail(guide_id, "Bausteine fehlen")
except Exception as e:
log.exception("[%s] Guide-Generierung fehlgeschlagen (%s)", topic, guide_id)
await _fail(guide_id, str(e)[:2000])
finally:
_cancelled.discard(guide_id)
# --- Tutor-Chat ---
def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str:
transcript = "\n".join(
f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}"
for m in messages
)
return _prompt(
"Chat",
topic=topic, format_name=format_name,
outline_block=outline.strip() or "(keine)",
section_block=section.strip() or "(kein Abschnitt erkannt)",
transcript=transcript,
)
async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str:
try:
prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages)
returncode, stdout, stderr = await run_agent(
"chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive"
)
if returncode != 0:
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
reply = stdout.strip()
return reply or "Entschuldigung, ich habe keine Antwort erhalten."
except Exception:
log.warning("[%s] Guide-Chat fehlgeschlagen", topic, exc_info=True)
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
# --- Elemente (persönliche Zusammenfassung) ---
def _element_fields(data: dict) -> dict | None:
"""Validiert KI-Element-JSON und normalisiert auf die DB-Felder."""
if not isinstance(data, dict):
return None
title = str(data.get("title", "")).strip()
if not title:
return None
listen = {}
for key in ("examples", "hints"):
raw = data.get(key, [])
listen[key] = [str(e).strip() for e in raw if str(e).strip()] if isinstance(raw, list) else []
return {
"title": title[:200],
"description": str(data.get("description", "")).strip(),
"examples": listen["examples"],
"hints": listen["hints"],
}
def _topic_context(topic: str, limit: int = 12000) -> str:
"""Bausteine + Guide-Inhalte des Themas als Kontext-Text (gekürzt)."""
parts: list[str] = []
bp = bausteine_path(topic)
if bp.exists():
parts.append(bp.read_text(encoding="utf-8"))
for fmt in ("FullGuide", "Guide", "MiniGuide", "OnePager"):
content = _json_datei(guide_content_path(topic, fmt))
if content:
for ch in content.get("chapters", []):
for sec in ch.get("sections", []):
parts.append(sec if isinstance(sec, str) else json.dumps(sec, ensure_ascii=False))
break # bester verfügbarer Guide reicht
text = "\n\n".join(parts).strip()
return text[:limit] if text else "(kein Material vorhanden)"
async def generate_element(topic: str, hint: str, provider: str = DEFAULT_PROVIDER) -> dict:
"""Erstellt Element-Felder per KI. Fallback: nur Titel aus dem Stichwort."""
fallback = {"title": hint.strip() or "Neues Element", "description": "", "examples": [], "hints": []}
try:
prompt = _prompt(
"Element-Create",
topic=topic, hint=hint.strip() or "(keins — wähle selbst ein Kernkonzept)",
context=_topic_context(topic),
)
returncode, stdout, _ = await run_agent(
"element-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive"
)
if returncode != 0:
return fallback
return _element_fields(_parse_json_text(stdout)) or fallback
except Exception:
log.warning("[%s] Element-Erstellung fehlgeschlagen", topic, exc_info=True)
return fallback
def _parse_suggestions(stdout: str) -> list[dict] | None:
"""Validiert Vorschlags-JSON aus KI-Output. None bei ungültigem JSON."""
data = _parse_json_text(stdout)
if not isinstance(data, dict):
return None
suggestions = []
for s in data.get("suggestions", []):
if not isinstance(s, dict):
continue
text = str(s.get("text", "")).strip()
target = s.get("target")
content = str(s.get("content", "")).strip()
if text and content and target in ("description", "examples", "hints"):
suggestions.append({"text": text, "target": target, "content": content})
return suggestions
async def check_element(element: dict, provider: str = DEFAULT_PROVIDER) -> list[dict] | None:
"""Zweischrittige Prüfung auf fehlende Infos: Recherche → Verifizieren. None bei Fehler."""
try:
element_json = json.dumps(
{k: element[k] for k in ("title", "description", "examples", "hints")},
ensure_ascii=False, indent=1,
)
context = _topic_context(element["topic"])
# Schritt 1: Recherche — breit Kandidaten sammeln
prompt = _prompt("Element-Check", topic=element["topic"], element_json=element_json, context=context)
returncode, stdout, _ = await run_agent(
"element-check-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive"
)
if returncode != 0:
return None
candidates = _parse_suggestions(stdout)
if candidates is None:
return None
if not candidates:
return []
# Schritt 2: Verifizieren — nur Wichtiges, nicht Redundantes durchlassen
prompt = _prompt(
"Element-Verify",
topic=element["topic"], element_json=element_json,
candidates_json=json.dumps({"suggestions": candidates}, ensure_ascii=False, indent=1),
context=context,
)
returncode, stdout, _ = await run_agent(
"element-verify-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive"
)
if returncode != 0:
return None
return _parse_suggestions(stdout)
except Exception:
log.warning("[%s] Element-Prüfung fehlgeschlagen", element.get("topic", "?"), exc_info=True)
return None
def _element_json(element: dict) -> str:
return json.dumps(
{k: element[k] for k in ("title", "description", "examples", "hints")},
ensure_ascii=False, indent=1,
)
def _validate_change(c, element: dict) -> dict | None:
"""Validiert einen Änderungs-Vorschlag aus KI-Output gegen das Element."""
if not isinstance(c, dict):
return None
text = str(c.get("text", "")).strip()
action = c.get("action")
target = c.get("target")
index = c.get("index")
content = str(c.get("content", "")).strip()
if not text or action not in ("entfernen", "anpassen", "hinzufuegen"):
return None
if target not in ("title", "description", "examples", "hints"):
return None
if action in ("anpassen", "hinzufuegen") and not content:
return None
if action == "entfernen" and target not in ("examples", "hints"):
return None
# Index nur für anpassen/entfernen in Listen-Feldern; muss existieren
if target in ("examples", "hints") and action in ("anpassen", "entfernen"):
if not isinstance(index, int) or not (0 <= index < len(element[target])):
return None
else:
index = None
return {"text": text, "action": action, "target": target, "index": index, "content": content}
async def chat_with_element(element: dict, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> tuple[str, list[dict]]:
"""Chat zum Element. Gibt (Antwort, Änderungs-Vorschläge) zurück — ändert nichts direkt."""
fehler = "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
try:
transcript = "\n".join(
f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}"
for m in messages
)
prompt = _prompt("Element-Chat", topic=element["topic"], element_json=_element_json(element), transcript=transcript)
returncode, stdout, _ = await run_agent(
"element-chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive"
)
if returncode != 0:
return fehler, []
data = _parse_json_text(stdout)
if not isinstance(data, dict):
return fehler, []
changes = [v for c in data.get("changes", []) if (v := _validate_change(c, element))]
reply = str(data.get("reply", "")).strip() or ("Vorschläge erstellt." if changes else fehler)
return reply, changes
except Exception:
log.warning("[%s] Element-Chat fehlgeschlagen", element.get("topic", "?"), exc_info=True)
return fehler, []
async def style_element(element: dict, provider: str = DEFAULT_PROVIDER) -> list[dict] | None:
"""Prüft ein Element auf die Stil-Regeln und schlägt Änderungen vor. None bei Fehler."""
try:
prompt = _prompt("Element-Stil", topic=element["topic"], element_json=_element_json(element))
returncode, stdout, _ = await run_agent(
"element-stil-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive"
)
if returncode != 0:
return None
data = _parse_json_text(stdout)
if not isinstance(data, dict):
return None
return [v for c in data.get("changes", []) if (v := _validate_change(c, element))]
except Exception:
log.warning("[%s] Stil-Prüfung fehlgeschlagen", element.get("topic", "?"), exc_info=True)
return None
async def refine_suggestion(element: dict, suggestion: dict, instruction: str, provider: str = DEFAULT_PROVIDER) -> dict | None:
"""Überarbeitet einen einzelnen Vorschlag nach Nutzer-Anweisung. None bei Fehler."""
try:
prompt = _prompt(
"Element-Refine",
topic=element["topic"], element_json=_element_json(element),
suggestion_json=json.dumps(suggestion, ensure_ascii=False, indent=1),
instruction=instruction,
)
returncode, stdout, _ = await run_agent(
"element-refine-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive"
)
if returncode != 0:
return None
data = _parse_json_text(stdout)
if not isinstance(data, dict):
return None
return _validate_change(data.get("change"), element)
except Exception:
log.warning("[%s] Vorschlags-Überarbeitung fehlgeschlagen", element.get("topic", "?"), exc_info=True)
return None