Files
creator/backend/generator.py
2026-06-06 19:13:18 +02:00

976 lines
39 KiB
Python

import asyncio
import json
import math
import shutil
import re
import uuid
from datetime import datetime, timezone
from pathlib import Path
from agents import run_agent, kill_process
from config import (
DEFAULT_PROVIDER,
FORMAT_ANTEIL,
TEMPLATES_DIR,
TIMEOUTS,
MAX_CONCURRENT_GENERATIONS,
)
from database import update_guide
from paths import arbeit_dir, bausteine_path, guide_content_path, project_dir
_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS)
_cancelled: set[str] = set()
async def cancel_guide(guide_id: str) -> bool:
_cancelled.add(guide_id)
kill_process(guide_id)
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen", updated_at=now)
return True
async def _set_progress(guide_id: str, progress: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, progress=progress, updated_at=now)
def _prompt(name: str, **kwargs) -> str:
template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8")
return template.format(**kwargs)
def _extra(instructions: str) -> str:
return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
def _log(topic: str, msg: str) -> None:
print(f"[generator] {topic}: {msg}", flush=True)
def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str:
stderr = (stderr or "").strip()
if stderr:
return f"{label}: {stderr[:1000]}"
tail = (stdout or "").strip()[-500:]
if tail:
return f"{label} (exit {returncode}, stderr leer): …{tail}"
return f"{label} (exit {returncode}, ohne Ausgabe)"
def _gather_error(label: str, results: list) -> str:
for r in results:
if isinstance(r, BaseException):
return f"{label}: {type(r).__name__}: {r}"
returncode, stdout, stderr = r
if returncode != 0:
return _claude_error(label, returncode, stdout, stderr)
return f"{label}: kein verwertbares Ergebnis"
async def _fail(guide_id: str, msg: str) -> None:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now)
def _norm_titel(s: str) -> str:
"""Normalisiert einen Titel für den Schlüssel-Vergleich."""
s = re.sub(r"[`'\"<>]", "", s)
return re.sub(r"\s+", " ", s).strip().lower()
def _titel(entry: str) -> str:
return entry.split("")[0].strip() or entry
def _eindeutige_titel(entries: dict[int, str]) -> dict[int, str]:
"""Macht Titel eindeutig (Suffix " (2)", " (3)" …), damit sie als Schlüssel taugen."""
seen: dict[str, int] = {}
out: dict[int, str] = {}
for num, text in entries.items():
titel = _titel(text)
key = _norm_titel(titel)
seen[key] = seen.get(key, 0) + 1
if seen[key] > 1:
rest = text.split("", 1)
text = f"{titel} ({seen[key]})" + (f"{rest[1]}" if len(rest) == 2 else "")
# zweiter Durchlauf nicht nötig: Suffixe kollidieren praktisch nicht
out[num] = text
return out
def _titel_index(entries: dict[int, str]) -> dict[str, int]:
return {_norm_titel(_titel(text)): num for num, text in entries.items()}
def _json_datei(path: Path):
"""Liest eine JSON-Datei (Code-Fences tolerant); None bei fehlend/ungültig."""
if not path.exists():
return None
try:
text = path.read_text(encoding="utf-8").strip()
text = re.sub(r"^```(?:json)?\s*|\s*```$", "", text)
return json.loads(text)
except Exception:
return None
def _resolve_liste(data, entries: dict[int, str], min_match: float = 0.85) -> list[int] | None:
"""{"reihenfolge": [Titel, …]} → [nums]; None bei zu vielen unbekannten Titeln
oder zu geringer Abdeckung der Einträge."""
if not isinstance(data, dict) or not isinstance(data.get("reihenfolge"), list):
return None
idx = _titel_index(entries)
nums: list[int] = []
total = unknown = 0
for t in data["reihenfolge"]:
if not isinstance(t, str):
return None
total += 1
num = _titel_aufloesen(idx, t)
if num is None:
unknown += 1
elif num not in nums:
nums.append(num)
if total == 0:
return None
if (total - unknown) / total < min_match or len(nums) / len(entries) < min_match:
return None
return nums
def _merge_sortierungen(topic: str, listen: list[list[int]], entries: dict[int, str]) -> list[int]:
"""Median-Rang über mehrere Sortierungen; Bausteine ohne Stimmen ans Ende."""
raenge: dict[int, list[int]] = {num: [] for num in entries}
for liste in listen:
for rang, num in enumerate(liste):
if num in raenge:
raenge[num].append(rang)
ohne = [num for num, r in raenge.items() if not r]
if ohne:
_log(topic, f"Sortierung: keine Stimmen für {[_titel(entries[n]) for n in ohne]} → ans Ende")
def key(num: int):
r = sorted(raenge[num])
if not r:
return (10**9, 10**9, num)
return (r[len(r) // 2], sum(r) / len(r), num)
return sorted(entries, key=key)
def _timeout(step: str, n: int = 0) -> int:
base, per = TIMEOUTS[step]
return base + per * n
_MAX_RESTARTS = 2
async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None:
"""Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse.
Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)`
prüft die Gültigkeit und liefert das Slot-Ergebnis oder None.
Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das
Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt.
`cancelled()` → True bricht ab (keine Restarts, Rückgabe None).
"""
attempts = {i: 0 for i in range(len(slots))}
tasks: dict[asyncio.Task, int] = {}
def spawn(i: int) -> None:
slot = slots[i]
task = asyncio.create_task(run_agent(
slot["key"], slot["prompt"], timeout,
provider=provider, role=slot["role"], capabilities=slot["capabilities"],
))
tasks[task] = i
for i in range(len(slots)):
spawn(i)
results: list = []
try:
while tasks:
if cancelled and cancelled():
return None
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
for task in done:
i = tasks.pop(task)
payload, err = None, None
try:
result = task.result()
if result[0] != 0:
err = _claude_error("Fehler", *result)
else:
payload = slots[i]["payload"](result)
if payload is None:
err = "Ergebnis ungültig/nicht parsebar"
except asyncio.TimeoutError:
err = f"Timeout nach {timeout}s"
except Exception as e:
err = f"{type(e).__name__}: {e}"
if payload is not None:
results.append(payload)
if on_update:
on_update(len(results))
if len(results) >= quorum:
return results
continue
_log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}")
attempts[i] += 1
if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()):
spawn(i)
_log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)")
return None
finally:
for task, i in tasks.items():
kill_process(slots[i]["key"])
task.cancel()
if tasks:
await asyncio.gather(*tasks.keys(), return_exceptions=True)
# --- Bausteine-Pipeline: 4x Recherche (3) → 2x Auswahl (1) → Check → 3x Sortierung (Median-Rang) ---
_bausteine_progress: dict[str, str] = {}
_bausteine_errors: dict[str, str] = {}
_bausteine_cancelled: set[str] = set()
_bausteine_step: dict[str, int] = {}
BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung", "Sortierung")
_CATEGORIES = ("KERN", "WICHTIG", "REST") # nur noch für den Altformat-Reader
def _bausteine_files(topic: str) -> dict:
arbeit = arbeit_dir(topic)
return {
"final": bausteine_path(topic),
"arbeit": arbeit,
"recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)],
"auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)],
"auswahl_check": arbeit / "auswahl-check.json",
"sortierung": [arbeit / f"sortierung-{i}.json" for i in (1, 2, 3)],
}
def _alle_slot_dateien(files: dict) -> list[Path]:
return [*files["recherche"], *files["auswahl"], files["auswahl_check"], *files["sortierung"]]
def cancel_bausteine(topic: str) -> bool:
if topic not in _bausteine_progress:
return False
_bausteine_cancelled.add(topic)
kill_process(f"bausteine-{topic}-")
return True
def _resume_step(topic: str) -> int:
"""Erster noch offener Schritt anhand der persistierten Zwischendateien."""
files = _bausteine_files(topic)
if sum(p.exists() for p in files["recherche"]) < 3:
return 0
if not any(p.exists() for p in files["auswahl"]):
return 1
if not files["auswahl_check"].exists():
return 2
return 3
def bausteine_status(topic: str) -> dict:
ready = bausteine_path(topic).exists()
generating = topic in _bausteine_progress
partial = False
if generating:
current = _bausteine_step.get(topic)
states = [
"pending" if current is None else "done" if i < current else "active" if i == current else "pending"
for i in range(len(BAUSTEINE_STEPS))
]
elif ready:
states = ["done"] * len(BAUSTEINE_STEPS)
else:
nxt = _resume_step(topic)
partial = nxt > 0
states = ["done" if i < nxt else "pending" for i in range(len(BAUSTEINE_STEPS))]
return {
"ready": ready,
"generating": generating,
"progress": _bausteine_progress.get(topic),
"error": _bausteine_errors.get(topic),
"partial": partial,
"steps": [{"label": label, "state": s} for label, s in zip(BAUSTEINE_STEPS, states)],
}
def active_bausteine() -> list[dict]:
return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()]
def reset_bausteine(topic: str) -> None:
files = _bausteine_files(topic)
files["final"].unlink(missing_ok=True)
shutil.rmtree(files["arbeit"], ignore_errors=True)
_bausteine_errors.pop(topic, None)
def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str:
if project:
source = _prompt("Bausteine-Quelle-Projekt", project=project)
else:
source = _prompt("Bausteine-Quelle-Thema", topic=topic)
return _prompt(
"Bausteine-Recherche",
topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions),
)
def _parse_auswahl(text: str) -> dict[int, str]:
"""Parst eine Baustein-Liste: `N. Titel — Kurzbeschreibung` pro Zeile."""
entries: dict[int, str] = {}
last = None
for line in text.splitlines():
m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line)
if m:
last = int(m.group(1))
entries[last] = m.group(2)
elif last is not None and line.strip():
entries[last] += " " + line.strip()
return entries
def _parse_kategorien(text: str) -> dict[str, list[str]]:
"""Altformat-Reader: finale Baustein-Datei mit ## KERN/WICHTIG/REST-Abschnitten."""
cats: dict[str, list[str]] = {}
current = None
for line in text.splitlines():
s = line.strip()
m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE)
if m:
current = m.group(1).upper()
cats.setdefault(current, [])
continue
m = re.match(r"(\d+)[.)]\s+(.*\S)", s)
if m and current:
cats[current].append(m.group(2))
return cats
def _lade_bausteine(text: str) -> dict[int, str]:
"""Lädt die finale Baustein-Datei — sortierte Liste (neu) oder Kategorien (Altformat)."""
if re.search(r"^#+\s*KERN\b", text, re.IGNORECASE | re.MULTILINE):
cats = _parse_kategorien(text)
texts = [t for cat in _CATEGORIES for t in cats.get(cat, [])]
return {i: t for i, t in enumerate(texts, 1)}
return _parse_auswahl(text)
def _file_payload(path: Path):
"""Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält."""
if not path.exists():
return None
text = path.read_text(encoding="utf-8")
return text if _parse_auswahl(text) else None
def _auswahl_payload(path: Path):
if not path.exists():
return None
text = path.read_text(encoding="utf-8")
entries = _parse_auswahl(text)
return (text, entries) if entries else None
def _auswahl_check_schema(data):
"""{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß."""
if not isinstance(data, dict):
return None
nach = data.get("nachtraege", [])
streich = data.get("streichen", [])
if not isinstance(nach, list) or not isinstance(streich, list):
return None
if not all(isinstance(x, str) for x in [*nach, *streich]):
return None
return {"nachtraege": nach, "streichen": streich}
def _titel_aufloesen(idx: dict[str, int], t: str) -> int | None:
"""Titel → Nummer; toleriert mitgeschleppte Beschreibungen ("Titel — …")."""
if not isinstance(t, str):
return None
return idx.get(_norm_titel(t)) or idx.get(_norm_titel(_titel(t)))
async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
if topic in _bausteine_progress:
return
_bausteine_progress[topic] = "Wartend…"
_bausteine_errors.pop(topic, None)
files = _bausteine_files(topic)
final_path = files["final"]
project = project_dir(topic) if project_dir(topic).is_dir() else None
def set_p(msg: str, step: int | None = None) -> None:
_bausteine_progress[topic] = msg
if step is not None:
_bausteine_step[topic] = step
def is_cancelled() -> bool:
return topic in _bausteine_cancelled
def abgebrochen() -> None:
_bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten"
try:
async with _semaphore:
files["arbeit"].mkdir(parents=True, exist_ok=True)
# „Neu erstellen": fertige Bausteine → kompletter Frischstart.
# Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume.
if final_path.exists():
for p_alt in _alle_slot_dateien(files):
p_alt.unlink(missing_ok=True)
# Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen
recherchen: list[str] = []
offen = []
for i, path in enumerate(files["recherche"], 1):
text = _file_payload(path)
if text is not None and len(recherchen) < 3:
recherchen.append(text)
else:
offen.append((i, path))
vorhanden = len(recherchen)
set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0)
if vorhanden < 3:
caps = "files" if project else "full"
slots = [
{
"key": f"bausteine-{topic}-recherche-{i}",
"prompt": _build_recherche_prompt(topic, path, instructions, project),
"role": "quick", "capabilities": caps,
"payload": (lambda result, p=path: _file_payload(p)),
}
for i, path in offen
]
neue = await _race(
topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider,
on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"),
cancelled=is_cancelled,
)
if is_cancelled():
abgebrochen()
return
if neue is None:
_bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)"
return
recherchen += neue
# Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen
n_est = max(len(_parse_auswahl(t)) for t in recherchen)
bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None)
if bestehende is not None:
flat, entries = bestehende
else:
set_p("Konsolidiere Recherche…", step=1)
results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1))
slots = [
{
"key": f"bausteine-{topic}-auswahl-{i}",
"prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path),
"role": "fast", "capabilities": "files",
"payload": (lambda result, p=path: _auswahl_payload(p)),
}
for i, path in enumerate(files["auswahl"], 1)
]
auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled)
if is_cancelled():
abgebrochen()
return
if auswahl is None:
_bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)"
return
flat, entries = auswahl[0]
# Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal)
set_p("Prüfe Auswahl…", step=2)
check_path = files["auswahl_check"]
patch = _auswahl_check_schema(_json_datei(check_path))
if patch is None:
check_path.unlink(missing_ok=True)
titel_listen = "\n\n".join(
f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values())
for i, text in enumerate(recherchen, 1)
)
slots = [{
"key": f"bausteine-{topic}-auswahlcheck-1",
"prompt": _prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path),
"role": "fast", "capabilities": "files",
"payload": (lambda result: _auswahl_check_schema(_json_datei(check_path))),
}]
checks = await _race(topic, "Auswahl-Check", slots, 1, _timeout("auswahl_check", len(entries)), provider, cancelled=is_cancelled)
if is_cancelled():
abgebrochen()
return
if checks is None:
_log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort")
else:
patch = checks[0]
if patch is not None and (patch["streichen"] or patch["nachtraege"]):
idx = _titel_index(entries)
weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None}
if weg:
_log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}")
entries = {n: t for n, t in entries.items() if n not in weg}
if patch["nachtraege"]:
_log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine")
texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"])
entries = {i: t for i, t in enumerate(texts, 1)}
# Ab hier ist der Titel der Schlüssel — eindeutig machen
entries = _eindeutige_titel(entries)
bausteine_liste = "\n".join(f"- {t}" for t in entries.values())
# Schritt 3: 3 Sortier-Agenten, ALLE nötig — Merge per Median-Rang
n = len(entries)
sortierungen: list[list[int]] = []
offen = []
for i, path in enumerate(files["sortierung"], 1):
liste = _resolve_liste(_json_datei(path), entries)
if liste is not None and len(sortierungen) < 3:
sortierungen.append(liste)
else:
path.unlink(missing_ok=True)
offen.append((i, path))
vorhanden = len(sortierungen)
set_p(f"Sortierung läuft ({vorhanden}/3 gültig)…", step=3)
if vorhanden < 3:
slots = [
{
"key": f"bausteine-{topic}-sortierung-{i}",
"prompt": _prompt("Bausteine-Sortierung", topic=topic, bausteine=bausteine_liste, out_path=path),
"role": "quick", "capabilities": "files",
"payload": (lambda result, p=path: _resolve_liste(_json_datei(p), entries)),
}
for i, path in offen
]
neue = await _race(
topic, "Sortierung", slots, 3 - vorhanden, _timeout("sortierung", n), provider,
on_update=lambda c: set_p(f"Sortierung läuft ({vorhanden + c}/3 gültig)…"),
cancelled=is_cancelled,
)
if is_cancelled():
abgebrochen()
return
if neue is None:
_bausteine_errors[topic] = "Sortierung fehlgeschlagen (Quorum nicht erreicht)"
return
sortierungen += neue
reihenfolge = _merge_sortierungen(topic, sortierungen, entries)
final_path.write_text(
"\n".join(f"{i}. {entries[num]}" for i, num in enumerate(reihenfolge, 1)) + "\n",
encoding="utf-8",
)
except Exception as e:
_bausteine_errors[topic] = str(e)[:2000]
finally:
# Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit.
_bausteine_progress.pop(topic, None)
_bausteine_step.pop(topic, None)
_bausteine_cancelled.discard(topic)
# --- Guide-Generierung: Bausteine → (Plan) → Writer → JSON ---
# Parallele Writer pro Format (OnePager hat einen eigenen Weg).
WRITER_COUNT = {"MiniGuide": 1, "Guide": 2, "FullGuide": 4}
def _resolve_gliederung(data, entries: dict[int, str]) -> list[dict] | None:
"""{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}]; None bei Schema-/Titel-Fehlern."""
if not isinstance(data, dict) or not isinstance(data.get("kapitel"), list):
return None
idx = _titel_index(entries)
chapters: list[dict] = []
seen: set[int] = set()
total = unknown = 0
for ch in data["kapitel"]:
if not isinstance(ch, dict) or not isinstance(ch.get("bausteine"), list):
return None
nums = []
for t in ch["bausteine"]:
total += 1
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
if num is None:
unknown += 1
elif num not in seen:
nums.append(num)
seen.add(num)
if nums:
chapters.append({"title": str(ch.get("titel", "")).strip() or "Kapitel", "nums": nums})
if not chapters or total == 0:
return None
if (total - unknown) / total < 0.85 or len(seen) / len(entries) < 0.85:
return None
missing = sorted(set(entries) - seen)
if missing:
chapters.append({"title": "Weitere Themen", "nums": missing})
return chapters
def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]:
"""Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl."""
n = max(1, min(n, len(chapters)))
chunks: list[list[dict]] = []
current: list[dict] = []
count = 0
remaining_total = sum(len(c["nums"]) for c in chapters)
remaining_chunks = n
for ch in chapters:
current.append(ch)
count += len(ch["nums"])
if remaining_chunks > 1 and count >= remaining_total / remaining_chunks:
chunks.append(current)
remaining_total -= count
remaining_chunks -= 1
current = []
count = 0
if current:
chunks.append(current)
return chunks
def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str:
lines = []
for ch in chunk:
lines.append(f"KAPITEL: {ch['title']}")
lines.extend(f"- {entries[num]}" for num in ch["nums"])
return "\n".join(lines)
_FRAGMENT_KAPITEL_RE = re.compile(r"<!--\s*kapitel\s*:\s*(.*?)\s*-->", re.IGNORECASE)
_FRAGMENT_SECTION_RE = re.compile(r"<!--\s*section\s*:\s*(.*?)\s*-->", re.IGNORECASE)
def _parse_fragment(text: str) -> list[dict]:
"""Parst eine Writer-Datei → [{"kapitel", "titel", "md"}] in Datei-Reihenfolge."""
sections: list[dict] = []
kapitel = None
current = None
for line in text.splitlines():
s = line.strip()
m = _FRAGMENT_KAPITEL_RE.match(s)
if m:
kapitel = m.group(1)
current = None
continue
m = _FRAGMENT_SECTION_RE.match(s)
if m:
current = {"kapitel": kapitel, "titel": m.group(1), "md": []}
sections.append(current)
continue
if current is not None:
current["md"].append(line)
for sec in sections:
sec["md"] = "\n".join(sec["md"]).strip()
return sections
async def _generate_onepager(
guide_id: str, topic: str, instructions: str, provider: str,
project: Path | None, content_path: Path, fragment_paths: list[Path],
) -> list[dict] | None:
def is_cancelled() -> bool:
return guide_id in _cancelled
PFLICHT_KARTEN = ("was ist", "welches problem", "wann nehmen", "einordnung", "so sieht", "fakten", "erste schritte")
def karten_schema(data):
if not isinstance(data, dict):
return None
if data.get("ok") is True:
return "ok"
karten = data.get("karten")
if not isinstance(karten, list) or not karten:
return None
out = []
for k in karten:
if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("merksatz"), str):
return None
titel, merksatz = k["titel"].strip(), k["merksatz"].strip()
if len(merksatz) < 5: # abgebrochene/leere Karten ("Per") sind ungültig
return None
out.append({"titel": titel, "merksatz": merksatz})
vorhanden = [k["titel"].lower() for k in out]
for pflicht in PFLICHT_KARTEN:
if not any(t.startswith(pflicht) for t in vorhanden):
return None
return out
# Schritt 1: Recherche — eigene Faktenbasis, unabhängig von den Bausteinen
await _set_progress(guide_id, "Recherchiere…")
recherche_path = content_path.parent / f"{content_path.stem}.recherche.md"
fragment_paths.append(recherche_path)
recherche_path.unlink(missing_ok=True)
if project:
source = _prompt("OnePager-Quelle-Projekt", project=project)
else:
source = _prompt("OnePager-Quelle-Thema", topic=topic)
slots = [{
"key": f"{guide_id}-recherche",
"prompt": _prompt("OnePager-Recherche", topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)),
"role": "quick", "capabilities": "files" if project else "full",
"payload": (lambda result: recherche_path.read_text(encoding="utf-8") if recherche_path.exists() else None),
}]
res = await _race(topic, "OnePager-Recherche", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Recherche fehlgeschlagen")
return None
recherche = res[0]
# Schritt 2: Bauen — Karten nur aus der Faktenbasis (JSON)
await _set_progress(guide_id, "Baue OnePager…")
karten_path = content_path.parent / f"{content_path.stem}.karten.json"
fragment_paths.append(karten_path)
karten_path.unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-bauen",
"prompt": _prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)),
"role": "fast", "capabilities": "files",
"payload": (lambda result: (k if isinstance(k := karten_schema(_json_datei(karten_path)), list) else None)),
}]
res = await _race(topic, "OnePager-Bauen", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "OnePager-Bau fehlgeschlagen")
return None
karten = res[0]
# Schritt 3: Verifizieren — {"ok": true} oder vollständig korrigierte Liste (nicht fatal)
await _set_progress(guide_id, "Verifiziere OnePager…")
check_path = content_path.parent / f"{content_path.stem}.onepager-check.json"
fragment_paths.append(check_path)
check_path.unlink(missing_ok=True)
karten_block = "\n".join(f"- {k['titel']}{k['merksatz']}" for k in karten)
slots = [{
"key": f"{guide_id}-verify",
"prompt": _prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block, out_path=check_path),
"role": "fast", "capabilities": "files",
"payload": (lambda result: karten_schema(_json_datei(check_path))),
}]
res = await _race(topic, "OnePager-Verifikation", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
_log(topic, "OnePager-Verifikation fehlgeschlagen — ungeprüfte Version wird verwendet")
elif isinstance(res[0], list):
_log(topic, "OnePager-Verifikation hat Korrekturen geliefert")
karten = res[0]
sections = [
{"num": i, "title": k["titel"], "md": k["merksatz"]}
for i, k in enumerate(karten, 1)
]
return [{"title": topic, "sections": sections}]
async def _generate_sections(
guide_id: str, topic: str, format_name: str, entries: dict[int, str],
facts: str, instructions: str, provider: str,
content_path: Path, fragment_paths: list[Path],
) -> list[dict] | None:
def is_cancelled() -> bool:
return guide_id in _cancelled
spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8")
bausteine_liste = "\n".join(f"- {t}" for t in entries.values())
if format_name == "MiniGuide":
# Ein Writer, gliedert selbst in Kapitel
plan = None
zuteilungen = [bausteine_liste]
chunk_sizes = [len(entries)]
else:
await _set_progress(guide_id, "Plane Gliederung…")
plan_path = content_path.parent / f"{content_path.stem}.gliederung.json"
fragment_paths.append(plan_path)
plan_path.unlink(missing_ok=True)
slots = [{
"key": f"{guide_id}-plan",
"prompt": _prompt("Guide-Plan", topic=topic, format_name=format_name, bausteine=bausteine_liste, out_path=plan_path, extra=_extra(instructions)),
"role": "guide", "capabilities": "files",
"payload": (lambda result: _resolve_gliederung(_json_datei(plan_path), entries)),
}]
res = await _race(topic, "Gliederung", slots, 1, _timeout("plan", len(entries)), provider, cancelled=is_cancelled)
if is_cancelled():
return None
if res is None:
await _fail(guide_id, "Gliederung fehlgeschlagen")
return None
plan = res[0]
chunks = _split_chunks(plan, WRITER_COUNT[format_name])
zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks]
chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks]
writer_count = len(zuteilungen)
await _set_progress(guide_id, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…")
paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)]
fragment_paths.extend(paths)
results = await asyncio.gather(*[
run_agent(
f"{guide_id}-w{i}",
_prompt(
"Guide-Writer",
topic=topic, format_name=format_name, zuteilung=zuteilung,
facts=facts, spec=spec, out_path=path, extra=_extra(instructions),
),
_timeout("writer", size), provider=provider, role="guide", capabilities="full",
)
for i, (zuteilung, path, size) in enumerate(zip(zuteilungen, paths, chunk_sizes), 1)
], return_exceptions=True)
if is_cancelled():
return None
for i, (r, p) in enumerate(zip(results, paths), 1):
if isinstance(r, BaseException):
_log(topic, f"Writer {i}: {type(r).__name__}: {r}")
elif r[0] != 0:
_log(topic, f"Writer {i}: {_claude_error('Fehler', *r)}")
elif not p.exists():
_log(topic, f"Writer {i}: keine Ausgabedatei erstellt")
fragments: list[dict] = []
for p in paths:
if p.exists():
fragments.extend(_parse_fragment(p.read_text(encoding="utf-8")))
if not fragments:
await _fail(guide_id, _gather_error("Writer-Fehler", list(results)))
return None
await _set_progress(guide_id, "Setze zusammen…")
idx = _titel_index(entries)
by_num: dict[int, dict] = {}
fragment_order: list[int] = []
for sec in fragments:
num = _titel_aufloesen(idx, sec["titel"])
if num is None:
_log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)")
continue
if num not in by_num:
by_num[num] = sec
fragment_order.append(num)
def section_json(num: int) -> dict:
sec = by_num[num]
return {"num": num, "title": _titel(entries[num]), "md": sec["md"]}
chapters: list[dict] = []
if plan is None:
# MiniGuide: Kapitel aus den Fragment-Markern in Datei-Reihenfolge
for num in fragment_order:
title = by_num[num]["kapitel"] or topic
if not chapters or chapters[-1]["title"] != title:
chapters.append({"title": title, "sections": []})
chapters[-1]["sections"].append(section_json(num))
else:
for ch in plan:
sections = [section_json(num) for num in ch["nums"] if num in by_num]
if sections:
chapters.append({"title": ch["title"], "sections": sections})
missing = sorted(set(entries) - set(by_num))
if missing:
_log(topic, f"Sections fehlen in der Writer-Ausgabe: {[_titel(entries[n]) for n in missing]}")
if not chapters:
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
return None
return chapters
async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
async with _semaphore:
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now)
content_path = guide_content_path(topic, format_name)
content_path.parent.mkdir(parents=True, exist_ok=True)
project = project_dir(topic) if project_dir(topic).is_dir() else None
fragment_paths: list[Path] = []
try:
if guide_id in _cancelled:
return
if format_name == "OnePager":
chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path, fragment_paths)
else:
alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8"))
if not alle:
await _fail(guide_id, "Keine Bausteine gefunden")
return
anteil, minimum = FORMAT_ANTEIL[format_name]
k = min(len(alle), max(minimum, math.ceil(anteil * len(alle))))
selected = [text for _, text in sorted(alle.items())][:k]
entries = _eindeutige_titel({i: text for i, text in enumerate(selected, 1)})
facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema")
chapters = await _generate_sections(
guide_id, topic, format_name, entries,
facts, instructions, provider, content_path, fragment_paths,
)
if chapters is None or guide_id in _cancelled:
return
content_path.write_text(
json.dumps({"topic": topic, "format": format_name, "chapters": chapters}, ensure_ascii=False, indent=1),
encoding="utf-8",
)
now = datetime.now(timezone.utc).isoformat()
await update_guide(guide_id, status="done", progress=None, updated_at=now)
except asyncio.TimeoutError:
await _fail(guide_id, "Timeout bei der Generierung")
except FileNotFoundError:
await _fail(guide_id, "Bausteine fehlen")
except Exception as e:
await _fail(guide_id, str(e)[:2000])
finally:
_cancelled.discard(guide_id)
for p in fragment_paths:
p.unlink(missing_ok=True)
# --- Tutor-Chat ---
def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str:
transcript = "\n".join(
f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}"
for m in messages
)
return _prompt(
"Chat",
topic=topic, format_name=format_name,
outline_block=outline.strip() or "(keine)",
section_block=section.strip() or "(kein Abschnitt erkannt)",
transcript=transcript,
)
async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str:
try:
prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages)
returncode, stdout, stderr = await run_agent(
"chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
)
if returncode != 0:
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
reply = stdout.strip()
return reply or "Entschuldigung, ich habe keine Antwort erhalten."
except Exception:
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."