Files
creator/backend/bausteine.py
2026-06-12 17:18:42 +02:00

436 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Bausteine-Pipeline: Recherche-Konsens + Klärungs-Loop — reines Inventar, unsortiert.
5x Recherche (min. 3, Grace) → Mapping (Konsens/Rest) → Klärungs-Loop (max.
KONSENS_MAX_RUNDEN Runden): 3 Auswahl-Agenten (min. 2, Grace) entscheiden
über den strittigen Rest, ein Mapping-Agent sortiert in aufnehmen/verwerfen/
weiter strittig. Leerer Rest beendet den Loop; die letzte Runde muss alles
entscheiden. Races nutzen ein Grace-Fenster statt „erste N gewinnen": Nach dem
ersten gültigen Ergebnis dürfen die übrigen Agenten KONSENS_GRACE Sekunden
fertig werden. Der Konsens wird im Code akkumuliert — kein Agent re-emittiert
die Gesamtliste.
"""
import asyncio
import logging
import shutil
import subprocess
from pathlib import Path
from agents import kill_process
from config import KONSENS_GRACE, KONSENS_MAX_RUNDEN, DEFAULT_PROVIDER
from fsutil import atomic_write_text
from jsonio import read_json_file as _json_datei
from paths import arbeit_dir, bausteine_path, project_dir
from pipeline import (
CANCELLED, FAILED, GenContext, _extra, _log, _prompt, _race, _rest_schema,
_runde_schema, _semaphore, _str_liste, _timeout, run_single_slot,
)
from textkit import _eindeutige_titel, _parse_auswahl, _titel_aufloesen, _titel_index, _vormerge
log = logging.getLogger("creator.bausteine")
_bausteine_progress: dict[str, str] = {}
_bausteine_errors: dict[str, str] = {}
_bausteine_cancelled: set[str] = set()
_bausteine_step: dict[str, int] = {}
BAUSTEINE_STEPS = ("Recherche", "Konsolidierung", "Klärung")
def _bausteine_steps(topic: str) -> tuple:
"""Projekte haben einen zusätzlichen Schritt: Themenfeld-Ergänzung per Web-Recherche."""
if project_dir(topic).is_dir():
return BAUSTEINE_STEPS + ("Ergänzung",)
return BAUSTEINE_STEPS
def _bausteine_files(topic: str) -> dict:
arbeit = arbeit_dir(topic)
runden = range(1, KONSENS_MAX_RUNDEN + 1)
return {
"final": bausteine_path(topic),
"arbeit": arbeit,
"recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4, 5)],
"recherche_mapping": arbeit / "recherche-mapping.json",
"auswahl": {n: [arbeit / f"auswahl-r{n}-{i}.json" for i in (1, 2, 3)] for n in runden},
"mapping": {n: arbeit / f"auswahl-mapping-r{n}.json" for n in runden},
"ergaenzung": arbeit / "ergaenzung.json",
}
def _alle_slot_dateien(files: dict) -> list[Path]:
return [
*files["recherche"], files["recherche_mapping"],
*(p for slots in files["auswahl"].values() for p in slots),
*files["mapping"].values(), files["ergaenzung"],
]
def cancel_bausteine(topic: str) -> bool:
if topic not in _bausteine_progress:
return False
_bausteine_cancelled.add(topic)
kill_process(f"bausteine-{topic}-")
return True
def _resume_step(topic: str) -> int:
"""Erster noch offener Schritt anhand der persistierten Zwischendateien."""
files = _bausteine_files(topic)
if sum(p.exists() for p in files["recherche"]) < 3:
return 0
if not files["recherche_mapping"].exists():
return 1
mapping = _mapping_schema(_json_datei(files["recherche_mapping"]))
geklaert = mapping is not None and (
not mapping[1] # kein strittiger Rest
or any((r := _runde_schema(_json_datei(p))) is not None and not r[1] for p in files["mapping"].values())
)
if not geklaert:
return 2
if project_dir(topic).is_dir() and not files["ergaenzung"].exists():
return 3
return len(_bausteine_steps(topic))
def bausteine_status(topic: str) -> dict:
steps = _bausteine_steps(topic)
ready = bausteine_path(topic).exists()
generating = topic in _bausteine_progress
partial = False
if generating:
current = _bausteine_step.get(topic)
states = [
"pending" if current is None else "done" if i < current else "active" if i == current else "pending"
for i in range(len(steps))
]
elif ready:
states = ["done"] * len(steps)
else:
nxt = _resume_step(topic)
partial = nxt > 0
states = ["done" if i < nxt else "pending" for i in range(len(steps))]
return {
"ready": ready,
"generating": generating,
"progress": _bausteine_progress.get(topic),
"error": _bausteine_errors.get(topic),
"partial": partial,
"steps": [{"label": label, "state": s} for label, s in zip(steps, states)],
}
def active_bausteine() -> list[dict]:
return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()]
def reset_bausteine(topic: str) -> None:
files = _bausteine_files(topic)
files["final"].unlink(missing_ok=True)
shutil.rmtree(files["arbeit"], ignore_errors=True)
_bausteine_errors.pop(topic, None)
def _ergaenzung_schema(data):
"""{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None."""
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
return None
out = []
for b in data["bausteine"]:
if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str):
return None
titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip()
if not titel:
return None
out.append((titel, beschreibung))
return out
def _pdfs_konvertieren(project: Path) -> None:
"""PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern.
Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die
.txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet.
Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt
unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten).
"""
pdfs = list(project.rglob("*.pdf"))
if not pdfs:
return
if shutil.which("pdftotext") is None:
raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden")
for pdf in pdfs:
txt = pdf.with_suffix(".txt")
if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime:
continue
try:
subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120)
_log(project.name, f"PDF konvertiert: {pdf.name}{txt.name}")
except Exception as e:
raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e
def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str:
if project:
source = _prompt("Bausteine-Quelle-Projekt", project=project)
else:
source = _prompt("Bausteine-Quelle-Thema", topic=topic)
return _prompt(
"Bausteine-Recherche",
topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions),
)
def _file_payload(path: Path):
"""Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält."""
if not path.exists():
return None
text = path.read_text(encoding="utf-8")
return text if _parse_auswahl(text) else None
def _mapping_schema(data):
"""{"bausteine": [str, ≥1], "rest": [str]} → (bausteine, rest) · sonst None."""
if not isinstance(data, dict):
return None
bausteine = _str_liste(data.get("bausteine"))
rest = _str_liste(data.get("rest"))
if not bausteine or rest is None:
return None
return bausteine, rest
async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
if topic in _bausteine_progress:
return
_bausteine_progress[topic] = "Wartend…"
_bausteine_errors.pop(topic, None)
files = _bausteine_files(topic)
final_path = files["final"]
project = project_dir(topic) if project_dir(topic).is_dir() else None
def set_p(msg: str, step: int | None = None) -> None:
_bausteine_progress[topic] = msg
if step is not None:
_bausteine_step[topic] = step
def is_cancelled() -> bool:
return topic in _bausteine_cancelled
def abgebrochen() -> None:
_bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten"
ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled)
try:
async with _semaphore:
files["arbeit"].mkdir(parents=True, exist_ok=True)
if project:
await asyncio.to_thread(_pdfs_konvertieren, project)
# „Neu erstellen": fertige Bausteine → kompletter Frischstart.
# Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume.
if final_path.exists():
for p_alt in _alle_slot_dateien(files):
p_alt.unlink(missing_ok=True)
# Schritt 1: 5 Recherche-Agenten, min. 3 mit Grace-Fenster — alle gültigen
# Slot-Dateien fließen ins Mapping (kein Kappen mehr bei 3)
recherchen: list[str] = []
offen = []
for i, path in enumerate(files["recherche"], 1):
text = _file_payload(path)
if text is not None:
recherchen.append(text)
else:
offen.append((i, path))
vorhanden = len(recherchen)
set_p(f"Recherche läuft ({vorhanden} gültig, min. 3)…", step=0)
if vorhanden < 3:
caps = "files" if project else "full"
slots = [
{
"key": f"bausteine-{topic}-recherche-{i}",
"prompt": _build_recherche_prompt(topic, path, instructions, project),
"role": "quick", "capabilities": caps,
"payload": (lambda result, p=path: _file_payload(p)),
}
for i, path in offen
]
neue = await _race(
topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider,
on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c} gültig, min. 3)…"),
cancelled=is_cancelled, grace=KONSENS_GRACE,
)
if is_cancelled():
abgebrochen()
return
if neue is None:
_bausteine_errors[topic] = "Recherche fehlgeschlagen (Minimum nicht erreicht)"
return
recherchen += neue
# Schritt 2: Recherche-Mapping — Code-Vormerge (exakte Titel) + 1 Agent
# für semantische Dubletten und Konsens/Rest-Teilung (fatal)
mapping = _mapping_schema(_json_datei(files["recherche_mapping"]))
if mapping is None:
set_p("Konsolidiere Recherche…", step=1)
files["recherche_mapping"].unlink(missing_ok=True)
gemergt = _vormerge([_parse_auswahl(t) for t in recherchen])
eintraege = "\n".join(f"{i}. {text} ({n}× genannt)" for i, (text, n) in enumerate(gemergt, 1))
status, mapping = await run_single_slot(
ctx, "Recherche-Mapping",
key=f"bausteine-{topic}-recherche-mapping",
prompt=_prompt(
"Bausteine-Recherche-Mapping",
topic=topic, n=len(recherchen), eintraege=eintraege,
out_path=files["recherche_mapping"],
),
role="judge", capabilities="files",
payload=lambda result: _mapping_schema(_json_datei(files["recherche_mapping"])),
timeout=_timeout("recherche_mapping", len(gemergt)),
)
if status == CANCELLED:
abgebrochen()
return
if status == FAILED:
_bausteine_errors[topic] = "Recherche-Mapping fehlgeschlagen"
return
konsens, rest = mapping
# Klärungs-Loop: 3 Auswahl-Agenten entscheiden über den Rest, ein
# Mapping-Agent sortiert in aufnehmen/verwerfen/weiter strittig.
# Leerer Rest beendet den Loop; Runde KONSENS_MAX_RUNDEN muss
# alles entscheiden. Der Konsens wächst nur hier im Code.
runde = 0
while rest and runde < KONSENS_MAX_RUNDEN:
runde += 1
final_runde = runde == KONSENS_MAX_RUNDEN
set_p(f"Klärung läuft (Runde {runde}/{KONSENS_MAX_RUNDEN})…", step=2)
mapping_path = files["mapping"][runde]
# Resume: fertiges Runden-Mapping wird direkt übernommen
ergebnis = _runde_schema(_json_datei(mapping_path), final=final_runde)
if ergebnis is None:
mapping_path.unlink(missing_ok=True)
konsens_block = "\n".join(f"- {t}" for t in konsens)
rest_block = "\n".join(f"- {t}" for t in rest)
# 3 Auswahl-Agenten, min. 2 mit Grace-Fenster
entscheidungen = []
offen = []
for i, path in enumerate(files["auswahl"][runde], 1):
res = _rest_schema(_json_datei(path))
if res is not None:
entscheidungen.append(res)
else:
offen.append((i, path))
if len(entscheidungen) < 2:
slots = [
{
"key": f"bausteine-{topic}-auswahl-r{runde}-{i}",
"prompt": _prompt(
"Bausteine-Auswahl",
topic=topic, konsens=konsens_block, rest=rest_block, out_path=path,
),
"role": "fast", "capabilities": "files",
"payload": (lambda result, p=path: _rest_schema(_json_datei(p))),
}
for i, path in offen
]
neue = await _race(
topic, f"Auswahl r{runde}", slots, 2 - len(entscheidungen),
_timeout("auswahl", len(rest)), provider,
cancelled=is_cancelled, grace=KONSENS_GRACE,
)
if is_cancelled():
abgebrochen()
return
if neue is None:
_bausteine_errors[topic] = f"Auswahl fehlgeschlagen (Runde {runde}, Minimum nicht erreicht)"
return
entscheidungen += neue
# Votum pro Rest-Eintrag deterministisch zählen
indizes = [_titel_index(dict(enumerate(e, 1))) for e in entscheidungen]
voten = "\n".join(
f"{i}. {text} (von {sum(1 for idx in indizes if _titel_aufloesen(idx, text) is not None)}"
f"/{len(entscheidungen)} Agenten übernommen)"
for i, text in enumerate(rest, 1)
)
final_zusatz = (
"\n- LETZTE RUNDE: Es gibt keine weitere Runde. `rest` MUSS leer sein"
" — entscheide JEDEN Eintrag selbst: aufnehmen oder verwerfen."
if final_runde else ""
)
status, ergebnis = await run_single_slot(
ctx, f"Auswahl-Mapping r{runde}",
key=f"bausteine-{topic}-auswahl-mapping-r{runde}",
prompt=_prompt(
"Bausteine-Auswahl-Mapping",
topic=topic, n=len(entscheidungen), konsens=konsens_block,
rest=voten, final=final_zusatz, out_path=mapping_path,
),
role="judge", capabilities="files",
payload=lambda result, p=mapping_path, f=final_runde: _runde_schema(_json_datei(p), final=f),
timeout=_timeout("auswahl_mapping", len(rest)),
)
if status == CANCELLED:
abgebrochen()
return
if status == FAILED:
_bausteine_errors[topic] = f"Auswahl-Mapping fehlgeschlagen (Runde {runde})"
return
aufnehmen, rest = ergebnis
_log(topic, f"Klärung Runde {runde}: {len(aufnehmen)} aufgenommen, {len(rest)} weiter strittig")
konsens = konsens + aufnehmen
entries = {i: t for i, t in enumerate(konsens, 1)}
# Nur Projekte: Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt,
# ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung].
if project:
set_p("Ergänze Themenfeld…", step=3)
erg_path = files["ergaenzung"]
ergaenzungen = _ergaenzung_schema(_json_datei(erg_path))
if ergaenzungen is None:
erg_path.unlink(missing_ok=True)
status, ergaenzungen = await run_single_slot(
ctx, "Ergänzung",
key=f"bausteine-{topic}-ergaenzung-1",
prompt=_prompt(
"Bausteine-Ergaenzung",
topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()),
out_path=erg_path, extra=_extra(instructions),
),
role="quick", capabilities="full",
payload=lambda result: _ergaenzung_schema(_json_datei(erg_path)),
timeout=_timeout("ergaenzung"),
)
if status == CANCELLED:
abgebrochen()
return
if status == FAILED:
_bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)"
return
idx = _titel_index(entries)
neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None]
if neu:
_log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt")
start = max(entries, default=0) + 1
for off, (t, b) in enumerate(neu):
entries[start + off] = f"{t}{b} [Ergänzung]"
# Titel eindeutig machen und unsortiertes Inventar schreiben
entries = _eindeutige_titel(entries)
atomic_write_text(final_path, "\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n")
except Exception as e:
log.exception("[%s] Bausteine-Generierung fehlgeschlagen", topic)
_bausteine_errors[topic] = str(e)[:2000]
finally:
# Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit.
_bausteine_progress.pop(topic, None)
_bausteine_step.pop(topic, None)
_bausteine_cancelled.discard(topic)