import asyncio import json import re import uuid from collections import Counter from datetime import datetime, timezone from pathlib import Path from agents import run_agent, kill_process from config import ( DEFAULT_PROVIDER, TEMPLATES_DIR, TIMEOUTS, MAX_CONCURRENT_GENERATIONS, ) from database import update_guide from paths import bausteine_path, guide_content_path, project_dir _semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS) _cancelled: set[str] = set() async def cancel_guide(guide_id: str) -> bool: _cancelled.add(guide_id) kill_process(guide_id) now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen", updated_at=now) return True async def _set_progress(guide_id: str, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, progress=progress, updated_at=now) def _prompt(name: str, **kwargs) -> str: template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8") return template.format(**kwargs) def _extra(instructions: str) -> str: return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else "" def _log(topic: str, msg: str) -> None: print(f"[generator] {topic}: {msg}", flush=True) def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str: stderr = (stderr or "").strip() if stderr: return f"{label}: {stderr[:1000]}" tail = (stdout or "").strip()[-500:] if tail: return f"{label} (exit {returncode}, stderr leer): …{tail}" return f"{label} (exit {returncode}, ohne Ausgabe)" def _gather_error(label: str, results: list) -> str: for r in results: if isinstance(r, BaseException): return f"{label}: {type(r).__name__}: {r}" returncode, stdout, stderr = r if returncode != 0: return _claude_error(label, returncode, stdout, stderr) return f"{label}: kein verwertbares Ergebnis" async def _fail(guide_id: str, msg: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now) def _timeout(step: str, n: int = 0) -> int: base, per = TIMEOUTS[step] return base + per * n _MAX_RESTARTS = 2 async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None: """Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse. Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)` prüft die Gültigkeit und liefert das Slot-Ergebnis oder None. Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt. `cancelled()` → True bricht ab (keine Restarts, Rückgabe None). """ attempts = {i: 0 for i in range(len(slots))} tasks: dict[asyncio.Task, int] = {} def spawn(i: int) -> None: slot = slots[i] task = asyncio.create_task(run_agent( slot["key"], slot["prompt"], timeout, provider=provider, role=slot["role"], capabilities=slot["capabilities"], )) tasks[task] = i for i in range(len(slots)): spawn(i) results: list = [] try: while tasks: if cancelled and cancelled(): return None done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) for task in done: i = tasks.pop(task) payload, err = None, None try: result = task.result() if result[0] != 0: err = _claude_error("Fehler", *result) else: payload = slots[i]["payload"](result) if payload is None: err = "Ergebnis ungültig/nicht parsebar" except asyncio.TimeoutError: err = f"Timeout nach {timeout}s" except Exception as e: err = f"{type(e).__name__}: {e}" if payload is not None: results.append(payload) if on_update: on_update(len(results)) if len(results) >= quorum: return results continue _log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}") attempts[i] += 1 if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()): spawn(i) _log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)") return None finally: for task, i in tasks.items(): kill_process(slots[i]["key"]) task.cancel() if tasks: await asyncio.gather(*tasks.keys(), return_exceptions=True) # --- Bausteine-Pipeline: 4x Recherche (3 nötig) → 2x Auswahl (1) → 4x Einordnung (3) → 2x Final (1) --- _bausteine_progress: dict[str, str] = {} _bausteine_errors: dict[str, str] = {} _bausteine_cancelled: set[str] = set() _bausteine_step: dict[str, int] = {} BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung", "Einordnung", "Verifikation", "Sortierung") def cancel_bausteine(topic: str) -> bool: if topic not in _bausteine_progress: return False _bausteine_cancelled.add(topic) kill_process(f"bausteine-{topic}-") return True _CATEGORIES = ("KERN", "WICHTIG", "REST") def _resume_step(topic: str) -> int: """Erster noch offener Schritt anhand der persistierten Zwischendateien.""" final_path = bausteine_path(topic) stem, parent = final_path.stem, final_path.parent if sum((parent / f"{stem}.recherche-{i}.md").exists() for i in (1, 2, 3, 4)) < 3: return 0 if not any((parent / f"{stem}.auswahl-{i}.md").exists() for i in (1, 2)): return 1 if not (parent / f"{stem}.auswahl-check.md").exists(): return 2 if sum((parent / f"{stem}.einordnung-{i}.md").exists() for i in (1, 2, 3)) < 3: return 3 if not (parent / f"{stem}.final-check.md").exists(): return 4 return 5 def _sortierung_path(topic: str): final_path = bausteine_path(topic) return final_path.parent / f"{final_path.stem}.sortierung.md" def bausteine_status(topic: str) -> dict: ready = bausteine_path(topic).exists() generating = topic in _bausteine_progress partial = False if generating: current = _bausteine_step.get(topic) states = [ "pending" if current is None else "done" if i < current else "active" if i == current else "pending" for i in range(len(BAUSTEINE_STEPS)) ] elif ready: states = ["done"] * len(BAUSTEINE_STEPS) if not _sortierung_path(topic).exists(): states[-1] = "pending" else: nxt = _resume_step(topic) partial = nxt > 0 states = ["done" if i < nxt else "pending" for i in range(len(BAUSTEINE_STEPS))] return { "ready": ready, "generating": generating, "progress": _bausteine_progress.get(topic), "error": _bausteine_errors.get(topic), "partial": partial, "steps": [{"label": label, "state": s} for label, s in zip(BAUSTEINE_STEPS, states)], } def active_bausteine() -> list[dict]: return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()] def reset_bausteine(topic: str) -> None: final_path = bausteine_path(topic) final_path.unlink(missing_ok=True) for i in (1, 2, 3, 4): (final_path.parent / f"{final_path.stem}.recherche-{i}.md").unlink(missing_ok=True) (final_path.parent / f"{final_path.stem}.einordnung-{i}.md").unlink(missing_ok=True) for i in (1, 2): (final_path.parent / f"{final_path.stem}.auswahl-{i}.md").unlink(missing_ok=True) (final_path.parent / f"{final_path.stem}.auswahl-check.md").unlink(missing_ok=True) (final_path.parent / f"{final_path.stem}.final-check.md").unlink(missing_ok=True) (final_path.parent / f"{final_path.stem}.sortierung.md").unlink(missing_ok=True) _bausteine_errors.pop(topic, None) def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str: if project: source = _prompt("Bausteine-Quelle-Projekt", project=project) else: source = _prompt("Bausteine-Quelle-Thema", topic=topic) return _prompt( "Bausteine-Recherche", topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions), ) def _parse_auswahl(text: str) -> dict[int, str]: """Parst die konsolidierte Liste: `N. Titel — Kurzbeschreibung` pro Zeile.""" entries: dict[int, str] = {} last = None for line in text.splitlines(): m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line) if m: last = int(m.group(1)) entries[last] = m.group(2) elif last is not None and line.strip(): entries[last] += " " + line.strip() return entries def _parse_einordnung(text: str) -> dict[int, str]: """Parst eine Einordnung (`KERN:` gefolgt von `N Titel`-Zeilen) zu Nummer→Kategorie.""" mapping: dict[int, str] = {} current = None for line in text.splitlines(): s = line.strip().lstrip("-*# ").strip() if not s: continue m = re.match(r"(KERN|WICHTIG|REST)\b[:\s]*(.*)$", s, re.IGNORECASE) if m: current = m.group(1).upper() for num in re.findall(r"\b\d+\b", m.group(2)): mapping.setdefault(int(num), current) continue if current: m = re.match(r"(\d+)\b", s) if m: mapping.setdefault(int(m.group(1)), current) return mapping def _build_final_bausteine(topic: str, entries: dict[int, str], mapping: dict[int, str], order: dict[str, list[int]] | None = None) -> str: """Baut die finale Baustein-Datei aus konsolidierter Liste + finaler Zuordnung. `order` (Kategorie → Nummern in Lernreihenfolge) sortiert innerhalb der Kategorien; nicht gelistete Nummern hängen in Originalreihenfolge hinten an. """ grouped: dict[str, list[int]] = {c: [] for c in _CATEGORIES} for num in sorted(entries): cat = mapping.get(num) if cat is None: _log(topic, f"Baustein {num} fehlt in finaler Einordnung → REST") cat = "REST" grouped[cat].append(num) unknown = sorted(set(mapping) - set(entries)) if unknown: _log(topic, f"finale Einordnung enthält unbekannte Nummern (ignoriert): {unknown}") if order: for cat in _CATEGORIES: wanted = set(grouped[cat]) seq = [n for n in order.get(cat, []) if n in wanted] grouped[cat] = seq + [n for n in grouped[cat] if n not in seq] parts = [] for cat in _CATEGORIES: lines = "\n".join(f"{i}. {entries[num]}" for i, num in enumerate(grouped[cat], 1)) parts.append(f"## {cat}\n{lines}") return "\n\n".join(parts) + "\n" def _file_payload(path: Path): """Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält.""" if not path.exists(): return None text = path.read_text(encoding="utf-8") return text if _parse_auswahl(text) else None def _auswahl_payload(path: Path): if not path.exists(): return None text = path.read_text(encoding="utf-8") entries = _parse_auswahl(text) return (text, entries) if entries else None def _parse_auswahl_check(text: str): """Parst die Auswahl-Prüfung: NACHTRÄGE (neue Einträge) + STREICHEN (Nummern).""" additions: list[str] = [] removals: set[int] = set() mode = None seen_marker = False for line in text.splitlines(): s = line.strip().lstrip("-*# ").strip() if not s: continue u = s.upper().rstrip(":") if u.startswith("NACHTR"): mode = "add" seen_marker = True continue if u.startswith("STREICH"): mode = "del" seen_marker = True continue if u == "OK": seen_marker = True continue if mode == "add": additions.append(s) elif mode == "del": m = re.match(r"(\d+)\b", s) if m: removals.add(int(m.group(1))) if not seen_marker: return None # Antwort hat das Format nicht getroffen return {"add": additions, "remove": removals} def _majority(mappings: list[dict[int, str]], entries: dict[int, str]) -> tuple[dict[int, str], list[int]]: """Mehrheitsentscheid über die Einordnungen; ohne Mehrheit → Streitfall.""" mapping: dict[int, str] = {} disputes: list[int] = [] for num in entries: votes = [m[num] for m in mappings if num in m] if not votes: disputes.append(num) continue cat, count = Counter(votes).most_common(1)[0] if count >= 2: mapping[num] = cat else: disputes.append(num) return mapping, disputes def _einordnung_block(mapping: dict[int, str], entries: dict[int, str]) -> str: parts = [] for cat in _CATEGORIES: nums = [n for n in sorted(entries) if mapping.get(n) == cat] lines = "\n".join(f"{n} {_titel(entries[n])}" for n in nums) parts.append(f"{cat}:\n{lines}" if lines else f"{cat}:") return "\n".join(parts) async def _run_sortierung(topic: str, entries: dict[int, str], mapping: dict[int, str], provider: str, cancelled) -> dict[str, list[int]] | None: """Sortiert innerhalb der Kategorien; schreibt bei Erfolg den Marker und liefert die Reihenfolge.""" slots = [{ "key": f"bausteine-{topic}-sortierung-1", "prompt": _prompt("Bausteine-Sortierung", topic=topic, einordnung=_einordnung_block(mapping, entries)), "role": "quick", "capabilities": "none", "payload": (lambda result: (result[1].strip(), _parse_einordnung(result[1])) if _parse_einordnung(result[1]) else None), }] res = await _race(topic, "Sortierung", slots, 1, _timeout("sortierung", len(entries)), provider, cancelled=cancelled) if res is None: return None raw, sort_mapping = res[0] _sortierung_path(topic).write_text(raw, encoding="utf-8") return {cat: [num for num, c in sort_mapping.items() if c == cat] for cat in _CATEGORIES} async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: if topic in _bausteine_progress: return _bausteine_progress[topic] = "Wartend…" _bausteine_errors.pop(topic, None) final_path = bausteine_path(topic) project = project_dir(topic) if project_dir(topic).is_dir() else None stem = final_path.stem recherche_paths = [final_path.parent / f"{stem}.recherche-{i}.md" for i in (1, 2, 3, 4)] auswahl_paths = [final_path.parent / f"{stem}.auswahl-{i}.md" for i in (1, 2)] einordnung_paths = [final_path.parent / f"{stem}.einordnung-{i}.md" for i in (1, 2, 3)] auswahl_check_path = final_path.parent / f"{stem}.auswahl-check.md" final_check_path = final_path.parent / f"{stem}.final-check.md" sortierung_path = _sortierung_path(topic) slot_files = [*recherche_paths, *auswahl_paths, *einordnung_paths, auswahl_check_path, final_check_path, sortierung_path] def set_p(msg: str, step: int | None = None) -> None: _bausteine_progress[topic] = msg if step is not None: _bausteine_step[topic] = step def is_cancelled() -> bool: return topic in _bausteine_cancelled def abgebrochen() -> None: _bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten" try: async with _semaphore: # Fertig, aber ohne Sortier-Marker (ältere Pipeline-Version): nur die Sortierung nachholen. if final_path.exists() and not sortierung_path.exists(): cats = _parse_kategorien(final_path.read_text(encoding="utf-8")) entries, mapping = {}, {} i = 0 for cat in _CATEGORIES: for text in cats.get(cat, []): i += 1 entries[i] = text mapping[i] = cat if entries: set_p("Sortiere Bausteine…", step=5) order = await _run_sortierung(topic, entries, mapping, provider, is_cancelled) if is_cancelled(): abgebrochen() return if order is None: _bausteine_errors[topic] = "Sortierung fehlgeschlagen" return final_path.write_text(_build_final_bausteine(topic, entries, mapping, order), encoding="utf-8") return # „Neu erstellen": fertige (sortierte) Bausteine → kompletter Frischstart. # Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume, fertige Schritte überspringen. if final_path.exists(): for p_alt in slot_files: p_alt.unlink(missing_ok=True) # Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen recherchen = [] offen = [] for i, path in enumerate(recherche_paths, 1): text = _file_payload(path) if text is not None and len(recherchen) < 3: recherchen.append(text) else: offen.append((i, path)) vorhanden = len(recherchen) set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0) if vorhanden < 3: caps = "files" if project else "full" slots = [ { "key": f"bausteine-{topic}-recherche-{i}", "prompt": _build_recherche_prompt(topic, path, instructions, project), "role": "quick", "capabilities": caps, "payload": (lambda result, p=path: _file_payload(p)), } for i, path in offen ] neue = await _race( topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider, on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"), cancelled=is_cancelled, ) if is_cancelled(): abgebrochen() return if neue is None: _bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)" return recherchen += neue # Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen n_est = max(len(_parse_auswahl(t)) for t in recherchen) results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1)) bestehende = next((res for p in auswahl_paths if (res := _auswahl_payload(p)) is not None), None) if bestehende is not None: flat, entries = bestehende else: set_p("Konsolidiere Recherche…", step=1) slots = [ { "key": f"bausteine-{topic}-auswahl-{i}", "prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path), "role": "fast", "capabilities": "files", "payload": (lambda result, p=path: _auswahl_payload(p)), } for i, path in enumerate(auswahl_paths, 1) ] auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled) if is_cancelled(): abgebrochen() return if auswahl is None: _bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)" return flat, entries = auswahl[0] # Schritt 2b: Auswahl-Prüfung (nicht fatal) — gespeicherte Antwort wird erneut angewendet set_p("Prüfe Auswahl…", step=2) raw_check = auswahl_check_path.read_text(encoding="utf-8") if auswahl_check_path.exists() else None patch = _parse_auswahl_check(raw_check) if raw_check is not None else None if patch is None: slots = [{ "key": f"bausteine-{topic}-auswahlcheck-1", "prompt": _prompt("Bausteine-Auswahl-Check", topic=topic, results=results_block, auswahl=flat), "role": "fast", "capabilities": "none", "payload": (lambda result: (result[1].strip(), _parse_auswahl_check(result[1])) if _parse_auswahl_check(result[1]) is not None else None), }] checks = await _race(topic, "Auswahl-Check", slots, 1, _timeout("auswahl_check", len(entries)), provider, cancelled=is_cancelled) if is_cancelled(): abgebrochen() return if checks is None: _log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort") else: raw_check, patch = checks[0] auswahl_check_path.write_text(raw_check, encoding="utf-8") if patch is not None: if patch["remove"]: _log(topic, f"Auswahl-Check streicht Duplikate: {sorted(patch['remove'])}") entries = {n: t for n, t in entries.items() if n not in patch["remove"]} if patch["add"]: _log(topic, f"Auswahl-Check ergänzt {len(patch['add'])} Bausteine") if patch["remove"] or patch["add"]: texts = [t for _, t in sorted(entries.items())] + patch["add"] entries = {i: t for i, t in enumerate(texts, 1)} flat = "\n".join(f"{i}. {t}" for i, t in entries.items()) # Schritt 3: 4 Einordnungs-Agenten, 3 gültige nötig — gespeicherte Stimmen einlesen n = len(entries) einordnungen = [] for path in einordnung_paths: if path.exists(): text = path.read_text(encoding="utf-8") parsed = _parse_einordnung(text) if parsed: einordnungen.append((text, parsed)) einordnungen = einordnungen[:3] vorhanden = len(einordnungen) set_p(f"Einordnung läuft ({vorhanden}/3 gültig)…", step=3) if vorhanden < 3: slots = [ { "key": f"bausteine-{topic}-einordnung-{i}", "prompt": _prompt("Bausteine-Einordnung", topic=topic, bausteine=flat), "role": "quick", "capabilities": "none", "payload": (lambda result: (result[1].strip(), _parse_einordnung(result[1])) if _parse_einordnung(result[1]) else None), } for i in range(vorhanden + 1, 5) ] neue = await _race( topic, "Einordnung", slots, 3 - vorhanden, _timeout("einordnung", n), provider, on_update=lambda c: set_p(f"Einordnung läuft ({vorhanden + c}/3 gültig)…"), cancelled=is_cancelled, ) if is_cancelled(): abgebrochen() return if neue is None: _bausteine_errors[topic] = "Einordnung fehlgeschlagen (Quorum nicht erreicht)" return for path, (text, _) in zip(einordnung_paths[vorhanden:], neue): path.write_text(text, encoding="utf-8") einordnungen += neue # Schritt 4: Python-Mehrheitsentscheid + Verifikations-Agent — gespeicherte Antwort wird erneut angewendet set_p("Verifiziere Einordnung…", step=4) mapping, disputes = _majority([m for _, m in einordnungen], entries) if disputes: _log(topic, f"Keine Mehrheit bei: {disputes}") raw_final = final_check_path.read_text(encoding="utf-8") if final_check_path.exists() else None if raw_final is not None and not (_parse_einordnung(raw_final) or "OK" in raw_final.upper()): raw_final = None if raw_final is None: streit_block = "\n".join(f"{num} {entries[num]}" for num in disputes) or "(keine)" final_prompt = _prompt( "Bausteine-Einordnung-Final", topic=topic, einordnung=_einordnung_block(mapping, entries), streitfaelle=streit_block, ) slots = [ { "key": f"bausteine-{topic}-final-{i}", "prompt": final_prompt, "role": "fast", "capabilities": "none", "payload": (lambda result: result[1].strip() if (_parse_einordnung(result[1]) or "OK" in result[1].upper()) else None), } for i in (1, 2) ] finals = await _race(topic, "Final", slots, 1, _timeout("final", n), provider, cancelled=is_cancelled) if is_cancelled(): abgebrochen() return if finals is None: _log(topic, "Final-Verifikation fehlgeschlagen — Mehrheitsentscheid bleibt unverändert") else: raw_final = finals[0] final_check_path.write_text(raw_final, encoding="utf-8") if raw_final is not None: overrides = {num: cat for num, cat in _parse_einordnung(raw_final).items() if num in entries} korrekturen = {num: cat for num, cat in overrides.items() if mapping.get(num) != cat and num not in disputes} if korrekturen: _log(topic, f"Final-Verifikation korrigiert: {korrekturen}") mapping.update(overrides) for num in disputes: if num not in mapping: _log(topic, f"Streitfall {num} unentschieden → WICHTIG") mapping[num] = "WICHTIG" # Schritt 5: Sortierung innerhalb der Kategorien (einfach → komplex, nicht fatal) set_p("Sortiere Bausteine…", step=5) order = await _run_sortierung(topic, entries, mapping, provider, is_cancelled) if is_cancelled(): abgebrochen() return if order is None: _log(topic, "Sortierung fehlgeschlagen — Originalreihenfolge bleibt (Nachholen über ▶)") final_path.write_text(_build_final_bausteine(topic, entries, mapping, order), encoding="utf-8") except Exception as e: _bausteine_errors[topic] = str(e)[:2000] finally: # Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit. # Aufräumen passiert nur explizit über reset_bausteine(). _bausteine_progress.pop(topic, None) _bausteine_step.pop(topic, None) _bausteine_cancelled.discard(topic) # --- Guide-Generierung: Bausteine → (Plan) → Writer → JSON --- # Welche Baustein-Kategorien jedes Format abdeckt. FORMAT_COVERAGE = { "MiniGuide": ("KERN",), "Guide": ("KERN", "WICHTIG"), "FullGuide": ("KERN", "WICHTIG", "REST"), } # Parallele Writer pro Format (OnePager hat einen eigenen Weg). WRITER_COUNT = {"MiniGuide": 1, "Guide": 2, "FullGuide": 4} def _parse_kategorien(text: str) -> dict[str, list[str]]: """Parst die finale Baustein-Datei (## KERN/WICHTIG/REST mit nummerierten Einträgen).""" cats: dict[str, list[str]] = {} current = None for line in text.splitlines(): s = line.strip() m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE) if m: current = m.group(1).upper() cats.setdefault(current, []) continue m = re.match(r"(\d+)[.)]\s+(.*\S)", s) if m and current: cats[current].append(m.group(2)) return cats def _titel(entry: str) -> str: return entry.split(" — ")[0].strip() or entry def _parse_gliederung(text: str, valid: set[int], topic: str) -> list[dict]: """Parst die Gliederung (`KAPITEL: Titel` + `N Titel`-Zeilen) → [{"title", "nums"}].""" chapters: list[dict] = [] seen: set[int] = set() for line in text.splitlines(): s = line.strip().lstrip("-*# ").strip() if not s: continue m = re.match(r"KAPITEL\s*:\s*(.+)", s, re.IGNORECASE) if m: chapters.append({"title": m.group(1).strip(), "nums": []}) continue m = re.match(r"(\d+)\b", s) if m and chapters: num = int(m.group(1)) if num in valid and num not in seen: chapters[-1]["nums"].append(num) seen.add(num) missing = sorted(valid - seen) if missing: _log(topic, f"Gliederung: Bausteine {missing} fehlen → Kapitel 'Weitere Themen'") chapters.append({"title": "Weitere Themen", "nums": missing}) return [c for c in chapters if c["nums"]] def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]: """Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl.""" n = max(1, min(n, len(chapters))) chunks: list[list[dict]] = [] current: list[dict] = [] count = 0 remaining_total = sum(len(c["nums"]) for c in chapters) remaining_chunks = n for ch in chapters: current.append(ch) count += len(ch["nums"]) if remaining_chunks > 1 and count >= remaining_total / remaining_chunks: chunks.append(current) remaining_total -= count remaining_chunks -= 1 current = [] count = 0 if current: chunks.append(current) return chunks def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str: lines = [] for ch in chunk: lines.append(f"KAPITEL: {ch['title']}") lines.extend(f"{num} {entries[num]}" for num in ch["nums"]) return "\n".join(lines) _FRAGMENT_KAPITEL_RE = re.compile(r"", re.IGNORECASE) _FRAGMENT_SECTION_RE = re.compile(r"", re.IGNORECASE) def _parse_fragment(text: str) -> list[dict]: """Parst eine Writer-Datei → [{"kapitel", "num", "title", "md"}] in Datei-Reihenfolge.""" sections: list[dict] = [] kapitel = None current = None for line in text.splitlines(): s = line.strip() m = _FRAGMENT_KAPITEL_RE.match(s) if m: kapitel = m.group(1) current = None continue m = _FRAGMENT_SECTION_RE.match(s) if m: current = {"kapitel": kapitel, "num": int(m.group(1)), "title": (m.group(2) or "").strip(), "md": []} sections.append(current) continue if current is not None: current["md"].append(line) for sec in sections: sec["md"] = "\n".join(sec["md"]).strip() return sections def _section_json(sec: dict, entries: dict[int, str]) -> dict: return {"num": sec["num"], "title": sec["title"] or _titel(entries[sec["num"]]), "md": sec["md"]} async def _generate_onepager( guide_id: str, topic: str, instructions: str, provider: str, project: Path | None, content_path: Path, fragment_paths: list[Path], ) -> list[dict] | None: def is_cancelled() -> bool: return guide_id in _cancelled # Schritt 1: Recherche — eigene Faktenbasis, unabhängig von den Bausteinen await _set_progress(guide_id, "Recherchiere…") recherche_path = content_path.parent / f"{content_path.stem}.recherche.md" fragment_paths.append(recherche_path) recherche_path.unlink(missing_ok=True) if project: source = _prompt("OnePager-Quelle-Projekt", project=project) else: source = _prompt("OnePager-Quelle-Thema", topic=topic) slots = [{ "key": f"{guide_id}-recherche", "prompt": _prompt("OnePager-Recherche", topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)), "role": "quick", "capabilities": "files" if project else "full", "payload": (lambda result: recherche_path.read_text(encoding="utf-8") if recherche_path.exists() else None), }] res = await _race(topic, "OnePager-Recherche", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "OnePager-Recherche fehlgeschlagen") return None recherche = res[0] # Schritt 2: Bauen — Karten nur aus der Faktenbasis await _set_progress(guide_id, "Baue OnePager…") slots = [{ "key": f"{guide_id}-bauen", "prompt": _prompt("OnePager-Bauen", topic=topic, recherche=recherche, extra=_extra(instructions)), "role": "fast", "capabilities": "none", "payload": (lambda result: _parse_auswahl(result[1]) or None), }] res = await _race(topic, "OnePager-Bauen", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: await _fail(guide_id, "OnePager-Bau fehlgeschlagen") return None cards = res[0] # Schritt 3: Verifizieren — OK oder vollständig korrigierte Liste (nicht fatal) await _set_progress(guide_id, "Verifiziere OnePager…") karten_block = "\n".join(f"{i}. {t}" for i, t in cards.items()) slots = [{ "key": f"{guide_id}-verify", "prompt": _prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block), "role": "fast", "capabilities": "none", "payload": (lambda result: result[1].strip() if (_parse_auswahl(result[1]) or "OK" in result[1].upper()) else None), }] res = await _race(topic, "OnePager-Verifikation", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled) if is_cancelled(): return None if res is None: _log(topic, "OnePager-Verifikation fehlgeschlagen — ungeprüfte Version wird verwendet") else: corrected = _parse_auswahl(res[0]) if corrected: _log(topic, "OnePager-Verifikation hat Korrekturen geliefert") cards = corrected sections = [ {"num": i, "title": _titel(text), "md": text.split(" — ", 1)[1].strip() if " — " in text else text} for i, text in cards.items() ] return [{"title": topic, "sections": sections}] async def _generate_sections( guide_id: str, topic: str, format_name: str, entries: dict[int, str], facts: str, instructions: str, provider: str, content_path: Path, fragment_paths: list[Path], ) -> list[dict] | None: spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8") bausteine_block = "\n".join(f"{i}. {t}" for i, t in entries.items()) if format_name == "MiniGuide": # Ein Writer, gliedert selbst in Kapitel plan = None zuteilungen = [bausteine_block] chunk_sizes = [len(entries)] else: await _set_progress(guide_id, "Plane Gliederung…") returncode, stdout, stderr = await run_agent( f"{guide_id}-plan", _prompt("Guide-Plan", topic=topic, format_name=format_name, bausteine=bausteine_block, extra=_extra(instructions)), _timeout("plan", len(entries)), provider=provider, role="guide", capabilities="none", ) if guide_id in _cancelled: return None if returncode != 0: await _fail(guide_id, _claude_error("Plan-Fehler", returncode, stdout, stderr)) return None plan = _parse_gliederung(stdout, set(entries), topic) if not plan: await _fail(guide_id, "Gliederung nicht parsebar") return None chunks = _split_chunks(plan, WRITER_COUNT[format_name]) zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks] chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks] writer_count = len(zuteilungen) await _set_progress(guide_id, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…") paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)] fragment_paths.extend(paths) results = await asyncio.gather(*[ run_agent( f"{guide_id}-w{i}", _prompt( "Guide-Writer", topic=topic, format_name=format_name, zuteilung=zuteilung, facts=facts, spec=spec, out_path=path, extra=_extra(instructions), ), _timeout("writer", size), provider=provider, role="guide", capabilities="full", ) for i, (zuteilung, path, size) in enumerate(zip(zuteilungen, paths, chunk_sizes), 1) ], return_exceptions=True) if guide_id in _cancelled: return None for i, (r, p) in enumerate(zip(results, paths), 1): if isinstance(r, BaseException): _log(topic, f"Writer {i}: {type(r).__name__}: {r}") elif r[0] != 0: _log(topic, f"Writer {i}: {_claude_error('Fehler', *r)}") elif not p.exists(): _log(topic, f"Writer {i}: keine Ausgabedatei erstellt") fragments: list[dict] = [] for p in paths: if p.exists(): fragments.extend(_parse_fragment(p.read_text(encoding="utf-8"))) if not fragments: await _fail(guide_id, _gather_error("Writer-Fehler", list(results))) return None await _set_progress(guide_id, "Setze zusammen…") chapters: list[dict] = [] if plan is None: # MiniGuide: Kapitel aus den Fragment-Markern in Datei-Reihenfolge seen: set[int] = set() for sec in fragments: if sec["num"] not in entries or sec["num"] in seen: continue seen.add(sec["num"]) title = sec["kapitel"] or topic if not chapters or chapters[-1]["title"] != title: chapters.append({"title": title, "sections": []}) chapters[-1]["sections"].append(_section_json(sec, entries)) missing = sorted(set(entries) - seen) else: by_num = {sec["num"]: sec for sec in fragments if sec["num"] in entries} for ch in plan: sections = [_section_json(by_num[num], entries) for num in ch["nums"] if num in by_num] if sections: chapters.append({"title": ch["title"], "sections": sections}) missing = sorted(set(entries) - set(by_num)) if missing: _log(topic, f"Sections fehlen in der Writer-Ausgabe: {missing}") if not chapters: await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden") return None return chapters async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: async with _semaphore: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now) content_path = guide_content_path(topic, format_name) project = project_dir(topic) if project_dir(topic).is_dir() else None fragment_paths: list[Path] = [] try: if guide_id in _cancelled: return if format_name == "OnePager": chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path, fragment_paths) else: cats = _parse_kategorien(bausteine_path(topic).read_text(encoding="utf-8")) selected: list[str] = [] for cat in FORMAT_COVERAGE[format_name]: selected.extend(cats.get(cat, [])) if not selected: await _fail(guide_id, "Keine passenden Bausteine gefunden") return entries = {i: text for i, text in enumerate(selected, 1)} facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema") chapters = await _generate_sections( guide_id, topic, format_name, entries, facts, instructions, provider, content_path, fragment_paths, ) if chapters is None or guide_id in _cancelled: return content_path.write_text( json.dumps({"topic": topic, "format": format_name, "chapters": chapters}, ensure_ascii=False, indent=1), encoding="utf-8", ) now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="done", progress=None, updated_at=now) except asyncio.TimeoutError: await _fail(guide_id, "Timeout bei der Generierung") except FileNotFoundError: await _fail(guide_id, "Bausteine fehlen") except Exception as e: await _fail(guide_id, str(e)[:2000]) finally: _cancelled.discard(guide_id) for p in fragment_paths: p.unlink(missing_ok=True) # --- Tutor-Chat --- def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str: transcript = "\n".join( f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}" for m in messages ) return _prompt( "Chat", topic=topic, format_name=format_name, outline_block=outline.strip() or "(keine)", section_block=section.strip() or "(kein Abschnitt erkannt)", transcript=transcript, ) async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str: try: prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages) returncode, stdout, stderr = await run_agent( "chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none" ) if returncode != 0: return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." reply = stdout.strip() return reply or "Entschuldigung, ich habe keine Antwort erhalten." except Exception: return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."