import asyncio import json import re import uuid from datetime import datetime, timezone from pathlib import Path from agents import run_agent, kill_process from config import ( AGENT_TIMEOUT, DEFAULT_PROVIDER, TEMPLATES_DIR, MAX_CONCURRENT_GENERATIONS, ) from database import update_guide from paths import bausteine_path, guide_content_path, project_dir _semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS) _cancelled: set[str] = set() async def cancel_guide(guide_id: str) -> bool: _cancelled.add(guide_id) kill_process(guide_id) now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen", updated_at=now) return True async def _set_progress(guide_id: str, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, progress=progress, updated_at=now) def _prompt(name: str, **kwargs) -> str: template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8") return template.format(**kwargs) def _extra(instructions: str) -> str: return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else "" def _log(topic: str, msg: str) -> None: print(f"[generator] {topic}: {msg}", flush=True) def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str: stderr = (stderr or "").strip() if stderr: return f"{label}: {stderr[:1000]}" tail = (stdout or "").strip()[-500:] if tail: return f"{label} (exit {returncode}, stderr leer): …{tail}" return f"{label} (exit {returncode}, ohne Ausgabe)" def _gather_error(label: str, results: list) -> str: for r in results: if isinstance(r, BaseException): return f"{label}: {type(r).__name__}: {r}" returncode, stdout, stderr = r if returncode != 0: return _claude_error(label, returncode, stdout, stderr) return f"{label}: kein verwertbares Ergebnis" async def _fail(guide_id: str, msg: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now) # --- Bausteine-Pipeline: 3x Recherche → Auswahl → 2x Einordnung → finale Einordnung --- _bausteine_progress: dict[str, str] = {} _bausteine_errors: dict[str, str] = {} _CATEGORIES = ("KERN", "WICHTIG", "REST") def bausteine_status(topic: str) -> dict: return { "ready": bausteine_path(topic).exists(), "generating": topic in _bausteine_progress, "progress": _bausteine_progress.get(topic), "error": _bausteine_errors.get(topic), } def active_bausteine() -> list[dict]: return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()] def reset_bausteine(topic: str) -> None: bausteine_path(topic).unlink(missing_ok=True) _bausteine_errors.pop(topic, None) def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str: if project: source = _prompt("Bausteine-Quelle-Projekt", project=project) else: source = _prompt("Bausteine-Quelle-Thema", topic=topic) return _prompt( "Bausteine-Recherche", topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions), ) def _parse_auswahl(text: str) -> dict[int, str]: """Parst die konsolidierte Liste: `N. Titel — Kurzbeschreibung` pro Zeile.""" entries: dict[int, str] = {} last = None for line in text.splitlines(): m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line) if m: last = int(m.group(1)) entries[last] = m.group(2) elif last is not None and line.strip(): entries[last] += " " + line.strip() return entries def _parse_einordnung(text: str) -> dict[int, str]: """Parst eine Einordnung (`KERN:` gefolgt von `N Titel`-Zeilen) zu Nummer→Kategorie.""" mapping: dict[int, str] = {} current = None for line in text.splitlines(): s = line.strip().lstrip("-*# ").strip() if not s: continue m = re.match(r"(KERN|WICHTIG|REST)\b[:\s]*(.*)$", s, re.IGNORECASE) if m: current = m.group(1).upper() for num in re.findall(r"\b\d+\b", m.group(2)): mapping.setdefault(int(num), current) continue if current: m = re.match(r"(\d+)\b", s) if m: mapping.setdefault(int(m.group(1)), current) return mapping def _build_final_bausteine(topic: str, entries: dict[int, str], mapping: dict[int, str]) -> str: """Baut die finale Baustein-Datei aus konsolidierter Liste + finaler Zuordnung.""" grouped: dict[str, list[str]] = {c: [] for c in _CATEGORIES} for num in sorted(entries): cat = mapping.get(num) if cat is None: _log(topic, f"Baustein {num} fehlt in finaler Einordnung → REST") cat = "REST" grouped[cat].append(entries[num]) unknown = sorted(set(mapping) - set(entries)) if unknown: _log(topic, f"finale Einordnung enthält unbekannte Nummern (ignoriert): {unknown}") parts = [] for cat in _CATEGORIES: lines = "\n".join(f"{i}. {text}" for i, text in enumerate(grouped[cat], 1)) parts.append(f"## {cat}\n{lines}") return "\n\n".join(parts) + "\n" async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: if topic in _bausteine_progress: return _bausteine_progress[topic] = "Wartend…" _bausteine_errors.pop(topic, None) final_path = bausteine_path(topic) project = project_dir(topic) if project_dir(topic).is_dir() else None stem = final_path.stem recherche_paths = [final_path.parent / f"{stem}.recherche-{i}.md" for i in (1, 2, 3)] auswahl_path = final_path.parent / f"{stem}.auswahl.md" try: async with _semaphore: # Schritt 1: 3 Recherche-Agenten parallel (Thema: Websuche, Projekt: Dateien lesen) _bausteine_progress[topic] = "Recherche läuft (3 Agenten)…" caps = "files" if project else "full" results = await asyncio.gather(*[ run_agent( f"bausteine-{topic}-recherche-{i}", _build_recherche_prompt(topic, path, instructions, project), AGENT_TIMEOUT, provider=provider, role="fast", capabilities=caps, ) for i, path in enumerate(recherche_paths, 1) ], return_exceptions=True) for i, (r, p) in enumerate(zip(results, recherche_paths), 1): if isinstance(r, BaseException): _log(topic, f"Recherche {i}: {type(r).__name__}: {r}") elif r[0] != 0: _log(topic, f"Recherche {i}: {_claude_error('Fehler', *r)}") elif not p.exists(): _log(topic, f"Recherche {i}: keine Ausgabedatei erstellt") recherchen = [p.read_text(encoding="utf-8") for p in recherche_paths if p.exists()] if not recherchen: _bausteine_errors[topic] = _gather_error("Recherche-Fehler", results) return # Schritt 2: Auswahl-Agent konsolidiert die Ergebnisse (ohne Quellen) _bausteine_progress[topic] = f"Konsolidiere Recherche ({len(recherchen)}/3 erfolgreich)…" results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1)) returncode, stdout, stderr = await run_agent( f"bausteine-{topic}-auswahl", _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=auswahl_path), AGENT_TIMEOUT, provider=provider, role="fast", capabilities="files", ) if returncode != 0 or not auswahl_path.exists(): _bausteine_errors[topic] = _claude_error("Auswahl-Fehler", returncode, stdout, stderr) return flat = auswahl_path.read_text(encoding="utf-8") entries = _parse_auswahl(flat) if not entries: _bausteine_errors[topic] = "Auswahl-Liste nicht parsebar" return # Schritt 3: 2 Einordnungs-Agenten parallel (antworten nur mit Nummer+Titel je Kategorie) _bausteine_progress[topic] = "Einordnung läuft (2 Agenten)…" results = await asyncio.gather(*[ run_agent( f"bausteine-{topic}-einordnung-{i}", _prompt("Bausteine-Einordnung", topic=topic, bausteine=flat), AGENT_TIMEOUT, provider=provider, role="fast", capabilities="none", ) for i in (1, 2) ], return_exceptions=True) einordnungen = [] for i, r in enumerate(results, 1): if isinstance(r, BaseException): _log(topic, f"Einordnung {i}: {type(r).__name__}: {r}") elif r[0] != 0: _log(topic, f"Einordnung {i}: {_claude_error('Fehler', *r)}") elif not _parse_einordnung(r[1]): _log(topic, f"Einordnung {i}: Antwort nicht parsebar") else: einordnungen.append(r[1].strip()) if not einordnungen: _bausteine_errors[topic] = _gather_error("Einordnungs-Fehler", results) return # Schritt 4: finale Einordnung — Python validiert und baut die Datei _bausteine_progress[topic] = f"Finale Einordnung ({len(einordnungen)}/2 erfolgreich)…" returncode, stdout, stderr = await run_agent( f"bausteine-{topic}-final", _prompt( "Bausteine-Einordnung-Final", topic=topic, bausteine=flat, einordnung_1=einordnungen[0], einordnung_2=einordnungen[-1], ), AGENT_TIMEOUT, provider=provider, role="fast", capabilities="none", ) if returncode != 0: _bausteine_errors[topic] = _claude_error("Finale-Einordnungs-Fehler", returncode, stdout, stderr) return mapping = _parse_einordnung(stdout) if not mapping: _bausteine_errors[topic] = "Finale Einordnung nicht parsebar" return final_path.write_text(_build_final_bausteine(topic, entries, mapping), encoding="utf-8") except Exception as e: _bausteine_errors[topic] = str(e)[:2000] finally: _bausteine_progress.pop(topic, None) for p in [*recherche_paths, auswahl_path]: p.unlink(missing_ok=True) # --- Guide-Generierung: Bausteine → (Plan) → Writer → JSON --- # Welche Baustein-Kategorien jedes Format abdeckt. FORMAT_COVERAGE = { "OnePager": ("KERN",), "MiniGuide": ("KERN",), "Guide": ("KERN", "WICHTIG"), "FullGuide": ("KERN", "WICHTIG", "REST"), } # Parallele Writer pro Format (OnePager hat einen eigenen Weg). WRITER_COUNT = {"MiniGuide": 1, "Guide": 2, "FullGuide": 4} def _parse_kategorien(text: str) -> dict[str, list[str]]: """Parst die finale Baustein-Datei (## KERN/WICHTIG/REST mit nummerierten Einträgen).""" cats: dict[str, list[str]] = {} current = None for line in text.splitlines(): s = line.strip() m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE) if m: current = m.group(1).upper() cats.setdefault(current, []) continue m = re.match(r"(\d+)[.)]\s+(.*\S)", s) if m and current: cats[current].append(m.group(2)) return cats def _titel(entry: str) -> str: return entry.split(" — ")[0].strip() or entry def _parse_gliederung(text: str, valid: set[int], topic: str) -> list[dict]: """Parst die Gliederung (`KAPITEL: Titel` + `N Titel`-Zeilen) → [{"title", "nums"}].""" chapters: list[dict] = [] seen: set[int] = set() for line in text.splitlines(): s = line.strip().lstrip("-*# ").strip() if not s: continue m = re.match(r"KAPITEL\s*:\s*(.+)", s, re.IGNORECASE) if m: chapters.append({"title": m.group(1).strip(), "nums": []}) continue m = re.match(r"(\d+)\b", s) if m and chapters: num = int(m.group(1)) if num in valid and num not in seen: chapters[-1]["nums"].append(num) seen.add(num) missing = sorted(valid - seen) if missing: _log(topic, f"Gliederung: Bausteine {missing} fehlen → Kapitel 'Weitere Themen'") chapters.append({"title": "Weitere Themen", "nums": missing}) return [c for c in chapters if c["nums"]] def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]: """Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl.""" n = max(1, min(n, len(chapters))) chunks: list[list[dict]] = [] current: list[dict] = [] count = 0 remaining_total = sum(len(c["nums"]) for c in chapters) remaining_chunks = n for ch in chapters: current.append(ch) count += len(ch["nums"]) if remaining_chunks > 1 and count >= remaining_total / remaining_chunks: chunks.append(current) remaining_total -= count remaining_chunks -= 1 current = [] count = 0 if current: chunks.append(current) return chunks def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str: lines = [] for ch in chunk: lines.append(f"KAPITEL: {ch['title']}") lines.extend(f"{num} {entries[num]}" for num in ch["nums"]) return "\n".join(lines) _FRAGMENT_KAPITEL_RE = re.compile(r"", re.IGNORECASE) _FRAGMENT_SECTION_RE = re.compile(r"", re.IGNORECASE) def _parse_fragment(text: str) -> list[dict]: """Parst eine Writer-Datei → [{"kapitel", "num", "title", "md"}] in Datei-Reihenfolge.""" sections: list[dict] = [] kapitel = None current = None for line in text.splitlines(): s = line.strip() m = _FRAGMENT_KAPITEL_RE.match(s) if m: kapitel = m.group(1) current = None continue m = _FRAGMENT_SECTION_RE.match(s) if m: current = {"kapitel": kapitel, "num": int(m.group(1)), "title": (m.group(2) or "").strip(), "md": []} sections.append(current) continue if current is not None: current["md"].append(line) for sec in sections: sec["md"] = "\n".join(sec["md"]).strip() return sections def _section_json(sec: dict, entries: dict[int, str]) -> dict: return {"num": sec["num"], "title": sec["title"] or _titel(entries[sec["num"]]), "md": sec["md"]} async def _generate_onepager(guide_id: str, topic: str, entries: dict[int, str], instructions: str, provider: str) -> list[dict] | None: await _set_progress(guide_id, "Generiere OnePager…") bausteine_block = "\n".join(f"{i}. {t}" for i, t in entries.items()) returncode, stdout, stderr = await run_agent( f"{guide_id}-onepager", _prompt("OnePager", topic=topic, bausteine=bausteine_block, extra=_extra(instructions)), AGENT_TIMEOUT, provider=provider, role="fast", capabilities="none", ) if guide_id in _cancelled: return None if returncode != 0: await _fail(guide_id, _claude_error("OnePager-Fehler", returncode, stdout, stderr)) return None merksaetze: dict[int, str] = {} for line in stdout.splitlines(): m = re.match(r"\s*(\d+)\s*[:.\-–—]\s*(.*\S)", line) if m: merksaetze.setdefault(int(m.group(1)), m.group(2)) sections = [] for num, entry in entries.items(): md = merksaetze.get(num) if md is None: _log(topic, f"OnePager: Merksatz für Baustein {num} fehlt") continue sections.append({"num": num, "title": _titel(entry), "md": md}) if not sections: await _fail(guide_id, "OnePager-Antwort nicht parsebar") return None return [{"title": topic, "sections": sections}] async def _generate_sections( guide_id: str, topic: str, format_name: str, entries: dict[int, str], facts: str, instructions: str, provider: str, content_path: Path, fragment_paths: list[Path], ) -> list[dict] | None: spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8") bausteine_block = "\n".join(f"{i}. {t}" for i, t in entries.items()) if format_name == "MiniGuide": # Ein Writer, gliedert selbst in Kapitel plan = None zuteilungen = [bausteine_block] else: await _set_progress(guide_id, "Plane Gliederung…") returncode, stdout, stderr = await run_agent( f"{guide_id}-plan", _prompt("Guide-Plan", topic=topic, format_name=format_name, bausteine=bausteine_block, extra=_extra(instructions)), AGENT_TIMEOUT, provider=provider, role="fast", capabilities="none", ) if guide_id in _cancelled: return None if returncode != 0: await _fail(guide_id, _claude_error("Plan-Fehler", returncode, stdout, stderr)) return None plan = _parse_gliederung(stdout, set(entries), topic) if not plan: await _fail(guide_id, "Gliederung nicht parsebar") return None chunks = _split_chunks(plan, WRITER_COUNT[format_name]) zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks] writer_count = len(zuteilungen) await _set_progress(guide_id, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…") paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)] fragment_paths.extend(paths) results = await asyncio.gather(*[ run_agent( f"{guide_id}-w{i}", _prompt( "Guide-Writer", topic=topic, format_name=format_name, zuteilung=zuteilung, facts=facts, spec=spec, out_path=path, extra=_extra(instructions), ), AGENT_TIMEOUT, provider=provider, role="guide", capabilities="full", ) for i, (zuteilung, path) in enumerate(zip(zuteilungen, paths), 1) ], return_exceptions=True) if guide_id in _cancelled: return None for i, (r, p) in enumerate(zip(results, paths), 1): if isinstance(r, BaseException): _log(topic, f"Writer {i}: {type(r).__name__}: {r}") elif r[0] != 0: _log(topic, f"Writer {i}: {_claude_error('Fehler', *r)}") elif not p.exists(): _log(topic, f"Writer {i}: keine Ausgabedatei erstellt") fragments: list[dict] = [] for p in paths: if p.exists(): fragments.extend(_parse_fragment(p.read_text(encoding="utf-8"))) if not fragments: await _fail(guide_id, _gather_error("Writer-Fehler", list(results))) return None await _set_progress(guide_id, "Setze zusammen…") chapters: list[dict] = [] if plan is None: # MiniGuide: Kapitel aus den Fragment-Markern in Datei-Reihenfolge seen: set[int] = set() for sec in fragments: if sec["num"] not in entries or sec["num"] in seen: continue seen.add(sec["num"]) title = sec["kapitel"] or topic if not chapters or chapters[-1]["title"] != title: chapters.append({"title": title, "sections": []}) chapters[-1]["sections"].append(_section_json(sec, entries)) missing = sorted(set(entries) - seen) else: by_num = {sec["num"]: sec for sec in fragments if sec["num"] in entries} for ch in plan: sections = [_section_json(by_num[num], entries) for num in ch["nums"] if num in by_num] if sections: chapters.append({"title": ch["title"], "sections": sections}) missing = sorted(set(entries) - set(by_num)) if missing: _log(topic, f"Sections fehlen in der Writer-Ausgabe: {missing}") if not chapters: await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden") return None return chapters async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: async with _semaphore: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="generating", progress="Lese Bausteine…", updated_at=now) content_path = guide_content_path(topic, format_name) project = project_dir(topic) if project_dir(topic).is_dir() else None fragment_paths: list[Path] = [] try: if guide_id in _cancelled: return cats = _parse_kategorien(bausteine_path(topic).read_text(encoding="utf-8")) selected: list[str] = [] for cat in FORMAT_COVERAGE[format_name]: selected.extend(cats.get(cat, [])) if not selected: await _fail(guide_id, "Keine passenden Bausteine gefunden") return entries = {i: text for i, text in enumerate(selected, 1)} if format_name == "OnePager": chapters = await _generate_onepager(guide_id, topic, entries, instructions, provider) else: facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema") chapters = await _generate_sections( guide_id, topic, format_name, entries, facts, instructions, provider, content_path, fragment_paths, ) if chapters is None or guide_id in _cancelled: return content_path.write_text( json.dumps({"topic": topic, "format": format_name, "chapters": chapters}, ensure_ascii=False, indent=1), encoding="utf-8", ) now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="done", progress=None, updated_at=now) except asyncio.TimeoutError: await _fail(guide_id, f"Timeout bei Generierung nach {AGENT_TIMEOUT}s") except FileNotFoundError: await _fail(guide_id, "Bausteine fehlen") except Exception as e: await _fail(guide_id, str(e)[:2000]) finally: _cancelled.discard(guide_id) for p in fragment_paths: p.unlink(missing_ok=True) # --- Tutor-Chat --- def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str: transcript = "\n".join( f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}" for m in messages ) return _prompt( "Chat", topic=topic, format_name=format_name, outline_block=outline.strip() or "(keine)", section_block=section.strip() or "(kein Abschnitt erkannt)", transcript=transcript, ) async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str: try: prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages) returncode, stdout, stderr = await run_agent( "chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none" ) if returncode != 0: return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut." reply = stdout.strip() return reply or "Entschuldigung, ich habe keine Antwort erhalten." except Exception: return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."