import asyncio import json import math import shutil import subprocess import re import uuid from datetime import datetime, timezone from pathlib import Path from agents import run_agent, kill_process from config import ( DEFAULT_PROVIDER, FORMAT_ANTEIL, TEMPLATES_DIR, TIMEOUTS, MAX_CONCURRENT_GENERATIONS, ) from database import update_guide from paths import arbeit_dir, bausteine_path, guide_content_path, project_dir _semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS) _cancelled: set[str] = set() async def cancel_guide(guide_id: str) -> bool: _cancelled.add(guide_id) kill_process(guide_id) now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now) return True async def _set_progress(guide_id: str, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, progress=progress, updated_at=now) def _prompt(name: str, **kwargs) -> str: template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8") return template.format(**kwargs) def _extra(instructions: str) -> str: return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else "" def _log(topic: str, msg: str) -> None: print(f"[generator] {topic}: {msg}", flush=True) def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str: stderr = (stderr or "").strip() if stderr: return f"{label}: {stderr[:1000]}" tail = (stdout or "").strip()[-500:] if tail: return f"{label} (exit {returncode}, stderr leer): …{tail}" return f"{label} (exit {returncode}, ohne Ausgabe)" def _gather_error(label: str, results: list) -> str: for r in results: if isinstance(r, BaseException): return f"{label}: {type(r).__name__}: {r}" returncode, stdout, stderr = r if returncode != 0: return _claude_error(label, returncode, stdout, stderr) return f"{label}: kein verwertbares Ergebnis" async def _fail(guide_id: str, msg: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now) def _norm_titel(s: str) -> str: """Normalisiert einen Titel für den Schlüssel-Vergleich.""" s = re.sub(r"[`'\"<>]", "", s) return re.sub(r"\s+", " ", s).strip().lower() def _titel(entry: str) -> str: return entry.split(" — ")[0].strip() or entry def _eindeutige_titel(entries: dict[int, str]) -> dict[int, str]: """Macht Titel eindeutig (Suffix " (2)", " (3)" …), damit sie als Schlüssel taugen.""" seen: dict[str, int] = {} out: dict[int, str] = {} for num, text in entries.items(): titel = _titel(text) key = _norm_titel(titel) seen[key] = seen.get(key, 0) + 1 if seen[key] > 1: rest = text.split(" — ", 1) text = f"{titel} ({seen[key]})" + (f" — {rest[1]}" if len(rest) == 2 else "") # zweiter Durchlauf nicht nötig: Suffixe kollidieren praktisch nicht out[num] = text return out def _titel_index(entries: dict[int, str]) -> dict[str, int]: return {_norm_titel(_titel(text)): num for num, text in entries.items()} def _json_datei(path: Path): """Liest eine JSON-Datei (Code-Fences tolerant); None bei fehlend/ungültig.""" if not path.exists(): return None try: text = path.read_text(encoding="utf-8").strip() text = re.sub(r"^```(?:json)?\s*|\s*```$", "", text) return json.loads(text) except Exception: return None def _timeout(step: str, n: int = 0) -> int: base, per = TIMEOUTS[step] return base + per * n _MAX_RESTARTS = 2 async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None: """Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse. Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)` prüft die Gültigkeit und liefert das Slot-Ergebnis oder None. Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt. `cancelled()` → True bricht ab (keine Restarts, Rückgabe None). """ attempts = {i: 0 for i in range(len(slots))} tasks: dict[asyncio.Task, int] = {} def spawn(i: int) -> None: slot = slots[i] task = asyncio.create_task(run_agent( slot["key"], slot["prompt"], timeout, provider=provider, role=slot["role"], capabilities=slot["capabilities"], )) tasks[task] = i for i in range(len(slots)): spawn(i) results: list = [] try: while tasks: if cancelled and cancelled(): return None done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) for task in done: i = tasks.pop(task) payload, err = None, None try: result = task.result() if result[0] != 0: err = _claude_error("Fehler", *result) else: payload = slots[i]["payload"](result) if payload is None: err = "Ergebnis ungültig/nicht parsebar" except asyncio.TimeoutError: err = f"Timeout nach {timeout}s" except Exception as e: err = f"{type(e).__name__}: {e}" if payload is not None: results.append(payload) if on_update: on_update(len(results)) if len(results) >= quorum: return results continue _log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}") attempts[i] += 1 if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()): spawn(i) _log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)") return None finally: for task, i in tasks.items(): kill_process(slots[i]["key"]) task.cancel() if tasks: await asyncio.gather(*tasks.keys(), return_exceptions=True) # --- Bausteine-Pipeline: 4x Recherche (3) → 2x Auswahl (1) → Prüfung — reines Inventar, unsortiert --- _bausteine_progress: dict[str, str] = {} _bausteine_errors: dict[str, str] = {} _bausteine_cancelled: set[str] = set() _bausteine_step: dict[str, int] = {} BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung") _CATEGORIES = ("KERN", "WICHTIG", "REST") # nur noch für den Altformat-Reader def _bausteine_steps(topic: str) -> tuple: """Projekte haben einen 4. Schritt: Themenfeld-Ergänzung per Web-Recherche.""" if project_dir(topic).is_dir(): return BAUSTEINE_STEPS + ("Ergänzung",) return BAUSTEINE_STEPS def _bausteine_files(topic: str) -> dict: arbeit = arbeit_dir(topic) return { "final": bausteine_path(topic), "arbeit": arbeit, "recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)], "auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)], "auswahl_check": arbeit / "auswahl-check.json", "ergaenzung": arbeit / "ergaenzung.json", } def _alle_slot_dateien(files: dict) -> list[Path]: return [*files["recherche"], *files["auswahl"], files["auswahl_check"], files["ergaenzung"]] def cancel_bausteine(topic: str) -> bool: if topic not in _bausteine_progress: return False _bausteine_cancelled.add(topic) kill_process(f"bausteine-{topic}-") return True def _resume_step(topic: str) -> int: """Erster noch offener Schritt anhand der persistierten Zwischendateien.""" files = _bausteine_files(topic) if sum(p.exists() for p in files["recherche"]) < 3: return 0 if not any(p.exists() for p in files["auswahl"]): return 1 if not files["auswahl_check"].exists(): return 2 if project_dir(topic).is_dir() and not files["ergaenzung"].exists(): return 3 return len(_bausteine_steps(topic)) def bausteine_status(topic: str) -> dict: steps = _bausteine_steps(topic) ready = bausteine_path(topic).exists() generating = topic in _bausteine_progress partial = False if generating: current = _bausteine_step.get(topic) states = [ "pending" if current is None else "done" if i < current else "active" if i == current else "pending" for i in range(len(steps)) ] elif ready: states = ["done"] * len(steps) else: nxt = _resume_step(topic) partial = nxt > 0 states = ["done" if i < nxt else "pending" for i in range(len(steps))] return { "ready": ready, "generating": generating, "progress": _bausteine_progress.get(topic), "error": _bausteine_errors.get(topic), "partial": partial, "steps": [{"label": label, "state": s} for label, s in zip(steps, states)], } def active_bausteine() -> list[dict]: return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()] def reset_bausteine(topic: str) -> None: files = _bausteine_files(topic) files["final"].unlink(missing_ok=True) shutil.rmtree(files["arbeit"], ignore_errors=True) _bausteine_errors.pop(topic, None) def _ergaenzung_schema(data): """{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None.""" if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list): return None out = [] for b in data["bausteine"]: if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str): return None titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip() if not titel: return None out.append((titel, beschreibung)) return out def _pdfs_konvertieren(project: Path) -> None: """PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern. Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die .txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet. Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten). """ pdfs = list(project.rglob("*.pdf")) if not pdfs: return if shutil.which("pdftotext") is None: raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden") for pdf in pdfs: txt = pdf.with_suffix(".txt") if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime: continue try: subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120) _log(project.name, f"PDF konvertiert: {pdf.name} → {txt.name}") except Exception as e: raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str: if project: source = _prompt("Bausteine-Quelle-Projekt", project=project) else: source = _prompt("Bausteine-Quelle-Thema", topic=topic) return _prompt( "Bausteine-Recherche", topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions), ) def _parse_auswahl(text: str) -> dict[int, str]: """Parst eine Baustein-Liste: `N. Titel — Kurzbeschreibung` pro Zeile.""" entries: dict[int, str] = {} last = None for line in text.splitlines(): m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line) if m: last = int(m.group(1)) entries[last] = m.group(2) elif last is not None and line.strip(): entries[last] += " " + line.strip() return entries def _parse_kategorien(text: str) -> dict[str, list[str]]: """Altformat-Reader: finale Baustein-Datei mit ## KERN/WICHTIG/REST-Abschnitten.""" cats: dict[str, list[str]] = {} current = None for line in text.splitlines(): s = line.strip() m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE) if m: current = m.group(1).upper() cats.setdefault(current, []) continue m = re.match(r"(\d+)[.)]\s+(.*\S)", s) if m and current: cats[current].append(m.group(2)) return cats def _lade_bausteine(text: str) -> dict[int, str]: """Lädt die finale Baustein-Datei — sortierte Liste (neu) oder Kategorien (Altformat).""" if re.search(r"^#+\s*KERN\b", text, re.IGNORECASE | re.MULTILINE): cats = _parse_kategorien(text) texts = [t for cat in _CATEGORIES for t in cats.get(cat, [])] return {i: t for i, t in enumerate(texts, 1)} return _parse_auswahl(text) def _file_payload(path: Path): """Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält.""" if not path.exists(): return None text = path.read_text(encoding="utf-8") return text if _parse_auswahl(text) else None def _auswahl_payload(path: Path): if not path.exists(): return None text = path.read_text(encoding="utf-8") entries = _parse_auswahl(text) return (text, entries) if entries else None def _auswahl_check_schema(data): """{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß.""" if not isinstance(data, dict): return None nach = data.get("nachtraege", []) streich = data.get("streichen", []) if not isinstance(nach, list) or not isinstance(streich, list): return None if not all(isinstance(x, str) for x in [*nach, *streich]): return None return {"nachtraege": nach, "streichen": streich} def _titel_aufloesen(idx: dict[str, int], t: str) -> int | None: """Titel → Nummer; toleriert mitgeschleppte Beschreibungen ("Titel — …").""" if not isinstance(t, str): return None return idx.get(_norm_titel(t)) or idx.get(_norm_titel(_titel(t))) async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: if topic in _bausteine_progress: return _bausteine_progress[topic] = "Wartend…" _bausteine_errors.pop(topic, None) files = _bausteine_files(topic) final_path = files["final"] project = project_dir(topic) if project_dir(topic).is_dir() else None def set_p(msg: str, step: int | None = None) -> None: _bausteine_progress[topic] = msg if step is not None: _bausteine_step[topic] = step def is_cancelled() -> bool: return topic in _bausteine_cancelled def abgebrochen() -> None: _bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten" try: async with _semaphore: files["arbeit"].mkdir(parents=True, exist_ok=True) if project: await asyncio.to_thread(_pdfs_konvertieren, project) # „Neu erstellen": fertige Bausteine → kompletter Frischstart. # Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume. if final_path.exists(): for p_alt in _alle_slot_dateien(files): p_alt.unlink(missing_ok=True) # Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen recherchen: list[str] = [] offen = [] for i, path in enumerate(files["recherche"], 1): text = _file_payload(path) if text is not None and len(recherchen) < 3: recherchen.append(text) else: offen.append((i, path)) vorhanden = len(recherchen) set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0) if vorhanden < 3: caps = "files" if project else "full" slots = [ { "key": f"bausteine-{topic}-recherche-{i}", "prompt": _build_recherche_prompt(topic, path, instructions, project), "role": "quick", "capabilities": caps, "payload": (lambda result, p=path: _file_payload(p)), } for i, path in offen ] neue = await _race( topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider, on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"), cancelled=is_cancelled, ) if is_cancelled(): abgebrochen() return if neue is None: _bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)" return recherchen += neue # Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen n_est = max(len(_parse_auswahl(t)) for t in recherchen) bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None) if bestehende is not None: flat, entries = bestehende else: set_p("Konsolidiere Recherche…", step=1) results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1)) slots = [ { "key": f"bausteine-{topic}-auswahl-{i}", "prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path), "role": "fast", "capabilities": "files", "payload": (lambda result, p=path: _auswahl_payload(p)), } for i, path in enumerate(files["auswahl"], 1) ] auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled) if is_cancelled(): abgebrochen() return if auswahl is None: _bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)" return flat, entries = auswahl[0] # Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal) set_p("Prüfe Auswahl…", step=2) check_path = files["auswahl_check"] patch = _auswahl_check_schema(_json_datei(check_path)) if patch is None: check_path.unlink(missing_ok=True) titel_listen = "\n\n".join( f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values()) for i, text in enumerate(recherchen, 1) ) slots = [{ "key": f"bausteine-{topic}-auswahlcheck-1", "prompt": _prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path), "role": "fast", "capabilities": "files", "payload": (lambda result: _auswahl_check_schema(_json_datei(check_path))), }] checks = await _race(topic, "Auswahl-Check", slots, 1, _timeout("auswahl_check", len(entries)), provider, cancelled=is_cancelled) if is_cancelled(): abgebrochen() return if checks is None: _log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort") else: patch = checks[0] if patch is not None and (patch["streichen"] or patch["nachtraege"]): idx = _titel_index(entries) weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None} if weg: _log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}") entries = {n: t for n, t in entries.items() if n not in weg} if patch["nachtraege"]: _log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine") texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"]) entries = {i: t for i, t in enumerate(texts, 1)} # Schritt 4 (nur Projekte): Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt, # ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung]. if project: set_p("Ergänze Themenfeld…", step=3) erg_path = files["ergaenzung"] ergaenzungen = _ergaenzung_schema(_json_datei(erg_path)) if ergaenzungen is None: erg_path.unlink(missing_ok=True) slots = [{ "key": f"bausteine-{topic}-ergaenzung-1", "prompt": _prompt( "Bausteine-Ergaenzung", topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()), out_path=erg_path, extra=_extra(instructions), ), "role": "quick", "capabilities": "full", "payload": (lambda result: _ergaenzung_schema(_json_datei(erg_path))), }] res = await _race(topic, "Ergänzung", slots, 1, _timeout("ergaenzung"), provider, cancelled=is_cancelled) if is_cancelled(): abgebrochen() return if res is None: _bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)" return ergaenzungen = res[0] idx = _titel_index(entries) neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None] if neu: _log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt") start = max(entries, default=0) + 1 for off, (t, b) in enumerate(neu): entries[start + off] = f"{t} — {b} [Ergänzung]" # Titel eindeutig machen und unsortiertes Inventar schreiben entries = _eindeutige_titel(entries) final_path.write_text( "\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n", encoding="utf-8", ) except Exception as e: _bausteine_errors[topic] = str(e)[:2000] finally: # Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit. _bausteine_progress.pop(topic, None) _bausteine_step.pop(topic, None) _bausteine_cancelled.discard(topic) # --- Guide-Generierung: 6 Schritte mit Prüfung nach jeder Phase (OnePager hat einen eigenen Weg) --- # Prüf-Agenten notieren nur Probleme; das Anpassen übernimmt der jeweilige Erzeuger-Typ. # Schritt-Dateien bleiben liegen → Abbruch erhält Fortschritt, ▶ setzt am offenen Schritt fort. GUIDE_STEPS = ("Auswahl", "Auswahl-Prüfung", "Gliederung", "Gliederungs-Prüfung", "Schreiben", "Lese-Prüfung") # Writer skalieren mit der Section-Zahl: 1 Writer je ~30 Sections (gedeckelt). # Kleine Pakete vermeiden Lazy-Output bei langen Listen und begrenzen den Schaden # eines fehlgeschlagenen Writers. WRITER_SECTIONS = 30 WRITER_MAX = 20 def _guide_files(content_path: Path) -> dict: d, stem = content_path.parent, content_path.stem return { "auswahl": d / f"{stem}.auswahl.json", "auswahl_check": d / f"{stem}.auswahl-check.json", "gliederung": d / f"{stem}.gliederung.json", "gliederung_check": d / f"{stem}.gliederung-check.json", # chunk-/lese-check-/fix-Dateien sind dynamisch: {stem}.chunk-i.md usw. } def guide_slot_dateien(content_path: Path) -> list[Path]: """Alle Schritt-Dateien eines Guides (für den Frischstart).""" return [p for p in content_path.parent.glob(f"{content_path.stem}.*") if p != content_path] async def _set_step(guide_id: str, step: int, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, step=step, progress=progress, updated_at=now) def _resolve_auswahl(data, entries: dict[int, str], k_min: int, k_max: int) -> list[int] | None: """{"bausteine": [Titel]} → Nummern; None bei Schema-Verstoß/Drift/falschem Umfang.""" if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list): return None idx = _titel_index(entries) nums: list[int] = [] seen: set[int] = set() total = unknown = 0 for t in data["bausteine"]: total += 1 num = _titel_aufloesen(idx, t) if isinstance(t, str) else None if num is None: unknown += 1 elif num not in seen: seen.add(num) nums.append(num) if total == 0 or (total - unknown) / total < 0.85: return None if len(nums) < 0.9 * k_min or len(nums) > 1.1 * k_max: return None return nums def _probleme_schema(data): """{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None.""" if not isinstance(data, dict): return None if data.get("ok") is True: return [] p = data.get("probleme") if not isinstance(p, list) or not p: return None out = [str(x).strip() for x in p if str(x).strip()] return out or None def _lese_probleme_schema(data): """{"ok": true} → [] · {"probleme": [{"section", "problem"}]} → Liste · sonst None.""" if not isinstance(data, dict): return None if data.get("ok") is True: return [] p = data.get("probleme") if not isinstance(p, list) or not p: return None out = [] for x in p: if not isinstance(x, dict) or not isinstance(x.get("section"), str) or not isinstance(x.get("problem"), str): return None out.append({"section": x["section"].strip(), "problem": x["problem"].strip()}) return out or None def _resolve_gliederung(data, entries: dict[int, str], soll_min: int, soll_max: int) -> list[dict] | None: """{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}]. `soll_min`/`soll_max` = erlaubte Spanne gewählter Bausteine (mit kleiner Toleranz). """ if not isinstance(data, dict) or not isinstance(data.get("kapitel"), list): return None idx = _titel_index(entries) chapters: list[dict] = [] seen: set[int] = set() total = unknown = 0 for ch in data["kapitel"]: if not isinstance(ch, dict) or not isinstance(ch.get("bausteine"), list): return None nums = [] for t in ch["bausteine"]: total += 1 num = _titel_aufloesen(idx, t) if isinstance(t, str) else None if num is None: unknown += 1 elif num not in seen: nums.append(num) seen.add(num) if nums: chapters.append({"title": str(ch.get("titel", "")).strip() or "Kapitel", "nums": nums}) if not chapters or total == 0: return None if (total - unknown) / total < 0.85: return None if len(seen) < 0.9 * soll_min or len(seen) > 1.1 * soll_max: return None return chapters def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]: """Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl.""" n = max(1, min(n, len(chapters))) chunks: list[list[dict]] = [] current: list[dict] = [] count = 0 remaining_total = sum(len(c["nums"]) for c in chapters) remaining_chunks = n for ch in chapters: current.append(ch) count += len(ch["nums"]) if remaining_chunks > 1 and count >= remaining_total / remaining_chunks: chunks.append(current) remaining_total -= count remaining_chunks -= 1 current = [] count = 0 if current: chunks.append(current) return chunks def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str: lines = [] for ch in chunk: lines.append(f"KAPITEL: {ch['title']}") lines.extend(f"- {entries[num]}" for num in ch["nums"]) return "\n".join(lines) _FRAGMENT_KAPITEL_RE = re.compile(r"", re.IGNORECASE) _FRAGMENT_SECTION_RE = re.compile(r"", re.IGNORECASE) def _parse_fragment(text: str) -> list[dict]: """Parst eine Writer-Datei → [{"kapitel", "titel", "md"}] in Datei-Reihenfolge.""" sections: list[dict] = [] kapitel = None current = None for line in text.splitlines(): s = line.strip() m = _FRAGMENT_KAPITEL_RE.match(s) if m: kapitel = m.group(1) current = None continue m = _FRAGMENT_SECTION_RE.match(s) if m: current = {"kapitel": kapitel, "titel": m.group(1), "md": []} sections.append(current) continue if current is not None: current["md"].append(line) for sec in sections: sec["md"] = "\n".join(sec["md"]).strip() return sections async def _generate_onepager( guide_id: str, topic: str, instructions: str, provider: str, project: Path | None, content_path: Path, ) -> list[dict] | None: def is_cancelled() -> bool: return guide_id in _cancelled # 3×3-Raster: 7 Karten mit festen Schlüsseln (Reihenfolge = Lesereihenfolge mobil) KARTEN_KEYS = ("info", "eigenschaften", "beispiel", "zusammenhaenge", "voraussetzungen", "modern", "veraltet") def karten_schema(data): """{"karten": {key: {titel, md}}} → Liste · sonst None.""" if not isinstance(data, dict): return None karten = data.get("karten") if not isinstance(karten, dict): return None out = [] for key in KARTEN_KEYS: k = karten.get(key) if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("md"), str): return None titel, md = k["titel"].strip(), k["md"].strip() if not titel or len(md) < 5: # abgebrochene/leere Karten sind ungültig return None out.append({"key": key, "titel": titel, "md": md}) return out d, stem = content_path.parent, content_path.stem recherche_path = d / f"{stem}.recherche.md" recherche_check_path = d / f"{stem}.recherche-check.json" karten_path = d / f"{stem}.karten.json" check_path = d / f"{stem}.onepager-check.json" # Projekte bekommen eigene Recherche-Dimensionen — Produkt-Fragen # (Version, Lizenz, Alternativen) laufen dort ins Leere. if project: source = _prompt("OnePager-Quelle-Projekt", project=project) recherche_template = "OnePager-Recherche-Projekt" recherche_check_template = "OnePager-Recherche-Check-Projekt" else: source = _prompt("OnePager-Quelle-Thema", topic=topic) recherche_template = "OnePager-Recherche" recherche_check_template = "OnePager-Recherche-Check" def recherche_payload(result=None): if not recherche_path.exists(): return None text = recherche_path.read_text(encoding="utf-8").strip() return text or None # Schritt 1: Recherche — vorhandene Datei wird übernommen (Resume) recherche = recherche_payload() if recherche is None: await _set_step(guide_id, 0, "Recherchiere…") slots = [{ "key": f"{guide_id}-recherche", "prompt": _prompt(recherche_template, topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)), "role": "quick", "capabilities": "files" if project else "full", "payload": recherche_payload, }] res = await _race(topic, "OnePager-Recherche", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "OnePager-Recherche fehlgeschlagen") return None recherche = res[0] # Schritt 2: Recherche-Prüfung — notiert Probleme; Anpassung macht ein Recherche-Agent if not recherche_check_path.exists(): await _set_step(guide_id, 1, "Prüfe Recherche…") slots = [{ "key": f"{guide_id}-recherche-check", "prompt": _prompt(recherche_check_template, topic=topic, recherche=recherche, out_path=recherche_check_path), "role": "fast", "capabilities": "files", "payload": (lambda result: _probleme_schema(_json_datei(recherche_check_path))), }] res = await _race(topic, "Recherche-Prüfung", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "Recherche-Prüfung fehlgeschlagen") return None probleme = res[0] if probleme: _log(topic, f"Recherche-Prüfung: {len(probleme)} Problem(e) notiert") await _set_step(guide_id, 1, "Passe Recherche an…") slots = [{ "key": f"{guide_id}-recherche-fix", "prompt": _prompt( "OnePager-Recherche-Fix", topic=topic, source=source, recherche=recherche, probleme="\n".join(f"- {p}" for p in probleme), out_path=recherche_path, extra=_extra(instructions), ), "role": "quick", "capabilities": "files" if project else "full", "payload": recherche_payload, }] res = await _race(topic, "Recherche-Fix", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: _log(topic, "Recherche-Fix ungültig — ursprüngliche Recherche bleibt") else: recherche = res[0] # Schritt 3: Bauen — Karten nur aus der Faktenbasis (Resume: gültige Datei wird übernommen) karten = karten_schema(_json_datei(karten_path)) if karten is None: await _set_step(guide_id, 2, "Baue OnePager…") karten_path.unlink(missing_ok=True) slots = [{ "key": f"{guide_id}-bauen", "prompt": _prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)), "role": "fast", "capabilities": "files", "payload": (lambda result: karten_schema(_json_datei(karten_path))), }] res = await _race(topic, "OnePager-Bauen", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "OnePager-Bau fehlgeschlagen") return None karten = res[0] def karten_block() -> str: return "\n\n".join(f"### {k['titel']} [{k['key']}]\n{k['md']}" for k in karten) # Schritt 4: Prüfung — notiert Probleme; Anpassung macht ein Bauen-Agent if not check_path.exists(): await _set_step(guide_id, 3, "Prüfe OnePager…") slots = [{ "key": f"{guide_id}-verify", "prompt": _prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block(), out_path=check_path), "role": "fast", "capabilities": "files", "payload": (lambda result: _probleme_schema(_json_datei(check_path))), }] res = await _race(topic, "OnePager-Prüfung", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "OnePager-Prüfung fehlgeschlagen") return None probleme = res[0] if probleme: _log(topic, f"OnePager-Prüfung: {len(probleme)} Problem(e) notiert") await _set_step(guide_id, 3, "Passe OnePager an…") slots = [{ "key": f"{guide_id}-karten-fix", "prompt": _prompt( "OnePager-Fix", topic=topic, recherche=recherche, karten=karten_block(), probleme="\n".join(f"- {p}" for p in probleme), out_path=karten_path, extra=_extra(instructions), ), "role": "fast", "capabilities": "files", "payload": (lambda result: karten_schema(_json_datei(karten_path))), }] res = await _race(topic, "OnePager-Fix", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: _log(topic, "OnePager-Fix ungültig — ursprüngliche Karten bleiben") karten_path.write_text( json.dumps({"karten": {k["key"]: {"titel": k["titel"], "md": k["md"]} for k in karten}}, ensure_ascii=False), encoding="utf-8", ) else: karten = res[0] sections = [ {"num": i, "title": k["titel"], "md": k["md"], "key": k["key"]} for i, k in enumerate(karten, 1) ] return [{"title": topic, "sections": sections}] async def _generate_sections( guide_id: str, topic: str, format_name: str, entries: dict[int, str], facts: str, instructions: str, provider: str, content_path: Path, ) -> list[dict] | None: def is_cancelled() -> bool: return guide_id in _cancelled spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8") files = _guide_files(content_path) bausteine_liste = "\n".join(f"- {t}" for t in entries.values()) n = len(entries) anteil_min, anteil_max, minimum, zweck = FORMAT_ANTEIL[format_name] k_min = min(n, max(minimum, math.ceil(anteil_min * n))) k_max = min(n, max(k_min, math.floor(anteil_max * n))) auswahl_auftrag = ( f"Wähle MINDESTENS {k_min} und HÖCHSTENS {k_max} der Bausteine und baue daraus {zweck}. " "Wähle, was diesem Zweck dient — lass weg, was dafür nicht nötig ist." ) # Schritt 1: Auswahl — vorhandene gültige Datei wird übernommen (Resume) auswahl = _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max) if auswahl is None: await _set_step(guide_id, 0, "Wähle Bausteine…") files["auswahl"].unlink(missing_ok=True) slots = [{ "key": f"{guide_id}-auswahl", "prompt": _prompt( "Guide-Auswahl", topic=topic, format_name=format_name, bausteine=bausteine_liste, auswahl_auftrag=auswahl_auftrag, out_path=files["auswahl"], extra=_extra(instructions), ), "role": "guide", "capabilities": "files", "payload": (lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)), }] res = await _race(topic, "Guide-Auswahl", slots, 1, _timeout("guide_auswahl", n), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "Auswahl fehlgeschlagen") return None auswahl = res[0] def auswahl_titel() -> str: return "\n".join(f"- {_titel(entries[num])}" for num in auswahl) def auswahl_json() -> str: return json.dumps({"bausteine": [_titel(entries[num]) for num in auswahl]}, ensure_ascii=False) # Schritt 2: Auswahl-Prüfung — notiert Probleme; Anpassung macht ein Auswahl-Agent if not files["auswahl_check"].exists(): await _set_step(guide_id, 1, "Prüfe Auswahl…") slots = [{ "key": f"{guide_id}-auswahl-check", "prompt": _prompt( "Guide-Auswahl-Check", topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag, bausteine=bausteine_liste, auswahl=auswahl_titel(), out_path=files["auswahl_check"], extra=_extra(instructions), ), "role": "fast", "capabilities": "files", "payload": (lambda result: _probleme_schema(_json_datei(files["auswahl_check"]))), }] res = await _race(topic, "Auswahl-Prüfung", slots, 1, _timeout("guide_check", len(auswahl)), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "Auswahl-Prüfung fehlgeschlagen") return None probleme = res[0] if probleme: _log(topic, f"Auswahl-Prüfung: {len(probleme)} Problem(e) notiert") await _set_step(guide_id, 1, "Passe Auswahl an…") slots = [{ "key": f"{guide_id}-auswahl-fix", "prompt": _prompt( "Guide-Auswahl-Fix", topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag, bausteine=bausteine_liste, auswahl=auswahl_titel(), probleme="\n".join(f"- {p}" for p in probleme), out_path=files["auswahl"], extra=_extra(instructions), ), "role": "guide", "capabilities": "files", "payload": (lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)), }] res = await _race(topic, "Auswahl-Fix", slots, 1, _timeout("guide_auswahl", n), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: _log(topic, "Auswahl-Fix ungültig — ursprüngliche Auswahl bleibt") files["auswahl"].write_text(auswahl_json(), encoding="utf-8") else: auswahl = res[0] sel_entries = {num: entries[num] for num in auswahl} soll = len(sel_entries) sel_liste = "\n".join(f"- {t}" for t in sel_entries.values()) # Schritt 3: Gliederung der festen Auswahl plan = _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll) if plan is None: await _set_step(guide_id, 2, "Plane Gliederung…") files["gliederung"].unlink(missing_ok=True) slots = [{ "key": f"{guide_id}-gliederung", "prompt": _prompt( "Guide-Gliederung", topic=topic, format_name=format_name, bausteine=sel_liste, out_path=files["gliederung"], extra=_extra(instructions), ), "role": "guide", "capabilities": "files", "payload": (lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)), }] res = await _race(topic, "Gliederung", slots, 1, _timeout("plan", soll), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "Gliederung fehlgeschlagen") return None plan = res[0] def gliederung_text() -> str: return "\n".join(_zuteilung_text([ch], {num: _titel(entries[num]) for num in ch["nums"]}) for ch in plan) def gliederung_json() -> str: return json.dumps( {"kapitel": [{"titel": ch["title"], "bausteine": [_titel(entries[num]) for num in ch["nums"]]} for ch in plan]}, ensure_ascii=False, ) # Schritt 4: Gliederungs-Prüfung if not files["gliederung_check"].exists(): await _set_step(guide_id, 3, "Prüfe Gliederung…") slots = [{ "key": f"{guide_id}-gliederung-check", "prompt": _prompt( "Guide-Gliederung-Check", topic=topic, format_name=format_name, zweck=zweck, auswahl=auswahl_titel(), gliederung=gliederung_text(), out_path=files["gliederung_check"], extra=_extra(instructions), ), "role": "fast", "capabilities": "files", "payload": (lambda result: _probleme_schema(_json_datei(files["gliederung_check"]))), }] res = await _race(topic, "Gliederungs-Prüfung", slots, 1, _timeout("guide_check", soll), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "Gliederungs-Prüfung fehlgeschlagen") return None probleme = res[0] if probleme: _log(topic, f"Gliederungs-Prüfung: {len(probleme)} Problem(e) notiert") await _set_step(guide_id, 3, "Passe Gliederung an…") slots = [{ "key": f"{guide_id}-gliederung-fix", "prompt": _prompt( "Guide-Gliederung-Fix", topic=topic, format_name=format_name, auswahl=auswahl_titel(), gliederung=gliederung_text(), probleme="\n".join(f"- {p}" for p in probleme), out_path=files["gliederung"], extra=_extra(instructions), ), "role": "guide", "capabilities": "files", "payload": (lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)), }] res = await _race(topic, "Gliederungs-Fix", slots, 1, _timeout("plan", soll), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: _log(topic, "Gliederungs-Fix ungültig — ursprüngliche Gliederung bleibt") files["gliederung"].write_text(gliederung_json(), encoding="utf-8") else: plan = res[0] # Schritt 5: Schreiben — vorhandene Chunk-Dateien werden übernommen (Resume) total_sections = sum(len(c["nums"]) for c in plan) chunks = _split_chunks(plan, min(WRITER_MAX, max(1, math.ceil(total_sections / WRITER_SECTIONS)))) zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks] chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks] writer_count = len(zuteilungen) paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)] offen = [i for i, p in enumerate(paths) if not p.exists()] if offen: await _set_step(guide_id, 4, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…") results = await asyncio.gather(*[ run_agent( f"{guide_id}-w{i + 1}", _prompt( "Guide-Writer", topic=topic, format_name=format_name, zuteilung=zuteilungen[i], facts=facts, spec=spec, out_path=paths[i], extra=_extra(instructions), ), _timeout("writer", chunk_sizes[i]), provider=provider, role="guide", capabilities="full", ) for i in offen ], return_exceptions=True) if is_cancelled(): return None for i, r in zip(offen, results): if isinstance(r, BaseException): _log(topic, f"Writer {i + 1}: {type(r).__name__}: {r}") elif r[0] != 0: _log(topic, f"Writer {i + 1}: {_claude_error('Fehler', *r)}") elif not paths[i].exists(): _log(topic, f"Writer {i + 1}: keine Ausgabedatei erstellt") if not any(p.exists() for p in paths): await _fail(guide_id, _gather_error("Writer-Fehler", list(results))) return None idx = _titel_index(entries) by_num: dict[int, dict] = {} for p in paths: if not p.exists(): continue for sec in _parse_fragment(p.read_text(encoding="utf-8")): num = _titel_aufloesen(idx, sec["titel"]) if num is None: _log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)") elif num not in by_num: by_num[num] = sec if not by_num: await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden") return None # Schritt 6: Lese-Prüfung pro Writer-Paket — Fix beauftragt Writer nur mit beanstandeten Sections chunk_nums = [[num for ch in chunk for num in ch["nums"] if num in by_num] for chunk in chunks] check_paths = [content_path.parent / f"{content_path.stem}.lese-check-{i}.json" for i in range(1, writer_count + 1)] offen_checks = [i for i, p in enumerate(check_paths) if _lese_probleme_schema(_json_datei(p)) is None and chunk_nums[i]] if offen_checks: await _set_step(guide_id, 5, f"Prüfe Lesbarkeit ({len(offen_checks)} Prüfer)…" if len(offen_checks) > 1 else "Prüfe Lesbarkeit…") def sections_text(nums: list[int]) -> str: return "\n\n".join(f"SECTION: {_titel(entries[num])}\n{by_num[num]['md']}" for num in nums) slots = [{ "key": f"{guide_id}-lese-check-{i + 1}", "prompt": _prompt( "Guide-Lese-Check", topic=topic, format_name=format_name, spec=spec, sections=sections_text(chunk_nums[i]), out_path=check_paths[i], extra=_extra(instructions), ), "role": "fast", "capabilities": "files", "payload": (lambda result, p=check_paths[i]: _lese_probleme_schema(_json_datei(p))), } for i in offen_checks] res = await _race(topic, "Lese-Prüfung", slots, len(slots), _timeout("lese_check", max(chunk_sizes)), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "Lese-Prüfung fehlgeschlagen") return None probleme_by_num: dict[int, str] = {} for p in check_paths: for item in (_lese_probleme_schema(_json_datei(p)) or []): num = _titel_aufloesen(idx, item["section"]) if num in by_num and num not in probleme_by_num: probleme_by_num[num] = item["problem"] if probleme_by_num: _log(topic, f"Lese-Prüfung: {len(probleme_by_num)} Section(s) beanstandet") await _set_step(guide_id, 5, f"Überarbeite {len(probleme_by_num)} Section(s)…") fix_chunks = [[num for num in nums if num in probleme_by_num] for nums in chunk_nums] fix_offen = [i for i, nums in enumerate(fix_chunks) if nums] fix_paths = [content_path.parent / f"{content_path.stem}.fix-{i + 1}.md" for i in range(writer_count)] def auftraege_text(nums: list[int]) -> str: return "\n\n".join( f"SECTION: {_titel(entries[num])}\nPROBLEM: {probleme_by_num[num]}\nAKTUELLER INHALT:\n{by_num[num]['md']}" for num in nums ) results = await asyncio.gather(*[ run_agent( f"{guide_id}-fix-w{i + 1}", _prompt( "Guide-Sections-Fix", topic=topic, format_name=format_name, facts=facts, spec=spec, auftraege=auftraege_text(fix_chunks[i]), out_path=fix_paths[i], extra=_extra(instructions), ), _timeout("writer", len(fix_chunks[i])), provider=provider, role="guide", capabilities="full", ) for i in fix_offen ], return_exceptions=True) if is_cancelled(): return None for i, r in zip(fix_offen, results): if isinstance(r, BaseException) or (not isinstance(r, BaseException) and r[0] != 0): _log(topic, f"Sections-Fix {i + 1} fehlgeschlagen — Original bleibt") ersetzt = 0 for i in fix_offen: if not fix_paths[i].exists(): continue for sec in _parse_fragment(fix_paths[i].read_text(encoding="utf-8")): num = _titel_aufloesen(idx, sec["titel"]) if num in probleme_by_num and sec["md"].strip(): by_num[num] = sec ersetzt += 1 _log(topic, f"Lese-Prüfung: {ersetzt} Section(s) überarbeitet") await _set_progress(guide_id, "Setze zusammen…") chapters: list[dict] = [] for ch in plan: sections = [ {"num": num, "title": _titel(entries[num]), "md": by_num[num]["md"]} for num in ch["nums"] if num in by_num ] if sections: chapters.append({"title": ch["title"], "sections": sections}) geplant = {num for ch in plan for num in ch["nums"]} missing = sorted(geplant - set(by_num)) if missing: _log(topic, f"Sections fehlen in der Writer-Ausgabe: {[_titel(entries[n]) for n in missing]}") if not chapters: await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden") return None return chapters async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: async with _semaphore: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now) content_path = guide_content_path(topic, format_name) content_path.parent.mkdir(parents=True, exist_ok=True) project = project_dir(topic) if project_dir(topic).is_dir() else None try: if guide_id in _cancelled: return if project: await asyncio.to_thread(_pdfs_konvertieren, project) # „Neu erstellen": fertiger Guide → kompletter Frischstart. # Sonst sind Schritt-Dateien Reste eines Abbruchs/Fehlers → Resume. if content_path.exists(): for p_alt in guide_slot_dateien(content_path): p_alt.unlink(missing_ok=True) if format_name == "OnePager": chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path) else: alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8")) if not alle: await _fail(guide_id, "Keine Bausteine gefunden") return entries = _eindeutige_titel(alle) facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema") chapters = await _generate_sections( guide_id, topic, format_name, entries, facts, instructions, provider, content_path, ) if chapters is None or guide_id in _cancelled: return content_path.write_text( json.dumps({"topic": topic, "format": format_name, "chapters": chapters}, ensure_ascii=False, indent=1), encoding="utf-8", ) now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="done", progress=None, step=None, updated_at=now) except asyncio.TimeoutError: await _fail(guide_id, "Timeout bei der Generierung") except FileNotFoundError: await _fail(guide_id, "Bausteine fehlen") except Exception as e: await _fail(guide_id, str(e)[:2000]) finally: _cancelled.discard(guide_id) # --- Tutor-Chat --- def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str: transcript = "\n".join( f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}" for m in messages ) return _prompt( "Chat", topic=topic, format_name=format_name, outline_block=outline.strip() or "(keine)", section_block=section.strip() or "(kein Abschnitt erkannt)", transcript=transcript, ) async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str: try: prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages) returncode, stdout, stderr = await run_agent( "chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none" ) if returncode != 0: return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." reply = stdout.strip() return reply or "Entschuldigung, ich habe keine Antwort erhalten." except Exception: return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."