Files
guides/backend/generator.py
2026-05-28 15:20:25 +00:00

445 lines
15 KiB
Python

import asyncio
import json
import re
import subprocess
import tempfile
import uuid
from datetime import datetime, timezone
from pathlib import Path
from config import (
AGENT_TIMEOUT,
CLAUDE_CLI,
TEMPLATES_DIR,
MAX_CONCURRENT_GENERATIONS,
STORAGE_DIR,
)
from database import (
update_guide,
create_baustein,
create_suggestions,
delete_pending_suggestions,
list_bausteine,
update_baustein,
)
_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS)
_active_processes: dict[str, asyncio.subprocess.Process] = {}
_cancelled: set[str] = set()
async def cancel_guide(guide_id: str) -> bool:
_cancelled.add(guide_id)
process = _active_processes.get(guide_id)
if process and process.returncode is None:
process.kill()
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen", updated_at=now)
return True
async def _set_progress(guide_id: str, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, progress=progress, updated_at=now)
async def _run_claude(guide_id: str, prompt: str, timeout: int, tools: str | None = "Write,Bash,Read,WebSearch,WebFetch") -> tuple[int, str, str]:
cmd = [CLAUDE_CLI, "-p"]
if tools:
cmd += ["--allowedTools", tools]
cmd += ["--dangerously-skip-permissions"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_active_processes[guide_id] = process
try:
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(input=prompt.encode("utf-8")),
timeout=timeout,
)
except asyncio.TimeoutError:
process.kill()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
pass
raise
return process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
finally:
_active_processes.pop(guide_id, None)
async def _render_pdf(html_path: Path, pdf_path: Path) -> tuple[bool, str]:
proc = await asyncio.create_subprocess_exec(
"weasyprint", str(html_path), str(pdf_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
if proc.returncode != 0:
return False, stderr.decode("utf-8", errors="replace")[:1000]
return True, ""
def _build_generator_prompt(topic: str, format_name: str, html_path: Path, instructions: str = "") -> str:
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
reference = (TEMPLATES_DIR / "Referenz" / f"{format_name}.md").read_text(encoding="utf-8")
extra = f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
return f"""Erstelle einen Lern-Guide zum Thema "{topic}" im Format "{format_name}".
Recherchiere zuerst die aktuelle Version und aktuelle Fakten zu "{topic}" per Websuche, damit Versionsnummern und Angaben stimmen.
Schreibe die HTML-Datei nach: {html_path}
Schreibe NUR die HTML-Datei. Führe KEIN weasyprint aus, erzeuge KEINE PDF. Das übernimmt ein anderer Prozess.
FORMAT-SPEZIFIKATION:
{spec}
REFERENZ-IMPLEMENTIERUNG (Stil-Vorlage, adaptiere für "{topic}"):
{reference}
{extra}"""
def _build_rework_prompt(topic: str, format_name: str, html_path: Path, instructions: str) -> str:
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
return f"""Überarbeite die bestehende HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}".
Lies zuerst die aktuelle HTML-Datei mit dem Read-Tool.
ANWEISUNGEN VOM NUTZER:
{instructions}
FORMAT-SPEZIFIKATION (muss weiterhin eingehalten werden):
{spec}
Schreibe die überarbeitete Version in dieselbe Datei: {html_path}
Führe KEIN weasyprint aus, erzeuge KEINE PDF.
"""
def _build_fix_prompt(topic: str, format_name: str, html_path: Path, feedback: str) -> str:
return f"""Die HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}" hat Probleme.
FEEDBACK VOM PRÜFER:
{feedback}
Behebe die Probleme in der HTML-Datei {html_path}. Schreibe die korrigierte Version in dieselbe Datei.
Führe KEIN weasyprint aus, erzeuge KEINE PDF.
"""
def _build_content_review_prompt(topic: str, format_name: str, html_path: Path) -> str:
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
return f"""Prüfe den Inhalt der HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}".
SCHRITT 1 — HTML-Datei lesen:
Öffne die Datei {html_path} mit dem Read-Tool.
SCHRITT 2 — Fakten per Websuche prüfen:
Recherchiere mit WebSearch, ob Versionsnummern, Jahreszahlen und zentrale Fakten zu "{topic}" aktuell und korrekt sind.
SCHRITT 3 — Vollständigkeit prüfen anhand dieser Spezifikation:
{spec}
Prüfkriterien:
- Sind alle Pflicht-Kapitel/Sektionen vorhanden?
- Stimmen Versionsnummern und Fakten?
- Ist der Inhalt fachlich korrekt und aktuell?
- Entspricht der Schwierigkeitsgrad dem Format?
- Sind Pflicht-Elemente vorhanden (Cover, TOC, Recall-Boxen, Callouts, Code-Beispiele)?
SCHRITT 4 — Antworte mit GENAU EINEM der folgenden Formate:
Bei Bestehen:
PASS
Bei Nicht-Bestehen:
FAIL
- Problem 1
- Problem 2
- ...
"""
async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "") -> None:
async with _semaphore:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="generating", progress="Recherche…", updated_at=now)
html_path = STORAGE_DIR / "html" / f"{guide_id}.html"
pdf_path = STORAGE_DIR / "pdf" / f"{guide_id}.pdf"
try:
if guide_id in _cancelled:
return
current_step = "Generierung"
current_timeout = AGENT_TIMEOUT
# Step 1: Generator-Agent erstellt HTML
await _set_progress(guide_id, "Generiere HTML…")
gen_prompt = _build_generator_prompt(topic, format_name, html_path, instructions)
returncode, stdout, stderr = await _run_claude(guide_id, gen_prompt, AGENT_TIMEOUT)
if guide_id in _cancelled:
return
if returncode != 0:
await _fail(guide_id, f"Generator-Fehler: {stderr[:1000]}")
return
if not html_path.exists():
await _fail(guide_id, "HTML-Datei wurde nicht erstellt")
return
# Step 2: Inhalts-Review (1x, kein Loop)
if guide_id in _cancelled:
return
await _set_progress(guide_id, "Prüfe Inhalt…")
current_step = "Inhalts-Review"
current_timeout = AGENT_TIMEOUT
content_prompt = _build_content_review_prompt(topic, format_name, html_path)
returncode, review_out, review_err = await _run_claude(guide_id, content_prompt, AGENT_TIMEOUT)
if returncode != 0:
await _fail(guide_id, f"Inhalts-Review-Fehler: {review_err[:1000]}")
return
review_text = review_out.strip()
if not review_text.startswith("PASS"):
if guide_id in _cancelled:
return
feedback = review_text.replace("FAIL", "").strip()
await _set_progress(guide_id, "Korrigiere Inhalt…")
current_step = "Inhalts-Korrektur"
current_timeout = AGENT_TIMEOUT
fix_prompt = _build_fix_prompt(topic, format_name, html_path, feedback)
returncode, _, fix_err = await _run_claude(guide_id, fix_prompt, AGENT_TIMEOUT)
if returncode != 0:
await _fail(guide_id, f"Fix-Fehler: {fix_err[:1000]}")
return
# Step 3: PDF rendern
if guide_id in _cancelled:
return
await _set_progress(guide_id, "Rendere PDF…")
ok, err = await _render_pdf(html_path, pdf_path)
if not ok:
await _fail(guide_id, f"WeasyPrint-Fehler: {err}")
return
now = datetime.now(timezone.utc).isoformat()
await update_guide(
guide_id,
status="done",
progress=None,
html_path=str(html_path),
pdf_path=str(pdf_path),
updated_at=now,
)
except asyncio.TimeoutError:
await _fail(guide_id, f"Timeout bei {current_step} nach {current_timeout}s")
except Exception as e:
await _fail(guide_id, str(e)[:2000])
finally:
_active_processes.pop(guide_id, None)
_cancelled.discard(guide_id)
async def rework_guide(guide_id: str, topic: str, format_name: str, instructions: str) -> None:
async with _semaphore:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="generating", progress="Überarbeite…", updated_at=now)
html_path = STORAGE_DIR / "html" / f"{guide_id}.html"
pdf_path = STORAGE_DIR / "pdf" / f"{guide_id}.pdf"
try:
if guide_id in _cancelled:
return
current_step = "Überarbeitung"
current_timeout = AGENT_TIMEOUT
rework_prompt = _build_rework_prompt(topic, format_name, html_path, instructions)
returncode, stdout, stderr = await _run_claude(guide_id, rework_prompt, AGENT_TIMEOUT)
if guide_id in _cancelled:
return
if returncode != 0:
await _fail(guide_id, f"Rework-Fehler: {stderr[:1000]}")
return
if not html_path.exists():
await _fail(guide_id, "HTML-Datei wurde nicht erstellt")
return
await _set_progress(guide_id, "Rendere PDF…")
ok, err = await _render_pdf(html_path, pdf_path)
if not ok:
await _fail(guide_id, f"WeasyPrint-Fehler: {err}")
return
now = datetime.now(timezone.utc).isoformat()
await update_guide(
guide_id, status="done", progress=None,
html_path=str(html_path), pdf_path=str(pdf_path), updated_at=now,
)
except asyncio.TimeoutError:
await _fail(guide_id, f"Timeout bei {current_step} nach {current_timeout}s")
except Exception as e:
await _fail(guide_id, str(e)[:2000])
finally:
_active_processes.pop(guide_id, None)
_cancelled.discard(guide_id)
async def _fail(guide_id: str, msg: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now)
# --- Bausteine ---
_suggestions_generating: set[str] = set()
def is_suggestions_generating(topic: str) -> bool:
return topic in _suggestions_generating
def _parse_json(text: str):
text = text.strip()
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
return json.loads(text)
def _build_suggestions_prompt(topic: str, html_paths: list[Path], existing_titles: list[str]) -> str:
spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8")
reference = (TEMPLATES_DIR / "Referenz" / "Baustein.md").read_text(encoding="utf-8")
existing_list = "\n".join(f"- {t}" for t in existing_titles) if existing_titles else "(keine)"
if html_paths:
read_instructions = "\n".join(f"- Lies: {p}" for p in html_paths)
guides_section = f"""SCHRITT 1 — Guides lesen:
{read_instructions}
"""
else:
guides_section = ""
return f"""Schlage fundamentale Bausteine (Kernkonzepte) zum Thema "{topic}" vor.
{guides_section}Bereits vorhandene Bausteine (NICHT erneut vorschlagen):
{existing_list}
FORMAT-SPEZIFIKATION:
{spec}
REFERENZ-BEISPIEL:
{reference}
Schlage bis zu 20 Bausteine vor. Antworte AUSSCHLIESSLICH mit einem JSON-Array. Jedes Element hat:
- "title"
- "description"
- "purpose"
- "examples": Array mit 4 Objekten {{"label": "...", "code": "..."}}
Orientiere dich an der Spezifikation und Referenz. NUR das JSON-Array, kein weiterer Text.
"""
def _build_baustein_detail_prompt(topic: str, title: str) -> str:
spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8")
reference = (TEMPLATES_DIR / "Referenz" / "Baustein.md").read_text(encoding="utf-8")
return f"""Generiere Details für den Baustein "{title}" im Kontext des Themas "{topic}".
FORMAT-SPEZIFIKATION:
{spec}
REFERENZ-BEISPIEL:
{reference}
Antworte AUSSCHLIESSLICH mit einem JSON-Objekt mit den Feldern "description", "purpose", "examples".
"examples" ist ein Array mit 4 Objekten {{"label": "...", "code": "..."}}.
Orientiere dich an der Spezifikation und Referenz. Kein weiterer Text, nur das JSON.
"""
async def generate_suggestions(topic: str, html_paths: list[Path]) -> None:
_suggestions_generating.add(topic)
try:
existing = await list_bausteine(topic)
existing_titles = [b["title"] for b in existing]
await delete_pending_suggestions(topic)
prompt = _build_suggestions_prompt(topic, html_paths, existing_titles)
tools = "Read" if html_paths else None
returncode, stdout, stderr = await _run_claude("suggestions-" + topic, prompt, 180, tools=tools)
if returncode != 0:
return
items = _parse_json(stdout)
if not isinstance(items, list):
return
now = datetime.now(timezone.utc).isoformat()
suggestions = []
for item in items[:20]:
suggestions.append({
"id": str(uuid.uuid4()),
"topic": topic,
"title": item.get("title", ""),
"description": item.get("description", ""),
"purpose": item.get("purpose", ""),
"example": json.dumps(item.get("examples", []), ensure_ascii=False),
"status": "pending",
"created_at": now,
})
if suggestions:
await create_suggestions(suggestions)
except Exception:
pass
finally:
_suggestions_generating.discard(topic)
async def generate_baustein_detail(baustein_id: str, topic: str, title: str) -> None:
try:
prompt = _build_baustein_detail_prompt(topic, title)
returncode, stdout, stderr = await _run_claude("baustein-" + baustein_id, prompt, 60, tools=None)
if returncode != 0:
return
data = _parse_json(stdout)
if not isinstance(data, dict):
return
now = datetime.now(timezone.utc).isoformat()
await update_baustein(
baustein_id,
description=data.get("description", ""),
purpose=data.get("purpose", ""),
example=json.dumps(data.get("examples", []), ensure_ascii=False),
updated_at=now,
)
except Exception:
pass