Files
guides/backend/generator.py
2026-05-29 17:58:43 +02:00

560 lines
19 KiB
Python

import asyncio
import json
import re
import shutil
import subprocess
import tempfile
import uuid
from datetime import datetime, timezone
from pathlib import Path
from config import (
AGENT_TIMEOUT,
CLAUDE_CLI,
TEMPLATES_DIR,
MAX_CONCURRENT_GENERATIONS,
MODEL_GUIDE,
MODEL_BAUSTEIN_GEN,
MODEL_BAUSTEIN_REWORK,
STORAGE_DIR,
)
from database import (
update_guide,
create_baustein,
create_suggestions,
delete_pending_suggestions,
list_bausteine,
update_baustein,
update_baustein_sort_orders,
)
from paths import final_paths, temp_paths
_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS)
_active_processes: dict[str, asyncio.subprocess.Process] = {}
_cancelled: set[str] = set()
async def cancel_guide(guide_id: str) -> bool:
_cancelled.add(guide_id)
process = _active_processes.get(guide_id)
if process and process.returncode is None:
process.kill()
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen", updated_at=now)
return True
async def _set_progress(guide_id: str, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, progress=progress, updated_at=now)
async def _run_claude(guide_id: str, prompt: str, timeout: int, tools: str | None = "Write,Bash,Read,WebSearch,WebFetch", model: str | None = None) -> tuple[int, str, str]:
cmd = [CLAUDE_CLI, "-p"]
if model:
cmd += ["--model", model]
if tools:
cmd += ["--allowedTools", tools]
cmd += ["--dangerously-skip-permissions"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_active_processes[guide_id] = process
try:
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(input=prompt.encode("utf-8")),
timeout=timeout,
)
except asyncio.TimeoutError:
process.kill()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
pass
raise
return process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
finally:
_active_processes.pop(guide_id, None)
async def _render_pdf(html_path: Path, pdf_path: Path) -> tuple[bool, str]:
proc = await asyncio.create_subprocess_exec(
"weasyprint", str(html_path), str(pdf_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
if proc.returncode != 0:
return False, stderr.decode("utf-8", errors="replace")[:1000]
return True, ""
def _build_generator_prompt(topic: str, format_name: str, html_path: Path, instructions: str = "") -> str:
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
reference = (TEMPLATES_DIR / "Referenz" / f"{format_name}.md").read_text(encoding="utf-8")
extra = f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
return f"""Erstelle einen Lern-Guide zum Thema "{topic}" im Format "{format_name}".
Recherchiere zuerst die aktuelle Version und aktuelle Fakten zu "{topic}" per Websuche, damit Versionsnummern und Angaben stimmen.
Schreibe die HTML-Datei nach: {html_path}
Schreibe NUR die HTML-Datei. Führe KEIN weasyprint aus, erzeuge KEINE PDF. Das übernimmt ein anderer Prozess.
FORMAT-SPEZIFIKATION:
{spec}
REFERENZ-IMPLEMENTIERUNG (Stil-Vorlage, adaptiere für "{topic}"):
{reference}
{extra}"""
def _build_rework_prompt(topic: str, format_name: str, html_path: Path, instructions: str) -> str:
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
return f"""Überarbeite die bestehende HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}".
Lies zuerst die aktuelle HTML-Datei mit dem Read-Tool.
ANWEISUNGEN VOM NUTZER:
{instructions}
FORMAT-SPEZIFIKATION (muss weiterhin eingehalten werden):
{spec}
Schreibe die überarbeitete Version in dieselbe Datei: {html_path}
Führe KEIN weasyprint aus, erzeuge KEINE PDF.
"""
def _build_fix_prompt(topic: str, format_name: str, html_path: Path, feedback: str) -> str:
return f"""Die HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}" hat Probleme.
FEEDBACK VOM PRÜFER:
{feedback}
Behebe die Probleme in der HTML-Datei {html_path}. Schreibe die korrigierte Version in dieselbe Datei.
Führe KEIN weasyprint aus, erzeuge KEINE PDF.
"""
def _build_content_review_prompt(topic: str, format_name: str, html_path: Path) -> str:
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
return f"""Prüfe den Inhalt der HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}".
SCHRITT 1 — HTML-Datei lesen:
Öffne die Datei {html_path} mit dem Read-Tool.
SCHRITT 2 — Fakten per Websuche prüfen:
Recherchiere mit WebSearch, ob Versionsnummern, Jahreszahlen und zentrale Fakten zu "{topic}" aktuell und korrekt sind.
SCHRITT 3 — Vollständigkeit prüfen anhand dieser Spezifikation:
{spec}
Prüfkriterien:
- Sind alle Pflicht-Kapitel/Sektionen vorhanden?
- Stimmen Versionsnummern und Fakten?
- Ist der Inhalt fachlich korrekt und aktuell?
- Entspricht der Schwierigkeitsgrad dem Format?
- Sind Pflicht-Elemente vorhanden (Cover, TOC, Recall-Boxen, Callouts, Code-Beispiele)?
SCHRITT 4 — Antworte mit GENAU EINEM der folgenden Formate:
Bei Bestehen:
PASS
Bei Nicht-Bestehen:
FAIL
- Problem 1
- Problem 2
- ...
"""
async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "") -> None:
async with _semaphore:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="generating", progress="Recherche…", updated_at=now)
html_path, pdf_path = final_paths(topic, format_name)
try:
if guide_id in _cancelled:
return
current_step = "Generierung"
current_timeout = AGENT_TIMEOUT
# Step 1: Generator-Agent erstellt HTML
await _set_progress(guide_id, "Generiere HTML…")
gen_prompt = _build_generator_prompt(topic, format_name, html_path, instructions)
returncode, stdout, stderr = await _run_claude(guide_id, gen_prompt, AGENT_TIMEOUT, model=MODEL_GUIDE)
if guide_id in _cancelled:
return
if returncode != 0:
await _fail(guide_id, f"Generator-Fehler: {stderr[:1000]}")
return
if not html_path.exists():
await _fail(guide_id, "HTML-Datei wurde nicht erstellt")
return
# Step 2: Inhalts-Review (1x, kein Loop)
if guide_id in _cancelled:
return
await _set_progress(guide_id, "Prüfe Inhalt…")
current_step = "Inhalts-Review"
current_timeout = AGENT_TIMEOUT
content_prompt = _build_content_review_prompt(topic, format_name, html_path)
returncode, review_out, review_err = await _run_claude(guide_id, content_prompt, AGENT_TIMEOUT, model=MODEL_GUIDE)
if returncode != 0:
await _fail(guide_id, f"Inhalts-Review-Fehler: {review_err[:1000]}")
return
review_text = review_out.strip()
if not review_text.startswith("PASS"):
if guide_id in _cancelled:
return
feedback = review_text.replace("FAIL", "").strip()
await _set_progress(guide_id, "Korrigiere Inhalt…")
current_step = "Inhalts-Korrektur"
current_timeout = AGENT_TIMEOUT
fix_prompt = _build_fix_prompt(topic, format_name, html_path, feedback)
returncode, _, fix_err = await _run_claude(guide_id, fix_prompt, AGENT_TIMEOUT, model=MODEL_GUIDE)
if returncode != 0:
await _fail(guide_id, f"Fix-Fehler: {fix_err[:1000]}")
return
# Step 3: PDF rendern
if guide_id in _cancelled:
return
await _set_progress(guide_id, "Rendere PDF…")
ok, err = await _render_pdf(html_path, pdf_path)
if not ok:
await _fail(guide_id, f"WeasyPrint-Fehler: {err}")
return
now = datetime.now(timezone.utc).isoformat()
await update_guide(
guide_id, status="done", progress=None, updated_at=now,
)
except asyncio.TimeoutError:
await _fail(guide_id, f"Timeout bei {current_step} nach {current_timeout}s")
except Exception as e:
await _fail(guide_id, str(e)[:2000])
finally:
_active_processes.pop(guide_id, None)
_cancelled.discard(guide_id)
async def rework_guide(guide_id: str, topic: str, format_name: str, instructions: str) -> None:
async with _semaphore:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="generating", progress="Überarbeite…", updated_at=now)
final_html, final_pdf = final_paths(topic, format_name)
tmp_html, tmp_pdf = temp_paths(guide_id)
try:
if guide_id in _cancelled:
return
if not final_html.exists():
await _fail(guide_id, "Original-HTML nicht gefunden")
return
shutil.copy2(final_html, tmp_html)
current_step = "Überarbeitung"
current_timeout = AGENT_TIMEOUT
rework_prompt = _build_rework_prompt(topic, format_name, tmp_html, instructions)
returncode, stdout, stderr = await _run_claude(guide_id, rework_prompt, AGENT_TIMEOUT, model=MODEL_GUIDE)
if guide_id in _cancelled:
return
if returncode != 0:
await _fail(guide_id, f"Rework-Fehler: {stderr[:1000]}")
return
if not tmp_html.exists():
await _fail(guide_id, "HTML-Datei wurde nicht erstellt")
return
await _set_progress(guide_id, "Rendere PDF…")
ok, err = await _render_pdf(tmp_html, tmp_pdf)
if not ok:
await _fail(guide_id, f"WeasyPrint-Fehler: {err}")
return
# Atomar: Temp → Final umbenennen
tmp_html.replace(final_html)
tmp_pdf.replace(final_pdf)
now = datetime.now(timezone.utc).isoformat()
await update_guide(
guide_id, status="done", progress=None, updated_at=now,
)
except asyncio.TimeoutError:
await _fail(guide_id, f"Timeout bei {current_step} nach {current_timeout}s")
except Exception as e:
await _fail(guide_id, str(e)[:2000])
finally:
_active_processes.pop(guide_id, None)
_cancelled.discard(guide_id)
tmp_html.unlink(missing_ok=True)
tmp_pdf.unlink(missing_ok=True)
async def _fail(guide_id: str, msg: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now)
# --- Bausteine ---
_suggestions_generating: set[str] = set()
_sorting: set[str] = set()
def is_suggestions_generating(topic: str) -> bool:
return topic in _suggestions_generating
def is_sorting(topic: str) -> bool:
return topic in _sorting
def _parse_json(text: str):
text = text.strip()
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
return json.loads(text)
def _build_suggestions_prompt(topic: str, html_paths: list[Path], existing_titles: list[str]) -> str:
spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8")
reference = (TEMPLATES_DIR / "Referenz" / "Baustein.md").read_text(encoding="utf-8")
existing_list = "\n".join(f"- {t}" for t in existing_titles) if existing_titles else "(keine)"
if html_paths:
read_instructions = "\n".join(f"- Lies: {p}" for p in html_paths)
guides_section = f"""SCHRITT 1 — Guides lesen:
{read_instructions}
"""
else:
guides_section = ""
return f"""Schlage fundamentale Bausteine (Kernkonzepte) zum Thema "{topic}" vor.
{guides_section}Bereits vorhandene Bausteine (NICHT erneut vorschlagen):
{existing_list}
FORMAT-SPEZIFIKATION:
{spec}
REFERENZ-BEISPIEL:
{reference}
Schlage 40 Bausteine vor. Antworte AUSSCHLIESSLICH mit einem JSON-Array. Jedes Element hat:
- "title"
- "description"
- "purpose"
- "examples": Array mit 1 Objekt {{"label": "...", "code": "..."}}
Orientiere dich an der Spezifikation und Referenz. NUR das JSON-Array, kein weiterer Text.
"""
def _build_baustein_detail_prompt(topic: str, title: str, instructions: str = "") -> str:
spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8")
reference = (TEMPLATES_DIR / "Referenz" / "Baustein.md").read_text(encoding="utf-8")
extra = f"\n\nZUSÄTZLICHE INFOS VOM NUTZER:\n{instructions}\n" if instructions else ""
return f"""Generiere Details für den Baustein "{title}" im Kontext des Themas "{topic}".
FORMAT-SPEZIFIKATION:
{spec}
REFERENZ-BEISPIEL:
{reference}
{extra}
Antworte AUSSCHLIESSLICH mit einem JSON-Objekt mit den Feldern "description", "purpose", "examples".
"examples" ist ein Array mit 1 Objekt {{"label": "...", "code": "..."}}.
Orientiere dich an der Spezifikation und Referenz. Kein weiterer Text, nur das JSON.
"""
async def generate_suggestions(topic: str, html_paths: list[Path]) -> None:
_suggestions_generating.add(topic)
try:
existing = await list_bausteine(topic)
existing_titles = [b["title"] for b in existing]
await delete_pending_suggestions(topic)
prompt = _build_suggestions_prompt(topic, html_paths, existing_titles)
tools = "Read" if html_paths else None
returncode, stdout, stderr = await _run_claude("suggestions-" + topic, prompt, 1800, tools=tools, model=MODEL_BAUSTEIN_GEN)
if returncode != 0:
return
items = _parse_json(stdout)
if not isinstance(items, list):
return
now = datetime.now(timezone.utc).isoformat()
suggestions = []
for item in items[:40]:
suggestions.append({
"id": str(uuid.uuid4()),
"topic": topic,
"title": item.get("title", ""),
"description": item.get("description", ""),
"purpose": item.get("purpose", ""),
"example": json.dumps(item.get("examples", []), ensure_ascii=False),
"status": "pending",
"created_at": now,
})
if suggestions:
await create_suggestions(suggestions)
except Exception:
pass
finally:
_suggestions_generating.discard(topic)
async def generate_baustein_detail(baustein_id: str, topic: str, title: str, instructions: str = "") -> None:
try:
prompt = _build_baustein_detail_prompt(topic, title, instructions)
returncode, stdout, stderr = await _run_claude("baustein-" + baustein_id, prompt, 60, tools=None, model=MODEL_BAUSTEIN_GEN)
if returncode != 0:
return
data = _parse_json(stdout)
if not isinstance(data, dict):
return
now = datetime.now(timezone.utc).isoformat()
await update_baustein(
baustein_id,
description=data.get("description", ""),
purpose=data.get("purpose", ""),
example=json.dumps(data.get("examples", []), ensure_ascii=False),
updated_at=now,
)
except Exception:
pass
async def rework_baustein(baustein_id: str, topic: str, title: str, current: dict, instructions: str) -> None:
try:
prompt = _build_baustein_rework_prompt(topic, title, current, instructions)
returncode, stdout, stderr = await _run_claude("baustein-" + baustein_id, prompt, 60, tools=None, model=MODEL_BAUSTEIN_REWORK)
if returncode != 0:
return
data = _parse_json(stdout)
if not isinstance(data, dict):
return
now = datetime.now(timezone.utc).isoformat()
await update_baustein(
baustein_id,
title=data.get("title", title),
description=data.get("description", ""),
purpose=data.get("purpose", ""),
example=json.dumps(data.get("examples", []), ensure_ascii=False),
updated_at=now,
)
except Exception:
pass
def _build_baustein_rework_prompt(topic: str, title: str, current: dict, instructions: str) -> str:
spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8")
current_json = json.dumps({
"title": title,
"description": current.get("description", ""),
"purpose": current.get("purpose", ""),
"examples": current.get("examples", []),
}, ensure_ascii=False, indent=2)
return f"""Überarbeite den Baustein "{title}" zum Thema "{topic}" gemäß den Anweisungen.
AKTUELLER STAND:
{current_json}
ANWEISUNGEN VOM NUTZER:
{instructions}
FORMAT-SPEZIFIKATION:
{spec}
Antworte AUSSCHLIESSLICH mit einem JSON-Objekt mit den Feldern "title", "description", "purpose", "examples".
"examples" ist ein Array mit Objekten {{"label": "...", "code": "..."}}.
Orientiere dich an der Spezifikation. Kein weiterer Text, nur das JSON.
"""
def _build_sort_prompt(topic: str, bausteine: list[dict], instructions: str) -> str:
items = "\n".join(
f"- id={b['id']} | {b['title']} | {b['description']} | {b['purpose']}"
for b in bausteine
)
if instructions:
criterion = f"Sortiere die folgenden Bausteine zum Thema \"{topic}\" STRIKT nach diesem Kriterium:\n\n{instructions}"
else:
criterion = f"Sortiere die folgenden Bausteine zum Thema \"{topic}\" von Anfaenger zu Experte (erstes = einfachster, letztes = komplexester)."
return f"""{criterion}
BAUSTEINE:
{items}
Antworte AUSSCHLIESSLICH mit einem JSON-Array der IDs in der gewuenschten Reihenfolge.
Beispiel: [\"id1\", \"id2\", \"id3\"]
Kein weiterer Text, nur das JSON-Array.
"""
async def sort_bausteine(topic: str, bausteine: list[dict], instructions: str = "") -> None:
_sorting.add(topic)
try:
prompt = _build_sort_prompt(topic, bausteine, instructions)
returncode, stdout, stderr = await _run_claude("sort-" + topic, prompt, 300, tools=None, model=MODEL_BAUSTEIN_GEN)
if returncode != 0:
return
ids = _parse_json(stdout)
if not isinstance(ids, list):
return
order_map = {bid: i for i, bid in enumerate(ids) if isinstance(bid, str)}
if order_map:
await update_baustein_sort_orders(topic, order_map)
except Exception as e:
print(f"[sort] topic={topic} Exception: {type(e).__name__}: {e}")
finally:
_sorting.discard(topic)