import asyncio import json import re import shutil import subprocess import tempfile import uuid from datetime import datetime, timezone from pathlib import Path from config import ( AGENT_TIMEOUT, CLAUDE_CLI, TEMPLATES_DIR, MAX_CONCURRENT_GENERATIONS, MODEL_GUIDE, MODEL_BAUSTEIN_GEN, MODEL_BAUSTEIN_REWORK, STORAGE_DIR, ) from database import ( update_guide, create_baustein, create_suggestions, delete_pending_suggestions, list_bausteine, update_baustein, ) from paths import final_paths, temp_paths _semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS) _active_processes: dict[str, asyncio.subprocess.Process] = {} _cancelled: set[str] = set() async def cancel_guide(guide_id: str) -> bool: _cancelled.add(guide_id) process = _active_processes.get(guide_id) if process and process.returncode is None: process.kill() now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen", updated_at=now) return True async def _set_progress(guide_id: str, progress: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, progress=progress, updated_at=now) async def _run_claude(guide_id: str, prompt: str, timeout: int, tools: str | None = "Write,Bash,Read,WebSearch,WebFetch", model: str | None = None) -> tuple[int, str, str]: cmd = [CLAUDE_CLI, "-p"] if model: cmd += ["--model", model] if tools: cmd += ["--allowedTools", tools] cmd += ["--dangerously-skip-permissions"] process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _active_processes[guide_id] = process try: try: stdout, stderr = await asyncio.wait_for( process.communicate(input=prompt.encode("utf-8")), timeout=timeout, ) except asyncio.TimeoutError: process.kill() try: await asyncio.wait_for(process.wait(), timeout=5) except asyncio.TimeoutError: pass raise return process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") finally: _active_processes.pop(guide_id, None) async def _render_pdf(html_path: Path, pdf_path: Path) -> tuple[bool, str]: proc = await asyncio.create_subprocess_exec( "weasyprint", str(html_path), str(pdf_path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _, stderr = await asyncio.wait_for(proc.communicate(), timeout=120) if proc.returncode != 0: return False, stderr.decode("utf-8", errors="replace")[:1000] return True, "" def _build_generator_prompt(topic: str, format_name: str, html_path: Path, instructions: str = "") -> str: spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8") reference = (TEMPLATES_DIR / "Referenz" / f"{format_name}.md").read_text(encoding="utf-8") extra = f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else "" return f"""Erstelle einen Lern-Guide zum Thema "{topic}" im Format "{format_name}". Recherchiere zuerst die aktuelle Version und aktuelle Fakten zu "{topic}" per Websuche, damit Versionsnummern und Angaben stimmen. Schreibe die HTML-Datei nach: {html_path} Schreibe NUR die HTML-Datei. Führe KEIN weasyprint aus, erzeuge KEINE PDF. Das übernimmt ein anderer Prozess. FORMAT-SPEZIFIKATION: {spec} REFERENZ-IMPLEMENTIERUNG (Stil-Vorlage, adaptiere für "{topic}"): {reference} {extra}""" def _build_rework_prompt(topic: str, format_name: str, html_path: Path, instructions: str) -> str: spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8") return f"""Überarbeite die bestehende HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}". Lies zuerst die aktuelle HTML-Datei mit dem Read-Tool. ANWEISUNGEN VOM NUTZER: {instructions} FORMAT-SPEZIFIKATION (muss weiterhin eingehalten werden): {spec} Schreibe die überarbeitete Version in dieselbe Datei: {html_path} Führe KEIN weasyprint aus, erzeuge KEINE PDF. """ def _build_fix_prompt(topic: str, format_name: str, html_path: Path, feedback: str) -> str: return f"""Die HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}" hat Probleme. FEEDBACK VOM PRÜFER: {feedback} Behebe die Probleme in der HTML-Datei {html_path}. Schreibe die korrigierte Version in dieselbe Datei. Führe KEIN weasyprint aus, erzeuge KEINE PDF. """ def _build_content_review_prompt(topic: str, format_name: str, html_path: Path) -> str: spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8") return f"""Prüfe den Inhalt der HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}". SCHRITT 1 — HTML-Datei lesen: Öffne die Datei {html_path} mit dem Read-Tool. SCHRITT 2 — Fakten per Websuche prüfen: Recherchiere mit WebSearch, ob Versionsnummern, Jahreszahlen und zentrale Fakten zu "{topic}" aktuell und korrekt sind. SCHRITT 3 — Vollständigkeit prüfen anhand dieser Spezifikation: {spec} Prüfkriterien: - Sind alle Pflicht-Kapitel/Sektionen vorhanden? - Stimmen Versionsnummern und Fakten? - Ist der Inhalt fachlich korrekt und aktuell? - Entspricht der Schwierigkeitsgrad dem Format? - Sind Pflicht-Elemente vorhanden (Cover, TOC, Recall-Boxen, Callouts, Code-Beispiele)? SCHRITT 4 — Antworte mit GENAU EINEM der folgenden Formate: Bei Bestehen: PASS Bei Nicht-Bestehen: FAIL - Problem 1 - Problem 2 - ... """ async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "") -> None: async with _semaphore: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="generating", progress="Recherche…", updated_at=now) html_path, pdf_path = final_paths(topic, format_name) try: if guide_id in _cancelled: return current_step = "Generierung" current_timeout = AGENT_TIMEOUT # Step 1: Generator-Agent erstellt HTML await _set_progress(guide_id, "Generiere HTML…") gen_prompt = _build_generator_prompt(topic, format_name, html_path, instructions) returncode, stdout, stderr = await _run_claude(guide_id, gen_prompt, AGENT_TIMEOUT, model=MODEL_GUIDE) if guide_id in _cancelled: return if returncode != 0: await _fail(guide_id, f"Generator-Fehler: {stderr[:1000]}") return if not html_path.exists(): await _fail(guide_id, "HTML-Datei wurde nicht erstellt") return # Step 2: Inhalts-Review (1x, kein Loop) if guide_id in _cancelled: return await _set_progress(guide_id, "Prüfe Inhalt…") current_step = "Inhalts-Review" current_timeout = AGENT_TIMEOUT content_prompt = _build_content_review_prompt(topic, format_name, html_path) returncode, review_out, review_err = await _run_claude(guide_id, content_prompt, AGENT_TIMEOUT, model=MODEL_GUIDE) if returncode != 0: await _fail(guide_id, f"Inhalts-Review-Fehler: {review_err[:1000]}") return review_text = review_out.strip() if not review_text.startswith("PASS"): if guide_id in _cancelled: return feedback = review_text.replace("FAIL", "").strip() await _set_progress(guide_id, "Korrigiere Inhalt…") current_step = "Inhalts-Korrektur" current_timeout = AGENT_TIMEOUT fix_prompt = _build_fix_prompt(topic, format_name, html_path, feedback) returncode, _, fix_err = await _run_claude(guide_id, fix_prompt, AGENT_TIMEOUT, model=MODEL_GUIDE) if returncode != 0: await _fail(guide_id, f"Fix-Fehler: {fix_err[:1000]}") return # Step 3: PDF rendern if guide_id in _cancelled: return await _set_progress(guide_id, "Rendere PDF…") ok, err = await _render_pdf(html_path, pdf_path) if not ok: await _fail(guide_id, f"WeasyPrint-Fehler: {err}") return now = datetime.now(timezone.utc).isoformat() await update_guide( guide_id, status="done", progress=None, updated_at=now, ) except asyncio.TimeoutError: await _fail(guide_id, f"Timeout bei {current_step} nach {current_timeout}s") except Exception as e: await _fail(guide_id, str(e)[:2000]) finally: _active_processes.pop(guide_id, None) _cancelled.discard(guide_id) async def rework_guide(guide_id: str, topic: str, format_name: str, instructions: str) -> None: async with _semaphore: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="generating", progress="Überarbeite…", updated_at=now) final_html, final_pdf = final_paths(topic, format_name) tmp_html, tmp_pdf = temp_paths(guide_id) try: if guide_id in _cancelled: return if not final_html.exists(): await _fail(guide_id, "Original-HTML nicht gefunden") return shutil.copy2(final_html, tmp_html) current_step = "Überarbeitung" current_timeout = AGENT_TIMEOUT rework_prompt = _build_rework_prompt(topic, format_name, tmp_html, instructions) returncode, stdout, stderr = await _run_claude(guide_id, rework_prompt, AGENT_TIMEOUT, model=MODEL_GUIDE) if guide_id in _cancelled: return if returncode != 0: await _fail(guide_id, f"Rework-Fehler: {stderr[:1000]}") return if not tmp_html.exists(): await _fail(guide_id, "HTML-Datei wurde nicht erstellt") return await _set_progress(guide_id, "Rendere PDF…") ok, err = await _render_pdf(tmp_html, tmp_pdf) if not ok: await _fail(guide_id, f"WeasyPrint-Fehler: {err}") return # Atomar: Temp → Final umbenennen tmp_html.replace(final_html) tmp_pdf.replace(final_pdf) now = datetime.now(timezone.utc).isoformat() await update_guide( guide_id, status="done", progress=None, updated_at=now, ) except asyncio.TimeoutError: await _fail(guide_id, f"Timeout bei {current_step} nach {current_timeout}s") except Exception as e: await _fail(guide_id, str(e)[:2000]) finally: _active_processes.pop(guide_id, None) _cancelled.discard(guide_id) tmp_html.unlink(missing_ok=True) tmp_pdf.unlink(missing_ok=True) async def _fail(guide_id: str, msg: str) -> None: now = datetime.now(timezone.utc).isoformat() await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now) # --- Bausteine --- _suggestions_generating: set[str] = set() def is_suggestions_generating(topic: str) -> bool: return topic in _suggestions_generating def _parse_json(text: str): text = text.strip() text = re.sub(r"^```(?:json)?\s*", "", text) text = re.sub(r"\s*```$", "", text) return json.loads(text) def _build_suggestions_prompt(topic: str, html_paths: list[Path], existing_titles: list[str]) -> str: spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8") reference = (TEMPLATES_DIR / "Referenz" / "Baustein.md").read_text(encoding="utf-8") existing_list = "\n".join(f"- {t}" for t in existing_titles) if existing_titles else "(keine)" if html_paths: read_instructions = "\n".join(f"- Lies: {p}" for p in html_paths) guides_section = f"""SCHRITT 1 — Guides lesen: {read_instructions} """ else: guides_section = "" return f"""Schlage fundamentale Bausteine (Kernkonzepte) zum Thema "{topic}" vor. {guides_section}Bereits vorhandene Bausteine (NICHT erneut vorschlagen): {existing_list} FORMAT-SPEZIFIKATION: {spec} REFERENZ-BEISPIEL: {reference} Schlage 40 Bausteine vor. Antworte AUSSCHLIESSLICH mit einem JSON-Array. Jedes Element hat: - "title" - "description" - "purpose" - "examples": Array mit 1 Objekt {{"label": "...", "code": "..."}} Orientiere dich an der Spezifikation und Referenz. NUR das JSON-Array, kein weiterer Text. """ def _build_baustein_detail_prompt(topic: str, title: str, instructions: str = "") -> str: spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8") reference = (TEMPLATES_DIR / "Referenz" / "Baustein.md").read_text(encoding="utf-8") extra = f"\n\nZUSÄTZLICHE INFOS VOM NUTZER:\n{instructions}\n" if instructions else "" return f"""Generiere Details für den Baustein "{title}" im Kontext des Themas "{topic}". FORMAT-SPEZIFIKATION: {spec} REFERENZ-BEISPIEL: {reference} {extra} Antworte AUSSCHLIESSLICH mit einem JSON-Objekt mit den Feldern "description", "purpose", "examples". "examples" ist ein Array mit 1 Objekt {{"label": "...", "code": "..."}}. Orientiere dich an der Spezifikation und Referenz. Kein weiterer Text, nur das JSON. """ async def generate_suggestions(topic: str, html_paths: list[Path]) -> None: _suggestions_generating.add(topic) try: existing = await list_bausteine(topic) existing_titles = [b["title"] for b in existing] await delete_pending_suggestions(topic) prompt = _build_suggestions_prompt(topic, html_paths, existing_titles) tools = "Read" if html_paths else None returncode, stdout, stderr = await _run_claude("suggestions-" + topic, prompt, 1800, tools=tools, model=MODEL_BAUSTEIN_GEN) if returncode != 0: return items = _parse_json(stdout) if not isinstance(items, list): return now = datetime.now(timezone.utc).isoformat() suggestions = [] for item in items[:40]: suggestions.append({ "id": str(uuid.uuid4()), "topic": topic, "title": item.get("title", ""), "description": item.get("description", ""), "purpose": item.get("purpose", ""), "example": json.dumps(item.get("examples", []), ensure_ascii=False), "status": "pending", "created_at": now, }) if suggestions: await create_suggestions(suggestions) except Exception: pass finally: _suggestions_generating.discard(topic) async def generate_baustein_detail(baustein_id: str, topic: str, title: str, instructions: str = "") -> None: try: prompt = _build_baustein_detail_prompt(topic, title, instructions) returncode, stdout, stderr = await _run_claude("baustein-" + baustein_id, prompt, 60, tools=None, model=MODEL_BAUSTEIN_GEN) if returncode != 0: return data = _parse_json(stdout) if not isinstance(data, dict): return now = datetime.now(timezone.utc).isoformat() await update_baustein( baustein_id, description=data.get("description", ""), purpose=data.get("purpose", ""), example=json.dumps(data.get("examples", []), ensure_ascii=False), updated_at=now, ) except Exception: pass async def rework_baustein(baustein_id: str, topic: str, title: str, current: dict, instructions: str) -> None: try: prompt = _build_baustein_rework_prompt(topic, title, current, instructions) returncode, stdout, stderr = await _run_claude("baustein-" + baustein_id, prompt, 60, tools=None, model=MODEL_BAUSTEIN_REWORK) if returncode != 0: return data = _parse_json(stdout) if not isinstance(data, dict): return now = datetime.now(timezone.utc).isoformat() await update_baustein( baustein_id, title=data.get("title", title), description=data.get("description", ""), purpose=data.get("purpose", ""), example=json.dumps(data.get("examples", []), ensure_ascii=False), updated_at=now, ) except Exception: pass def _build_baustein_rework_prompt(topic: str, title: str, current: dict, instructions: str) -> str: current_json = json.dumps({ "title": title, "description": current.get("description", ""), "purpose": current.get("purpose", ""), "examples": current.get("examples", []), }, ensure_ascii=False, indent=2) return f"""Überarbeite den Baustein "{title}" zum Thema "{topic}" gemäß den Anweisungen. AKTUELLER STAND: {current_json} ANWEISUNGEN VOM NUTZER: {instructions} Antworte AUSSCHLIESSLICH mit einem JSON-Objekt mit den Feldern "title", "description", "purpose", "examples". "examples" ist ein Array mit Objekten {{"label": "...", "code": "..."}}. Kein weiterer Text, nur das JSON. """