1601 lines
67 KiB
Python
1601 lines
67 KiB
Python
import asyncio
|
||
import json
|
||
import logging
|
||
import math
|
||
import shutil
|
||
import subprocess
|
||
import re
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from agents import run_agent, kill_process
|
||
from config import (
|
||
DEFAULT_PROVIDER,
|
||
FORMAT_ANTEIL,
|
||
TEMPLATES_DIR,
|
||
TIMEOUTS,
|
||
MAX_CONCURRENT_GENERATIONS,
|
||
)
|
||
from database import update_guide
|
||
from paths import arbeit_dir, bausteine_path, guide_content_path, project_dir
|
||
|
||
_semaphore = asyncio.Semaphore(MAX_CONCURRENT_GENERATIONS)
|
||
_cancelled: set[str] = set()
|
||
|
||
|
||
async def cancel_guide(guide_id: str) -> bool:
|
||
_cancelled.add(guide_id)
|
||
kill_process(guide_id)
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, status="error", progress=None, error_msg="Abgebrochen — Fortschritt bleibt erhalten", updated_at=now)
|
||
return True
|
||
|
||
|
||
async def _set_progress(guide_id: str, progress: str) -> None:
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, progress=progress, updated_at=now)
|
||
|
||
|
||
def _prompt(name: str, **kwargs) -> str:
|
||
template = (TEMPLATES_DIR / "Prompt" / f"{name}.md").read_text(encoding="utf-8")
|
||
return template.format(**kwargs)
|
||
|
||
|
||
def _extra(instructions: str) -> str:
|
||
return f"\n\nZUSÄTZLICHE ANWEISUNGEN VOM NUTZER:\n{instructions}\n" if instructions else ""
|
||
|
||
|
||
log = logging.getLogger("creator.generator")
|
||
|
||
|
||
def _log(topic: str, msg: str) -> None:
|
||
log.info("[%s] %s", topic, msg)
|
||
|
||
|
||
def _claude_error(label: str, returncode: int, stdout: str, stderr: str) -> str:
|
||
stderr = (stderr or "").strip()
|
||
if stderr:
|
||
return f"{label}: {stderr[:1000]}"
|
||
tail = (stdout or "").strip()[-500:]
|
||
if tail:
|
||
return f"{label} (exit {returncode}, stderr leer): …{tail}"
|
||
return f"{label} (exit {returncode}, ohne Ausgabe)"
|
||
|
||
|
||
def _gather_error(label: str, results: list) -> str:
|
||
for r in results:
|
||
if isinstance(r, BaseException):
|
||
return f"{label}: {type(r).__name__}: {r}"
|
||
returncode, stdout, stderr = r
|
||
if returncode != 0:
|
||
return _claude_error(label, returncode, stdout, stderr)
|
||
return f"{label}: kein verwertbares Ergebnis"
|
||
|
||
|
||
async def _fail(guide_id: str, msg: str) -> None:
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, status="error", progress=None, error_msg=msg, updated_at=now)
|
||
|
||
|
||
def _norm_titel(s: str) -> str:
|
||
"""Normalisiert einen Titel für den Schlüssel-Vergleich."""
|
||
s = re.sub(r"[`'\"<>]", "", s)
|
||
return re.sub(r"\s+", " ", s).strip().lower()
|
||
|
||
|
||
def _titel(entry: str) -> str:
|
||
return entry.split(" — ")[0].strip() or entry
|
||
|
||
|
||
def _eindeutige_titel(entries: dict[int, str]) -> dict[int, str]:
|
||
"""Macht Titel eindeutig (Suffix " (2)", " (3)" …), damit sie als Schlüssel taugen."""
|
||
seen: dict[str, int] = {}
|
||
out: dict[int, str] = {}
|
||
for num, text in entries.items():
|
||
titel = _titel(text)
|
||
key = _norm_titel(titel)
|
||
seen[key] = seen.get(key, 0) + 1
|
||
if seen[key] > 1:
|
||
rest = text.split(" — ", 1)
|
||
text = f"{titel} ({seen[key]})" + (f" — {rest[1]}" if len(rest) == 2 else "")
|
||
# zweiter Durchlauf nicht nötig: Suffixe kollidieren praktisch nicht
|
||
out[num] = text
|
||
return out
|
||
|
||
|
||
def _titel_index(entries: dict[int, str]) -> dict[str, int]:
|
||
return {_norm_titel(_titel(text)): num for num, text in entries.items()}
|
||
|
||
|
||
def _json_datei(path: Path):
|
||
"""Liest eine JSON-Datei (Code-Fences tolerant); None bei fehlend/ungültig."""
|
||
if not path.exists():
|
||
return None
|
||
try:
|
||
text = path.read_text(encoding="utf-8").strip()
|
||
text = re.sub(r"^```(?:json)?\s*|\s*```$", "", text)
|
||
return json.loads(text)
|
||
except Exception as e:
|
||
log.debug("JSON-Datei ungültig: %s (%s)", path, e)
|
||
return None
|
||
|
||
|
||
def _timeout(step: str, n: int = 0) -> int:
|
||
base, per = TIMEOUTS[step]
|
||
return base + per * n
|
||
|
||
|
||
_MAX_RESTARTS = 2
|
||
|
||
|
||
async def _race(topic: str, label: str, slots: list[dict], quorum: int, timeout: int, provider: str, on_update=None, cancelled=None) -> list | None:
|
||
"""Startet alle Slots parallel und sammelt `quorum` gültige Ergebnisse.
|
||
|
||
Slot-Spec: {key, prompt, role, capabilities, payload}. `payload(result)`
|
||
prüft die Gültigkeit und liefert das Slot-Ergebnis oder None.
|
||
Fehler/Timeout/ungültig → Slot-Neustart (max. _MAX_RESTARTS). Sobald das
|
||
Quorum steht, werden die übrigen Agenten gekillt. None = Quorum verfehlt.
|
||
`cancelled()` → True bricht ab (keine Restarts, Rückgabe None).
|
||
"""
|
||
attempts = {i: 0 for i in range(len(slots))}
|
||
tasks: dict[asyncio.Task, int] = {}
|
||
|
||
def spawn(i: int) -> None:
|
||
slot = slots[i]
|
||
task = asyncio.create_task(run_agent(
|
||
slot["key"], slot["prompt"], timeout,
|
||
provider=provider, role=slot["role"], capabilities=slot["capabilities"],
|
||
))
|
||
tasks[task] = i
|
||
|
||
for i in range(len(slots)):
|
||
spawn(i)
|
||
|
||
results: list = []
|
||
try:
|
||
while tasks:
|
||
if cancelled and cancelled():
|
||
return None
|
||
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
|
||
for task in done:
|
||
i = tasks.pop(task)
|
||
payload, err = None, None
|
||
try:
|
||
result = task.result()
|
||
if result[0] != 0:
|
||
err = _claude_error("Fehler", *result)
|
||
else:
|
||
payload = slots[i]["payload"](result)
|
||
if payload is None:
|
||
err = "Ergebnis ungültig/nicht parsebar"
|
||
except asyncio.TimeoutError:
|
||
err = f"Timeout nach {timeout}s"
|
||
except Exception as e:
|
||
err = f"{type(e).__name__}: {e}"
|
||
|
||
if payload is not None:
|
||
results.append(payload)
|
||
if on_update:
|
||
on_update(len(results))
|
||
if len(results) >= quorum:
|
||
return results
|
||
continue
|
||
|
||
_log(topic, f"{label} {i + 1} (Versuch {attempts[i] + 1}): {err}")
|
||
attempts[i] += 1
|
||
if attempts[i] <= _MAX_RESTARTS and not (cancelled and cancelled()):
|
||
spawn(i)
|
||
_log(topic, f"{label}: Quorum {quorum} nicht erreicht ({len(results)} gültig)")
|
||
return None
|
||
finally:
|
||
for task, i in tasks.items():
|
||
kill_process(slots[i]["key"])
|
||
task.cancel()
|
||
if tasks:
|
||
await asyncio.gather(*tasks.keys(), return_exceptions=True)
|
||
|
||
|
||
# --- Bausteine-Pipeline: 4x Recherche (3) → 2x Auswahl (1) → Prüfung — reines Inventar, unsortiert ---
|
||
|
||
_bausteine_progress: dict[str, str] = {}
|
||
_bausteine_errors: dict[str, str] = {}
|
||
_bausteine_cancelled: set[str] = set()
|
||
_bausteine_step: dict[str, int] = {}
|
||
|
||
BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung")
|
||
_CATEGORIES = ("KERN", "WICHTIG", "REST") # nur noch für den Altformat-Reader
|
||
|
||
|
||
def _bausteine_steps(topic: str) -> tuple:
|
||
"""Projekte haben einen 4. Schritt: Themenfeld-Ergänzung per Web-Recherche."""
|
||
if project_dir(topic).is_dir():
|
||
return BAUSTEINE_STEPS + ("Ergänzung",)
|
||
return BAUSTEINE_STEPS
|
||
|
||
|
||
def _bausteine_files(topic: str) -> dict:
|
||
arbeit = arbeit_dir(topic)
|
||
return {
|
||
"final": bausteine_path(topic),
|
||
"arbeit": arbeit,
|
||
"recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)],
|
||
"auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)],
|
||
"auswahl_check": arbeit / "auswahl-check.json",
|
||
"ergaenzung": arbeit / "ergaenzung.json",
|
||
}
|
||
|
||
|
||
def _alle_slot_dateien(files: dict) -> list[Path]:
|
||
return [*files["recherche"], *files["auswahl"], files["auswahl_check"], files["ergaenzung"]]
|
||
|
||
|
||
def cancel_bausteine(topic: str) -> bool:
|
||
if topic not in _bausteine_progress:
|
||
return False
|
||
_bausteine_cancelled.add(topic)
|
||
kill_process(f"bausteine-{topic}-")
|
||
return True
|
||
|
||
|
||
def _resume_step(topic: str) -> int:
|
||
"""Erster noch offener Schritt anhand der persistierten Zwischendateien."""
|
||
files = _bausteine_files(topic)
|
||
if sum(p.exists() for p in files["recherche"]) < 3:
|
||
return 0
|
||
if not any(p.exists() for p in files["auswahl"]):
|
||
return 1
|
||
if not files["auswahl_check"].exists():
|
||
return 2
|
||
if project_dir(topic).is_dir() and not files["ergaenzung"].exists():
|
||
return 3
|
||
return len(_bausteine_steps(topic))
|
||
|
||
|
||
def bausteine_status(topic: str) -> dict:
|
||
steps = _bausteine_steps(topic)
|
||
ready = bausteine_path(topic).exists()
|
||
generating = topic in _bausteine_progress
|
||
partial = False
|
||
if generating:
|
||
current = _bausteine_step.get(topic)
|
||
states = [
|
||
"pending" if current is None else "done" if i < current else "active" if i == current else "pending"
|
||
for i in range(len(steps))
|
||
]
|
||
elif ready:
|
||
states = ["done"] * len(steps)
|
||
else:
|
||
nxt = _resume_step(topic)
|
||
partial = nxt > 0
|
||
states = ["done" if i < nxt else "pending" for i in range(len(steps))]
|
||
return {
|
||
"ready": ready,
|
||
"generating": generating,
|
||
"progress": _bausteine_progress.get(topic),
|
||
"error": _bausteine_errors.get(topic),
|
||
"partial": partial,
|
||
"steps": [{"label": label, "state": s} for label, s in zip(steps, states)],
|
||
}
|
||
|
||
|
||
def active_bausteine() -> list[dict]:
|
||
return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()]
|
||
|
||
|
||
def reset_bausteine(topic: str) -> None:
|
||
files = _bausteine_files(topic)
|
||
files["final"].unlink(missing_ok=True)
|
||
shutil.rmtree(files["arbeit"], ignore_errors=True)
|
||
_bausteine_errors.pop(topic, None)
|
||
|
||
|
||
def _ergaenzung_schema(data):
|
||
"""{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None."""
|
||
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
|
||
return None
|
||
out = []
|
||
for b in data["bausteine"]:
|
||
if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str):
|
||
return None
|
||
titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip()
|
||
if not titel:
|
||
return None
|
||
out.append((titel, beschreibung))
|
||
return out
|
||
|
||
|
||
def _pdfs_konvertieren(project: Path) -> None:
|
||
"""PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern.
|
||
|
||
Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die
|
||
.txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet.
|
||
Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt
|
||
unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten).
|
||
"""
|
||
pdfs = list(project.rglob("*.pdf"))
|
||
if not pdfs:
|
||
return
|
||
if shutil.which("pdftotext") is None:
|
||
raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden")
|
||
for pdf in pdfs:
|
||
txt = pdf.with_suffix(".txt")
|
||
if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime:
|
||
continue
|
||
try:
|
||
subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120)
|
||
_log(project.name, f"PDF konvertiert: {pdf.name} → {txt.name}")
|
||
except Exception as e:
|
||
raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e
|
||
|
||
|
||
def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str:
|
||
if project:
|
||
source = _prompt("Bausteine-Quelle-Projekt", project=project)
|
||
else:
|
||
source = _prompt("Bausteine-Quelle-Thema", topic=topic)
|
||
return _prompt(
|
||
"Bausteine-Recherche",
|
||
topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions),
|
||
)
|
||
|
||
|
||
def _parse_auswahl(text: str) -> dict[int, str]:
|
||
"""Parst eine Baustein-Liste: `N. Titel — Kurzbeschreibung` pro Zeile."""
|
||
entries: dict[int, str] = {}
|
||
last = None
|
||
for line in text.splitlines():
|
||
m = re.match(r"\s*(\d+)[.)]\s+(.*\S)", line)
|
||
if m:
|
||
last = int(m.group(1))
|
||
entries[last] = m.group(2)
|
||
elif last is not None and line.strip():
|
||
entries[last] += " " + line.strip()
|
||
return entries
|
||
|
||
|
||
def _parse_kategorien(text: str) -> dict[str, list[str]]:
|
||
"""Altformat-Reader: finale Baustein-Datei mit ## KERN/WICHTIG/REST-Abschnitten."""
|
||
cats: dict[str, list[str]] = {}
|
||
current = None
|
||
for line in text.splitlines():
|
||
s = line.strip()
|
||
m = re.match(r"#+\s*(KERN|WICHTIG|REST)\b", s, re.IGNORECASE)
|
||
if m:
|
||
current = m.group(1).upper()
|
||
cats.setdefault(current, [])
|
||
continue
|
||
m = re.match(r"(\d+)[.)]\s+(.*\S)", s)
|
||
if m and current:
|
||
cats[current].append(m.group(2))
|
||
return cats
|
||
|
||
|
||
def _lade_bausteine(text: str) -> dict[int, str]:
|
||
"""Lädt die finale Baustein-Datei — sortierte Liste (neu) oder Kategorien (Altformat)."""
|
||
if re.search(r"^#+\s*KERN\b", text, re.IGNORECASE | re.MULTILINE):
|
||
cats = _parse_kategorien(text)
|
||
texts = [t for cat in _CATEGORIES for t in cats.get(cat, [])]
|
||
return {i: t for i, t in enumerate(texts, 1)}
|
||
return _parse_auswahl(text)
|
||
|
||
|
||
def _file_payload(path: Path):
|
||
"""Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält."""
|
||
if not path.exists():
|
||
return None
|
||
text = path.read_text(encoding="utf-8")
|
||
return text if _parse_auswahl(text) else None
|
||
|
||
|
||
def _auswahl_payload(path: Path):
|
||
if not path.exists():
|
||
return None
|
||
text = path.read_text(encoding="utf-8")
|
||
entries = _parse_auswahl(text)
|
||
return (text, entries) if entries else None
|
||
|
||
|
||
def _auswahl_check_schema(data):
|
||
"""{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß."""
|
||
if not isinstance(data, dict):
|
||
return None
|
||
nach = data.get("nachtraege", [])
|
||
streich = data.get("streichen", [])
|
||
if not isinstance(nach, list) or not isinstance(streich, list):
|
||
return None
|
||
if not all(isinstance(x, str) for x in [*nach, *streich]):
|
||
return None
|
||
return {"nachtraege": nach, "streichen": streich}
|
||
|
||
|
||
def _titel_aufloesen(idx: dict[str, int], t: str) -> int | None:
|
||
"""Titel → Nummer; toleriert mitgeschleppte Beschreibungen ("Titel — …")."""
|
||
if not isinstance(t, str):
|
||
return None
|
||
return idx.get(_norm_titel(t)) or idx.get(_norm_titel(_titel(t)))
|
||
|
||
|
||
async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
|
||
if topic in _bausteine_progress:
|
||
return
|
||
_bausteine_progress[topic] = "Wartend…"
|
||
_bausteine_errors.pop(topic, None)
|
||
|
||
files = _bausteine_files(topic)
|
||
final_path = files["final"]
|
||
project = project_dir(topic) if project_dir(topic).is_dir() else None
|
||
|
||
def set_p(msg: str, step: int | None = None) -> None:
|
||
_bausteine_progress[topic] = msg
|
||
if step is not None:
|
||
_bausteine_step[topic] = step
|
||
|
||
def is_cancelled() -> bool:
|
||
return topic in _bausteine_cancelled
|
||
|
||
def abgebrochen() -> None:
|
||
_bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten"
|
||
|
||
try:
|
||
async with _semaphore:
|
||
files["arbeit"].mkdir(parents=True, exist_ok=True)
|
||
if project:
|
||
await asyncio.to_thread(_pdfs_konvertieren, project)
|
||
# „Neu erstellen": fertige Bausteine → kompletter Frischstart.
|
||
# Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume.
|
||
if final_path.exists():
|
||
for p_alt in _alle_slot_dateien(files):
|
||
p_alt.unlink(missing_ok=True)
|
||
|
||
# Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen
|
||
recherchen: list[str] = []
|
||
offen = []
|
||
for i, path in enumerate(files["recherche"], 1):
|
||
text = _file_payload(path)
|
||
if text is not None and len(recherchen) < 3:
|
||
recherchen.append(text)
|
||
else:
|
||
offen.append((i, path))
|
||
vorhanden = len(recherchen)
|
||
set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0)
|
||
if vorhanden < 3:
|
||
caps = "files" if project else "full"
|
||
slots = [
|
||
{
|
||
"key": f"bausteine-{topic}-recherche-{i}",
|
||
"prompt": _build_recherche_prompt(topic, path, instructions, project),
|
||
"role": "quick", "capabilities": caps,
|
||
"payload": (lambda result, p=path: _file_payload(p)),
|
||
}
|
||
for i, path in offen
|
||
]
|
||
neue = await _race(
|
||
topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider,
|
||
on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"),
|
||
cancelled=is_cancelled,
|
||
)
|
||
if is_cancelled():
|
||
abgebrochen()
|
||
return
|
||
if neue is None:
|
||
_bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)"
|
||
return
|
||
recherchen += neue
|
||
|
||
# Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen
|
||
n_est = max(len(_parse_auswahl(t)) for t in recherchen)
|
||
bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None)
|
||
if bestehende is not None:
|
||
flat, entries = bestehende
|
||
else:
|
||
set_p("Konsolidiere Recherche…", step=1)
|
||
results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1))
|
||
slots = [
|
||
{
|
||
"key": f"bausteine-{topic}-auswahl-{i}",
|
||
"prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result, p=path: _auswahl_payload(p)),
|
||
}
|
||
for i, path in enumerate(files["auswahl"], 1)
|
||
]
|
||
auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
abgebrochen()
|
||
return
|
||
if auswahl is None:
|
||
_bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)"
|
||
return
|
||
flat, entries = auswahl[0]
|
||
|
||
# Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal)
|
||
set_p("Prüfe Auswahl…", step=2)
|
||
check_path = files["auswahl_check"]
|
||
patch = _auswahl_check_schema(_json_datei(check_path))
|
||
if patch is None:
|
||
check_path.unlink(missing_ok=True)
|
||
titel_listen = "\n\n".join(
|
||
f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values())
|
||
for i, text in enumerate(recherchen, 1)
|
||
)
|
||
slots = [{
|
||
"key": f"bausteine-{topic}-auswahlcheck-1",
|
||
"prompt": _prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result: _auswahl_check_schema(_json_datei(check_path))),
|
||
}]
|
||
checks = await _race(topic, "Auswahl-Check", slots, 1, _timeout("auswahl_check", len(entries)), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
abgebrochen()
|
||
return
|
||
if checks is None:
|
||
_log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort")
|
||
else:
|
||
patch = checks[0]
|
||
if patch is not None and (patch["streichen"] or patch["nachtraege"]):
|
||
idx = _titel_index(entries)
|
||
weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None}
|
||
if weg:
|
||
_log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}")
|
||
entries = {n: t for n, t in entries.items() if n not in weg}
|
||
if patch["nachtraege"]:
|
||
_log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine")
|
||
texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"])
|
||
entries = {i: t for i, t in enumerate(texts, 1)}
|
||
|
||
# Schritt 4 (nur Projekte): Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt,
|
||
# ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung].
|
||
if project:
|
||
set_p("Ergänze Themenfeld…", step=3)
|
||
erg_path = files["ergaenzung"]
|
||
ergaenzungen = _ergaenzung_schema(_json_datei(erg_path))
|
||
if ergaenzungen is None:
|
||
erg_path.unlink(missing_ok=True)
|
||
slots = [{
|
||
"key": f"bausteine-{topic}-ergaenzung-1",
|
||
"prompt": _prompt(
|
||
"Bausteine-Ergaenzung",
|
||
topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()),
|
||
out_path=erg_path, extra=_extra(instructions),
|
||
),
|
||
"role": "quick", "capabilities": "full",
|
||
"payload": (lambda result: _ergaenzung_schema(_json_datei(erg_path))),
|
||
}]
|
||
res = await _race(topic, "Ergänzung", slots, 1, _timeout("ergaenzung"), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
abgebrochen()
|
||
return
|
||
if res is None:
|
||
_bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)"
|
||
return
|
||
ergaenzungen = res[0]
|
||
idx = _titel_index(entries)
|
||
neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None]
|
||
if neu:
|
||
_log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt")
|
||
start = max(entries, default=0) + 1
|
||
for off, (t, b) in enumerate(neu):
|
||
entries[start + off] = f"{t} — {b} [Ergänzung]"
|
||
|
||
# Titel eindeutig machen und unsortiertes Inventar schreiben
|
||
entries = _eindeutige_titel(entries)
|
||
final_path.write_text(
|
||
"\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n",
|
||
encoding="utf-8",
|
||
)
|
||
except Exception as e:
|
||
log.exception("[%s] Bausteine-Generierung fehlgeschlagen", topic)
|
||
_bausteine_errors[topic] = str(e)[:2000]
|
||
finally:
|
||
# Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit.
|
||
_bausteine_progress.pop(topic, None)
|
||
_bausteine_step.pop(topic, None)
|
||
_bausteine_cancelled.discard(topic)
|
||
|
||
|
||
# --- Guide-Generierung: 6 Schritte mit Prüfung nach jeder Phase (OnePager hat einen eigenen Weg) ---
|
||
# Prüf-Agenten notieren nur Probleme; das Anpassen übernimmt der jeweilige Erzeuger-Typ.
|
||
# Schritt-Dateien bleiben liegen → Abbruch erhält Fortschritt, ▶ setzt am offenen Schritt fort.
|
||
|
||
GUIDE_STEPS = ("Auswahl", "Auswahl-Prüfung", "Gliederung", "Gliederungs-Prüfung", "Schreiben", "Lese-Prüfung")
|
||
|
||
# Writer skalieren mit der Section-Zahl: 1 Writer je ~30 Sections (gedeckelt).
|
||
# Kleine Pakete vermeiden Lazy-Output bei langen Listen und begrenzen den Schaden
|
||
# eines fehlgeschlagenen Writers.
|
||
WRITER_SECTIONS = 30
|
||
WRITER_MAX = 20
|
||
|
||
|
||
def _guide_files(content_path: Path) -> dict:
|
||
d, stem = content_path.parent, content_path.stem
|
||
return {
|
||
"auswahl": d / f"{stem}.auswahl.json",
|
||
"auswahl_check": d / f"{stem}.auswahl-check.json",
|
||
"gliederung": d / f"{stem}.gliederung.json",
|
||
"gliederung_check": d / f"{stem}.gliederung-check.json",
|
||
# chunk-/lese-check-/fix-Dateien sind dynamisch: {stem}.chunk-i.md usw.
|
||
}
|
||
|
||
|
||
def guide_slot_dateien(content_path: Path) -> list[Path]:
|
||
"""Alle Schritt-Dateien eines Guides (für den Frischstart)."""
|
||
return [p for p in content_path.parent.glob(f"{content_path.stem}.*") if p != content_path]
|
||
|
||
|
||
async def _set_step(guide_id: str, step: int, progress: str) -> None:
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, step=step, progress=progress, updated_at=now)
|
||
|
||
|
||
def _resolve_auswahl(data, entries: dict[int, str], k_min: int, k_max: int) -> list[int] | None:
|
||
"""{"bausteine": [Titel]} → Nummern; None bei Schema-Verstoß/Drift/falschem Umfang."""
|
||
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
|
||
return None
|
||
idx = _titel_index(entries)
|
||
nums: list[int] = []
|
||
seen: set[int] = set()
|
||
total = unknown = 0
|
||
for t in data["bausteine"]:
|
||
total += 1
|
||
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
|
||
if num is None:
|
||
unknown += 1
|
||
elif num not in seen:
|
||
seen.add(num)
|
||
nums.append(num)
|
||
if total == 0 or (total - unknown) / total < 0.85:
|
||
return None
|
||
if len(nums) < 0.9 * k_min or len(nums) > 1.1 * k_max:
|
||
return None
|
||
return nums
|
||
|
||
|
||
def _probleme_schema(data):
|
||
"""{"ok": true} → [] · {"probleme": [str]} → Liste · sonst None."""
|
||
if not isinstance(data, dict):
|
||
return None
|
||
if data.get("ok") is True:
|
||
return []
|
||
p = data.get("probleme")
|
||
if not isinstance(p, list) or not p:
|
||
return None
|
||
out = [str(x).strip() for x in p if str(x).strip()]
|
||
return out or None
|
||
|
||
|
||
def _lese_probleme_schema(data):
|
||
"""{"ok": true} → [] · {"probleme": [{"section", "problem"}]} → Liste · sonst None."""
|
||
if not isinstance(data, dict):
|
||
return None
|
||
if data.get("ok") is True:
|
||
return []
|
||
p = data.get("probleme")
|
||
if not isinstance(p, list) or not p:
|
||
return None
|
||
out = []
|
||
for x in p:
|
||
if not isinstance(x, dict) or not isinstance(x.get("section"), str) or not isinstance(x.get("problem"), str):
|
||
return None
|
||
out.append({"section": x["section"].strip(), "problem": x["problem"].strip()})
|
||
return out or None
|
||
|
||
|
||
def _resolve_gliederung(data, entries: dict[int, str], soll_min: int, soll_max: int) -> list[dict] | None:
|
||
"""{"kapitel": [{"titel", "bausteine": [Titel]}]} → [{"title", "nums"}].
|
||
|
||
`soll_min`/`soll_max` = erlaubte Spanne gewählter Bausteine (mit kleiner Toleranz).
|
||
"""
|
||
if not isinstance(data, dict) or not isinstance(data.get("kapitel"), list):
|
||
return None
|
||
idx = _titel_index(entries)
|
||
chapters: list[dict] = []
|
||
seen: set[int] = set()
|
||
total = unknown = 0
|
||
for ch in data["kapitel"]:
|
||
if not isinstance(ch, dict) or not isinstance(ch.get("bausteine"), list):
|
||
return None
|
||
nums = []
|
||
for t in ch["bausteine"]:
|
||
total += 1
|
||
num = _titel_aufloesen(idx, t) if isinstance(t, str) else None
|
||
if num is None:
|
||
unknown += 1
|
||
elif num not in seen:
|
||
nums.append(num)
|
||
seen.add(num)
|
||
if nums:
|
||
chapters.append({"title": str(ch.get("titel", "")).strip() or "Kapitel", "nums": nums})
|
||
if not chapters or total == 0:
|
||
return None
|
||
if (total - unknown) / total < 0.85:
|
||
return None
|
||
if len(seen) < 0.9 * soll_min or len(seen) > 1.1 * soll_max:
|
||
return None
|
||
return chapters
|
||
|
||
|
||
def _split_chunks(chapters: list[dict], n: int) -> list[list[dict]]:
|
||
"""Teilt Kapitel in bis zu n zusammenhängende Chunks, balanciert nach Section-Anzahl."""
|
||
n = max(1, min(n, len(chapters)))
|
||
chunks: list[list[dict]] = []
|
||
current: list[dict] = []
|
||
count = 0
|
||
remaining_total = sum(len(c["nums"]) for c in chapters)
|
||
remaining_chunks = n
|
||
for ch in chapters:
|
||
current.append(ch)
|
||
count += len(ch["nums"])
|
||
if remaining_chunks > 1 and count >= remaining_total / remaining_chunks:
|
||
chunks.append(current)
|
||
remaining_total -= count
|
||
remaining_chunks -= 1
|
||
current = []
|
||
count = 0
|
||
if current:
|
||
chunks.append(current)
|
||
return chunks
|
||
|
||
|
||
def _zuteilung_text(chunk: list[dict], entries: dict[int, str]) -> str:
|
||
lines = []
|
||
for ch in chunk:
|
||
lines.append(f"KAPITEL: {ch['title']}")
|
||
lines.extend(f"- {entries[num]}" for num in ch["nums"])
|
||
return "\n".join(lines)
|
||
|
||
|
||
_FRAGMENT_KAPITEL_RE = re.compile(r"<!--\s*kapitel\s*:\s*(.*?)\s*-->", re.IGNORECASE)
|
||
_FRAGMENT_SECTION_RE = re.compile(r"<!--\s*section\s*:\s*(.*?)\s*-->", re.IGNORECASE)
|
||
|
||
|
||
def _parse_fragment(text: str) -> list[dict]:
|
||
"""Parst eine Writer-Datei → [{"kapitel", "titel", "md"}] in Datei-Reihenfolge."""
|
||
sections: list[dict] = []
|
||
kapitel = None
|
||
current = None
|
||
for line in text.splitlines():
|
||
s = line.strip()
|
||
m = _FRAGMENT_KAPITEL_RE.match(s)
|
||
if m:
|
||
kapitel = m.group(1)
|
||
current = None
|
||
continue
|
||
m = _FRAGMENT_SECTION_RE.match(s)
|
||
if m:
|
||
current = {"kapitel": kapitel, "titel": m.group(1), "md": []}
|
||
sections.append(current)
|
||
continue
|
||
if current is not None:
|
||
current["md"].append(line)
|
||
for sec in sections:
|
||
sec["md"] = "\n".join(sec["md"]).strip()
|
||
return sections
|
||
|
||
|
||
async def _generate_onepager(
|
||
guide_id: str, topic: str, instructions: str, provider: str,
|
||
project: Path | None, content_path: Path,
|
||
) -> list[dict] | None:
|
||
def is_cancelled() -> bool:
|
||
return guide_id in _cancelled
|
||
|
||
# 3×3-Raster: 7 Karten mit festen Schlüsseln (Reihenfolge = Lesereihenfolge mobil)
|
||
KARTEN_KEYS = ("info", "eigenschaften", "beispiel", "zusammenhaenge", "voraussetzungen", "modern", "veraltet")
|
||
|
||
def karten_schema(data):
|
||
"""{"karten": {key: {titel, md}}} → Liste · sonst None."""
|
||
if not isinstance(data, dict):
|
||
return None
|
||
karten = data.get("karten")
|
||
if not isinstance(karten, dict):
|
||
return None
|
||
out = []
|
||
for key in KARTEN_KEYS:
|
||
k = karten.get(key)
|
||
if not isinstance(k, dict) or not isinstance(k.get("titel"), str) or not isinstance(k.get("md"), str):
|
||
return None
|
||
titel, md = k["titel"].strip(), k["md"].strip()
|
||
if not titel or len(md) < 5: # abgebrochene/leere Karten sind ungültig
|
||
return None
|
||
out.append({"key": key, "titel": titel, "md": md})
|
||
return out
|
||
|
||
d, stem = content_path.parent, content_path.stem
|
||
recherche_path = d / f"{stem}.recherche.md"
|
||
recherche_check_path = d / f"{stem}.recherche-check.json"
|
||
karten_path = d / f"{stem}.karten.json"
|
||
check_path = d / f"{stem}.onepager-check.json"
|
||
|
||
# Projekte bekommen eigene Recherche-Dimensionen — Produkt-Fragen
|
||
# (Version, Lizenz, Alternativen) laufen dort ins Leere.
|
||
if project:
|
||
source = _prompt("OnePager-Quelle-Projekt", project=project)
|
||
recherche_template = "OnePager-Recherche-Projekt"
|
||
recherche_check_template = "OnePager-Recherche-Check-Projekt"
|
||
else:
|
||
source = _prompt("OnePager-Quelle-Thema", topic=topic)
|
||
recherche_template = "OnePager-Recherche"
|
||
recherche_check_template = "OnePager-Recherche-Check"
|
||
|
||
def recherche_payload(result=None):
|
||
if not recherche_path.exists():
|
||
return None
|
||
text = recherche_path.read_text(encoding="utf-8").strip()
|
||
return text or None
|
||
|
||
# Schritt 1: Recherche — vorhandene Datei wird übernommen (Resume)
|
||
recherche = recherche_payload()
|
||
if recherche is None:
|
||
await _set_step(guide_id, 0, "Recherchiere…")
|
||
slots = [{
|
||
"key": f"{guide_id}-recherche",
|
||
"prompt": _prompt(recherche_template, topic=topic, source=source, out_path=recherche_path, extra=_extra(instructions)),
|
||
"role": "quick", "capabilities": "files" if project else "full",
|
||
"payload": recherche_payload,
|
||
}]
|
||
res = await _race(topic, "OnePager-Recherche", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "OnePager-Recherche fehlgeschlagen")
|
||
return None
|
||
recherche = res[0]
|
||
|
||
# Schritt 2: Recherche-Prüfung — notiert Probleme; Anpassung macht ein Recherche-Agent
|
||
if not recherche_check_path.exists():
|
||
await _set_step(guide_id, 1, "Prüfe Recherche…")
|
||
slots = [{
|
||
"key": f"{guide_id}-recherche-check",
|
||
"prompt": _prompt(recherche_check_template, topic=topic, recherche=recherche, out_path=recherche_check_path),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result: _probleme_schema(_json_datei(recherche_check_path))),
|
||
}]
|
||
res = await _race(topic, "Recherche-Prüfung", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "Recherche-Prüfung fehlgeschlagen")
|
||
return None
|
||
probleme = res[0]
|
||
if probleme:
|
||
_log(topic, f"Recherche-Prüfung: {len(probleme)} Problem(e) notiert")
|
||
await _set_step(guide_id, 1, "Passe Recherche an…")
|
||
slots = [{
|
||
"key": f"{guide_id}-recherche-fix",
|
||
"prompt": _prompt(
|
||
"OnePager-Recherche-Fix",
|
||
topic=topic, source=source, recherche=recherche,
|
||
probleme="\n".join(f"- {p}" for p in probleme),
|
||
out_path=recherche_path, extra=_extra(instructions),
|
||
),
|
||
"role": "quick", "capabilities": "files" if project else "full",
|
||
"payload": recherche_payload,
|
||
}]
|
||
res = await _race(topic, "Recherche-Fix", slots, 1, _timeout("onepager_recherche"), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
_log(topic, "Recherche-Fix ungültig — ursprüngliche Recherche bleibt")
|
||
else:
|
||
recherche = res[0]
|
||
|
||
# Schritt 3: Bauen — Karten nur aus der Faktenbasis (Resume: gültige Datei wird übernommen)
|
||
karten = karten_schema(_json_datei(karten_path))
|
||
if karten is None:
|
||
await _set_step(guide_id, 2, "Baue OnePager…")
|
||
karten_path.unlink(missing_ok=True)
|
||
slots = [{
|
||
"key": f"{guide_id}-bauen",
|
||
"prompt": _prompt("OnePager-Bauen", topic=topic, recherche=recherche, out_path=karten_path, extra=_extra(instructions)),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result: karten_schema(_json_datei(karten_path))),
|
||
}]
|
||
res = await _race(topic, "OnePager-Bauen", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "OnePager-Bau fehlgeschlagen")
|
||
return None
|
||
karten = res[0]
|
||
|
||
def karten_block() -> str:
|
||
return "\n\n".join(f"### {k['titel']} [{k['key']}]\n{k['md']}" for k in karten)
|
||
|
||
# Schritt 4: Prüfung — notiert Probleme; Anpassung macht ein Bauen-Agent
|
||
if not check_path.exists():
|
||
await _set_step(guide_id, 3, "Prüfe OnePager…")
|
||
slots = [{
|
||
"key": f"{guide_id}-verify",
|
||
"prompt": _prompt("OnePager-Verifikation", topic=topic, recherche=recherche, karten=karten_block(), out_path=check_path),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result: _probleme_schema(_json_datei(check_path))),
|
||
}]
|
||
res = await _race(topic, "OnePager-Prüfung", slots, 1, _timeout("onepager_verify"), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "OnePager-Prüfung fehlgeschlagen")
|
||
return None
|
||
probleme = res[0]
|
||
if probleme:
|
||
_log(topic, f"OnePager-Prüfung: {len(probleme)} Problem(e) notiert")
|
||
await _set_step(guide_id, 3, "Passe OnePager an…")
|
||
slots = [{
|
||
"key": f"{guide_id}-karten-fix",
|
||
"prompt": _prompt(
|
||
"OnePager-Fix",
|
||
topic=topic, recherche=recherche, karten=karten_block(),
|
||
probleme="\n".join(f"- {p}" for p in probleme),
|
||
out_path=karten_path, extra=_extra(instructions),
|
||
),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result: karten_schema(_json_datei(karten_path))),
|
||
}]
|
||
res = await _race(topic, "OnePager-Fix", slots, 1, _timeout("onepager_bauen"), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
_log(topic, "OnePager-Fix ungültig — ursprüngliche Karten bleiben")
|
||
karten_path.write_text(
|
||
json.dumps({"karten": {k["key"]: {"titel": k["titel"], "md": k["md"]} for k in karten}}, ensure_ascii=False),
|
||
encoding="utf-8",
|
||
)
|
||
else:
|
||
karten = res[0]
|
||
|
||
sections = [
|
||
{"num": i, "title": k["titel"], "md": k["md"], "key": k["key"]}
|
||
for i, k in enumerate(karten, 1)
|
||
]
|
||
return [{"title": topic, "sections": sections}]
|
||
|
||
|
||
async def _generate_sections(
|
||
guide_id: str, topic: str, format_name: str, entries: dict[int, str],
|
||
facts: str, instructions: str, provider: str,
|
||
content_path: Path,
|
||
) -> list[dict] | None:
|
||
def is_cancelled() -> bool:
|
||
return guide_id in _cancelled
|
||
|
||
spec = (TEMPLATES_DIR / "Format" / "Section.md").read_text(encoding="utf-8")
|
||
files = _guide_files(content_path)
|
||
bausteine_liste = "\n".join(f"- {t}" for t in entries.values())
|
||
n = len(entries)
|
||
anteil_min, anteil_max, minimum, zweck = FORMAT_ANTEIL[format_name]
|
||
k_min = min(n, max(minimum, math.ceil(anteil_min * n)))
|
||
k_max = min(n, max(k_min, math.floor(anteil_max * n)))
|
||
auswahl_auftrag = (
|
||
f"Wähle MINDESTENS {k_min} und HÖCHSTENS {k_max} der Bausteine und baue daraus {zweck}. "
|
||
"Wähle, was diesem Zweck dient — lass weg, was dafür nicht nötig ist."
|
||
)
|
||
|
||
# Schritt 1: Auswahl — vorhandene gültige Datei wird übernommen (Resume)
|
||
auswahl = _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)
|
||
if auswahl is None:
|
||
await _set_step(guide_id, 0, "Wähle Bausteine…")
|
||
files["auswahl"].unlink(missing_ok=True)
|
||
slots = [{
|
||
"key": f"{guide_id}-auswahl",
|
||
"prompt": _prompt(
|
||
"Guide-Auswahl",
|
||
topic=topic, format_name=format_name, bausteine=bausteine_liste,
|
||
auswahl_auftrag=auswahl_auftrag, out_path=files["auswahl"], extra=_extra(instructions),
|
||
),
|
||
"role": "guide", "capabilities": "files",
|
||
"payload": (lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)),
|
||
}]
|
||
res = await _race(topic, "Guide-Auswahl", slots, 1, _timeout("guide_auswahl", n), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "Auswahl fehlgeschlagen")
|
||
return None
|
||
auswahl = res[0]
|
||
|
||
def auswahl_titel() -> str:
|
||
return "\n".join(f"- {_titel(entries[num])}" for num in auswahl)
|
||
|
||
def auswahl_json() -> str:
|
||
return json.dumps({"bausteine": [_titel(entries[num]) for num in auswahl]}, ensure_ascii=False)
|
||
|
||
# Schritt 2: Auswahl-Prüfung — notiert Probleme; Anpassung macht ein Auswahl-Agent
|
||
if not files["auswahl_check"].exists():
|
||
await _set_step(guide_id, 1, "Prüfe Auswahl…")
|
||
slots = [{
|
||
"key": f"{guide_id}-auswahl-check",
|
||
"prompt": _prompt(
|
||
"Guide-Auswahl-Check",
|
||
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
|
||
bausteine=bausteine_liste, auswahl=auswahl_titel(),
|
||
out_path=files["auswahl_check"], extra=_extra(instructions),
|
||
),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result: _probleme_schema(_json_datei(files["auswahl_check"]))),
|
||
}]
|
||
res = await _race(topic, "Auswahl-Prüfung", slots, 1, _timeout("guide_check", len(auswahl)), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "Auswahl-Prüfung fehlgeschlagen")
|
||
return None
|
||
probleme = res[0]
|
||
if probleme:
|
||
_log(topic, f"Auswahl-Prüfung: {len(probleme)} Problem(e) notiert")
|
||
await _set_step(guide_id, 1, "Passe Auswahl an…")
|
||
slots = [{
|
||
"key": f"{guide_id}-auswahl-fix",
|
||
"prompt": _prompt(
|
||
"Guide-Auswahl-Fix",
|
||
topic=topic, format_name=format_name, auswahl_auftrag=auswahl_auftrag,
|
||
bausteine=bausteine_liste, auswahl=auswahl_titel(),
|
||
probleme="\n".join(f"- {p}" for p in probleme),
|
||
out_path=files["auswahl"], extra=_extra(instructions),
|
||
),
|
||
"role": "guide", "capabilities": "files",
|
||
"payload": (lambda result: _resolve_auswahl(_json_datei(files["auswahl"]), entries, k_min, k_max)),
|
||
}]
|
||
res = await _race(topic, "Auswahl-Fix", slots, 1, _timeout("guide_auswahl", n), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
_log(topic, "Auswahl-Fix ungültig — ursprüngliche Auswahl bleibt")
|
||
files["auswahl"].write_text(auswahl_json(), encoding="utf-8")
|
||
else:
|
||
auswahl = res[0]
|
||
|
||
sel_entries = {num: entries[num] for num in auswahl}
|
||
soll = len(sel_entries)
|
||
sel_liste = "\n".join(f"- {t}" for t in sel_entries.values())
|
||
|
||
# Schritt 3: Gliederung der festen Auswahl
|
||
plan = _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)
|
||
if plan is None:
|
||
await _set_step(guide_id, 2, "Plane Gliederung…")
|
||
files["gliederung"].unlink(missing_ok=True)
|
||
slots = [{
|
||
"key": f"{guide_id}-gliederung",
|
||
"prompt": _prompt(
|
||
"Guide-Gliederung",
|
||
topic=topic, format_name=format_name, bausteine=sel_liste,
|
||
out_path=files["gliederung"], extra=_extra(instructions),
|
||
),
|
||
"role": "guide", "capabilities": "files",
|
||
"payload": (lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)),
|
||
}]
|
||
res = await _race(topic, "Gliederung", slots, 1, _timeout("plan", soll), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "Gliederung fehlgeschlagen")
|
||
return None
|
||
plan = res[0]
|
||
|
||
def gliederung_text() -> str:
|
||
return "\n".join(_zuteilung_text([ch], {num: _titel(entries[num]) for num in ch["nums"]}) for ch in plan)
|
||
|
||
def gliederung_json() -> str:
|
||
return json.dumps(
|
||
{"kapitel": [{"titel": ch["title"], "bausteine": [_titel(entries[num]) for num in ch["nums"]]} for ch in plan]},
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
# Schritt 4: Gliederungs-Prüfung
|
||
if not files["gliederung_check"].exists():
|
||
await _set_step(guide_id, 3, "Prüfe Gliederung…")
|
||
slots = [{
|
||
"key": f"{guide_id}-gliederung-check",
|
||
"prompt": _prompt(
|
||
"Guide-Gliederung-Check",
|
||
topic=topic, format_name=format_name, zweck=zweck,
|
||
auswahl=auswahl_titel(), gliederung=gliederung_text(),
|
||
out_path=files["gliederung_check"], extra=_extra(instructions),
|
||
),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result: _probleme_schema(_json_datei(files["gliederung_check"]))),
|
||
}]
|
||
res = await _race(topic, "Gliederungs-Prüfung", slots, 1, _timeout("guide_check", soll), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "Gliederungs-Prüfung fehlgeschlagen")
|
||
return None
|
||
probleme = res[0]
|
||
if probleme:
|
||
_log(topic, f"Gliederungs-Prüfung: {len(probleme)} Problem(e) notiert")
|
||
await _set_step(guide_id, 3, "Passe Gliederung an…")
|
||
slots = [{
|
||
"key": f"{guide_id}-gliederung-fix",
|
||
"prompt": _prompt(
|
||
"Guide-Gliederung-Fix",
|
||
topic=topic, format_name=format_name,
|
||
auswahl=auswahl_titel(), gliederung=gliederung_text(),
|
||
probleme="\n".join(f"- {p}" for p in probleme),
|
||
out_path=files["gliederung"], extra=_extra(instructions),
|
||
),
|
||
"role": "guide", "capabilities": "files",
|
||
"payload": (lambda result: _resolve_gliederung(_json_datei(files["gliederung"]), sel_entries, soll, soll)),
|
||
}]
|
||
res = await _race(topic, "Gliederungs-Fix", slots, 1, _timeout("plan", soll), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
_log(topic, "Gliederungs-Fix ungültig — ursprüngliche Gliederung bleibt")
|
||
files["gliederung"].write_text(gliederung_json(), encoding="utf-8")
|
||
else:
|
||
plan = res[0]
|
||
|
||
# Schritt 5: Schreiben — vorhandene Chunk-Dateien werden übernommen (Resume)
|
||
total_sections = sum(len(c["nums"]) for c in plan)
|
||
chunks = _split_chunks(plan, min(WRITER_MAX, max(1, math.ceil(total_sections / WRITER_SECTIONS))))
|
||
zuteilungen = [_zuteilung_text(chunk, entries) for chunk in chunks]
|
||
chunk_sizes = [sum(len(c["nums"]) for c in chunk) for chunk in chunks]
|
||
writer_count = len(zuteilungen)
|
||
paths = [content_path.parent / f"{content_path.stem}.chunk-{i}.md" for i in range(1, writer_count + 1)]
|
||
offen = [i for i, p in enumerate(paths) if not p.exists()]
|
||
if offen:
|
||
await _set_step(guide_id, 4, f"Schreibe Sections ({writer_count} Writer)…" if writer_count > 1 else "Schreibe Sections…")
|
||
results = await asyncio.gather(*[
|
||
run_agent(
|
||
f"{guide_id}-w{i + 1}",
|
||
_prompt(
|
||
"Guide-Writer",
|
||
topic=topic, format_name=format_name, zuteilung=zuteilungen[i],
|
||
facts=facts, spec=spec, out_path=paths[i], extra=_extra(instructions),
|
||
),
|
||
_timeout("writer", chunk_sizes[i]), provider=provider, role="guide", capabilities="full",
|
||
)
|
||
for i in offen
|
||
], return_exceptions=True)
|
||
if is_cancelled():
|
||
return None
|
||
for i, r in zip(offen, results):
|
||
if isinstance(r, BaseException):
|
||
_log(topic, f"Writer {i + 1}: {type(r).__name__}: {r}")
|
||
elif r[0] != 0:
|
||
_log(topic, f"Writer {i + 1}: {_claude_error('Fehler', *r)}")
|
||
elif not paths[i].exists():
|
||
_log(topic, f"Writer {i + 1}: keine Ausgabedatei erstellt")
|
||
if not any(p.exists() for p in paths):
|
||
await _fail(guide_id, _gather_error("Writer-Fehler", list(results)))
|
||
return None
|
||
|
||
idx = _titel_index(entries)
|
||
by_num: dict[int, dict] = {}
|
||
for p in paths:
|
||
if not p.exists():
|
||
continue
|
||
for sec in _parse_fragment(p.read_text(encoding="utf-8")):
|
||
num = _titel_aufloesen(idx, sec["titel"])
|
||
if num is None:
|
||
_log(topic, f"Writer lieferte unbekannte Section '{sec['titel'][:40]}' (ignoriert)")
|
||
elif num not in by_num:
|
||
by_num[num] = sec
|
||
if not by_num:
|
||
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
|
||
return None
|
||
|
||
# Schritt 6: Lese-Prüfung pro Writer-Paket — Fix beauftragt Writer nur mit beanstandeten Sections
|
||
chunk_nums = [[num for ch in chunk for num in ch["nums"] if num in by_num] for chunk in chunks]
|
||
check_paths = [content_path.parent / f"{content_path.stem}.lese-check-{i}.json" for i in range(1, writer_count + 1)]
|
||
offen_checks = [i for i, p in enumerate(check_paths) if _lese_probleme_schema(_json_datei(p)) is None and chunk_nums[i]]
|
||
if offen_checks:
|
||
await _set_step(guide_id, 5, f"Prüfe Lesbarkeit ({len(offen_checks)} Prüfer)…" if len(offen_checks) > 1 else "Prüfe Lesbarkeit…")
|
||
|
||
def sections_text(nums: list[int]) -> str:
|
||
return "\n\n".join(f"SECTION: {_titel(entries[num])}\n{by_num[num]['md']}" for num in nums)
|
||
|
||
slots = [{
|
||
"key": f"{guide_id}-lese-check-{i + 1}",
|
||
"prompt": _prompt(
|
||
"Guide-Lese-Check",
|
||
topic=topic, format_name=format_name, spec=spec,
|
||
sections=sections_text(chunk_nums[i]),
|
||
out_path=check_paths[i], extra=_extra(instructions),
|
||
),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result, p=check_paths[i]: _lese_probleme_schema(_json_datei(p))),
|
||
} for i in offen_checks]
|
||
res = await _race(topic, "Lese-Prüfung", slots, len(slots), _timeout("lese_check", max(chunk_sizes)), provider, cancelled=is_cancelled)
|
||
if is_cancelled():
|
||
return None
|
||
if res is None:
|
||
await _fail(guide_id, "Lese-Prüfung fehlgeschlagen")
|
||
return None
|
||
|
||
probleme_by_num: dict[int, str] = {}
|
||
for p in check_paths:
|
||
for item in (_lese_probleme_schema(_json_datei(p)) or []):
|
||
num = _titel_aufloesen(idx, item["section"])
|
||
if num in by_num and num not in probleme_by_num:
|
||
probleme_by_num[num] = item["problem"]
|
||
|
||
if probleme_by_num:
|
||
_log(topic, f"Lese-Prüfung: {len(probleme_by_num)} Section(s) beanstandet")
|
||
await _set_step(guide_id, 5, f"Überarbeite {len(probleme_by_num)} Section(s)…")
|
||
fix_chunks = [[num for num in nums if num in probleme_by_num] for nums in chunk_nums]
|
||
fix_offen = [i for i, nums in enumerate(fix_chunks) if nums]
|
||
fix_paths = [content_path.parent / f"{content_path.stem}.fix-{i + 1}.md" for i in range(writer_count)]
|
||
|
||
def auftraege_text(nums: list[int]) -> str:
|
||
return "\n\n".join(
|
||
f"SECTION: {_titel(entries[num])}\nPROBLEM: {probleme_by_num[num]}\nAKTUELLER INHALT:\n{by_num[num]['md']}"
|
||
for num in nums
|
||
)
|
||
|
||
results = await asyncio.gather(*[
|
||
run_agent(
|
||
f"{guide_id}-fix-w{i + 1}",
|
||
_prompt(
|
||
"Guide-Sections-Fix",
|
||
topic=topic, format_name=format_name, facts=facts, spec=spec,
|
||
auftraege=auftraege_text(fix_chunks[i]),
|
||
out_path=fix_paths[i], extra=_extra(instructions),
|
||
),
|
||
_timeout("writer", len(fix_chunks[i])), provider=provider, role="guide", capabilities="full",
|
||
)
|
||
for i in fix_offen
|
||
], return_exceptions=True)
|
||
if is_cancelled():
|
||
return None
|
||
for i, r in zip(fix_offen, results):
|
||
if isinstance(r, BaseException) or (not isinstance(r, BaseException) and r[0] != 0):
|
||
_log(topic, f"Sections-Fix {i + 1} fehlgeschlagen — Original bleibt")
|
||
ersetzt = 0
|
||
for i in fix_offen:
|
||
if not fix_paths[i].exists():
|
||
continue
|
||
for sec in _parse_fragment(fix_paths[i].read_text(encoding="utf-8")):
|
||
num = _titel_aufloesen(idx, sec["titel"])
|
||
if num in probleme_by_num and sec["md"].strip():
|
||
by_num[num] = sec
|
||
ersetzt += 1
|
||
_log(topic, f"Lese-Prüfung: {ersetzt} Section(s) überarbeitet")
|
||
|
||
await _set_progress(guide_id, "Setze zusammen…")
|
||
chapters: list[dict] = []
|
||
for ch in plan:
|
||
sections = [
|
||
{"num": num, "title": _titel(entries[num]), "md": by_num[num]["md"]}
|
||
for num in ch["nums"] if num in by_num
|
||
]
|
||
if sections:
|
||
chapters.append({"title": ch["title"], "sections": sections})
|
||
geplant = {num for ch in plan for num in ch["nums"]}
|
||
missing = sorted(geplant - set(by_num))
|
||
if missing:
|
||
_log(topic, f"Sections fehlen in der Writer-Ausgabe: {[_titel(entries[n]) for n in missing]}")
|
||
if not chapters:
|
||
await _fail(guide_id, "Keine Sections in der Writer-Ausgabe gefunden")
|
||
return None
|
||
return chapters
|
||
|
||
|
||
async def generate_guide(guide_id: str, topic: str, format_name: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
|
||
async with _semaphore:
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, status="generating", progress="Starte…", updated_at=now)
|
||
|
||
content_path = guide_content_path(topic, format_name)
|
||
content_path.parent.mkdir(parents=True, exist_ok=True)
|
||
project = project_dir(topic) if project_dir(topic).is_dir() else None
|
||
|
||
try:
|
||
if guide_id in _cancelled:
|
||
return
|
||
|
||
if project:
|
||
await asyncio.to_thread(_pdfs_konvertieren, project)
|
||
|
||
# „Neu erstellen": fertiger Guide → kompletter Frischstart.
|
||
# Sonst sind Schritt-Dateien Reste eines Abbruchs/Fehlers → Resume.
|
||
if content_path.exists():
|
||
for p_alt in guide_slot_dateien(content_path):
|
||
p_alt.unlink(missing_ok=True)
|
||
|
||
if format_name == "OnePager":
|
||
chapters = await _generate_onepager(guide_id, topic, instructions, provider, project, content_path)
|
||
else:
|
||
alle = _lade_bausteine(bausteine_path(topic).read_text(encoding="utf-8"))
|
||
if not alle:
|
||
await _fail(guide_id, "Keine Bausteine gefunden")
|
||
return
|
||
entries = _eindeutige_titel(alle)
|
||
facts = _prompt("Guide-Fakten-Projekt", project=project) if project else _prompt("Guide-Fakten-Thema")
|
||
chapters = await _generate_sections(
|
||
guide_id, topic, format_name, entries,
|
||
facts, instructions, provider, content_path,
|
||
)
|
||
if chapters is None or guide_id in _cancelled:
|
||
return
|
||
|
||
content_path.write_text(
|
||
json.dumps({"topic": topic, "format": format_name, "chapters": chapters}, ensure_ascii=False, indent=1),
|
||
encoding="utf-8",
|
||
)
|
||
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
await update_guide(guide_id, status="done", progress=None, step=None, updated_at=now)
|
||
|
||
except asyncio.TimeoutError:
|
||
await _fail(guide_id, "Timeout bei der Generierung")
|
||
except FileNotFoundError:
|
||
await _fail(guide_id, "Bausteine fehlen")
|
||
except Exception as e:
|
||
log.exception("[%s] Guide-Generierung fehlgeschlagen (%s)", topic, guide_id)
|
||
await _fail(guide_id, str(e)[:2000])
|
||
finally:
|
||
_cancelled.discard(guide_id)
|
||
|
||
|
||
# --- Tutor-Chat ---
|
||
|
||
def _build_guide_chat_prompt(topic: str, format_name: str, section: str, outline: str, messages: list[dict]) -> str:
|
||
transcript = "\n".join(
|
||
f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}"
|
||
for m in messages
|
||
)
|
||
return _prompt(
|
||
"Chat",
|
||
topic=topic, format_name=format_name,
|
||
outline_block=outline.strip() or "(keine)",
|
||
section_block=section.strip() or "(kein Abschnitt erkannt)",
|
||
transcript=transcript,
|
||
)
|
||
|
||
|
||
async def chat_with_guide(topic: str, format_name: str, section: str, outline: str, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> str:
|
||
try:
|
||
prompt = _build_guide_chat_prompt(topic, format_name, section, outline, messages)
|
||
returncode, stdout, stderr = await run_agent(
|
||
"chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
|
||
reply = stdout.strip()
|
||
return reply or "Entschuldigung, ich habe keine Antwort erhalten."
|
||
except Exception:
|
||
log.warning("[%s] Guide-Chat fehlgeschlagen", topic, exc_info=True)
|
||
return "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
|
||
|
||
|
||
# --- Elemente (persönliche Zusammenfassung) ---
|
||
|
||
def _parse_json_text(text: str):
|
||
"""Parst JSON aus KI-Output (Code-Fences und Drumherum-Text tolerant).
|
||
|
||
Repariert unescapte Anführungszeichen in Strings (z. B. MiniMax: "Titel „p" geändert"):
|
||
das letzte `"` vor der Fehlerstelle escapen und erneut parsen.
|
||
"""
|
||
text = re.sub(r"^```(?:json)?\s*|\s*```$", "", (text or "").strip())
|
||
start, end = text.find("{"), text.rfind("}")
|
||
if start == -1 or end <= start:
|
||
return None
|
||
candidate = text[start:end + 1]
|
||
for _ in range(20):
|
||
try:
|
||
return json.loads(candidate)
|
||
except json.JSONDecodeError as e:
|
||
if not e.msg.startswith(("Expecting ',' delimiter", "Expecting ':' delimiter")):
|
||
return None
|
||
q = candidate.rfind('"', 0, e.pos)
|
||
if q <= 0:
|
||
return None
|
||
candidate = candidate[:q] + '\\"' + candidate[q + 1:]
|
||
except Exception:
|
||
return None
|
||
return None
|
||
|
||
|
||
def _element_fields(data: dict) -> dict | None:
|
||
"""Validiert KI-Element-JSON und normalisiert auf die DB-Felder."""
|
||
if not isinstance(data, dict):
|
||
return None
|
||
title = str(data.get("title", "")).strip()
|
||
if not title:
|
||
return None
|
||
listen = {}
|
||
for key in ("examples", "hints"):
|
||
raw = data.get(key, [])
|
||
listen[key] = [str(e).strip() for e in raw if str(e).strip()] if isinstance(raw, list) else []
|
||
return {
|
||
"title": title[:200],
|
||
"description": str(data.get("description", "")).strip(),
|
||
"examples": listen["examples"],
|
||
"hints": listen["hints"],
|
||
}
|
||
|
||
|
||
def _topic_context(topic: str, limit: int = 12000) -> str:
|
||
"""Bausteine + Guide-Inhalte des Themas als Kontext-Text (gekürzt)."""
|
||
parts: list[str] = []
|
||
bp = bausteine_path(topic)
|
||
if bp.exists():
|
||
parts.append(bp.read_text(encoding="utf-8"))
|
||
for fmt in ("FullGuide", "Guide", "MiniGuide", "OnePager"):
|
||
content = _json_datei(guide_content_path(topic, fmt))
|
||
if content:
|
||
for ch in content.get("chapters", []):
|
||
for sec in ch.get("sections", []):
|
||
parts.append(sec if isinstance(sec, str) else json.dumps(sec, ensure_ascii=False))
|
||
break # bester verfügbarer Guide reicht
|
||
text = "\n\n".join(parts).strip()
|
||
return text[:limit] if text else "(kein Material vorhanden)"
|
||
|
||
|
||
async def generate_element(topic: str, hint: str, provider: str = DEFAULT_PROVIDER) -> dict:
|
||
"""Erstellt Element-Felder per KI. Fallback: nur Titel aus dem Stichwort."""
|
||
fallback = {"title": hint.strip() or "Neues Element", "description": "", "examples": [], "hints": []}
|
||
try:
|
||
prompt = _prompt(
|
||
"Element-Create",
|
||
topic=topic, hint=hint.strip() or "(keins — wähle selbst ein Kernkonzept)",
|
||
context=_topic_context(topic),
|
||
)
|
||
returncode, stdout, _ = await run_agent(
|
||
"element-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return fallback
|
||
return _element_fields(_parse_json_text(stdout)) or fallback
|
||
except Exception:
|
||
log.warning("[%s] Element-Erstellung fehlgeschlagen", topic, exc_info=True)
|
||
return fallback
|
||
|
||
|
||
def _parse_suggestions(stdout: str) -> list[dict] | None:
|
||
"""Validiert Vorschlags-JSON aus KI-Output. None bei ungültigem JSON."""
|
||
data = _parse_json_text(stdout)
|
||
if not isinstance(data, dict):
|
||
return None
|
||
suggestions = []
|
||
for s in data.get("suggestions", []):
|
||
if not isinstance(s, dict):
|
||
continue
|
||
text = str(s.get("text", "")).strip()
|
||
target = s.get("target")
|
||
content = str(s.get("content", "")).strip()
|
||
if text and content and target in ("description", "examples", "hints"):
|
||
suggestions.append({"text": text, "target": target, "content": content})
|
||
return suggestions
|
||
|
||
|
||
async def check_element(element: dict, provider: str = DEFAULT_PROVIDER) -> list[dict] | None:
|
||
"""Zweischrittige Prüfung auf fehlende Infos: Recherche → Verifizieren. None bei Fehler."""
|
||
try:
|
||
element_json = json.dumps(
|
||
{k: element[k] for k in ("title", "description", "examples", "hints")},
|
||
ensure_ascii=False, indent=1,
|
||
)
|
||
context = _topic_context(element["topic"])
|
||
|
||
# Schritt 1: Recherche — breit Kandidaten sammeln
|
||
prompt = _prompt("Element-Check", topic=element["topic"], element_json=element_json, context=context)
|
||
returncode, stdout, _ = await run_agent(
|
||
"element-check-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return None
|
||
candidates = _parse_suggestions(stdout)
|
||
if candidates is None:
|
||
return None
|
||
if not candidates:
|
||
return []
|
||
|
||
# Schritt 2: Verifizieren — nur Wichtiges, nicht Redundantes durchlassen
|
||
prompt = _prompt(
|
||
"Element-Verify",
|
||
topic=element["topic"], element_json=element_json,
|
||
candidates_json=json.dumps({"suggestions": candidates}, ensure_ascii=False, indent=1),
|
||
context=context,
|
||
)
|
||
returncode, stdout, _ = await run_agent(
|
||
"element-verify-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return None
|
||
return _parse_suggestions(stdout)
|
||
except Exception:
|
||
log.warning("[%s] Element-Prüfung fehlgeschlagen", element.get("topic", "?"), exc_info=True)
|
||
return None
|
||
|
||
|
||
def _element_json(element: dict) -> str:
|
||
return json.dumps(
|
||
{k: element[k] for k in ("title", "description", "examples", "hints")},
|
||
ensure_ascii=False, indent=1,
|
||
)
|
||
|
||
|
||
def _validate_change(c, element: dict) -> dict | None:
|
||
"""Validiert einen Änderungs-Vorschlag aus KI-Output gegen das Element."""
|
||
if not isinstance(c, dict):
|
||
return None
|
||
text = str(c.get("text", "")).strip()
|
||
action = c.get("action")
|
||
target = c.get("target")
|
||
index = c.get("index")
|
||
content = str(c.get("content", "")).strip()
|
||
if not text or action not in ("entfernen", "anpassen", "hinzufuegen"):
|
||
return None
|
||
if target not in ("title", "description", "examples", "hints"):
|
||
return None
|
||
if action in ("anpassen", "hinzufuegen") and not content:
|
||
return None
|
||
if action == "entfernen" and target not in ("examples", "hints"):
|
||
return None
|
||
# Index nur für anpassen/entfernen in Listen-Feldern; muss existieren
|
||
if target in ("examples", "hints") and action in ("anpassen", "entfernen"):
|
||
if not isinstance(index, int) or not (0 <= index < len(element[target])):
|
||
return None
|
||
else:
|
||
index = None
|
||
return {"text": text, "action": action, "target": target, "index": index, "content": content}
|
||
|
||
|
||
async def chat_with_element(element: dict, messages: list[dict], provider: str = DEFAULT_PROVIDER) -> tuple[str, list[dict]]:
|
||
"""Chat zum Element. Gibt (Antwort, Änderungs-Vorschläge) zurück — ändert nichts direkt."""
|
||
fehler = "Entschuldigung, das hat nicht geklappt. Bitte versuche es erneut."
|
||
try:
|
||
transcript = "\n".join(
|
||
f"{'Nutzer' if m.get('role') == 'user' else 'Assistent'}: {m.get('content', '')}"
|
||
for m in messages
|
||
)
|
||
prompt = _prompt("Element-Chat", topic=element["topic"], element_json=_element_json(element), transcript=transcript)
|
||
returncode, stdout, _ = await run_agent(
|
||
"element-chat-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return fehler, []
|
||
data = _parse_json_text(stdout)
|
||
if not isinstance(data, dict):
|
||
return fehler, []
|
||
changes = [v for c in data.get("changes", []) if (v := _validate_change(c, element))]
|
||
reply = str(data.get("reply", "")).strip() or ("Vorschläge erstellt." if changes else fehler)
|
||
return reply, changes
|
||
except Exception:
|
||
log.warning("[%s] Element-Chat fehlgeschlagen", element.get("topic", "?"), exc_info=True)
|
||
return fehler, []
|
||
|
||
|
||
async def style_element(element: dict, provider: str = DEFAULT_PROVIDER) -> list[dict] | None:
|
||
"""Prüft ein Element auf die Stil-Regeln und schlägt Änderungen vor. None bei Fehler."""
|
||
try:
|
||
prompt = _prompt("Element-Stil", topic=element["topic"], element_json=_element_json(element))
|
||
returncode, stdout, _ = await run_agent(
|
||
"element-stil-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return None
|
||
data = _parse_json_text(stdout)
|
||
if not isinstance(data, dict):
|
||
return None
|
||
return [v for c in data.get("changes", []) if (v := _validate_change(c, element))]
|
||
except Exception:
|
||
log.warning("[%s] Stil-Prüfung fehlgeschlagen", element.get("topic", "?"), exc_info=True)
|
||
return None
|
||
|
||
|
||
async def refine_suggestion(element: dict, suggestion: dict, instruction: str, provider: str = DEFAULT_PROVIDER) -> dict | None:
|
||
"""Überarbeitet einen einzelnen Vorschlag nach Nutzer-Anweisung. None bei Fehler."""
|
||
try:
|
||
prompt = _prompt(
|
||
"Element-Refine",
|
||
topic=element["topic"], element_json=_element_json(element),
|
||
suggestion_json=json.dumps(suggestion, ensure_ascii=False, indent=1),
|
||
instruction=instruction,
|
||
)
|
||
returncode, stdout, _ = await run_agent(
|
||
"element-refine-" + str(uuid.uuid4()), prompt, 240, provider=provider, role="fast", capabilities="none"
|
||
)
|
||
if returncode != 0:
|
||
return None
|
||
data = _parse_json_text(stdout)
|
||
if not isinstance(data, dict):
|
||
return None
|
||
return _validate_change(data.get("change"), element)
|
||
except Exception:
|
||
log.warning("[%s] Vorschlags-Überarbeitung fehlgeschlagen", element.get("topic", "?"), exc_info=True)
|
||
return None
|