712 lines
28 KiB
Python
712 lines
28 KiB
Python
import asyncio
|
||
import json
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import tempfile
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from agents import run_agent, kill_process
|
||
from config import (
|
||
AGENT_TIMEOUT,
|
||
DEFAULT_PROVIDER,
|
||
TEMPLATES_DIR,
|
||
MAX_CONCURRENT_GENERATIONS,
|
||
STORAGE_DIR,
|
||
)
|
||
from database import (
|
||
update_guide,
|
||
create_baustein,
|
||
create_suggestions,
|
||
delete_pending_suggestions,
|
||
list_bausteine,
|
||
update_baustein,
|
||
update_baustein_sort_orders,
|
||
)
|
||
from paths import final_paths, temp_paths, project_dir, project_cache_path
|
||
|
||
_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS)
|
||
_cancelled: set[str] = set()
|
||
|
||
|
||
async def cancel_guide(guide_id: str) -> bool:
|
||
_cancelled.add(guide_id)
|
||
kill_process(guide_id)
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen", updated_at=now)
|
||
return True
|
||
|
||
|
||
async def _set_progress(guide_id: str, progress: str) -> None:
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, progress=progress, updated_at=now)
|
||
|
||
|
||
async def _render_pdf(html_path: Path, pdf_path: Path) -> tuple[bool, str]:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"weasyprint", str(html_path), str(pdf_path),
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
_, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||
if proc.returncode != 0:
|
||
return False, stderr.decode("utf-8", errors="replace")[:1000]
|
||
return True, ""
|
||
|
||
|
||
def _build_project_index_prompt(name: str, cache_path: Path, has_cache: bool) -> str:
|
||
src = project_dir(name)
|
||
if has_cache:
|
||
return f"""Es existiert bereits eine verdichtete Wissensdatei zum Projekt "{name}" unter {cache_path}.
|
||
|
||
Prüfe sie gegen das echte Projekt unter {src}.
|
||
|
||
SCHRITT 1: Lies die bestehende Wissensdatei {cache_path}.
|
||
SCHRITT 2: Verschaffe dir mit Bash (ls/find) und Read einen vollständigen Überblick über {src}. Lies README, Doku-Ordner und den relevanten Quellcode.
|
||
SCHRITT 3: Ergänze fehlende oder veraltete wichtige Informationen. Ein früherer Lauf war evtl. schlampig und hat Wichtiges ausgelassen.
|
||
SCHRITT 4: Schreibe die vollständige, korrigierte Wissensdatei zurück nach {cache_path}.
|
||
|
||
Die Datei muss als alleinige Faktenbasis für einen Lern-Guide ausreichen. Erfasse: Zweck, Architektur, Abläufe, wichtige Dateien, Konfiguration, Befehle, Datenstrukturen, Besonderheiten. Schreibe NUR diese eine Datei."""
|
||
|
||
return f"""Lies das Projekt "{name}" vollständig ein und erstelle daraus eine verdichtete Wissensdatei.
|
||
|
||
SCHRITT 1: Verschaffe dir mit Bash (ls/find) einen Überblick über {src}.
|
||
SCHRITT 2: Lies README, Doku-Ordner und den relevanten Quellcode mit dem Read-Tool.
|
||
SCHRITT 3: Schreibe eine vollständige Wissensdatei nach {cache_path}.
|
||
|
||
Die Datei muss als alleinige Faktenbasis für einen Lern-Guide ausreichen. Erfasse: Zweck, Architektur, Abläufe, wichtige Dateien, Konfiguration, Befehle, Datenstrukturen, Besonderheiten. Lass nichts Wichtiges aus. Schreibe NUR diese eine Datei."""
|
||
|
||
|
||
def _research_line(topic: str, project_content: str | None) -> str:
|
||
if project_content:
|
||
return (
|
||
f'Die folgenden PROJEKT-INHALTE sind die Quelle der Wahrheit für "{topic}". '
|
||
"Nutze sie als primäre Faktenbasis. Recherchiere per Websuche nur ergänzend, "
|
||
"um fehlende oder sich ändernde Fakten (z. B. aktuelle Versionsnummern externer Tools) zu prüfen.\n\n"
|
||
f"PROJEKT-INHALTE (Quelle der Wahrheit):\n{project_content}"
|
||
)
|
||
return f'Recherchiere zuerst die aktuelle Version und aktuelle Fakten zu "{topic}" per Websuche, damit Versionsnummern und Angaben stimmen.'
|
||
|
||
|
||
def _build_inventory_prompt(topic: str, format_name: str, inventory_path: Path, instructions: str = "", project_content: str | None = None) -> str:
|
||
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
|
||
extra = f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
|
||
|
||
return f"""Erstelle das Themeninventar für einen Lern-Guide zum Thema "{topic}" im Format "{format_name}".
|
||
|
||
{_research_line(topic, project_content)}
|
||
|
||
Das Inventar ist eine durchnummerierte Liste der Bausteine (Konzepte/Funktionen/Features), die der Guide abdecken muss — bereits in sinnvolle Teile/Sektionen gruppiert. Die ABDECKUNGS-Regel der Format-Spezifikation bestimmt die Auswahl; wie viele Punkte das sind, hängt vom Thema ab. Ein anderer Agent schreibt den Guide später strikt nach diesem Inventar — jeder Punkt wird eine Sektion.
|
||
|
||
Schreibe NUR die Inventar-Datei nach: {inventory_path}
|
||
Kein HTML, kein weasyprint.
|
||
|
||
FORMAT-SPEZIFIKATION (relevant sind ABDECKUNG und Seitenrahmen):
|
||
{spec}
|
||
{extra}"""
|
||
|
||
|
||
def _build_generator_prompt(topic: str, format_name: str, html_path: Path, inventory: str, instructions: str = "", project_content: str | None = None) -> str:
|
||
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
|
||
reference = (TEMPLATES_DIR / "Referenz" / f"{format_name}.md").read_text(encoding="utf-8")
|
||
|
||
extra = f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
|
||
|
||
return f"""Erstelle einen Lern-Guide zum Thema "{topic}" im Format "{format_name}".
|
||
|
||
{_research_line(topic, project_content)}
|
||
|
||
Schreibe die HTML-Datei nach: {html_path}
|
||
|
||
Schreibe NUR die HTML-Datei. Führe KEIN weasyprint aus, erzeuge KEINE PDF. Das übernimmt ein anderer Prozess.
|
||
|
||
THEMENINVENTAR (verbindlich — von einem Recherche-Agenten erstellt):
|
||
Jeder Inventar-Punkt muss im Guide als eigene Sektion umgesetzt werden. Nicht zusammenfassen, nicht kürzen, nichts weglassen. Der Umfang des Guides folgt aus diesem Inventar innerhalb des Seitenrahmens der Spezifikation.
|
||
{inventory}
|
||
|
||
FORMAT-SPEZIFIKATION:
|
||
{spec}
|
||
|
||
REFERENZ-IMPLEMENTIERUNG (NUR Stil-Vorlage: Bausteine, CSS, Tonalität. Umfang und Struktur kommen aus dem INVENTAR, nicht aus der Referenz):
|
||
{reference}
|
||
{extra}"""
|
||
|
||
|
||
def _build_rework_prompt(topic: str, format_name: str, html_path: Path, instructions: str) -> str:
|
||
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
|
||
return f"""Überarbeite die bestehende HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}".
|
||
|
||
Lies zuerst die aktuelle HTML-Datei mit dem Read-Tool.
|
||
|
||
ANWEISUNGEN VOM NUTZER:
|
||
{instructions}
|
||
|
||
FORMAT-SPEZIFIKATION (muss weiterhin eingehalten werden):
|
||
{spec}
|
||
|
||
Schreibe die überarbeitete Version in dieselbe Datei: {html_path}
|
||
Führe KEIN weasyprint aus, erzeuge KEINE PDF.
|
||
"""
|
||
|
||
|
||
def _build_fix_prompt(topic: str, format_name: str, html_path: Path, feedback: str) -> str:
|
||
return f"""Die HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}" hat Probleme.
|
||
|
||
FEEDBACK VOM PRÜFER:
|
||
{feedback}
|
||
|
||
Behebe die Probleme in der HTML-Datei {html_path}. Schreibe die korrigierte Version in dieselbe Datei.
|
||
Führe KEIN weasyprint aus, erzeuge KEINE PDF.
|
||
"""
|
||
|
||
|
||
def _build_content_review_prompt(topic: str, format_name: str, html_path: Path, project_content: str | None = None) -> str:
|
||
spec = (TEMPLATES_DIR / "Format" / f"{format_name}.md").read_text(encoding="utf-8")
|
||
|
||
if project_content:
|
||
check_step = (
|
||
"SCHRITT 2 — Fakten prüfen:\n"
|
||
f'Vergleiche den Inhalt mit den folgenden PROJEKT-INHALTEN (Quelle der Wahrheit) für "{topic}". '
|
||
"Stimmen die Projekt-Fakten? Fehlt Wichtiges aus dem Projekt? "
|
||
"Externe/aktuelle Fakten (Versionsnummern fremder Tools) ergänzend per WebSearch prüfen.\n\n"
|
||
f"PROJEKT-INHALTE:\n{project_content}"
|
||
)
|
||
else:
|
||
check_step = (
|
||
"SCHRITT 2 — Fakten per Websuche prüfen:\n"
|
||
f'Recherchiere mit WebSearch, ob Versionsnummern, Jahreszahlen und zentrale Fakten zu "{topic}" aktuell und korrekt sind.'
|
||
)
|
||
|
||
return f"""Prüfe den Inhalt der HTML-Datei {html_path} für den "{format_name}" zum Thema "{topic}".
|
||
|
||
SCHRITT 1 — HTML-Datei lesen:
|
||
Öffne die Datei {html_path} mit dem Read-Tool.
|
||
|
||
{check_step}
|
||
|
||
SCHRITT 3 — Vollständigkeit prüfen anhand dieser Spezifikation:
|
||
{spec}
|
||
|
||
Prüfkriterien:
|
||
- Sind alle Pflicht-Kapitel/Sektionen vorhanden?
|
||
- Stimmen Versionsnummern und Fakten?
|
||
- Ist der Inhalt fachlich korrekt und aktuell?
|
||
- Entspricht der Schwierigkeitsgrad dem Format?
|
||
- Sind Pflicht-Elemente vorhanden (Cover, TOC, Recall-Boxen, Callouts, Code-Beispiele)?
|
||
|
||
SCHRITT 4 — Antworte mit GENAU EINEM der folgenden Formate:
|
||
|
||
Bei Bestehen:
|
||
PASS
|
||
|
||
Bei Nicht-Bestehen:
|
||
FAIL
|
||
- Problem 1
|
||
- Problem 2
|
||
- ...
|
||
"""
|
||
|
||
|
||
async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", reindex: bool = False, provider: str = DEFAULT_PROVIDER) -> None:
|
||
async with _semaphore:
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, status="generating", progress="Recherche…", updated_at=now)
|
||
|
||
html_path, pdf_path = final_paths(topic, format_name)
|
||
|
||
try:
|
||
if guide_id in _cancelled:
|
||
return
|
||
|
||
current_step = "Generierung"
|
||
current_timeout = AGENT_TIMEOUT
|
||
|
||
# Step 0: Projekt einlesen (nur wenn topic ein Projekt ist)
|
||
project_content: str | None = None
|
||
if project_dir(topic).is_dir():
|
||
cache_path = project_cache_path(topic)
|
||
if reindex or not cache_path.exists():
|
||
await _set_progress(guide_id, "Lese Projekt…")
|
||
current_step = "Projekt-Einlesen"
|
||
index_prompt = _build_project_index_prompt(topic, cache_path, cache_path.exists())
|
||
returncode, idx_out, idx_err = await run_agent(
|
||
guide_id, index_prompt, AGENT_TIMEOUT,
|
||
provider=provider, role="fast", capabilities="files",
|
||
)
|
||
if guide_id in _cancelled:
|
||
return
|
||
if returncode != 0:
|
||
await _fail(guide_id, _claude_error("Projekt-Einlese-Fehler", returncode, idx_out, idx_err))
|
||
return
|
||
if not cache_path.exists():
|
||
await _fail(guide_id, "Projekt-Wissensdatei wurde nicht erstellt")
|
||
return
|
||
project_content = cache_path.read_text(encoding="utf-8")
|
||
|
||
# Step 1: Inventar-Agent recherchiert die Bausteine des Themas
|
||
inventory_path = html_path.with_suffix(".inventar.md")
|
||
await _set_progress(guide_id, "Erstelle Themeninventar…")
|
||
current_step = "Inventar"
|
||
inv_prompt = _build_inventory_prompt(topic, format_name, inventory_path, instructions, project_content)
|
||
returncode, inv_out, inv_err = await run_agent(guide_id, inv_prompt, AGENT_TIMEOUT, provider=provider, role="fast", capabilities="full")
|
||
|
||
if guide_id in _cancelled:
|
||
return
|
||
if returncode != 0:
|
||
await _fail(guide_id, _claude_error("Inventar-Fehler", returncode, inv_out, inv_err))
|
||
return
|
||
if not inventory_path.exists():
|
||
await _fail(guide_id, "Themeninventar wurde nicht erstellt")
|
||
return
|
||
inventory = inventory_path.read_text(encoding="utf-8")
|
||
|
||
# Step 2: Generator-Agent erstellt HTML nach Inventar
|
||
await _set_progress(guide_id, "Generiere HTML…")
|
||
current_step = "Generierung"
|
||
gen_prompt = _build_generator_prompt(topic, format_name, html_path, inventory, instructions, project_content)
|
||
returncode, stdout, stderr = await run_agent(guide_id, gen_prompt, AGENT_TIMEOUT, provider=provider, role="guide", capabilities="full")
|
||
|
||
if guide_id in _cancelled:
|
||
return
|
||
if returncode != 0:
|
||
await _fail(guide_id, _claude_error("Generator-Fehler", returncode, stdout, stderr))
|
||
return
|
||
|
||
if not html_path.exists():
|
||
await _fail(guide_id, "HTML-Datei wurde nicht erstellt")
|
||
return
|
||
|
||
# Step 2: Inhalts-Review (1x, kein Loop)
|
||
if guide_id in _cancelled:
|
||
return
|
||
|
||
await _set_progress(guide_id, "Prüfe Inhalt…")
|
||
current_step = "Inhalts-Review"
|
||
current_timeout = AGENT_TIMEOUT
|
||
content_prompt = _build_content_review_prompt(topic, format_name, html_path, project_content)
|
||
returncode, review_out, review_err = await run_agent(guide_id, content_prompt, AGENT_TIMEOUT, provider=provider, role="guide", capabilities="full")
|
||
|
||
if returncode != 0:
|
||
await _fail(guide_id, _claude_error("Inhalts-Review-Fehler", returncode, review_out, review_err))
|
||
return
|
||
|
||
review_text = review_out.strip()
|
||
if not review_text.startswith("PASS"):
|
||
if guide_id in _cancelled:
|
||
return
|
||
|
||
feedback = review_text.replace("FAIL", "").strip()
|
||
await _set_progress(guide_id, "Korrigiere Inhalt…")
|
||
current_step = "Inhalts-Korrektur"
|
||
current_timeout = AGENT_TIMEOUT
|
||
fix_prompt = _build_fix_prompt(topic, format_name, html_path, feedback)
|
||
returncode, fix_out, fix_err = await run_agent(guide_id, fix_prompt, AGENT_TIMEOUT, provider=provider, role="guide", capabilities="full")
|
||
|
||
if returncode != 0:
|
||
await _fail(guide_id, _claude_error("Fix-Fehler", returncode, fix_out, fix_err))
|
||
return
|
||
|
||
# Step 3: PDF rendern
|
||
if guide_id in _cancelled:
|
||
return
|
||
|
||
await _set_progress(guide_id, "Rendere PDF…")
|
||
ok, err = await _render_pdf(html_path, pdf_path)
|
||
if not ok:
|
||
await _fail(guide_id, f"WeasyPrint-Fehler: {err}")
|
||
return
|
||
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(
|
||
guide_id, status="done", progress=None, updated_at=now,
|
||
)
|
||
|
||
except asyncio.TimeoutError:
|
||
await _fail(guide_id, f"Timeout bei {current_step} nach {current_timeout}s")
|
||
except Exception as e:
|
||
await _fail(guide_id, str(e)[:2000])
|
||
finally:
|
||
_cancelled.discard(guide_id)
|
||
|
||
|
||
async def rework_guide(guide_id: str, topic: str, format_name: str, instructions: str, provider: str = DEFAULT_PROVIDER) -> None:
|
||
async with _semaphore:
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, status="generating", progress="Überarbeite…", updated_at=now)
|
||
|
||
final_html, final_pdf = final_paths(topic, format_name)
|
||
tmp_html, tmp_pdf = temp_paths(guide_id)
|
||
|
||
try:
|
||
if guide_id in _cancelled:
|
||
return
|
||
|
||
if not final_html.exists():
|
||
await _fail(guide_id, "Original-HTML nicht gefunden")
|
||
return
|
||
|
||
shutil.copy2(final_html, tmp_html)
|
||
|
||
current_step = "Überarbeitung"
|
||
current_timeout = AGENT_TIMEOUT
|
||
|
||
rework_prompt = _build_rework_prompt(topic, format_name, tmp_html, instructions)
|
||
returncode, stdout, stderr = await run_agent(guide_id, rework_prompt, AGENT_TIMEOUT, provider=provider, role="guide", capabilities="full")
|
||
|
||
if guide_id in _cancelled:
|
||
return
|
||
if returncode != 0:
|
||
await _fail(guide_id, _claude_error("Rework-Fehler", returncode, stdout, stderr))
|
||
return
|
||
|
||
if not tmp_html.exists():
|
||
await _fail(guide_id, "HTML-Datei wurde nicht erstellt")
|
||
return
|
||
|
||
await _set_progress(guide_id, "Rendere PDF…")
|
||
ok, err = await _render_pdf(tmp_html, tmp_pdf)
|
||
if not ok:
|
||
await _fail(guide_id, f"WeasyPrint-Fehler: {err}")
|
||
return
|
||
|
||
# Atomar: Temp → Final umbenennen
|
||
tmp_html.replace(final_html)
|
||
tmp_pdf.replace(final_pdf)
|
||
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(
|
||
guide_id, status="done", progress=None, updated_at=now,
|
||
)
|
||
|
||
except asyncio.TimeoutError:
|
||
await _fail(guide_id, f"Timeout bei {current_step} nach {current_timeout}s")
|
||
except Exception as e:
|
||
await _fail(guide_id, str(e)[:2000])
|
||
finally:
|
||
_cancelled.discard(guide_id)
|
||
tmp_html.unlink(missing_ok=True)
|
||
tmp_pdf.unlink(missing_ok=True)
|
||
|
||
|
||
def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str:
|
||
stderr = (stderr or "").strip()
|
||
if stderr:
|
||
return f"{label}: {stderr[:1000]}"
|
||
tail = (stdout or "").strip()[-500:]
|
||
if tail:
|
||
return f"{label} (exit {returncode}, stderr leer): …{tail}"
|
||
return f"{label} (exit {returncode}, ohne Ausgabe)"
|
||
|
||
|
||
async def _fail(guide_id: str, msg: str) -> None:
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now)
|
||
|
||
|
||
# --- Bausteine ---
|
||
|
||
_suggestions_generating: set[str] = set()
|
||
_sorting: set[str] = set()
|
||
|
||
|
||
def is_suggestions_generating(topic: str) -> bool:
|
||
return topic in _suggestions_generating
|
||
|
||
|
||
def is_sorting(topic: str) -> bool:
|
||
return topic in _sorting
|
||
|
||
|
||
def _parse_json(text: str):
|
||
text = text.strip()
|
||
text = re.sub(r"^```(?:json)?\s*", "", text)
|
||
text = re.sub(r"\s*```$", "", text)
|
||
return json.loads(text)
|
||
|
||
|
||
def _build_suggestions_prompt(topic: str, html_paths: list[Path], existing_titles: list[str]) -> str:
|
||
spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8")
|
||
reference = (TEMPLATES_DIR / "Referenz" / "Baustein.md").read_text(encoding="utf-8")
|
||
existing_list = "\n".join(f"- {t}" for t in existing_titles) if existing_titles else "(keine)"
|
||
|
||
if html_paths:
|
||
read_instructions = "\n".join(f"- Lies: {p}" for p in html_paths)
|
||
guides_section = f"""SCHRITT 1 — Guides lesen:
|
||
{read_instructions}
|
||
|
||
"""
|
||
else:
|
||
guides_section = ""
|
||
|
||
return f"""Schlage fundamentale Bausteine (Kernkonzepte) zum Thema "{topic}" vor.
|
||
|
||
{guides_section}Bereits vorhandene Bausteine (NICHT erneut vorschlagen):
|
||
{existing_list}
|
||
|
||
FORMAT-SPEZIFIKATION:
|
||
{spec}
|
||
|
||
REFERENZ-BEISPIEL:
|
||
{reference}
|
||
|
||
Schlage 40 Bausteine vor. Antworte AUSSCHLIESSLICH mit einem JSON-Array. Jedes Element hat:
|
||
- "title"
|
||
- "description"
|
||
- "purpose"
|
||
- "examples": Array mit 1 Objekt {{"label": "...", "code": "..."}}
|
||
|
||
Orientiere dich an der Spezifikation und Referenz. NUR das JSON-Array, kein weiterer Text.
|
||
"""
|
||
|
||
|
||
def _build_baustein_detail_prompt(topic: str, title: str, instructions: str = "") -> str:
|
||
spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8")
|
||
reference = (TEMPLATES_DIR / "Referenz" / "Baustein.md").read_text(encoding="utf-8")
|
||
extra = f"\n\nZUSÄTZLICHE INFOS VOM NUTZER:\n{instructions}\n" if instructions else ""
|
||
|
||
return f"""Generiere Details für den Baustein "{title}" im Kontext des Themas "{topic}".
|
||
|
||
FORMAT-SPEZIFIKATION:
|
||
{spec}
|
||
|
||
REFERENZ-BEISPIEL:
|
||
{reference}
|
||
{extra}
|
||
Antworte AUSSCHLIESSLICH mit einem JSON-Objekt mit den Feldern "description", "purpose", "examples".
|
||
"examples" ist ein Array mit 1 Objekt {{"label": "...", "code": "..."}}.
|
||
Orientiere dich an der Spezifikation und Referenz. Kein weiterer Text, nur das JSON.
|
||
"""
|
||
|
||
|
||
async def generate_suggestions(topic: str, html_paths: list[Path], provider: str = DEFAULT_PROVIDER) -> None:
|
||
_suggestions_generating.add(topic)
|
||
try:
|
||
existing = await list_bausteine(topic)
|
||
existing_titles = [b["title"] for b in existing]
|
||
|
||
await delete_pending_suggestions(topic)
|
||
|
||
prompt = _build_suggestions_prompt(topic, html_paths, existing_titles)
|
||
capabilities = "read" if html_paths else "none"
|
||
returncode, stdout, stderr = await run_agent("suggestions-" + topic, prompt, 1800, provider=provider, role="fast", capabilities=capabilities)
|
||
|
||
if returncode != 0:
|
||
return
|
||
|
||
items = _parse_json(stdout)
|
||
if not isinstance(items, list):
|
||
return
|
||
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
suggestions = []
|
||
for item in items[:40]:
|
||
suggestions.append({
|
||
"id": str(uuid.uuid4()),
|
||
"topic": topic,
|
||
"title": item.get("title", ""),
|
||
"description": item.get("description", ""),
|
||
"purpose": item.get("purpose", ""),
|
||
"example": json.dumps(item.get("examples", []), ensure_ascii=False),
|
||
"status": "pending",
|
||
"created_at": now,
|
||
})
|
||
if suggestions:
|
||
await create_suggestions(suggestions)
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
_suggestions_generating.discard(topic)
|
||
|
||
|
||
async def generate_baustein_detail(baustein_id: str, topic: str, title: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
|
||
try:
|
||
prompt = _build_baustein_detail_prompt(topic, title, instructions)
|
||
returncode, stdout, stderr = await run_agent("baustein-" + baustein_id, prompt, 180, provider=provider, role="fast", capabilities="none")
|
||
|
||
if returncode != 0:
|
||
return
|
||
|
||
data = _parse_json(stdout)
|
||
if not isinstance(data, dict):
|
||
return
|
||
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_baustein(
|
||
baustein_id,
|
||
description=data.get("description", ""),
|
||
purpose=data.get("purpose", ""),
|
||
example=json.dumps(data.get("examples", []), ensure_ascii=False),
|
||
updated_at=now,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
async def rework_baustein(baustein_id: str, topic: str, title: str, current: dict, instructions: str, provider: str = DEFAULT_PROVIDER) -> None:
|
||
try:
|
||
prompt = _build_baustein_rework_prompt(topic, title, current, instructions)
|
||
returncode, stdout, stderr = await run_agent("baustein-" + baustein_id, prompt, 180, provider=provider, role="fast", capabilities="none")
|
||
|
||
if returncode != 0:
|
||
return
|
||
|
||
data = _parse_json(stdout)
|
||
if not isinstance(data, dict):
|
||
return
|
||
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_baustein(
|
||
baustein_id,
|
||
title=data.get("title", title),
|
||
description=data.get("description", ""),
|
||
purpose=data.get("purpose", ""),
|
||
example=json.dumps(data.get("examples", []), ensure_ascii=False),
|
||
updated_at=now,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _build_baustein_rework_prompt(topic: str, title: str, current: dict, instructions: str) -> str:
|
||
spec = (TEMPLATES_DIR / "Format" / "Baustein.md").read_text(encoding="utf-8")
|
||
|
||
current_json = json.dumps({
|
||
"title": title,
|
||
"description": current.get("description", ""),
|
||
"purpose": current.get("purpose", ""),
|
||
"examples": current.get("examples", []),
|
||
}, ensure_ascii=False, indent=2)
|
||
|
||
return f"""Überarbeite den Baustein "{title}" zum Thema "{topic}" gemäß den Anweisungen.
|
||
|
||
AKTUELLER STAND:
|
||
{current_json}
|
||
|
||
ANWEISUNGEN VOM NUTZER:
|
||
{instructions}
|
||
|
||
FORMAT-SPEZIFIKATION:
|
||
{spec}
|
||
|
||
Antworte AUSSCHLIESSLICH mit einem JSON-Objekt mit den Feldern "title", "description", "purpose", "examples".
|
||
"examples" ist ein Array mit Objekten {{"label": "...", "code": "..."}}.
|
||
Orientiere dich an der Spezifikation. Kein weiterer Text, nur das JSON.
|
||
"""
|
||
|
||
|
||
|
||
|
||
def _build_sort_prompt(topic: str, bausteine: list[dict], instructions: str) -> str:
|
||
items = "\n".join(
|
||
f"- id={b['id']} | {b['title']} | {b['description']} | {b['purpose']}"
|
||
for b in bausteine
|
||
)
|
||
if instructions:
|
||
criterion = f"Sortiere die folgenden Bausteine zum Thema \"{topic}\" STRIKT nach diesem Kriterium:\n\n{instructions}"
|
||
else:
|
||
criterion = f"Sortiere die folgenden Bausteine zum Thema \"{topic}\" von Anfaenger zu Experte (erstes = einfachster, letztes = komplexester)."
|
||
|
||
return f"""{criterion}
|
||
|
||
BAUSTEINE:
|
||
{items}
|
||
|
||
Antworte AUSSCHLIESSLICH mit einem JSON-Array der IDs in der gewuenschten Reihenfolge.
|
||
Beispiel: [\"id1\", \"id2\", \"id3\"]
|
||
|
||
Kein weiterer Text, nur das JSON-Array.
|
||
"""
|
||
|
||
|
||
def _build_topic_suggest_prompt(problem: str, existing_topics: list[str]) -> str:
|
||
template = (TEMPLATES_DIR / "Format" / "Suche.md").read_text(encoding="utf-8")
|
||
existing = "\n".join(f"- {t}" for t in existing_topics) if existing_topics else "(keine)"
|
||
return template.replace("{problem}", problem).replace("{existing}", existing)
|
||
|
||
|
||
def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str:
|
||
transcript = "\n".join(
|
||
f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}"
|
||
for m in messages
|
||
)
|
||
outline_block = outline.strip() or "(keine)"
|
||
section_block = section.strip() or "(kein Abschnitt erkannt)"
|
||
return f"""Du bist ein hilfreicher Tutor zum Lern-Guide "{topic}" (Format: {format_name}). Ein Leser stellt dir Fragen, während er den Guide liest.
|
||
|
||
GLIEDERUNG DES GUIDES:
|
||
{outline_block}
|
||
|
||
AKTUELLER ABSCHNITT, DEN DER LESER GERADE LIEST:
|
||
{section_block}
|
||
|
||
BISHERIGER CHAT-VERLAUF:
|
||
{transcript}
|
||
|
||
Antworte als Assistent auf die letzte Nutzer-Nachricht.
|
||
|
||
WICHTIG – Antwortstil:
|
||
- KURZ und EINFACH: 1–3 Sätze, klare Sprache.
|
||
- Keine Einleitung, keine Wiederholung der Frage, kein Markdown-Drumherum.
|
||
- Beantworte nur die Frage; nutze den Abschnitt und die Gliederung als Kontext.
|
||
|
||
Gib NUR die Antwort aus, kein Präfix wie "Assistent:"."""
|
||
|
||
|
||
async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str:
|
||
try:
|
||
prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages)
|
||
returncode, stdout, stderr = await run_agent(
|
||
"chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
|
||
reply = stdout.strip()
|
||
return reply or "Entschuldigung, ich habe keine Antwort erhalten."
|
||
except Exception:
|
||
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
|
||
|
||
|
||
async def suggest_topics(problem: str, existing_topics: list[str] | None = None, provider: str = DEFAULT_PROVIDER) -> list[dict]:
|
||
try:
|
||
prompt = _build_topic_suggest_prompt(problem, existing_topics or [])
|
||
returncode, stdout, stderr = await run_agent(
|
||
"topic-suggest-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return []
|
||
items = _parse_json(stdout)
|
||
if not isinstance(items, list):
|
||
return []
|
||
result = []
|
||
for item in items:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
title = str(item.get("title", "")).strip()[:100]
|
||
if not title:
|
||
continue
|
||
result.append({"title": title, "reason": str(item.get("reason", "")).strip()})
|
||
return result
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
async def sort_bausteine(topic: str, bausteine: list[dict], instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
|
||
_sorting.add(topic)
|
||
try:
|
||
prompt = _build_sort_prompt(topic, bausteine, instructions)
|
||
returncode, stdout, stderr = await run_agent("sort-" + topic, prompt, 600, provider=provider, role="fast", capabilities="none")
|
||
if returncode != 0:
|
||
return
|
||
ids = _parse_json(stdout)
|
||
if not isinstance(ids, list):
|
||
return
|
||
order_map = {bid: i for i, bid in enumerate(ids) if isinstance(bid, str)}
|
||
if order_map:
|
||
await update_baustein_sort_orders(topic, order_map)
|
||
except Exception as e:
|
||
print(f"[sort] topic={topic} Exception: {type(e).__name__}: {e}")
|
||
finally:
|
||
_sorting.discard(topic)
|