diff --git a/backend/bausteine.py b/backend/bausteine.py new file mode 100644 index 0000000..7945128 --- /dev/null +++ b/backend/bausteine.py @@ -0,0 +1,367 @@ +"""Bausteine-Pipeline: 4x Recherche (3 nötig) → 2x Auswahl (1) → Prüfung — reines Inventar, unsortiert.""" + +import asyncio +import logging +import shutil +import subprocess +from pathlib import Path + +from agents import kill_process +from config import DEFAULT_PROVIDER +from fsutil import atomic_write_text +from jsonio import read_json_file as _json_datei +from paths import arbeit_dir, bausteine_path, project_dir +from pipeline import ( + CANCELLED, FAILED, GenContext, _extra, _log, _prompt, _race, + _semaphore, _timeout, run_single_slot, +) +from textkit import _eindeutige_titel, _parse_auswahl, _titel, _titel_aufloesen, _titel_index + +log = logging.getLogger("creator.bausteine") + +_bausteine_progress: dict[str, str] = {} +_bausteine_errors: dict[str, str] = {} +_bausteine_cancelled: set[str] = set() +_bausteine_step: dict[str, int] = {} + +BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung") + + +def _bausteine_steps(topic: str) -> tuple: + """Projekte haben einen 4. Schritt: Themenfeld-Ergänzung per Web-Recherche.""" + if project_dir(topic).is_dir(): + return BAUSTEINE_STEPS + ("Ergänzung",) + return BAUSTEINE_STEPS + + +def _bausteine_files(topic: str) -> dict: + arbeit = arbeit_dir(topic) + return { + "final": bausteine_path(topic), + "arbeit": arbeit, + "recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)], + "auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)], + "auswahl_check": arbeit / "auswahl-check.json", + "ergaenzung": arbeit / "ergaenzung.json", + } + + +def _alle_slot_dateien(files: dict) -> list[Path]: + return [*files["recherche"], *files["auswahl"], files["auswahl_check"], files["ergaenzung"]] + + +def cancel_bausteine(topic: str) -> bool: + if topic not in _bausteine_progress: + return False + _bausteine_cancelled.add(topic) + kill_process(f"bausteine-{topic}-") + return True + + +def _resume_step(topic: str) -> int: + """Erster noch offener Schritt anhand der persistierten Zwischendateien.""" + files = _bausteine_files(topic) + if sum(p.exists() for p in files["recherche"]) < 3: + return 0 + if not any(p.exists() for p in files["auswahl"]): + return 1 + if not files["auswahl_check"].exists(): + return 2 + if project_dir(topic).is_dir() and not files["ergaenzung"].exists(): + return 3 + return len(_bausteine_steps(topic)) + + +def bausteine_status(topic: str) -> dict: + steps = _bausteine_steps(topic) + ready = bausteine_path(topic).exists() + generating = topic in _bausteine_progress + partial = False + if generating: + current = _bausteine_step.get(topic) + states = [ + "pending" if current is None else "done" if i < current else "active" if i == current else "pending" + for i in range(len(steps)) + ] + elif ready: + states = ["done"] * len(steps) + else: + nxt = _resume_step(topic) + partial = nxt > 0 + states = ["done" if i < nxt else "pending" for i in range(len(steps))] + return { + "ready": ready, + "generating": generating, + "progress": _bausteine_progress.get(topic), + "error": _bausteine_errors.get(topic), + "partial": partial, + "steps": [{"label": label, "state": s} for label, s in zip(steps, states)], + } + + +def active_bausteine() -> list[dict]: + return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()] + + +def reset_bausteine(topic: str) -> None: + files = _bausteine_files(topic) + files["final"].unlink(missing_ok=True) + shutil.rmtree(files["arbeit"], ignore_errors=True) + _bausteine_errors.pop(topic, None) + + +def _ergaenzung_schema(data): + """{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None.""" + if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list): + return None + out = [] + for b in data["bausteine"]: + if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str): + return None + titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip() + if not titel: + return None + out.append((titel, beschreibung)) + return out + + +def _pdfs_konvertieren(project: Path) -> None: + """PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern. + + Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die + .txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet. + Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt + unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten). + """ + pdfs = list(project.rglob("*.pdf")) + if not pdfs: + return + if shutil.which("pdftotext") is None: + raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden") + for pdf in pdfs: + txt = pdf.with_suffix(".txt") + if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime: + continue + try: + subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120) + _log(project.name, f"PDF konvertiert: {pdf.name} → {txt.name}") + except Exception as e: + raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e + + +def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str: + if project: + source = _prompt("Bausteine-Quelle-Projekt", project=project) + else: + source = _prompt("Bausteine-Quelle-Thema", topic=topic) + return _prompt( + "Bausteine-Recherche", + topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions), + ) + + +def _file_payload(path: Path): + """Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält.""" + if not path.exists(): + return None + text = path.read_text(encoding="utf-8") + return text if _parse_auswahl(text) else None + + +def _auswahl_payload(path: Path): + if not path.exists(): + return None + text = path.read_text(encoding="utf-8") + entries = _parse_auswahl(text) + return (text, entries) if entries else None + + +def _auswahl_check_schema(data): + """{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß.""" + if not isinstance(data, dict): + return None + nach = data.get("nachtraege", []) + streich = data.get("streichen", []) + if not isinstance(nach, list) or not isinstance(streich, list): + return None + if not all(isinstance(x, str) for x in [*nach, *streich]): + return None + return {"nachtraege": nach, "streichen": streich} + + +async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: + if topic in _bausteine_progress: + return + _bausteine_progress[topic] = "Wartend…" + _bausteine_errors.pop(topic, None) + + files = _bausteine_files(topic) + final_path = files["final"] + project = project_dir(topic) if project_dir(topic).is_dir() else None + + def set_p(msg: str, step: int | None = None) -> None: + _bausteine_progress[topic] = msg + if step is not None: + _bausteine_step[topic] = step + + def is_cancelled() -> bool: + return topic in _bausteine_cancelled + + def abgebrochen() -> None: + _bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten" + + ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled) + + try: + async with _semaphore: + files["arbeit"].mkdir(parents=True, exist_ok=True) + if project: + await asyncio.to_thread(_pdfs_konvertieren, project) + # „Neu erstellen": fertige Bausteine → kompletter Frischstart. + # Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume. + if final_path.exists(): + for p_alt in _alle_slot_dateien(files): + p_alt.unlink(missing_ok=True) + + # Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen + recherchen: list[str] = [] + offen = [] + for i, path in enumerate(files["recherche"], 1): + text = _file_payload(path) + if text is not None and len(recherchen) < 3: + recherchen.append(text) + else: + offen.append((i, path)) + vorhanden = len(recherchen) + set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0) + if vorhanden < 3: + caps = "files" if project else "full" + slots = [ + { + "key": f"bausteine-{topic}-recherche-{i}", + "prompt": _build_recherche_prompt(topic, path, instructions, project), + "role": "quick", "capabilities": caps, + "payload": (lambda result, p=path: _file_payload(p)), + } + for i, path in offen + ] + neue = await _race( + topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider, + on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"), + cancelled=is_cancelled, + ) + if is_cancelled(): + abgebrochen() + return + if neue is None: + _bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)" + return + recherchen += neue + + # Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen + n_est = max(len(_parse_auswahl(t)) for t in recherchen) + bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None) + if bestehende is not None: + flat, entries = bestehende + else: + set_p("Konsolidiere Recherche…", step=1) + results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1)) + slots = [ + { + "key": f"bausteine-{topic}-auswahl-{i}", + "prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path), + "role": "fast", "capabilities": "files", + "payload": (lambda result, p=path: _auswahl_payload(p)), + } + for i, path in enumerate(files["auswahl"], 1) + ] + auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled) + if is_cancelled(): + abgebrochen() + return + if auswahl is None: + _bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)" + return + flat, entries = auswahl[0] + + # Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal) + set_p("Prüfe Auswahl…", step=2) + check_path = files["auswahl_check"] + patch = _auswahl_check_schema(_json_datei(check_path)) + if patch is None: + check_path.unlink(missing_ok=True) + titel_listen = "\n\n".join( + f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values()) + for i, text in enumerate(recherchen, 1) + ) + status, check = await run_single_slot( + ctx, "Auswahl-Check", + key=f"bausteine-{topic}-auswahlcheck-1", + prompt=_prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path), + role="fast", capabilities="files", + payload=lambda result: _auswahl_check_schema(_json_datei(check_path)), + timeout=_timeout("auswahl_check", len(entries)), + ) + if status == CANCELLED: + abgebrochen() + return + if status == FAILED: + _log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort") + else: + patch = check + if patch is not None and (patch["streichen"] or patch["nachtraege"]): + idx = _titel_index(entries) + weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None} + if weg: + _log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}") + entries = {n: t for n, t in entries.items() if n not in weg} + if patch["nachtraege"]: + _log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine") + texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"]) + entries = {i: t for i, t in enumerate(texts, 1)} + + # Schritt 4 (nur Projekte): Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt, + # ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung]. + if project: + set_p("Ergänze Themenfeld…", step=3) + erg_path = files["ergaenzung"] + ergaenzungen = _ergaenzung_schema(_json_datei(erg_path)) + if ergaenzungen is None: + erg_path.unlink(missing_ok=True) + status, ergaenzungen = await run_single_slot( + ctx, "Ergänzung", + key=f"bausteine-{topic}-ergaenzung-1", + prompt=_prompt( + "Bausteine-Ergaenzung", + topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()), + out_path=erg_path, extra=_extra(instructions), + ), + role="quick", capabilities="full", + payload=lambda result: _ergaenzung_schema(_json_datei(erg_path)), + timeout=_timeout("ergaenzung"), + ) + if status == CANCELLED: + abgebrochen() + return + if status == FAILED: + _bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)" + return + idx = _titel_index(entries) + neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None] + if neu: + _log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt") + start = max(entries, default=0) + 1 + for off, (t, b) in enumerate(neu): + entries[start + off] = f"{t} — {b} [Ergänzung]" + + # Titel eindeutig machen und unsortiertes Inventar schreiben + entries = _eindeutige_titel(entries) + atomic_write_text(final_path, "\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n") + except Exception as e: + log.exception("[%s] Bausteine-Generierung fehlgeschlagen", topic) + _bausteine_errors[topic] = str(e)[:2000] + finally: + # Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit. + _bausteine_progress.pop(topic, None) + _bausteine_step.pop(topic, None) + _bausteine_cancelled.discard(topic) diff --git a/backend/elements.py b/backend/elements.py new file mode 100644 index 0000000..a408fb3 --- /dev/null +++ b/backend/elements.py @@ -0,0 +1,258 @@ +"""Elemente (persönliche Zusammenfassung) und Tutor-Chat zum Guide.""" + +import json +import logging +import uuid + +from agents import run_agent +from config import DEFAULT_PROVIDER +from jsonio import parse_json_text as _parse_json_text, read_json_file as _json_datei +from paths import bausteine_path, guide_content_path +from pipeline import _prompt + +log = logging.getLogger("creator.elements") + + +# --- Tutor-Chat --- + +def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str: + transcript = "\n".join( + f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}" + for m in messages + ) + return _prompt( + "Chat", + topic=topic, format_name=format_name, + outline_block=outline.strip() or "(keine)", + section_block=section.strip() or "(kein Abschnitt erkannt)", + transcript=transcript, + ) + + +async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str: + try: + prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages) + returncode, stdout, stderr = await run_agent( + "chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" + ) + if returncode != 0: + return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." + reply = stdout.strip() + return reply or "Entschuldigung, ich habe keine Antwort erhalten." + except Exception: + log.warning("[%s] Guide-Chat fehlgeschlagen", topic, exc_info=True) + return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." + + +# --- Elemente --- + +def _element_fields(data: dict) -> dict | None: + """Validiert KI-Element-JSON und normalisiert auf die DB-Felder.""" + if not isinstance(data, dict): + return None + title = str(data.get("title", "")).strip() + if not title: + return None + listen = {} + for key in ("examples", "hints"): + raw = data.get(key, []) + listen[key] = [str(e).strip() for e in raw if str(e).strip()] if isinstance(raw, list) else [] + return { + "title": title[:200], + "description": str(data.get("description", "")).strip(), + "examples": listen["examples"], + "hints": listen["hints"], + } + + +def _topic_context(topic: str, limit: int = 12000) -> str: + """Bausteine + Guide-Inhalte des Themas als Kontext-Text (gekürzt).""" + parts: list[str] = [] + bp = bausteine_path(topic) + if bp.exists(): + parts.append(bp.read_text(encoding="utf-8")) + for fmt in ("FullGuide", "Guide", "MiniGuide", "OnePager"): + content = _json_datei(guide_content_path(topic, fmt)) + if content: + for ch in content.get("chapters", []): + for sec in ch.get("sections", []): + parts.append(sec if isinstance(sec, str) else json.dumps(sec, ensure_ascii=False)) + break # bester verfügbarer Guide reicht + text = "\n\n".join(parts).strip() + return text[:limit] if text else "(kein Material vorhanden)" + + +async def generate_element(topic: str, hint: str, provider: str = DEFAULT_PROVIDER) -> dict: + """Erstellt Element-Felder per KI. Fallback: nur Titel aus dem Stichwort.""" + fallback = {"title": hint.strip() or "Neues Element", "description": "", "examples": [], "hints": []} + try: + prompt = _prompt( + "Element-Create", + topic=topic, hint=hint.strip() or "(keins — wähle selbst ein Kernkonzept)", + context=_topic_context(topic), + ) + returncode, stdout, _ = await run_agent( + "element-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" + ) + if returncode != 0: + return fallback + return _element_fields(_parse_json_text(stdout)) or fallback + except Exception: + log.warning("[%s] Element-Erstellung fehlgeschlagen", topic, exc_info=True) + return fallback + + +def _parse_suggestions(stdout: str) -> list[dict] | None: + """Validiert Vorschlags-JSON aus KI-Output. None bei ungültigem JSON.""" + data = _parse_json_text(stdout) + if not isinstance(data, dict): + return None + suggestions = [] + for s in data.get("suggestions", []): + if not isinstance(s, dict): + continue + text = str(s.get("text", "")).strip() + target = s.get("target") + content = str(s.get("content", "")).strip() + if text and content and target in ("description", "examples", "hints"): + suggestions.append({"text": text, "target": target, "content": content}) + return suggestions + + +async def check_element(element: dict, provider: str = DEFAULT_PROVIDER) -> list[dict] | None: + """Zweischrittige Prüfung auf fehlende Infos: Recherche → Verifizieren. None bei Fehler.""" + try: + element_json = json.dumps( + {k: element[k] for k in ("title", "description", "examples", "hints")}, + ensure_ascii=False, indent=1, + ) + context = _topic_context(element["topic"]) + + # Schritt 1: Recherche — breit Kandidaten sammeln + prompt = _prompt("Element-Check", topic=element["topic"], element_json=element_json, context=context) + returncode, stdout, _ = await run_agent( + "element-check-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" + ) + if returncode != 0: + return None + candidates = _parse_suggestions(stdout) + if candidates is None: + return None + if not candidates: + return [] + + # Schritt 2: Verifizieren — nur Wichtiges, nicht Redundantes durchlassen + prompt = _prompt( + "Element-Verify", + topic=element["topic"], element_json=element_json, + candidates_json=json.dumps({"suggestions": candidates}, ensure_ascii=False, indent=1), + context=context, + ) + returncode, stdout, _ = await run_agent( + "element-verify-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" + ) + if returncode != 0: + return None + return _parse_suggestions(stdout) + except Exception: + log.warning("[%s] Element-Prüfung fehlgeschlagen", element.get("topic", "?"), exc_info=True) + return None + + +def _element_json(element: dict) -> str: + return json.dumps( + {k: element[k] for k in ("title", "description", "examples", "hints")}, + ensure_ascii=False, indent=1, + ) + + +def _validate_change(c, element: dict) -> dict | None: + """Validiert einen Änderungs-Vorschlag aus KI-Output gegen das Element.""" + if not isinstance(c, dict): + return None + text = str(c.get("text", "")).strip() + action = c.get("action") + target = c.get("target") + index = c.get("index") + content = str(c.get("content", "")).strip() + if not text or action not in ("entfernen", "anpassen", "hinzufuegen"): + return None + if target not in ("title", "description", "examples", "hints"): + return None + if action in ("anpassen", "hinzufuegen") and not content: + return None + if action == "entfernen" and target not in ("examples", "hints"): + return None + # Index nur für anpassen/entfernen in Listen-Feldern; muss existieren + if target in ("examples", "hints") and action in ("anpassen", "entfernen"): + if not isinstance(index, int) or not (0 <= index < len(element[target])): + return None + else: + index = None + return {"text": text, "action": action, "target": target, "index": index, "content": content} + + +async def chat_with_element(element: dict, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> tuple[str, list[dict]]: + """Chat zum Element. Gibt (Antwort, Änderungs-Vorschläge) zurück — ändert nichts direkt.""" + fehler = "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." + try: + transcript = "\n".join( + f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}" + for m in messages + ) + prompt = _prompt("Element-Chat", topic=element["topic"], element_json=_element_json(element), transcript=transcript) + returncode, stdout, _ = await run_agent( + "element-chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" + ) + if returncode != 0: + return fehler, [] + data = _parse_json_text(stdout) + if not isinstance(data, dict): + return fehler, [] + changes = [v for c in data.get("changes", []) if (v := _validate_change(c, element))] + reply = str(data.get("reply", "")).strip() or ("Vorschläge erstellt." if changes else fehler) + return reply, changes + except Exception: + log.warning("[%s] Element-Chat fehlgeschlagen", element.get("topic", "?"), exc_info=True) + return fehler, [] + + +async def style_element(element: dict, provider: str = DEFAULT_PROVIDER) -> list[dict] | None: + """Prüft ein Element auf die Stil-Regeln und schlägt Änderungen vor. None bei Fehler.""" + try: + prompt = _prompt("Element-Stil", topic=element["topic"], element_json=_element_json(element)) + returncode, stdout, _ = await run_agent( + "element-stil-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" + ) + if returncode != 0: + return None + data = _parse_json_text(stdout) + if not isinstance(data, dict): + return None + return [v for c in data.get("changes", []) if (v := _validate_change(c, element))] + except Exception: + log.warning("[%s] Stil-Prüfung fehlgeschlagen", element.get("topic", "?"), exc_info=True) + return None + + +async def refine_suggestion(element: dict, suggestion: dict, instruction: str, provider: str = DEFAULT_PROVIDER) -> dict | None: + """Überarbeitet einen einzelnen Vorschlag nach Nutzer-Anweisung. None bei Fehler.""" + try: + prompt = _prompt( + "Element-Refine", + topic=element["topic"], element_json=_element_json(element), + suggestion_json=json.dumps(suggestion, ensure_ascii=False, indent=1), + instruction=instruction, + ) + returncode, stdout, _ = await run_agent( + "element-refine-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" + ) + if returncode != 0: + return None + data = _parse_json_text(stdout) + if not isinstance(data, dict): + return None + return _validate_change(data.get("change"), element) + except Exception: + log.warning("[%s] Vorschlags-Überarbeitung fehlgeschlagen", element.get("topic", "?"), exc_info=True) + return None diff --git a/backend/generator.py b/backend/generator.py deleted file mode 100644 index fe06d28..0000000 --- a/backend/generator.py +++ /dev/null @@ -1,1593 +0,0 @@ -import asyncio -import json -import logging -import math -import shutil -import subprocess -import re -import uuid -from dataclasses import dataclass -from datetime import datetime, timezone -from pathlib import Path -from typing import Callable - -from agents import run_agent, kill_process -from config import ( - DEFAULT_PROVIDER, - FORMAT_ANTEIL, - TEMPLATES_DIR, - TIMEOUTS, - MAX_CONCURRENT_GENERATIONS, -) -from database import list_guides, update_guide -from fsutil import atomic_write_json, atomic_write_text -from jsonio import parse_json_text as _parse_json_text, read_json_file as _json_datei -from paths import arbeit_dir, bausteine_path, guide_content_path, project_dir - -_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS) -_cancelled: set[str] = set() - - -async def cancel_guide(guide_id: str) -> bool: - _cancelled.add(guide_id) - kill_process(guide_id) - now = datetime.now(timezone.utc).isoformat() - await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now) - return True - - -async def _set_progress(guide_id: str, progress: str) -> None: - now = datetime.now(timezone.utc).isoformat() - await update_guide(guide_id, progress=progress, updated_at=now) - - -def _prompt(name: str, **kwargs) -> str: - template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8") - return template.format(**kwargs) - - -def _extra(instructions: str) -> str: - return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else "" - - -log = logging.getLogger("creator.generator") - - -def _log(topic: str, msg: str) -> None: - log.info("[%s] %s", topic, msg) - - -def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str: - stderr = (stderr or "").strip() - if stderr: - return f"{label}: {stderr[:1000]}" - tail = (stdout or "").strip()[-500:] - if tail: - return f"{label} (exit {returncode}, stderr leer): …{tail}" - return f"{label} (exit {returncode}, ohne Ausgabe)" - - -def _gather_error(label: str, results: list) -> str: - for r in results: - if isinstance(r, BaseException): - return f"{label}: {type(r).__name__}: {r}" - returncode, stdout, stderr = r - if returncode != 0: - return _claude_error(label, returncode, stdout, stderr) - return f"{label}: kein verwertbares Ergebnis" - - -async def _fail(guide_id: str, msg: str) -> None: - now = datetime.now(timezone.utc).isoformat() - await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now) - - -def _norm_titel(s: str) -> str: - """Normalisiert einen Titel für den Schlüssel-Vergleich.""" - s = re.sub(r"[`'\"<>]", "", s) - return re.sub(r"\s+", " ", s).strip().lower() - - -def _titel(entry: str) -> str: - return entry.split(" — ")[0].strip() or entry - - -def _eindeutige_titel(entries: dict[int, str]) -> dict[int, str]: - """Macht Titel eindeutig (Suffix " (2)", " (3)" …), damit sie als Schlüssel taugen.""" - seen: dict[str, int] = {} - out: dict[int, str] = {} - for num, text in entries.items(): - titel = _titel(text) - key = _norm_titel(titel) - seen[key] = seen.get(key, 0) + 1 - if seen[key] > 1: - rest = text.split(" — ", 1) - text = f"{titel} ({seen[key]})" + (f" — {rest[1]}" if len(rest) == 2 else "") - # zweiter Durchlauf nicht nötig: Suffixe kollidieren praktisch nicht - out[num] = text - return out - - -def _titel_index(entries: dict[int, str]) -> dict[str, int]: - return {_norm_titel(_titel(text)): num for num, text in entries.items()} - - -def _timeout(step: str, n: int = 0) -> int: - base, per = TIMEOUTS[step] - return base + per * n - - -_MAX_RESTARTS = 2 - - -async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None: - """Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse. - - Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)` - prüft die Gültigkeit und liefert das Slot-Ergebnis oder None. - Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das - Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt. - `cancelled()` → True bricht ab (keine Restarts, Rückgabe None). - """ - attempts = {i: 0 for i in range(len(slots))} - tasks: dict[asyncio.Task, int] = {} - - def spawn(i: int) -> None: - slot = slots[i] - task = asyncio.create_task(run_agent( - slot["key"], slot["prompt"], timeout, - provider=provider, role=slot["role"], capabilities=slot["capabilities"], - )) - tasks[task] = i - - for i in range(len(slots)): - spawn(i) - - results: list = [] - try: - while tasks: - if cancelled and cancelled(): - return None - done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) - for task in done: - i = tasks.pop(task) - payload, err = None, None - try: - result = task.result() - if result[0] != 0: - err = _claude_error("Fehler", *result) - else: - payload = slots[i]["payload"](result) - if payload is None: - err = "Ergebnis ungültig/nicht parsebar" - except asyncio.TimeoutError: - err = f"Timeout nach {timeout}s" - except Exception as e: - err = f"{type(e).__name__}: {e}" - - if payload is not None: - results.append(payload) - if on_update: - on_update(len(results)) - if len(results) >= quorum: - return results - continue - - _log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}") - attempts[i] += 1 - if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()): - spawn(i) - _log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)") - return None - finally: - for task, i in tasks.items(): - kill_process(slots[i]["key"]) - task.cancel() - if tasks: - await asyncio.gather(*tasks.keys(), return_exceptions=True) - - -@dataclass -class GenContext: - """Durchgereichte Pipeline-Parameter — erspart lange Argument-Signaturen.""" - topic: str - provider: str - is_cancelled: Callable[[], bool] - guide_id: str | None = None - - -# Ergebnis-Status von run_single_slot/_check_then_fix -OK, CANCELLED, FAILED = "ok", "cancelled", "failed" - - -async def run_single_slot( - ctx: GenContext, label: str, *, - key: str, prompt: str, role: str, capabilities: str, payload, timeout: int, -) -> tuple[str, object]: - """Ein Agent, ein gültiges Ergebnis (Race mit Quorum 1). - - → (OK, wert) | (CANCELLED, None) | (FAILED, None) - """ - slots = [{"key": key, "prompt": prompt, "role": role, "capabilities": capabilities, "payload": payload}] - res = await _race(ctx.topic, label, slots, 1, timeout, ctx.provider, cancelled=ctx.is_cancelled) - if ctx.is_cancelled(): - return CANCELLED, None - if res is None: - return FAILED, None - return OK, res[0] - - -async def _check_then_fix( - ctx: GenContext, *, name: str, step: int, - check_key: str, check_prompt: str, check_path: Path, check_timeout: int, - fix_key: str, build_fix_prompt, fix_payload, fix_timeout: int, - fix_role: str = "fast", fix_caps: str = "files", - on_fix_invalid=None, -) -> tuple[str, object]: - """Check→Fix-Muster: Prüf-Agent notiert Probleme (JSON), Fix-Agent behebt sie. - - Resume: existierende Check-Datei überspringt den ganzen Schritt. - Check ist fatal (FAILED), Fix nicht — Original bleibt; on_fix_invalid kann - das kanonische Original zurückschreiben, falls der Fix-Agent die - Artefakt-Datei zerschrieben hat. - Lese-Check (Multi-Slot, Section-genau) und Bausteine-Auswahl-Check - (Patch-Semantik) passen bewusst NICHT in dieses Muster. - → (OK, neues_artefakt | None=unverändert) | (CANCELLED, None) | (FAILED, None) - """ - if check_path.exists(): - return OK, None - await _set_step(ctx.guide_id, step, f"Prüfe {name}…") - status, probleme = await run_single_slot( - ctx, f"{name}-Prüfung", key=check_key, prompt=check_prompt, - role="fast", capabilities="files", - payload=lambda result: _probleme_schema(_json_datei(check_path)), - timeout=check_timeout, - ) - if status != OK: - return status, None - if not probleme: - return OK, None - _log(ctx.topic, f"{name}-Prüfung: {len(probleme)} Problem(e) notiert") - await _set_step(ctx.guide_id, step, f"Passe {name} an…") - status, fixed = await run_single_slot( - ctx, f"{name}-Fix", key=fix_key, prompt=build_fix_prompt(probleme), - role=fix_role, capabilities=fix_caps, payload=fix_payload, timeout=fix_timeout, - ) - if status == CANCELLED: - return CANCELLED, None - if status == FAILED: - _log(ctx.topic, f"{name}-Fix ungültig — Original bleibt") - if on_fix_invalid: - on_fix_invalid() - return OK, None - return OK, fixed - - -# --- Bausteine-Pipeline: 4x Recherche (3) → 2x Auswahl (1) → Prüfung — reines Inventar, unsortiert --- - -_bausteine_progress: dict[str, str] = {} -_bausteine_errors: dict[str, str] = {} -_bausteine_cancelled: set[str] = set() -_bausteine_step: dict[str, int] = {} - -BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung") -_CATEGORIES = ("KERN", "WICHTIG", "REST") # nur noch für den Altformat-Reader - - -def _bausteine_steps(topic: str) -> tuple: - """Projekte haben einen 4. Schritt: Themenfeld-Ergänzung per Web-Recherche.""" - if project_dir(topic).is_dir(): - return BAUSTEINE_STEPS + ("Ergänzung",) - return BAUSTEINE_STEPS - - -def _bausteine_files(topic: str) -> dict: - arbeit = arbeit_dir(topic) - return { - "final": bausteine_path(topic), - "arbeit": arbeit, - "recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)], - "auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)], - "auswahl_check": arbeit / "auswahl-check.json", - "ergaenzung": arbeit / "ergaenzung.json", - } - - -def _alle_slot_dateien(files: dict) -> list[Path]: - return [*files["recherche"], *files["auswahl"], files["auswahl_check"], files["ergaenzung"]] - - -def cancel_bausteine(topic: str) -> bool: - if topic not in _bausteine_progress: - return False - _bausteine_cancelled.add(topic) - kill_process(f"bausteine-{topic}-") - return True - - -def _resume_step(topic: str) -> int: - """Erster noch offener Schritt anhand der persistierten Zwischendateien.""" - files = _bausteine_files(topic) - if sum(p.exists() for p in files["recherche"]) < 3: - return 0 - if not any(p.exists() for p in files["auswahl"]): - return 1 - if not files["auswahl_check"].exists(): - return 2 - if project_dir(topic).is_dir() and not files["ergaenzung"].exists(): - return 3 - return len(_bausteine_steps(topic)) - - -def bausteine_status(topic: str) -> dict: - steps = _bausteine_steps(topic) - ready = bausteine_path(topic).exists() - generating = topic in _bausteine_progress - partial = False - if generating: - current = _bausteine_step.get(topic) - states = [ - "pending" if current is None else "done" if i < current else "active" if i == current else "pending" - for i in range(len(steps)) - ] - elif ready: - states = ["done"] * len(steps) - else: - nxt = _resume_step(topic) - partial = nxt > 0 - states = ["done" if i < nxt else "pending" for i in range(len(steps))] - return { - "ready": ready, - "generating": generating, - "progress": _bausteine_progress.get(topic), - "error": _bausteine_errors.get(topic), - "partial": partial, - "steps": [{"label": label, "state": s} for label, s in zip(steps, states)], - } - - -def active_bausteine() -> list[dict]: - return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()] - - -def reset_bausteine(topic: str) -> None: - files = _bausteine_files(topic) - files["final"].unlink(missing_ok=True) - shutil.rmtree(files["arbeit"], ignore_errors=True) - _bausteine_errors.pop(topic, None) - - -def _ergaenzung_schema(data): - """{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None.""" - if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list): - return None - out = [] - for b in data["bausteine"]: - if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str): - return None - titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip() - if not titel: - return None - out.append((titel, beschreibung)) - return out - - -def _pdfs_konvertieren(project: Path) -> None: - """PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern. - - Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die - .txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet. - Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt - unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten). - """ - pdfs = list(project.rglob("*.pdf")) - if not pdfs: - return - if shutil.which("pdftotext") is None: - raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden") - for pdf in pdfs: - txt = pdf.with_suffix(".txt") - if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime: - continue - try: - subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120) - _log(project.name, f"PDF konvertiert: {pdf.name} → {txt.name}") - except Exception as e: - raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e - - -def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str: - if project: - source = _prompt("Bausteine-Quelle-Projekt", project=project) - else: - source = _prompt("Bausteine-Quelle-Thema", topic=topic) - return _prompt( - "Bausteine-Recherche", - topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions), - ) - - -def _parse_auswahl(text: str) -> dict[int, str]: - """Parst eine Baustein-Liste: `N. Titel — Kurzbeschreibung` pro Zeile.""" - entries: dict[int, str] = {} - last = None - for line in text.splitlines(): - m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line) - if m: - last = int(m.group(1)) - entries[last] = m.group(2) - elif last is not None and line.strip(): - entries[last] += " " + line.strip() - return entries - - -def _parse_kategorien(text: str) -> dict[str, list[str]]: - """Altformat-Reader: finale Baustein-Datei mit ## KERN/WICHTIG/REST-Abschnitten.""" - cats: dict[str, list[str]] = {} - current = None - for line in text.splitlines(): - s = line.strip() - m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE) - if m: - current = m.group(1).upper() - cats.setdefault(current, []) - continue - m = re.match(r"(\d+)[.)]\s+(.*\S)", s) - if m and current: - cats[current].append(m.group(2)) - return cats - - -def _lade_bausteine(text: str) -> dict[int, str]: - """Lädt die finale Baustein-Datei — sortierte Liste (neu) oder Kategorien (Altformat).""" - if re.search(r"^#+\s*KERN\b", text, re.IGNORECASE | re.MULTILINE): - cats = _parse_kategorien(text) - texts = [t for cat in _CATEGORIES for t in cats.get(cat, [])] - return {i: t for i, t in enumerate(texts, 1)} - return _parse_auswahl(text) - - -def _file_payload(path: Path): - """Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält.""" - if not path.exists(): - return None - text = path.read_text(encoding="utf-8") - return text if _parse_auswahl(text) else None - - -def _auswahl_payload(path: Path): - if not path.exists(): - return None - text = path.read_text(encoding="utf-8") - entries = _parse_auswahl(text) - return (text, entries) if entries else None - - -def _auswahl_check_schema(data): - """{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß.""" - if not isinstance(data, dict): - return None - nach = data.get("nachtraege", []) - streich = data.get("streichen", []) - if not isinstance(nach, list) or not isinstance(streich, list): - return None - if not all(isinstance(x, str) for x in [*nach, *streich]): - return None - return {"nachtraege": nach, "streichen": streich} - - -def _titel_aufloesen(idx: dict[str, int], t: str) -> int | None: - """Titel → Nummer; toleriert mitgeschleppte Beschreibungen ("Titel — …").""" - if not isinstance(t, str): - return None - return idx.get(_norm_titel(t)) or idx.get(_norm_titel(_titel(t))) - - -async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: - if topic in _bausteine_progress: - return - _bausteine_progress[topic] = "Wartend…" - _bausteine_errors.pop(topic, None) - - files = _bausteine_files(topic) - final_path = files["final"] - project = project_dir(topic) if project_dir(topic).is_dir() else None - - def set_p(msg: str, step: int | None = None) -> None: - _bausteine_progress[topic] = msg - if step is not None: - _bausteine_step[topic] = step - - def is_cancelled() -> bool: - return topic in _bausteine_cancelled - - def abgebrochen() -> None: - _bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten" - - ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled) - - try: - async with _semaphore: - files["arbeit"].mkdir(parents=True, exist_ok=True) - if project: - await asyncio.to_thread(_pdfs_konvertieren, project) - # „Neu erstellen": fertige Bausteine → kompletter Frischstart. - # Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume. - if final_path.exists(): - for p_alt in _alle_slot_dateien(files): - p_alt.unlink(missing_ok=True) - - # Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen - recherchen: list[str] = [] - offen = [] - for i, path in enumerate(files["recherche"], 1): - text = _file_payload(path) - if text is not None and len(recherchen) < 3: - recherchen.append(text) - else: - offen.append((i, path)) - vorhanden = len(recherchen) - set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0) - if vorhanden < 3: - caps = "files" if project else "full" - slots = [ - { - "key": f"bausteine-{topic}-recherche-{i}", - "prompt": _build_recherche_prompt(topic, path, instructions, project), - "role": "quick", "capabilities": caps, - "payload": (lambda result, p=path: _file_payload(p)), - } - for i, path in offen - ] - neue = await _race( - topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider, - on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"), - cancelled=is_cancelled, - ) - if is_cancelled(): - abgebrochen() - return - if neue is None: - _bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)" - return - recherchen += neue - - # Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen - n_est = max(len(_parse_auswahl(t)) for t in recherchen) - bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None) - if bestehende is not None: - flat, entries = bestehende - else: - set_p("Konsolidiere Recherche…", step=1) - results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1)) - slots = [ - { - "key": f"bausteine-{topic}-auswahl-{i}", - "prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path), - "role": "fast", "capabilities": "files", - "payload": (lambda result, p=path: _auswahl_payload(p)), - } - for i, path in enumerate(files["auswahl"], 1) - ] - auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled) - if is_cancelled(): - abgebrochen() - return - if auswahl is None: - _bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)" - return - flat, entries = auswahl[0] - - # Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal) - set_p("Prüfe Auswahl…", step=2) - check_path = files["auswahl_check"] - patch = _auswahl_check_schema(_json_datei(check_path)) - if patch is None: - check_path.unlink(missing_ok=True) - titel_listen = "\n\n".join( - f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values()) - for i, text in enumerate(recherchen, 1) - ) - status, check = await run_single_slot( - ctx, "Auswahl-Check", - key=f"bausteine-{topic}-auswahlcheck-1", - prompt=_prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path), - role="fast", capabilities="files", - payload=lambda result: _auswahl_check_schema(_json_datei(check_path)), - timeout=_timeout("auswahl_check", len(entries)), - ) - if status == CANCELLED: - abgebrochen() - return - if status == FAILED: - _log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort") - else: - patch = check - if patch is not None and (patch["streichen"] or patch["nachtraege"]): - idx = _titel_index(entries) - weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None} - if weg: - _log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}") - entries = {n: t for n, t in entries.items() if n not in weg} - if patch["nachtraege"]: - _log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine") - texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"]) - entries = {i: t for i, t in enumerate(texts, 1)} - - # Schritt 4 (nur Projekte): Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt, - # ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung]. - if project: - set_p("Ergänze Themenfeld…", step=3) - erg_path = files["ergaenzung"] - ergaenzungen = _ergaenzung_schema(_json_datei(erg_path)) - if ergaenzungen is None: - erg_path.unlink(missing_ok=True) - status, ergaenzungen = await run_single_slot( - ctx, "Ergänzung", - key=f"bausteine-{topic}-ergaenzung-1", - prompt=_prompt( - "Bausteine-Ergaenzung", - topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()), - out_path=erg_path, extra=_extra(instructions), - ), - role="quick", capabilities="full", - payload=lambda result: _ergaenzung_schema(_json_datei(erg_path)), - timeout=_timeout("ergaenzung"), - ) - if status == CANCELLED: - abgebrochen() - return - if status == FAILED: - _bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)" - return - idx = _titel_index(entries) - neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None] - if neu: - _log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt") - start = max(entries, default=0) + 1 - for off, (t, b) in enumerate(neu): - entries[start + off] = f"{t} — {b} [Ergänzung]" - - # Titel eindeutig machen und unsortiertes Inventar schreiben - entries = _eindeutige_titel(entries) - atomic_write_text(final_path, "\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n") - except Exception as e: - log.exception("[%s] Bausteine-Generierung fehlgeschlagen", topic) - _bausteine_errors[topic] = str(e)[:2000] - finally: - # Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit. - _bausteine_progress.pop(topic, None) - _bausteine_step.pop(topic, None) - _bausteine_cancelled.discard(topic) - - -# --- Guide-Generierung: 6 Schritte mit Prüfung nach jeder Phase (OnePager hat einen eigenen Weg) --- -# Prüf-Agenten notieren nur Probleme; das Anpassen übernimmt der jeweilige Erzeuger-Typ. -# Schritt-Dateien bleiben liegen → Abbruch erhält Fortschritt, ▶ setzt am offenen Schritt fort. - -GUIDE_STEPS = ("Auswahl", "Auswahl-Prüfung", "Gliederung", "Gliederungs-Prüfung", "Schreiben", "Lese-Prüfung") - -# Writer skalieren mit der Section-Zahl: 1 Writer je ~30 Sections (gedeckelt). -# Kleine Pakete vermeiden Lazy-Output bei langen Listen und begrenzen den Schaden -# eines fehlgeschlagenen Writers. -WRITER_SECTIONS = 30 -WRITER_MAX = 20 - - -def _guide_files(content_path: Path) -> dict: - d, stem = content_path.parent, content_path.stem - return { - "auswahl": d / f"{stem}.auswahl.json", - "auswahl_check": d / f"{stem}.auswahl-check.json", - "gliederung": d / f"{stem}.gliederung.json", - "gliederung_check": d / f"{stem}.gliederung-check.json", - # chunk-/lese-check-/fix-Dateien sind dynamisch: {stem}.chunk-i.md usw. - } - - -def guide_slot_dateien(content_path: Path) -> list[Path]: - """Alle Schritt-Dateien eines Guides (für den Frischstart).""" - return [p for p in content_path.parent.glob(f"{content_path.stem}.*") if p != content_path] - - -async def _set_step(guide_id: str, step: int, progress: str) -> None: - now = datetime.now(timezone.utc).isoformat() - await update_guide(guide_id, step=step, progress=progress, updated_at=now) - - -def _resolve_auswahl(data, entries: dict[int, str], k_min: int, k_max: int) -> list[int] | None: - """{"bausteine": [Titel]} → Nummern; None bei Schema-Verstoß/Drift/falschem Umfang.""" - if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list): - return None - idx = _titel_index(entries) - nums: list[int] = [] - seen: set[int] = set() - total = unknown = 0 - for t in data["bausteine"]: - total += 1 - num = _titel_aufloesen(idx, t) if isinstance(t, str) else None - if num is None: - unknown += 1 - elif num not in seen: - seen.add(num) - nums.append(num) - if total == 0 or (total - unknown) / total < 0.85: - return None - if len(nums) < 0.9 * k_min or len(nums) > 1.1 * k_max: - return None - return nums - - -def _probleme_schema(data): - """{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None.""" - if not isinstance(data, dict): - return None - if data.get("ok") is True: - return [] - p = data.get("probleme") - if not isinstance(p, list) or not p: - return None - out = [str(x).strip() for x in p if str(x).strip()] - return out or None - - -def _lese_probleme_schema(data): - """{"ok": true} → [] · {"probleme": [{"section", "problem"}]} → Liste · sonst None.""" - if not isinstance(data, dict): - return None - if data.get("ok") is True: - return [] - p = data.get("probleme") - if not isinstance(p, list) or not p: - return None - out = [] - for x in p: - if not isinstance(x, dict) or not isinstance(x.get("section"), str) or not isinstance(x.get("problem"), str): - return None - out.append({"section": x["section"].strip(), "problem": x["problem"].strip()}) - return out or None - - -def _resolve_gliederung(data, entries: dict[int, str], soll_min: int, soll_max: int) -> list[dict] | None: - """{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}]. - - `soll_min`/`soll_max` = erlaubte Spanne gewählter Bausteine (mit kleiner Toleranz). - """ - if not isinstance(data, dict) or not isinstance(data.get("kapitel"), list): - return None - idx = _titel_index(entries) - chapters: list[dict] = [] - seen: set[int] = set() - total = unknown = 0 - for ch in data["kapitel"]: - if not isinstance(ch, dict) or not isinstance(ch.get("bausteine"), list): - return None - nums = [] - for t in ch["bausteine"]: - total += 1 - num = _titel_aufloesen(idx, t) if isinstance(t, str) else None - if num is None: - unknown += 1 - elif num not in seen: - nums.append(num) - seen.add(num) - if nums: - chapters.append({"title": str(ch.get("titel", "")).strip() or "Kapitel", "nums": nums}) - if not chapters or total == 0: - return None - if (total - unknown) / total < 0.85: - return None - if len(seen) < 0.9 * soll_min or len(seen) > 1.1 * soll_max: - return None - return chapters - - -def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]: - """Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl.""" - n = max(1, min(n, len(chapters))) - chunks: list[list[dict]] = [] - current: list[dict] = [] - count = 0 - remaining_total = sum(len(c["nums"]) for c in chapters) - remaining_chunks = n - for ch in chapters: - current.append(ch) - count += len(ch["nums"]) - if remaining_chunks > 1 and count >= remaining_total / remaining_chunks: - chunks.append(current) - remaining_total -= count - remaining_chunks -= 1 - current = [] - count = 0 - if current: - chunks.append(current) - return chunks - - -def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str: - lines = [] - for ch in chunk: - lines.append(f"KAPITEL: {ch['title']}") - lines.extend(f"- {entries[num]}" for num in ch["nums"]) - return "\n".join(lines) - - -_FRAGMENT_KAPITEL_RE = re.compile(r"", re.IGNORECASE) -_FRAGMENT_SECTION_RE = re.compile(r"", re.IGNORECASE) - - -def _parse_fragment(text: str) -> list[dict]: - """Parst eine Writer-Datei → [{"kapitel", "titel", "md"}] in Datei-Reihenfolge.""" - sections: list[dict] = [] - kapitel = None - current = None - for line in text.splitlines(): - s = line.strip() - m = _FRAGMENT_KAPITEL_RE.match(s) - if m: - kapitel = m.group(1) - current = None - continue - m = _FRAGMENT_SECTION_RE.match(s) - if m: - current = {"kapitel": kapitel, "titel": m.group(1), "md": []} - sections.append(current) - continue - if current is not None: - current["md"].append(line) - for sec in sections: - sec["md"] = "\n".join(sec["md"]).strip() - return sections - - -async def _generate_onepager( - guide_id: str, topic: str, instructions: str, provider: str, - project: Path | None, content_path: Path, -) -> list[dict] | None: - ctx = GenContext(topic=topic, provider=provider, is_cancelled=lambda: guide_id in _cancelled, guide_id=guide_id) - - # 3×3-Raster: 7 Karten mit festen Schlüsseln (Reihenfolge = Lesereihenfolge mobil) - KARTEN_KEYS = ("info", "eigenschaften", "beispiel", "zusammenhaenge", "voraussetzungen", "modern", "veraltet") - - def karten_schema(data): - """{"karten": {key: {titel, md}}} → Liste · sonst None.""" - if not isinstance(data, dict): - return None - karten = data.get("karten") - if not isinstance(karten, dict): - return None - out = [] - for key in KARTEN_KEYS: - k = karten.get(key) - if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("md"), str): - return None - titel, md = k["titel"].strip(), k["md"].strip() - if not titel or len(md) < 5: # abgebrochene/leere Karten sind ungültig - return None - out.append({"key": key, "titel": titel, "md": md}) - return out - - d, stem = content_path.parent, content_path.stem - recherche_path = d / f"{stem}.recherche.md" - recherche_check_path = d / f"{stem}.recherche-check.json" - karten_path = d / f"{stem}.karten.json" - check_path = d / f"{stem}.onepager-check.json" - - # Projekte bekommen eigene Recherche-Dimensionen — Produkt-Fragen - # (Version, Lizenz, Alternativen) laufen dort ins Leere. - if project: - source = _prompt("OnePager-Quelle-Projekt", project=project) - recherche_template = "OnePager-Recherche-Projekt" - recherche_check_template = "OnePager-Recherche-Check-Projekt" - else: - source = _prompt("OnePager-Quelle-Thema", topic=topic) - recherche_template = "OnePager-Recherche" - recherche_check_template = "OnePager-Recherche-Check" - - def recherche_payload(result=None): - if not recherche_path.exists(): - return None - text = recherche_path.read_text(encoding="utf-8").strip() - return text or None - - # Schritt 1: Recherche — vorhandene Datei wird übernommen (Resume) - recherche = recherche_payload() - if recherche is None: - await _set_step(guide_id, 0, "Recherchiere…") - status, recherche = await run_single_slot( - ctx, "OnePager-Recherche", - key=f"{guide_id}-recherche", - prompt=_prompt(recherche_template, topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)), - role="quick", capabilities="files" if project else "full", - payload=recherche_payload, timeout=_timeout("onepager_recherche"), - ) - if status == CANCELLED: - return None - if status == FAILED: - await _fail(guide_id, "OnePager-Recherche fehlgeschlagen") - return None - - # Schritt 2: Recherche-Prüfung — notiert Probleme; Anpassung macht ein Recherche-Agent - status, fixed = await _check_then_fix( - ctx, name="Recherche", step=1, - check_key=f"{guide_id}-recherche-check", - check_prompt=_prompt(recherche_check_template, topic=topic, recherche=recherche, out_path=recherche_check_path), - check_path=recherche_check_path, check_timeout=_timeout("onepager_verify"), - fix_key=f"{guide_id}-recherche-fix", - build_fix_prompt=lambda probleme: _prompt( - "OnePager-Recherche-Fix", - topic=topic, source=source, recherche=recherche, - probleme="\n".join(f"- {p}" for p in probleme), - out_path=recherche_path, extra=_extra(instructions), - ), - fix_payload=recherche_payload, fix_timeout=_timeout("onepager_recherche"), - fix_role="quick", fix_caps="files" if project else "full", - ) - if status == CANCELLED: - return None - if status == FAILED: - await _fail(guide_id, "Recherche-Prüfung fehlgeschlagen") - return None - if fixed is not None: - recherche = fixed - - # Schritt 3: Bauen — Karten nur aus der Faktenbasis (Resume: gültige Datei wird übernommen) - karten = karten_schema(_json_datei(karten_path)) - if karten is None: - await _set_step(guide_id, 2, "Baue OnePager…") - karten_path.unlink(missing_ok=True) - status, karten = await run_single_slot( - ctx, "OnePager-Bauen", - key=f"{guide_id}-bauen", - prompt=_prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)), - role="fast", capabilities="files", - payload=lambda result: karten_schema(_json_datei(karten_path)), - timeout=_timeout("onepager_bauen"), - ) - if status == CANCELLED: - return None - if status == FAILED: - await _fail(guide_id, "OnePager-Bau fehlgeschlagen") - return None - - def karten_block() -> str: - return "\n\n".join(f"### {k['titel']} [{k['key']}]\n{k['md']}" for k in karten) - - # Schritt 4: Prüfung — notiert Probleme; Anpassung macht ein Bauen-Agent - status, fixed = await _check_then_fix( - ctx, name="OnePager", step=3, - check_key=f"{guide_id}-verify", - check_prompt=_prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block(), out_path=check_path), - check_path=check_path, check_timeout=_timeout("onepager_verify"), - fix_key=f"{guide_id}-karten-fix", - build_fix_prompt=lambda probleme: _prompt( - "OnePager-Fix", - topic=topic, recherche=recherche, karten=karten_block(), - probleme="\n".join(f"- {p}" for p in probleme), - out_path=karten_path, extra=_extra(instructions), - ), - fix_payload=lambda result: karten_schema(_json_datei(karten_path)), - fix_timeout=_timeout("onepager_bauen"), - on_fix_invalid=lambda: atomic_write_json( - karten_path, {"karten": {k["key"]: {"titel": k["titel"], "md": k["md"]} for k in karten}}, - ), - ) - if status == CANCELLED: - return None - if status == FAILED: - await _fail(guide_id, "OnePager-Prüfung fehlgeschlagen") - return None - if fixed is not None: - karten = fixed - - sections = [ - {"num": i, "title": k["titel"], "md": k["md"], "key": k["key"]} - for i, k in enumerate(karten, 1) - ] - return [{"title": topic, "sections": sections}] - - -async def _generate_sections( - guide_id: str, topic: str, format_name: str, entries: dict[int, str], - facts: str, instructions: str, provider: str, - content_path: Path, -) -> list[dict] | None: - def is_cancelled() -> bool: - return guide_id in _cancelled - - ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled, guide_id=guide_id) - spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8") - files = _guide_files(content_path) - bausteine_liste = "\n".join(f"- {t}" for t in entries.values()) - n = len(entries) - anteil_min, anteil_max, minimum, zweck = FORMAT_ANTEIL[format_name] - k_min = min(n, max(minimum, math.ceil(anteil_min * n))) - k_max = min(n, max(k_min, math.floor(anteil_max * n))) - auswahl_auftrag = ( - f"Wähle MINDESTENS {k_min} und HÖCHSTENS {k_max} der Bausteine und baue daraus {zweck}. " - "Wähle, was diesem Zweck dient — lass weg, was dafür nicht nötig ist." - ) - - # Schritt 1: Auswahl — vorhandene gültige Datei wird übernommen (Resume) - auswahl = _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max) - if auswahl is None: - await _set_step(guide_id, 0, "Wähle Bausteine…") - files["auswahl"].unlink(missing_ok=True) - status, auswahl = await run_single_slot( - ctx, "Guide-Auswahl", - key=f"{guide_id}-auswahl", - prompt=_prompt( - "Guide-Auswahl", - topic=topic, format_name=format_name, bausteine=bausteine_liste, - auswahl_auftrag=auswahl_auftrag, out_path=files["auswahl"], extra=_extra(instructions), - ), - role="guide", capabilities="files", - payload=lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max), - timeout=_timeout("guide_auswahl", n), - ) - if status == CANCELLED: - return None - if status == FAILED: - await _fail(guide_id, "Auswahl fehlgeschlagen") - return None - - def auswahl_titel() -> str: - return "\n".join(f"- {_titel(entries[num])}" for num in auswahl) - - def auswahl_json() -> str: - return json.dumps({"bausteine": [_titel(entries[num]) for num in auswahl]}, ensure_ascii=False) - - # Schritt 2: Auswahl-Prüfung — notiert Probleme; Anpassung macht ein Auswahl-Agent - status, fixed = await _check_then_fix( - ctx, name="Auswahl", step=1, - check_key=f"{guide_id}-auswahl-check", - check_prompt=_prompt( - "Guide-Auswahl-Check", - topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag, - bausteine=bausteine_liste, auswahl=auswahl_titel(), - out_path=files["auswahl_check"], extra=_extra(instructions), - ), - check_path=files["auswahl_check"], check_timeout=_timeout("guide_check", len(auswahl)), - fix_key=f"{guide_id}-auswahl-fix", - build_fix_prompt=lambda probleme: _prompt( - "Guide-Auswahl-Fix", - topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag, - bausteine=bausteine_liste, auswahl=auswahl_titel(), - probleme="\n".join(f"- {p}" for p in probleme), - out_path=files["auswahl"], extra=_extra(instructions), - ), - fix_payload=lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max), - fix_timeout=_timeout("guide_auswahl", n), fix_role="guide", - on_fix_invalid=lambda: atomic_write_text(files["auswahl"], auswahl_json()), - ) - if status == CANCELLED: - return None - if status == FAILED: - await _fail(guide_id, "Auswahl-Prüfung fehlgeschlagen") - return None - if fixed is not None: - auswahl = fixed - - sel_entries = {num: entries[num] for num in auswahl} - soll = len(sel_entries) - sel_liste = "\n".join(f"- {t}" for t in sel_entries.values()) - - # Schritt 3: Gliederung der festen Auswahl - plan = _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll) - if plan is None: - await _set_step(guide_id, 2, "Plane Gliederung…") - files["gliederung"].unlink(missing_ok=True) - status, plan = await run_single_slot( - ctx, "Gliederung", - key=f"{guide_id}-gliederung", - prompt=_prompt( - "Guide-Gliederung", - topic=topic, format_name=format_name, bausteine=sel_liste, - out_path=files["gliederung"], extra=_extra(instructions), - ), - role="guide", capabilities="files", - payload=lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll), - timeout=_timeout("plan", soll), - ) - if status == CANCELLED: - return None - if status == FAILED: - await _fail(guide_id, "Gliederung fehlgeschlagen") - return None - - def gliederung_text() -> str: - return "\n".join(_zuteilung_text([ch], {num: _titel(entries[num]) for num in ch["nums"]}) for ch in plan) - - def gliederung_json() -> str: - return json.dumps( - {"kapitel": [{"titel": ch["title"], "bausteine": [_titel(entries[num]) for num in ch["nums"]]} for ch in plan]}, - ensure_ascii=False, - ) - - # Schritt 4: Gliederungs-Prüfung - status, fixed = await _check_then_fix( - ctx, name="Gliederung", step=3, - check_key=f"{guide_id}-gliederung-check", - check_prompt=_prompt( - "Guide-Gliederung-Check", - topic=topic, format_name=format_name, zweck=zweck, - auswahl=auswahl_titel(), gliederung=gliederung_text(), - out_path=files["gliederung_check"], extra=_extra(instructions), - ), - check_path=files["gliederung_check"], check_timeout=_timeout("guide_check", soll), - fix_key=f"{guide_id}-gliederung-fix", - build_fix_prompt=lambda probleme: _prompt( - "Guide-Gliederung-Fix", - topic=topic, format_name=format_name, - auswahl=auswahl_titel(), gliederung=gliederung_text(), - probleme="\n".join(f"- {p}" for p in probleme), - out_path=files["gliederung"], extra=_extra(instructions), - ), - fix_payload=lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll), - fix_timeout=_timeout("plan", soll), fix_role="guide", - on_fix_invalid=lambda: atomic_write_text(files["gliederung"], gliederung_json()), - ) - if status == CANCELLED: - return None - if status == FAILED: - await _fail(guide_id, "Gliederungs-Prüfung fehlgeschlagen") - return None - if fixed is not None: - plan = fixed - - # Schritt 5: Schreiben — vorhandene Chunk-Dateien werden übernommen (Resume) - total_sections = sum(len(c["nums"]) for c in plan) - chunks = _split_chunks(plan, min(WRITER_MAX, max(1, math.ceil(total_sections / WRITER_SECTIONS)))) - zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks] - chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks] - writer_count = len(zuteilungen) - paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)] - offen = [i for i, p in enumerate(paths) if not p.exists()] - if offen: - await _set_step(guide_id, 4, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…") - results = await asyncio.gather(*[ - run_agent( - f"{guide_id}-w{i + 1}", - _prompt( - "Guide-Writer", - topic=topic, format_name=format_name, zuteilung=zuteilungen[i], - facts=facts, spec=spec, out_path=paths[i], extra=_extra(instructions), - ), - _timeout("writer", chunk_sizes[i]), provider=provider, role="guide", capabilities="full", - ) - for i in offen - ], return_exceptions=True) - if is_cancelled(): - return None - for i, r in zip(offen, results): - if isinstance(r, BaseException): - _log(topic, f"Writer {i + 1}: {type(r).__name__}: {r}") - elif r[0] != 0: - _log(topic, f"Writer {i + 1}: {_claude_error('Fehler', *r)}") - elif not paths[i].exists(): - _log(topic, f"Writer {i + 1}: keine Ausgabedatei erstellt") - if not any(p.exists() for p in paths): - await _fail(guide_id, _gather_error("Writer-Fehler", list(results))) - return None - - idx = _titel_index(entries) - by_num: dict[int, dict] = {} - for p in paths: - if not p.exists(): - continue - for sec in _parse_fragment(p.read_text(encoding="utf-8")): - num = _titel_aufloesen(idx, sec["titel"]) - if num is None: - _log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)") - elif num not in by_num: - by_num[num] = sec - if not by_num: - await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden") - return None - - # Schritt 6: Lese-Prüfung pro Writer-Paket — Fix beauftragt Writer nur mit beanstandeten Sections - chunk_nums = [[num for ch in chunk for num in ch["nums"] if num in by_num] for chunk in chunks] - check_paths = [content_path.parent / f"{content_path.stem}.lese-check-{i}.json" for i in range(1, writer_count + 1)] - offen_checks = [i for i, p in enumerate(check_paths) if _lese_probleme_schema(_json_datei(p)) is None and chunk_nums[i]] - if offen_checks: - await _set_step(guide_id, 5, f"Prüfe Lesbarkeit ({len(offen_checks)} Prüfer)…" if len(offen_checks) > 1 else "Prüfe Lesbarkeit…") - - def sections_text(nums: list[int]) -> str: - return "\n\n".join(f"SECTION: {_titel(entries[num])}\n{by_num[num]['md']}" for num in nums) - - slots = [{ - "key": f"{guide_id}-lese-check-{i + 1}", - "prompt": _prompt( - "Guide-Lese-Check", - topic=topic, format_name=format_name, spec=spec, - sections=sections_text(chunk_nums[i]), - out_path=check_paths[i], extra=_extra(instructions), - ), - "role": "fast", "capabilities": "files", - "payload": (lambda result, p=check_paths[i]: _lese_probleme_schema(_json_datei(p))), - } for i in offen_checks] - res = await _race(topic, "Lese-Prüfung", slots, len(slots), _timeout("lese_check", max(chunk_sizes)), provider, cancelled=is_cancelled) - if is_cancelled(): - return None - if res is None: - await _fail(guide_id, "Lese-Prüfung fehlgeschlagen") - return None - - probleme_by_num: dict[int, str] = {} - for p in check_paths: - for item in (_lese_probleme_schema(_json_datei(p)) or []): - num = _titel_aufloesen(idx, item["section"]) - if num in by_num and num not in probleme_by_num: - probleme_by_num[num] = item["problem"] - - if probleme_by_num: - _log(topic, f"Lese-Prüfung: {len(probleme_by_num)} Section(s) beanstandet") - await _set_step(guide_id, 5, f"Überarbeite {len(probleme_by_num)} Section(s)…") - fix_chunks = [[num for num in nums if num in probleme_by_num] for nums in chunk_nums] - fix_offen = [i for i, nums in enumerate(fix_chunks) if nums] - fix_paths = [content_path.parent / f"{content_path.stem}.fix-{i + 1}.md" for i in range(writer_count)] - - def auftraege_text(nums: list[int]) -> str: - return "\n\n".join( - f"SECTION: {_titel(entries[num])}\nPROBLEM: {probleme_by_num[num]}\nAKTUELLER INHALT:\n{by_num[num]['md']}" - for num in nums - ) - - results = await asyncio.gather(*[ - run_agent( - f"{guide_id}-fix-w{i + 1}", - _prompt( - "Guide-Sections-Fix", - topic=topic, format_name=format_name, facts=facts, spec=spec, - auftraege=auftraege_text(fix_chunks[i]), - out_path=fix_paths[i], extra=_extra(instructions), - ), - _timeout("writer", len(fix_chunks[i])), provider=provider, role="guide", capabilities="full", - ) - for i in fix_offen - ], return_exceptions=True) - if is_cancelled(): - return None - for i, r in zip(fix_offen, results): - if isinstance(r, BaseException) or (not isinstance(r, BaseException) and r[0] != 0): - _log(topic, f"Sections-Fix {i + 1} fehlgeschlagen — Original bleibt") - ersetzt = 0 - for i in fix_offen: - if not fix_paths[i].exists(): - continue - for sec in _parse_fragment(fix_paths[i].read_text(encoding="utf-8")): - num = _titel_aufloesen(idx, sec["titel"]) - if num in probleme_by_num and sec["md"].strip(): - by_num[num] = sec - ersetzt += 1 - _log(topic, f"Lese-Prüfung: {ersetzt} Section(s) überarbeitet") - - await _set_progress(guide_id, "Setze zusammen…") - chapters: list[dict] = [] - for ch in plan: - sections = [ - {"num": num, "title": _titel(entries[num]), "md": by_num[num]["md"]} - for num in ch["nums"] if num in by_num - ] - if sections: - chapters.append({"title": ch["title"], "sections": sections}) - geplant = {num for ch in plan for num in ch["nums"]} - missing = sorted(geplant - set(by_num)) - if missing: - _log(topic, f"Sections fehlen in der Writer-Ausgabe: {[_titel(entries[n]) for n in missing]}") - if not chapters: - await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden") - return None - return chapters - - -async def reconcile_guides() -> None: - """DB↔Dateisystem abgleichen: status=done ohne Content-Datei → error. - - Läuft beim Server-Start (nach init_db) — fängt Crashes zwischen - Datei-Write und Status-Update ab. - """ - for g in await list_guides(): - if g["status"] == "done" and not guide_content_path(g["topic"], g["format"]).exists(): - log.warning("[%s] Guide %s: done ohne Content-Datei — auf error gesetzt", g["topic"], g["id"]) - now = datetime.now(timezone.utc).isoformat() - await update_guide(g["id"], status="error", error_msg="Inhalt fehlt — neu generieren", updated_at=now) - - -async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: - async with _semaphore: - now = datetime.now(timezone.utc).isoformat() - await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now) - - content_path = guide_content_path(topic, format_name) - content_path.parent.mkdir(parents=True, exist_ok=True) - project = project_dir(topic) if project_dir(topic).is_dir() else None - - try: - if guide_id in _cancelled: - return - - if project: - await asyncio.to_thread(_pdfs_konvertieren, project) - - # „Neu erstellen": fertiger Guide → kompletter Frischstart. - # Sonst sind Schritt-Dateien Reste eines Abbruchs/Fehlers → Resume. - if content_path.exists(): - for p_alt in guide_slot_dateien(content_path): - p_alt.unlink(missing_ok=True) - - if format_name == "OnePager": - chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path) - else: - alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8")) - if not alle: - await _fail(guide_id, "Keine Bausteine gefunden") - return - entries = _eindeutige_titel(alle) - facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema") - chapters = await _generate_sections( - guide_id, topic, format_name, entries, - facts, instructions, provider, content_path, - ) - if chapters is None or guide_id in _cancelled: - return - - atomic_write_json(content_path, {"topic": topic, "format": format_name, "chapters": chapters}, indent=1) - - now = datetime.now(timezone.utc).isoformat() - await update_guide(guide_id, status="done", progress=None, step=None, updated_at=now) - - except asyncio.TimeoutError: - await _fail(guide_id, "Timeout bei der Generierung") - except FileNotFoundError: - await _fail(guide_id, "Bausteine fehlen") - except Exception as e: - log.exception("[%s] Guide-Generierung fehlgeschlagen (%s)", topic, guide_id) - await _fail(guide_id, str(e)[:2000]) - finally: - _cancelled.discard(guide_id) - - -# --- Tutor-Chat --- - -def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str: - transcript = "\n".join( - f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}" - for m in messages - ) - return _prompt( - "Chat", - topic=topic, format_name=format_name, - outline_block=outline.strip() or "(keine)", - section_block=section.strip() or "(kein Abschnitt erkannt)", - transcript=transcript, - ) - - -async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str: - try: - prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages) - returncode, stdout, stderr = await run_agent( - "chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" - ) - if returncode != 0: - return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." - reply = stdout.strip() - return reply or "Entschuldigung, ich habe keine Antwort erhalten." - except Exception: - log.warning("[%s] Guide-Chat fehlgeschlagen", topic, exc_info=True) - return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." - - -# --- Elemente (persönliche Zusammenfassung) --- - -def _element_fields(data: dict) -> dict | None: - """Validiert KI-Element-JSON und normalisiert auf die DB-Felder.""" - if not isinstance(data, dict): - return None - title = str(data.get("title", "")).strip() - if not title: - return None - listen = {} - for key in ("examples", "hints"): - raw = data.get(key, []) - listen[key] = [str(e).strip() for e in raw if str(e).strip()] if isinstance(raw, list) else [] - return { - "title": title[:200], - "description": str(data.get("description", "")).strip(), - "examples": listen["examples"], - "hints": listen["hints"], - } - - -def _topic_context(topic: str, limit: int = 12000) -> str: - """Bausteine + Guide-Inhalte des Themas als Kontext-Text (gekürzt).""" - parts: list[str] = [] - bp = bausteine_path(topic) - if bp.exists(): - parts.append(bp.read_text(encoding="utf-8")) - for fmt in ("FullGuide", "Guide", "MiniGuide", "OnePager"): - content = _json_datei(guide_content_path(topic, fmt)) - if content: - for ch in content.get("chapters", []): - for sec in ch.get("sections", []): - parts.append(sec if isinstance(sec, str) else json.dumps(sec, ensure_ascii=False)) - break # bester verfügbarer Guide reicht - text = "\n\n".join(parts).strip() - return text[:limit] if text else "(kein Material vorhanden)" - - -async def generate_element(topic: str, hint: str, provider: str = DEFAULT_PROVIDER) -> dict: - """Erstellt Element-Felder per KI. Fallback: nur Titel aus dem Stichwort.""" - fallback = {"title": hint.strip() or "Neues Element", "description": "", "examples": [], "hints": []} - try: - prompt = _prompt( - "Element-Create", - topic=topic, hint=hint.strip() or "(keins — wähle selbst ein Kernkonzept)", - context=_topic_context(topic), - ) - returncode, stdout, _ = await run_agent( - "element-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" - ) - if returncode != 0: - return fallback - return _element_fields(_parse_json_text(stdout)) or fallback - except Exception: - log.warning("[%s] Element-Erstellung fehlgeschlagen", topic, exc_info=True) - return fallback - - -def _parse_suggestions(stdout: str) -> list[dict] | None: - """Validiert Vorschlags-JSON aus KI-Output. None bei ungültigem JSON.""" - data = _parse_json_text(stdout) - if not isinstance(data, dict): - return None - suggestions = [] - for s in data.get("suggestions", []): - if not isinstance(s, dict): - continue - text = str(s.get("text", "")).strip() - target = s.get("target") - content = str(s.get("content", "")).strip() - if text and content and target in ("description", "examples", "hints"): - suggestions.append({"text": text, "target": target, "content": content}) - return suggestions - - -async def check_element(element: dict, provider: str = DEFAULT_PROVIDER) -> list[dict] | None: - """Zweischrittige Prüfung auf fehlende Infos: Recherche → Verifizieren. None bei Fehler.""" - try: - element_json = json.dumps( - {k: element[k] for k in ("title", "description", "examples", "hints")}, - ensure_ascii=False, indent=1, - ) - context = _topic_context(element["topic"]) - - # Schritt 1: Recherche — breit Kandidaten sammeln - prompt = _prompt("Element-Check", topic=element["topic"], element_json=element_json, context=context) - returncode, stdout, _ = await run_agent( - "element-check-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" - ) - if returncode != 0: - return None - candidates = _parse_suggestions(stdout) - if candidates is None: - return None - if not candidates: - return [] - - # Schritt 2: Verifizieren — nur Wichtiges, nicht Redundantes durchlassen - prompt = _prompt( - "Element-Verify", - topic=element["topic"], element_json=element_json, - candidates_json=json.dumps({"suggestions": candidates}, ensure_ascii=False, indent=1), - context=context, - ) - returncode, stdout, _ = await run_agent( - "element-verify-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" - ) - if returncode != 0: - return None - return _parse_suggestions(stdout) - except Exception: - log.warning("[%s] Element-Prüfung fehlgeschlagen", element.get("topic", "?"), exc_info=True) - return None - - -def _element_json(element: dict) -> str: - return json.dumps( - {k: element[k] for k in ("title", "description", "examples", "hints")}, - ensure_ascii=False, indent=1, - ) - - -def _validate_change(c, element: dict) -> dict | None: - """Validiert einen Änderungs-Vorschlag aus KI-Output gegen das Element.""" - if not isinstance(c, dict): - return None - text = str(c.get("text", "")).strip() - action = c.get("action") - target = c.get("target") - index = c.get("index") - content = str(c.get("content", "")).strip() - if not text or action not in ("entfernen", "anpassen", "hinzufuegen"): - return None - if target not in ("title", "description", "examples", "hints"): - return None - if action in ("anpassen", "hinzufuegen") and not content: - return None - if action == "entfernen" and target not in ("examples", "hints"): - return None - # Index nur für anpassen/entfernen in Listen-Feldern; muss existieren - if target in ("examples", "hints") and action in ("anpassen", "entfernen"): - if not isinstance(index, int) or not (0 <= index < len(element[target])): - return None - else: - index = None - return {"text": text, "action": action, "target": target, "index": index, "content": content} - - -async def chat_with_element(element: dict, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> tuple[str, list[dict]]: - """Chat zum Element. Gibt (Antwort, Änderungs-Vorschläge) zurück — ändert nichts direkt.""" - fehler = "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." - try: - transcript = "\n".join( - f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}" - for m in messages - ) - prompt = _prompt("Element-Chat", topic=element["topic"], element_json=_element_json(element), transcript=transcript) - returncode, stdout, _ = await run_agent( - "element-chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" - ) - if returncode != 0: - return fehler, [] - data = _parse_json_text(stdout) - if not isinstance(data, dict): - return fehler, [] - changes = [v for c in data.get("changes", []) if (v := _validate_change(c, element))] - reply = str(data.get("reply", "")).strip() or ("Vorschläge erstellt." if changes else fehler) - return reply, changes - except Exception: - log.warning("[%s] Element-Chat fehlgeschlagen", element.get("topic", "?"), exc_info=True) - return fehler, [] - - -async def style_element(element: dict, provider: str = DEFAULT_PROVIDER) -> list[dict] | None: - """Prüft ein Element auf die Stil-Regeln und schlägt Änderungen vor. None bei Fehler.""" - try: - prompt = _prompt("Element-Stil", topic=element["topic"], element_json=_element_json(element)) - returncode, stdout, _ = await run_agent( - "element-stil-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" - ) - if returncode != 0: - return None - data = _parse_json_text(stdout) - if not isinstance(data, dict): - return None - return [v for c in data.get("changes", []) if (v := _validate_change(c, element))] - except Exception: - log.warning("[%s] Stil-Prüfung fehlgeschlagen", element.get("topic", "?"), exc_info=True) - return None - - -async def refine_suggestion(element: dict, suggestion: dict, instruction: str, provider: str = DEFAULT_PROVIDER) -> dict | None: - """Überarbeitet einen einzelnen Vorschlag nach Nutzer-Anweisung. None bei Fehler.""" - try: - prompt = _prompt( - "Element-Refine", - topic=element["topic"], element_json=_element_json(element), - suggestion_json=json.dumps(suggestion, ensure_ascii=False, indent=1), - instruction=instruction, - ) - returncode, stdout, _ = await run_agent( - "element-refine-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none", lane="interactive" - ) - if returncode != 0: - return None - data = _parse_json_text(stdout) - if not isinstance(data, dict): - return None - return _validate_change(data.get("change"), element) - except Exception: - log.warning("[%s] Vorschlags-Überarbeitung fehlgeschlagen", element.get("topic", "?"), exc_info=True) - return None diff --git a/backend/guide.py b/backend/guide.py new file mode 100644 index 0000000..2d42d80 --- /dev/null +++ b/backend/guide.py @@ -0,0 +1,492 @@ +"""Guide-Generierung: 6 Schritte mit Prüfung nach jeder Phase (OnePager hat einen eigenen Weg). + +Prüf-Agenten notieren nur Probleme; das Anpassen übernimmt der jeweilige Erzeuger-Typ. +Schritt-Dateien bleiben liegen → Abbruch erhält Fortschritt, ▶ setzt am offenen Schritt fort. +""" + +import asyncio +import json +import logging +import math +from datetime import datetime, timezone +from pathlib import Path + +from agents import run_agent +from bausteine import _pdfs_konvertieren +from config import DEFAULT_PROVIDER, FORMAT_ANTEIL, TEMPLATES_DIR +from database import list_guides, update_guide +from fsutil import atomic_write_json, atomic_write_text +from jsonio import read_json_file as _json_datei +from onepager import _generate_onepager +from paths import bausteine_path, guide_content_path, project_dir +from pipeline import ( + CANCELLED, FAILED, GenContext, _check_then_fix, _claude_error, _extra, + _fail, _gather_error, _log, _prompt, _race, _semaphore, _set_progress, + _set_step, _timeout, clear_guide_cancelled, is_guide_cancelled, + run_single_slot, +) +from textkit import ( + _eindeutige_titel, _lade_bausteine, _parse_fragment, _split_chunks, + _titel, _titel_aufloesen, _titel_index, _zuteilung_text, +) + +log = logging.getLogger("creator.guide") + +GUIDE_STEPS = ("Auswahl", "Auswahl-Prüfung", "Gliederung", "Gliederungs-Prüfung", "Schreiben", "Lese-Prüfung") + +# Writer skalieren mit der Section-Zahl: 1 Writer je ~30 Sections (gedeckelt). +# Kleine Pakete vermeiden Lazy-Output bei langen Listen und begrenzen den Schaden +# eines fehlgeschlagenen Writers. +WRITER_SECTIONS = 30 +WRITER_MAX = 20 + + +def _guide_files(content_path: Path) -> dict: + d, stem = content_path.parent, content_path.stem + return { + "auswahl": d / f"{stem}.auswahl.json", + "auswahl_check": d / f"{stem}.auswahl-check.json", + "gliederung": d / f"{stem}.gliederung.json", + "gliederung_check": d / f"{stem}.gliederung-check.json", + # chunk-/lese-check-/fix-Dateien sind dynamisch: {stem}.chunk-i.md usw. + } + + +def guide_slot_dateien(content_path: Path) -> list[Path]: + """Alle Schritt-Dateien eines Guides (für den Frischstart).""" + return [p for p in content_path.parent.glob(f"{content_path.stem}.*") if p != content_path] + + +def _resolve_auswahl(data, entries: dict[int, str], k_min: int, k_max: int) -> list[int] | None: + """{"bausteine": [Titel]} → Nummern; None bei Schema-Verstoß/Drift/falschem Umfang.""" + if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list): + return None + idx = _titel_index(entries) + nums: list[int] = [] + seen: set[int] = set() + total = unknown = 0 + for t in data["bausteine"]: + total += 1 + num = _titel_aufloesen(idx, t) if isinstance(t, str) else None + if num is None: + unknown += 1 + elif num not in seen: + seen.add(num) + nums.append(num) + if total == 0 or (total - unknown) / total < 0.85: + return None + if len(nums) < 0.9 * k_min or len(nums) > 1.1 * k_max: + return None + return nums + + +def _lese_probleme_schema(data): + """{"ok": true} → [] · {"probleme": [{"section", "problem"}]} → Liste · sonst None.""" + if not isinstance(data, dict): + return None + if data.get("ok") is True: + return [] + p = data.get("probleme") + if not isinstance(p, list) or not p: + return None + out = [] + for x in p: + if not isinstance(x, dict) or not isinstance(x.get("section"), str) or not isinstance(x.get("problem"), str): + return None + out.append({"section": x["section"].strip(), "problem": x["problem"].strip()}) + return out or None + + +def _resolve_gliederung(data, entries: dict[int, str], soll_min: int, soll_max: int) -> list[dict] | None: + """{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}]. + + `soll_min`/`soll_max` = erlaubte Spanne gewählter Bausteine (mit kleiner Toleranz). + """ + if not isinstance(data, dict) or not isinstance(data.get("kapitel"), list): + return None + idx = _titel_index(entries) + chapters: list[dict] = [] + seen: set[int] = set() + total = unknown = 0 + for ch in data["kapitel"]: + if not isinstance(ch, dict) or not isinstance(ch.get("bausteine"), list): + return None + nums = [] + for t in ch["bausteine"]: + total += 1 + num = _titel_aufloesen(idx, t) if isinstance(t, str) else None + if num is None: + unknown += 1 + elif num not in seen: + nums.append(num) + seen.add(num) + if nums: + chapters.append({"title": str(ch.get("titel", "")).strip() or "Kapitel", "nums": nums}) + if not chapters or total == 0: + return None + if (total - unknown) / total < 0.85: + return None + if len(seen) < 0.9 * soll_min or len(seen) > 1.1 * soll_max: + return None + return chapters + + +async def _generate_sections( + guide_id: str, topic: str, format_name: str, entries: dict[int, str], + facts: str, instructions: str, provider: str, + content_path: Path, +) -> list[dict] | None: + def is_cancelled() -> bool: + return is_guide_cancelled(guide_id) + + ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled, guide_id=guide_id) + spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8") + files = _guide_files(content_path) + bausteine_liste = "\n".join(f"- {t}" for t in entries.values()) + n = len(entries) + anteil_min, anteil_max, minimum, zweck = FORMAT_ANTEIL[format_name] + k_min = min(n, max(minimum, math.ceil(anteil_min * n))) + k_max = min(n, max(k_min, math.floor(anteil_max * n))) + auswahl_auftrag = ( + f"Wähle MINDESTENS {k_min} und HÖCHSTENS {k_max} der Bausteine und baue daraus {zweck}. " + "Wähle, was diesem Zweck dient — lass weg, was dafür nicht nötig ist." + ) + + # Schritt 1: Auswahl — vorhandene gültige Datei wird übernommen (Resume) + auswahl = _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max) + if auswahl is None: + await _set_step(guide_id, 0, "Wähle Bausteine…") + files["auswahl"].unlink(missing_ok=True) + status, auswahl = await run_single_slot( + ctx, "Guide-Auswahl", + key=f"{guide_id}-auswahl", + prompt=_prompt( + "Guide-Auswahl", + topic=topic, format_name=format_name, bausteine=bausteine_liste, + auswahl_auftrag=auswahl_auftrag, out_path=files["auswahl"], extra=_extra(instructions), + ), + role="guide", capabilities="files", + payload=lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max), + timeout=_timeout("guide_auswahl", n), + ) + if status == CANCELLED: + return None + if status == FAILED: + await _fail(guide_id, "Auswahl fehlgeschlagen") + return None + + def auswahl_titel() -> str: + return "\n".join(f"- {_titel(entries[num])}" for num in auswahl) + + def auswahl_json() -> str: + return json.dumps({"bausteine": [_titel(entries[num]) for num in auswahl]}, ensure_ascii=False) + + # Schritt 2: Auswahl-Prüfung — notiert Probleme; Anpassung macht ein Auswahl-Agent + status, fixed = await _check_then_fix( + ctx, name="Auswahl", step=1, + check_key=f"{guide_id}-auswahl-check", + check_prompt=_prompt( + "Guide-Auswahl-Check", + topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag, + bausteine=bausteine_liste, auswahl=auswahl_titel(), + out_path=files["auswahl_check"], extra=_extra(instructions), + ), + check_path=files["auswahl_check"], check_timeout=_timeout("guide_check", len(auswahl)), + fix_key=f"{guide_id}-auswahl-fix", + build_fix_prompt=lambda probleme: _prompt( + "Guide-Auswahl-Fix", + topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag, + bausteine=bausteine_liste, auswahl=auswahl_titel(), + probleme="\n".join(f"- {p}" for p in probleme), + out_path=files["auswahl"], extra=_extra(instructions), + ), + fix_payload=lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max), + fix_timeout=_timeout("guide_auswahl", n), fix_role="guide", + on_fix_invalid=lambda: atomic_write_text(files["auswahl"], auswahl_json()), + ) + if status == CANCELLED: + return None + if status == FAILED: + await _fail(guide_id, "Auswahl-Prüfung fehlgeschlagen") + return None + if fixed is not None: + auswahl = fixed + + sel_entries = {num: entries[num] for num in auswahl} + soll = len(sel_entries) + sel_liste = "\n".join(f"- {t}" for t in sel_entries.values()) + + # Schritt 3: Gliederung der festen Auswahl + plan = _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll) + if plan is None: + await _set_step(guide_id, 2, "Plane Gliederung…") + files["gliederung"].unlink(missing_ok=True) + status, plan = await run_single_slot( + ctx, "Gliederung", + key=f"{guide_id}-gliederung", + prompt=_prompt( + "Guide-Gliederung", + topic=topic, format_name=format_name, bausteine=sel_liste, + out_path=files["gliederung"], extra=_extra(instructions), + ), + role="guide", capabilities="files", + payload=lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll), + timeout=_timeout("plan", soll), + ) + if status == CANCELLED: + return None + if status == FAILED: + await _fail(guide_id, "Gliederung fehlgeschlagen") + return None + + def gliederung_text() -> str: + return "\n".join(_zuteilung_text([ch], {num: _titel(entries[num]) for num in ch["nums"]}) for ch in plan) + + def gliederung_json() -> str: + return json.dumps( + {"kapitel": [{"titel": ch["title"], "bausteine": [_titel(entries[num]) for num in ch["nums"]]} for ch in plan]}, + ensure_ascii=False, + ) + + # Schritt 4: Gliederungs-Prüfung + status, fixed = await _check_then_fix( + ctx, name="Gliederung", step=3, + check_key=f"{guide_id}-gliederung-check", + check_prompt=_prompt( + "Guide-Gliederung-Check", + topic=topic, format_name=format_name, zweck=zweck, + auswahl=auswahl_titel(), gliederung=gliederung_text(), + out_path=files["gliederung_check"], extra=_extra(instructions), + ), + check_path=files["gliederung_check"], check_timeout=_timeout("guide_check", soll), + fix_key=f"{guide_id}-gliederung-fix", + build_fix_prompt=lambda probleme: _prompt( + "Guide-Gliederung-Fix", + topic=topic, format_name=format_name, + auswahl=auswahl_titel(), gliederung=gliederung_text(), + probleme="\n".join(f"- {p}" for p in probleme), + out_path=files["gliederung"], extra=_extra(instructions), + ), + fix_payload=lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll), + fix_timeout=_timeout("plan", soll), fix_role="guide", + on_fix_invalid=lambda: atomic_write_text(files["gliederung"], gliederung_json()), + ) + if status == CANCELLED: + return None + if status == FAILED: + await _fail(guide_id, "Gliederungs-Prüfung fehlgeschlagen") + return None + if fixed is not None: + plan = fixed + + # Schritt 5: Schreiben — vorhandene Chunk-Dateien werden übernommen (Resume) + total_sections = sum(len(c["nums"]) for c in plan) + chunks = _split_chunks(plan, min(WRITER_MAX, max(1, math.ceil(total_sections / WRITER_SECTIONS)))) + zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks] + chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks] + writer_count = len(zuteilungen) + paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)] + offen = [i for i, p in enumerate(paths) if not p.exists()] + if offen: + await _set_step(guide_id, 4, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…") + results = await asyncio.gather(*[ + run_agent( + f"{guide_id}-w{i + 1}", + _prompt( + "Guide-Writer", + topic=topic, format_name=format_name, zuteilung=zuteilungen[i], + facts=facts, spec=spec, out_path=paths[i], extra=_extra(instructions), + ), + _timeout("writer", chunk_sizes[i]), provider=provider, role="guide", capabilities="full", + ) + for i in offen + ], return_exceptions=True) + if is_cancelled(): + return None + for i, r in zip(offen, results): + if isinstance(r, BaseException): + _log(topic, f"Writer {i + 1}: {type(r).__name__}: {r}") + elif r[0] != 0: + _log(topic, f"Writer {i + 1}: {_claude_error('Fehler', *r)}") + elif not paths[i].exists(): + _log(topic, f"Writer {i + 1}: keine Ausgabedatei erstellt") + if not any(p.exists() for p in paths): + await _fail(guide_id, _gather_error("Writer-Fehler", list(results))) + return None + + idx = _titel_index(entries) + by_num: dict[int, dict] = {} + for p in paths: + if not p.exists(): + continue + for sec in _parse_fragment(p.read_text(encoding="utf-8")): + num = _titel_aufloesen(idx, sec["titel"]) + if num is None: + _log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)") + elif num not in by_num: + by_num[num] = sec + if not by_num: + await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden") + return None + + # Schritt 6: Lese-Prüfung pro Writer-Paket — Fix beauftragt Writer nur mit beanstandeten Sections + chunk_nums = [[num for ch in chunk for num in ch["nums"] if num in by_num] for chunk in chunks] + check_paths = [content_path.parent / f"{content_path.stem}.lese-check-{i}.json" for i in range(1, writer_count + 1)] + offen_checks = [i for i, p in enumerate(check_paths) if _lese_probleme_schema(_json_datei(p)) is None and chunk_nums[i]] + if offen_checks: + await _set_step(guide_id, 5, f"Prüfe Lesbarkeit ({len(offen_checks)} Prüfer)…" if len(offen_checks) > 1 else "Prüfe Lesbarkeit…") + + def sections_text(nums: list[int]) -> str: + return "\n\n".join(f"SECTION: {_titel(entries[num])}\n{by_num[num]['md']}" for num in nums) + + slots = [{ + "key": f"{guide_id}-lese-check-{i + 1}", + "prompt": _prompt( + "Guide-Lese-Check", + topic=topic, format_name=format_name, spec=spec, + sections=sections_text(chunk_nums[i]), + out_path=check_paths[i], extra=_extra(instructions), + ), + "role": "fast", "capabilities": "files", + "payload": (lambda result, p=check_paths[i]: _lese_probleme_schema(_json_datei(p))), + } for i in offen_checks] + res = await _race(topic, "Lese-Prüfung", slots, len(slots), _timeout("lese_check", max(chunk_sizes)), provider, cancelled=is_cancelled) + if is_cancelled(): + return None + if res is None: + await _fail(guide_id, "Lese-Prüfung fehlgeschlagen") + return None + + probleme_by_num: dict[int, str] = {} + for p in check_paths: + for item in (_lese_probleme_schema(_json_datei(p)) or []): + num = _titel_aufloesen(idx, item["section"]) + if num in by_num and num not in probleme_by_num: + probleme_by_num[num] = item["problem"] + + if probleme_by_num: + _log(topic, f"Lese-Prüfung: {len(probleme_by_num)} Section(s) beanstandet") + await _set_step(guide_id, 5, f"Überarbeite {len(probleme_by_num)} Section(s)…") + fix_chunks = [[num for num in nums if num in probleme_by_num] for nums in chunk_nums] + fix_offen = [i for i, nums in enumerate(fix_chunks) if nums] + fix_paths = [content_path.parent / f"{content_path.stem}.fix-{i + 1}.md" for i in range(writer_count)] + + def auftraege_text(nums: list[int]) -> str: + return "\n\n".join( + f"SECTION: {_titel(entries[num])}\nPROBLEM: {probleme_by_num[num]}\nAKTUELLER INHALT:\n{by_num[num]['md']}" + for num in nums + ) + + results = await asyncio.gather(*[ + run_agent( + f"{guide_id}-fix-w{i + 1}", + _prompt( + "Guide-Sections-Fix", + topic=topic, format_name=format_name, facts=facts, spec=spec, + auftraege=auftraege_text(fix_chunks[i]), + out_path=fix_paths[i], extra=_extra(instructions), + ), + _timeout("writer", len(fix_chunks[i])), provider=provider, role="guide", capabilities="full", + ) + for i in fix_offen + ], return_exceptions=True) + if is_cancelled(): + return None + for i, r in zip(fix_offen, results): + if isinstance(r, BaseException) or (not isinstance(r, BaseException) and r[0] != 0): + _log(topic, f"Sections-Fix {i + 1} fehlgeschlagen — Original bleibt") + ersetzt = 0 + for i in fix_offen: + if not fix_paths[i].exists(): + continue + for sec in _parse_fragment(fix_paths[i].read_text(encoding="utf-8")): + num = _titel_aufloesen(idx, sec["titel"]) + if num in probleme_by_num and sec["md"].strip(): + by_num[num] = sec + ersetzt += 1 + _log(topic, f"Lese-Prüfung: {ersetzt} Section(s) überarbeitet") + + await _set_progress(guide_id, "Setze zusammen…") + chapters: list[dict] = [] + for ch in plan: + sections = [ + {"num": num, "title": _titel(entries[num]), "md": by_num[num]["md"]} + for num in ch["nums"] if num in by_num + ] + if sections: + chapters.append({"title": ch["title"], "sections": sections}) + geplant = {num for ch in plan for num in ch["nums"]} + missing = sorted(geplant - set(by_num)) + if missing: + _log(topic, f"Sections fehlen in der Writer-Ausgabe: {[_titel(entries[n]) for n in missing]}") + if not chapters: + await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden") + return None + return chapters + + +async def reconcile_guides() -> None: + """DB↔Dateisystem abgleichen: status=done ohne Content-Datei → error. + + Läuft beim Server-Start (nach init_db) — fängt Crashes zwischen + Datei-Write und Status-Update ab. + """ + for g in await list_guides(): + if g["status"] == "done" and not guide_content_path(g["topic"], g["format"]).exists(): + log.warning("[%s] Guide %s: done ohne Content-Datei — auf error gesetzt", g["topic"], g["id"]) + now = datetime.now(timezone.utc).isoformat() + await update_guide(g["id"], status="error", error_msg="Inhalt fehlt — neu generieren", updated_at=now) + + +async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: + async with _semaphore: + now = datetime.now(timezone.utc).isoformat() + await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now) + + content_path = guide_content_path(topic, format_name) + content_path.parent.mkdir(parents=True, exist_ok=True) + project = project_dir(topic) if project_dir(topic).is_dir() else None + + try: + if is_guide_cancelled(guide_id): + return + + if project: + await asyncio.to_thread(_pdfs_konvertieren, project) + + # „Neu erstellen": fertiger Guide → kompletter Frischstart. + # Sonst sind Schritt-Dateien Reste eines Abbruchs/Fehlers → Resume. + if content_path.exists(): + for p_alt in guide_slot_dateien(content_path): + p_alt.unlink(missing_ok=True) + + if format_name == "OnePager": + chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path) + else: + alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8")) + if not alle: + await _fail(guide_id, "Keine Bausteine gefunden") + return + entries = _eindeutige_titel(alle) + facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema") + chapters = await _generate_sections( + guide_id, topic, format_name, entries, + facts, instructions, provider, content_path, + ) + if chapters is None or is_guide_cancelled(guide_id): + return + + atomic_write_json(content_path, {"topic": topic, "format": format_name, "chapters": chapters}, indent=1) + + now = datetime.now(timezone.utc).isoformat() + await update_guide(guide_id, status="done", progress=None, step=None, updated_at=now) + + except asyncio.TimeoutError: + await _fail(guide_id, "Timeout bei der Generierung") + except FileNotFoundError: + await _fail(guide_id, "Bausteine fehlen") + except Exception as e: + log.exception("[%s] Guide-Generierung fehlgeschlagen (%s)", topic, guide_id) + await _fail(guide_id, str(e)[:2000]) + finally: + clear_guide_cancelled(guide_id) diff --git a/backend/main.py b/backend/main.py index 1d9f4f6..d6be158 100644 --- a/backend/main.py +++ b/backend/main.py @@ -9,7 +9,7 @@ setup_logging() from config import FRONTEND_DIST, STORAGE_DIR from database import init_db, close_db -from generator import reconcile_guides +from guide import reconcile_guides from routes import router diff --git a/backend/onepager.py b/backend/onepager.py new file mode 100644 index 0000000..bcd82f8 --- /dev/null +++ b/backend/onepager.py @@ -0,0 +1,157 @@ +"""OnePager-Pipeline: Recherche → Recherche-Prüfung → Bauen → Prüfung (7 Karten im 3×3-Raster).""" + +from pathlib import Path + +from fsutil import atomic_write_json +from jsonio import read_json_file as _json_datei +from pipeline import ( + CANCELLED, FAILED, GenContext, _check_then_fix, _extra, _fail, + _prompt, _set_step, _timeout, is_guide_cancelled, run_single_slot, +) + + +async def _generate_onepager( + guide_id: str, topic: str, instructions: str, provider: str, + project: Path | None, content_path: Path, +) -> list[dict] | None: + ctx = GenContext(topic=topic, provider=provider, is_cancelled=lambda: is_guide_cancelled(guide_id), guide_id=guide_id) + + # 3×3-Raster: 7 Karten mit festen Schlüsseln (Reihenfolge = Lesereihenfolge mobil) + KARTEN_KEYS = ("info", "eigenschaften", "beispiel", "zusammenhaenge", "voraussetzungen", "modern", "veraltet") + + def karten_schema(data): + """{"karten": {key: {titel, md}}} → Liste · sonst None.""" + if not isinstance(data, dict): + return None + karten = data.get("karten") + if not isinstance(karten, dict): + return None + out = [] + for key in KARTEN_KEYS: + k = karten.get(key) + if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("md"), str): + return None + titel, md = k["titel"].strip(), k["md"].strip() + if not titel or len(md) < 5: # abgebrochene/leere Karten sind ungültig + return None + out.append({"key": key, "titel": titel, "md": md}) + return out + + d, stem = content_path.parent, content_path.stem + recherche_path = d / f"{stem}.recherche.md" + recherche_check_path = d / f"{stem}.recherche-check.json" + karten_path = d / f"{stem}.karten.json" + check_path = d / f"{stem}.onepager-check.json" + + # Projekte bekommen eigene Recherche-Dimensionen — Produkt-Fragen + # (Version, Lizenz, Alternativen) laufen dort ins Leere. + if project: + source = _prompt("OnePager-Quelle-Projekt", project=project) + recherche_template = "OnePager-Recherche-Projekt" + recherche_check_template = "OnePager-Recherche-Check-Projekt" + else: + source = _prompt("OnePager-Quelle-Thema", topic=topic) + recherche_template = "OnePager-Recherche" + recherche_check_template = "OnePager-Recherche-Check" + + def recherche_payload(result=None): + if not recherche_path.exists(): + return None + text = recherche_path.read_text(encoding="utf-8").strip() + return text or None + + # Schritt 1: Recherche — vorhandene Datei wird übernommen (Resume) + recherche = recherche_payload() + if recherche is None: + await _set_step(guide_id, 0, "Recherchiere…") + status, recherche = await run_single_slot( + ctx, "OnePager-Recherche", + key=f"{guide_id}-recherche", + prompt=_prompt(recherche_template, topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)), + role="quick", capabilities="files" if project else "full", + payload=recherche_payload, timeout=_timeout("onepager_recherche"), + ) + if status == CANCELLED: + return None + if status == FAILED: + await _fail(guide_id, "OnePager-Recherche fehlgeschlagen") + return None + + # Schritt 2: Recherche-Prüfung — notiert Probleme; Anpassung macht ein Recherche-Agent + status, fixed = await _check_then_fix( + ctx, name="Recherche", step=1, + check_key=f"{guide_id}-recherche-check", + check_prompt=_prompt(recherche_check_template, topic=topic, recherche=recherche, out_path=recherche_check_path), + check_path=recherche_check_path, check_timeout=_timeout("onepager_verify"), + fix_key=f"{guide_id}-recherche-fix", + build_fix_prompt=lambda probleme: _prompt( + "OnePager-Recherche-Fix", + topic=topic, source=source, recherche=recherche, + probleme="\n".join(f"- {p}" for p in probleme), + out_path=recherche_path, extra=_extra(instructions), + ), + fix_payload=recherche_payload, fix_timeout=_timeout("onepager_recherche"), + fix_role="quick", fix_caps="files" if project else "full", + ) + if status == CANCELLED: + return None + if status == FAILED: + await _fail(guide_id, "Recherche-Prüfung fehlgeschlagen") + return None + if fixed is not None: + recherche = fixed + + # Schritt 3: Bauen — Karten nur aus der Faktenbasis (Resume: gültige Datei wird übernommen) + karten = karten_schema(_json_datei(karten_path)) + if karten is None: + await _set_step(guide_id, 2, "Baue OnePager…") + karten_path.unlink(missing_ok=True) + status, karten = await run_single_slot( + ctx, "OnePager-Bauen", + key=f"{guide_id}-bauen", + prompt=_prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)), + role="fast", capabilities="files", + payload=lambda result: karten_schema(_json_datei(karten_path)), + timeout=_timeout("onepager_bauen"), + ) + if status == CANCELLED: + return None + if status == FAILED: + await _fail(guide_id, "OnePager-Bau fehlgeschlagen") + return None + + def karten_block() -> str: + return "\n\n".join(f"### {k['titel']} [{k['key']}]\n{k['md']}" for k in karten) + + # Schritt 4: Prüfung — notiert Probleme; Anpassung macht ein Bauen-Agent + status, fixed = await _check_then_fix( + ctx, name="OnePager", step=3, + check_key=f"{guide_id}-verify", + check_prompt=_prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block(), out_path=check_path), + check_path=check_path, check_timeout=_timeout("onepager_verify"), + fix_key=f"{guide_id}-karten-fix", + build_fix_prompt=lambda probleme: _prompt( + "OnePager-Fix", + topic=topic, recherche=recherche, karten=karten_block(), + probleme="\n".join(f"- {p}" for p in probleme), + out_path=karten_path, extra=_extra(instructions), + ), + fix_payload=lambda result: karten_schema(_json_datei(karten_path)), + fix_timeout=_timeout("onepager_bauen"), + on_fix_invalid=lambda: atomic_write_json( + karten_path, {"karten": {k["key"]: {"titel": k["titel"], "md": k["md"]} for k in karten}}, + ), + ) + if status == CANCELLED: + return None + if status == FAILED: + await _fail(guide_id, "OnePager-Prüfung fehlgeschlagen") + return None + if fixed is not None: + karten = fixed + + sections = [ + {"num": i, "title": k["titel"], "md": k["md"], "key": k["key"]} + for i, k in enumerate(karten, 1) + ] + return [{"title": topic, "sections": sections}] diff --git a/backend/pipeline.py b/backend/pipeline.py new file mode 100644 index 0000000..120cf92 --- /dev/null +++ b/backend/pipeline.py @@ -0,0 +1,251 @@ +"""Pipeline-Grundbausteine: Agent-Races, Single-Slot, Check→Fix, Prompts, Guide-Status. + +Hält den mutablen Pipeline-Zustand (Generierungs-Semaphore, Cancel-Set). +Zugriff auf das Cancel-Set NUR über die Funktionen hier — kopierte Referenzen +in anderen Modulen würden bei einem Re-Assign auseinanderlaufen. +""" + +import asyncio +import logging +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Callable + +from agents import run_agent, kill_process +from config import MAX_CONCURRENT_GENERATIONS, TEMPLATES_DIR, TIMEOUTS +from database import update_guide +from jsonio import read_json_file as _json_datei + +log = logging.getLogger("creator.pipeline") + +_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS) +_cancelled: set[str] = set() + + +async def cancel_guide(guide_id: str) -> bool: + _cancelled.add(guide_id) + kill_process(guide_id) + now = datetime.now(timezone.utc).isoformat() + await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now) + return True + + +def is_guide_cancelled(guide_id: str) -> bool: + return guide_id in _cancelled + + +def clear_guide_cancelled(guide_id: str) -> None: + _cancelled.discard(guide_id) + + +async def _set_progress(guide_id: str, progress: str) -> None: + now = datetime.now(timezone.utc).isoformat() + await update_guide(guide_id, progress=progress, updated_at=now) + + +async def _set_step(guide_id: str, step: int, progress: str) -> None: + now = datetime.now(timezone.utc).isoformat() + await update_guide(guide_id, step=step, progress=progress, updated_at=now) + + +async def _fail(guide_id: str, msg: str) -> None: + now = datetime.now(timezone.utc).isoformat() + await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now) + + +def _prompt(name: str, **kwargs) -> str: + template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8") + return template.format(**kwargs) + + +def _extra(instructions: str) -> str: + return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else "" + + +def _log(topic: str, msg: str) -> None: + log.info("[%s] %s", topic, msg) + + +def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str: + stderr = (stderr or "").strip() + if stderr: + return f"{label}: {stderr[:1000]}" + tail = (stdout or "").strip()[-500:] + if tail: + return f"{label} (exit {returncode}, stderr leer): …{tail}" + return f"{label} (exit {returncode}, ohne Ausgabe)" + + +def _gather_error(label: str, results: list) -> str: + for r in results: + if isinstance(r, BaseException): + return f"{label}: {type(r).__name__}: {r}" + returncode, stdout, stderr = r + if returncode != 0: + return _claude_error(label, returncode, stdout, stderr) + return f"{label}: kein verwertbares Ergebnis" + + +def _timeout(step: str, n: int = 0) -> int: + base, per = TIMEOUTS[step] + return base + per * n + + +def _probleme_schema(data): + """{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None.""" + if not isinstance(data, dict): + return None + if data.get("ok") is True: + return [] + p = data.get("probleme") + if not isinstance(p, list) or not p: + return None + out = [str(x).strip() for x in p if str(x).strip()] + return out or None + + +_MAX_RESTARTS = 2 + + +async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None: + """Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse. + + Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)` + prüft die Gültigkeit und liefert das Slot-Ergebnis oder None. + Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das + Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt. + `cancelled()` → True bricht ab (keine Restarts, Rückgabe None). + """ + attempts = {i: 0 for i in range(len(slots))} + tasks: dict[asyncio.Task, int] = {} + + def spawn(i: int) -> None: + slot = slots[i] + task = asyncio.create_task(run_agent( + slot["key"], slot["prompt"], timeout, + provider=provider, role=slot["role"], capabilities=slot["capabilities"], + )) + tasks[task] = i + + for i in range(len(slots)): + spawn(i) + + results: list = [] + try: + while tasks: + if cancelled and cancelled(): + return None + done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) + for task in done: + i = tasks.pop(task) + payload, err = None, None + try: + result = task.result() + if result[0] != 0: + err = _claude_error("Fehler", *result) + else: + payload = slots[i]["payload"](result) + if payload is None: + err = "Ergebnis ungültig/nicht parsebar" + except asyncio.TimeoutError: + err = f"Timeout nach {timeout}s" + except Exception as e: + err = f"{type(e).__name__}: {e}" + + if payload is not None: + results.append(payload) + if on_update: + on_update(len(results)) + if len(results) >= quorum: + return results + continue + + _log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}") + attempts[i] += 1 + if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()): + spawn(i) + _log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)") + return None + finally: + for task, i in tasks.items(): + kill_process(slots[i]["key"]) + task.cancel() + if tasks: + await asyncio.gather(*tasks.keys(), return_exceptions=True) + + +@dataclass +class GenContext: + """Durchgereichte Pipeline-Parameter — erspart lange Argument-Signaturen.""" + topic: str + provider: str + is_cancelled: Callable[[], bool] + guide_id: str | None = None + + +# Ergebnis-Status von run_single_slot/_check_then_fix +OK, CANCELLED, FAILED = "ok", "cancelled", "failed" + + +async def run_single_slot( + ctx: GenContext, label: str, *, + key: str, prompt: str, role: str, capabilities: str, payload, timeout: int, +) -> tuple[str, object]: + """Ein Agent, ein gültiges Ergebnis (Race mit Quorum 1). + + → (OK, wert) | (CANCELLED, None) | (FAILED, None) + """ + slots = [{"key": key, "prompt": prompt, "role": role, "capabilities": capabilities, "payload": payload}] + res = await _race(ctx.topic, label, slots, 1, timeout, ctx.provider, cancelled=ctx.is_cancelled) + if ctx.is_cancelled(): + return CANCELLED, None + if res is None: + return FAILED, None + return OK, res[0] + + +async def _check_then_fix( + ctx: GenContext, *, name: str, step: int, + check_key: str, check_prompt: str, check_path: Path, check_timeout: int, + fix_key: str, build_fix_prompt, fix_payload, fix_timeout: int, + fix_role: str = "fast", fix_caps: str = "files", + on_fix_invalid=None, +) -> tuple[str, object]: + """Check→Fix-Muster: Prüf-Agent notiert Probleme (JSON), Fix-Agent behebt sie. + + Resume: existierende Check-Datei überspringt den ganzen Schritt. + Check ist fatal (FAILED), Fix nicht — Original bleibt; on_fix_invalid kann + das kanonische Original zurückschreiben, falls der Fix-Agent die + Artefakt-Datei zerschrieben hat. + Lese-Check (Multi-Slot, Section-genau) und Bausteine-Auswahl-Check + (Patch-Semantik) passen bewusst NICHT in dieses Muster. + → (OK, neues_artefakt | None=unverändert) | (CANCELLED, None) | (FAILED, None) + """ + if check_path.exists(): + return OK, None + await _set_step(ctx.guide_id, step, f"Prüfe {name}…") + status, probleme = await run_single_slot( + ctx, f"{name}-Prüfung", key=check_key, prompt=check_prompt, + role="fast", capabilities="files", + payload=lambda result: _probleme_schema(_json_datei(check_path)), + timeout=check_timeout, + ) + if status != OK: + return status, None + if not probleme: + return OK, None + _log(ctx.topic, f"{name}-Prüfung: {len(probleme)} Problem(e) notiert") + await _set_step(ctx.guide_id, step, f"Passe {name} an…") + status, fixed = await run_single_slot( + ctx, f"{name}-Fix", key=fix_key, prompt=build_fix_prompt(probleme), + role=fix_role, capabilities=fix_caps, payload=fix_payload, timeout=fix_timeout, + ) + if status == CANCELLED: + return CANCELLED, None + if status == FAILED: + _log(ctx.topic, f"{name}-Fix ungültig — Original bleibt") + if on_fix_invalid: + on_fix_invalid() + return OK, None + return OK, fixed diff --git a/backend/routes.py b/backend/routes.py index 31a4af9..c49f7b5 100644 --- a/backend/routes.py +++ b/backend/routes.py @@ -15,11 +15,10 @@ from database import ( list_progress, set_progress, delete_progress, create_element, list_elements, get_element, update_element, delete_element, ) -from generator import ( - generate_guide, cancel_guide, chat_with_guide, guide_slot_dateien, - generate_bausteine, cancel_bausteine, bausteine_status, active_bausteine, reset_bausteine, - generate_element, chat_with_element, check_element, style_element, refine_suggestion, -) +from bausteine import generate_bausteine, cancel_bausteine, bausteine_status, active_bausteine, reset_bausteine +from elements import generate_element, chat_with_guide, chat_with_element, check_element, style_element, refine_suggestion +from guide import generate_guide, guide_slot_dateien +from pipeline import cancel_guide from models import ( GuideCreateRequest, GuideResponse, TopicCreateRequest, diff --git a/backend/textkit.py b/backend/textkit.py new file mode 100644 index 0000000..68a3ac2 --- /dev/null +++ b/backend/textkit.py @@ -0,0 +1,143 @@ +"""Reine Text-Helfer: Titel-Normalisierung, Listen-Parser, Chunk-Aufteilung. + +Kein Zustand, keine IO — überall gefahrlos importierbar. +""" + +import re + +_CATEGORIES = ("KERN", "WICHTIG", "REST") # nur noch für den Altformat-Reader + + +def _norm_titel(s: str) -> str: + """Normalisiert einen Titel für den Schlüssel-Vergleich.""" + s = re.sub(r"[`'\"<>]", "", s) + return re.sub(r"\s+", " ", s).strip().lower() + + +def _titel(entry: str) -> str: + return entry.split(" — ")[0].strip() or entry + + +def _eindeutige_titel(entries: dict[int, str]) -> dict[int, str]: + """Macht Titel eindeutig (Suffix " (2)", " (3)" …), damit sie als Schlüssel taugen.""" + seen: dict[str, int] = {} + out: dict[int, str] = {} + for num, text in entries.items(): + titel = _titel(text) + key = _norm_titel(titel) + seen[key] = seen.get(key, 0) + 1 + if seen[key] > 1: + rest = text.split(" — ", 1) + text = f"{titel} ({seen[key]})" + (f" — {rest[1]}" if len(rest) == 2 else "") + # zweiter Durchlauf nicht nötig: Suffixe kollidieren praktisch nicht + out[num] = text + return out + + +def _titel_index(entries: dict[int, str]) -> dict[str, int]: + return {_norm_titel(_titel(text)): num for num, text in entries.items()} + + +def _titel_aufloesen(idx: dict[str, int], t: str) -> int | None: + """Titel → Nummer; toleriert mitgeschleppte Beschreibungen ("Titel — …").""" + if not isinstance(t, str): + return None + return idx.get(_norm_titel(t)) or idx.get(_norm_titel(_titel(t))) + + +def _parse_auswahl(text: str) -> dict[int, str]: + """Parst eine Baustein-Liste: `N. Titel — Kurzbeschreibung` pro Zeile.""" + entries: dict[int, str] = {} + last = None + for line in text.splitlines(): + m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line) + if m: + last = int(m.group(1)) + entries[last] = m.group(2) + elif last is not None and line.strip(): + entries[last] += " " + line.strip() + return entries + + +def _parse_kategorien(text: str) -> dict[str, list[str]]: + """Altformat-Reader: finale Baustein-Datei mit ## KERN/WICHTIG/REST-Abschnitten.""" + cats: dict[str, list[str]] = {} + current = None + for line in text.splitlines(): + s = line.strip() + m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE) + if m: + current = m.group(1).upper() + cats.setdefault(current, []) + continue + m = re.match(r"(\d+)[.)]\s+(.*\S)", s) + if m and current: + cats[current].append(m.group(2)) + return cats + + +def _lade_bausteine(text: str) -> dict[int, str]: + """Lädt die finale Baustein-Datei — sortierte Liste (neu) oder Kategorien (Altformat).""" + if re.search(r"^#+\s*KERN\b", text, re.IGNORECASE | re.MULTILINE): + cats = _parse_kategorien(text) + texts = [t for cat in _CATEGORIES for t in cats.get(cat, [])] + return {i: t for i, t in enumerate(texts, 1)} + return _parse_auswahl(text) + + +_FRAGMENT_KAPITEL_RE = re.compile(r"", re.IGNORECASE) +_FRAGMENT_SECTION_RE = re.compile(r"", re.IGNORECASE) + + +def _parse_fragment(text: str) -> list[dict]: + """Parst eine Writer-Datei → [{"kapitel", "titel", "md"}] in Datei-Reihenfolge.""" + sections: list[dict] = [] + kapitel = None + current = None + for line in text.splitlines(): + s = line.strip() + m = _FRAGMENT_KAPITEL_RE.match(s) + if m: + kapitel = m.group(1) + current = None + continue + m = _FRAGMENT_SECTION_RE.match(s) + if m: + current = {"kapitel": kapitel, "titel": m.group(1), "md": []} + sections.append(current) + continue + if current is not None: + current["md"].append(line) + for sec in sections: + sec["md"] = "\n".join(sec["md"]).strip() + return sections + + +def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]: + """Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl.""" + n = max(1, min(n, len(chapters))) + chunks: list[list[dict]] = [] + current: list[dict] = [] + count = 0 + remaining_total = sum(len(c["nums"]) for c in chapters) + remaining_chunks = n + for ch in chapters: + current.append(ch) + count += len(ch["nums"]) + if remaining_chunks > 1 and count >= remaining_total / remaining_chunks: + chunks.append(current) + remaining_total -= count + remaining_chunks -= 1 + current = [] + count = 0 + if current: + chunks.append(current) + return chunks + + +def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str: + lines = [] + for ch in chunk: + lines.append(f"KAPITEL: {ch['title']}") + lines.extend(f"- {entries[num]}" for num in ch["nums"]) + return "\n".join(lines)