436 lines
19 KiB
Python
436 lines
19 KiB
Python
"""Bausteine-Pipeline: Recherche-Konsens + Klärungs-Loop — reines Inventar, unsortiert.
|
||
|
||
5x Recherche (min. 3, Grace) → Mapping (Konsens/Rest) → Klärungs-Loop (max.
|
||
KONSENS_MAX_RUNDEN Runden): 3 Auswahl-Agenten (min. 2, Grace) entscheiden
|
||
über den strittigen Rest, ein Mapping-Agent sortiert in aufnehmen/verwerfen/
|
||
weiter strittig. Leerer Rest beendet den Loop; die letzte Runde muss alles
|
||
entscheiden. Races nutzen ein Grace-Fenster statt „erste N gewinnen": Nach dem
|
||
ersten gültigen Ergebnis dürfen die übrigen Agenten KONSENS_GRACE Sekunden
|
||
fertig werden. Der Konsens wird im Code akkumuliert — kein Agent re-emittiert
|
||
die Gesamtliste.
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import shutil
|
||
import subprocess
|
||
from pathlib import Path
|
||
|
||
from agents import kill_process
|
||
from config import KONSENS_GRACE, KONSENS_MAX_RUNDEN, DEFAULT_PROVIDER
|
||
from fsutil import atomic_write_text
|
||
from jsonio import read_json_file as _json_datei
|
||
from paths import arbeit_dir, bausteine_path, project_dir
|
||
from pipeline import (
|
||
CANCELLED, FAILED, GenContext, _extra, _log, _prompt, _race, _rest_schema,
|
||
_runde_schema, _semaphore, _str_liste, _timeout, run_single_slot,
|
||
)
|
||
from textkit import _eindeutige_titel, _parse_auswahl, _titel_aufloesen, _titel_index, _vormerge
|
||
|
||
log = logging.getLogger("creator.bausteine")
|
||
|
||
_bausteine_progress: dict[str, str] = {}
|
||
_bausteine_errors: dict[str, str] = {}
|
||
_bausteine_cancelled: set[str] = set()
|
||
_bausteine_step: dict[str, int] = {}
|
||
|
||
BAUSTEINE_STEPS = ("Recherche", "Konsolidierung", "Klärung")
|
||
|
||
|
||
def _bausteine_steps(topic: str) -> tuple:
|
||
"""Projekte haben einen zusätzlichen Schritt: Themenfeld-Ergänzung per Web-Recherche."""
|
||
if project_dir(topic).is_dir():
|
||
return BAUSTEINE_STEPS + ("Ergänzung",)
|
||
return BAUSTEINE_STEPS
|
||
|
||
|
||
def _bausteine_files(topic: str) -> dict:
|
||
arbeit = arbeit_dir(topic)
|
||
runden = range(1, KONSENS_MAX_RUNDEN + 1)
|
||
return {
|
||
"final": bausteine_path(topic),
|
||
"arbeit": arbeit,
|
||
"recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4, 5)],
|
||
"recherche_mapping": arbeit / "recherche-mapping.json",
|
||
"auswahl": {n: [arbeit / f"auswahl-r{n}-{i}.json" for i in (1, 2, 3)] for n in runden},
|
||
"mapping": {n: arbeit / f"auswahl-mapping-r{n}.json" for n in runden},
|
||
"ergaenzung": arbeit / "ergaenzung.json",
|
||
}
|
||
|
||
|
||
def _alle_slot_dateien(files: dict) -> list[Path]:
|
||
return [
|
||
*files["recherche"], files["recherche_mapping"],
|
||
*(p for slots in files["auswahl"].values() for p in slots),
|
||
*files["mapping"].values(), files["ergaenzung"],
|
||
]
|
||
|
||
|
||
def cancel_bausteine(topic: str) -> bool:
|
||
if topic not in _bausteine_progress:
|
||
return False
|
||
_bausteine_cancelled.add(topic)
|
||
kill_process(f"bausteine-{topic}-")
|
||
return True
|
||
|
||
|
||
def _resume_step(topic: str) -> int:
|
||
"""Erster noch offener Schritt anhand der persistierten Zwischendateien."""
|
||
files = _bausteine_files(topic)
|
||
if sum(p.exists() for p in files["recherche"]) < 3:
|
||
return 0
|
||
if not files["recherche_mapping"].exists():
|
||
return 1
|
||
mapping = _mapping_schema(_json_datei(files["recherche_mapping"]))
|
||
geklaert = mapping is not None and (
|
||
not mapping[1] # kein strittiger Rest
|
||
or any((r := _runde_schema(_json_datei(p))) is not None and not r[1] for p in files["mapping"].values())
|
||
)
|
||
if not geklaert:
|
||
return 2
|
||
if project_dir(topic).is_dir() and not files["ergaenzung"].exists():
|
||
return 3
|
||
return len(_bausteine_steps(topic))
|
||
|
||
|
||
def bausteine_status(topic: str) -> dict:
|
||
steps = _bausteine_steps(topic)
|
||
ready = bausteine_path(topic).exists()
|
||
generating = topic in _bausteine_progress
|
||
partial = False
|
||
if generating:
|
||
current = _bausteine_step.get(topic)
|
||
states = [
|
||
"pending" if current is None else "done" if i < current else "active" if i == current else "pending"
|
||
for i in range(len(steps))
|
||
]
|
||
elif ready:
|
||
states = ["done"] * len(steps)
|
||
else:
|
||
nxt = _resume_step(topic)
|
||
partial = nxt > 0
|
||
states = ["done" if i < nxt else "pending" for i in range(len(steps))]
|
||
return {
|
||
"ready": ready,
|
||
"generating": generating,
|
||
"progress": _bausteine_progress.get(topic),
|
||
"error": _bausteine_errors.get(topic),
|
||
"partial": partial,
|
||
"steps": [{"label": label, "state": s} for label, s in zip(steps, states)],
|
||
}
|
||
|
||
|
||
def active_bausteine() -> list[dict]:
|
||
return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()]
|
||
|
||
|
||
def reset_bausteine(topic: str) -> None:
|
||
files = _bausteine_files(topic)
|
||
files["final"].unlink(missing_ok=True)
|
||
shutil.rmtree(files["arbeit"], ignore_errors=True)
|
||
_bausteine_errors.pop(topic, None)
|
||
|
||
|
||
def _ergaenzung_schema(data):
|
||
"""{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None."""
|
||
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
|
||
return None
|
||
out = []
|
||
for b in data["bausteine"]:
|
||
if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str):
|
||
return None
|
||
titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip()
|
||
if not titel:
|
||
return None
|
||
out.append((titel, beschreibung))
|
||
return out
|
||
|
||
|
||
def _pdfs_konvertieren(project: Path) -> None:
|
||
"""PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern.
|
||
|
||
Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die
|
||
.txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet.
|
||
Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt
|
||
unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten).
|
||
"""
|
||
pdfs = list(project.rglob("*.pdf"))
|
||
if not pdfs:
|
||
return
|
||
if shutil.which("pdftotext") is None:
|
||
raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden")
|
||
for pdf in pdfs:
|
||
txt = pdf.with_suffix(".txt")
|
||
if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime:
|
||
continue
|
||
try:
|
||
subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120)
|
||
_log(project.name, f"PDF konvertiert: {pdf.name} → {txt.name}")
|
||
except Exception as e:
|
||
raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e
|
||
|
||
|
||
def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str:
|
||
if project:
|
||
source = _prompt("Bausteine-Quelle-Projekt", project=project)
|
||
else:
|
||
source = _prompt("Bausteine-Quelle-Thema", topic=topic)
|
||
return _prompt(
|
||
"Bausteine-Recherche",
|
||
topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions),
|
||
)
|
||
|
||
|
||
def _file_payload(path: Path):
|
||
"""Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält."""
|
||
if not path.exists():
|
||
return None
|
||
text = path.read_text(encoding="utf-8")
|
||
return text if _parse_auswahl(text) else None
|
||
|
||
|
||
def _mapping_schema(data):
|
||
"""{"bausteine": [str, ≥1], "rest": [str]} → (bausteine, rest) · sonst None."""
|
||
if not isinstance(data, dict):
|
||
return None
|
||
bausteine = _str_liste(data.get("bausteine"))
|
||
rest = _str_liste(data.get("rest"))
|
||
if not bausteine or rest is None:
|
||
return None
|
||
return bausteine, rest
|
||
|
||
|
||
async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
|
||
if topic in _bausteine_progress:
|
||
return
|
||
_bausteine_progress[topic] = "Wartend…"
|
||
_bausteine_errors.pop(topic, None)
|
||
|
||
files = _bausteine_files(topic)
|
||
final_path = files["final"]
|
||
project = project_dir(topic) if project_dir(topic).is_dir() else None
|
||
|
||
def set_p(msg: str, step: int | None = None) -> None:
|
||
_bausteine_progress[topic] = msg
|
||
if step is not None:
|
||
_bausteine_step[topic] = step
|
||
|
||
def is_cancelled() -> bool:
|
||
return topic in _bausteine_cancelled
|
||
|
||
def abgebrochen() -> None:
|
||
_bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten"
|
||
|
||
ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled)
|
||
|
||
try:
|
||
async with _semaphore:
|
||
files["arbeit"].mkdir(parents=True, exist_ok=True)
|
||
if project:
|
||
await asyncio.to_thread(_pdfs_konvertieren, project)
|
||
# „Neu erstellen": fertige Bausteine → kompletter Frischstart.
|
||
# Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume.
|
||
if final_path.exists():
|
||
for p_alt in _alle_slot_dateien(files):
|
||
p_alt.unlink(missing_ok=True)
|
||
|
||
# Schritt 1: 5 Recherche-Agenten, min. 3 mit Grace-Fenster — alle gültigen
|
||
# Slot-Dateien fließen ins Mapping (kein Kappen mehr bei 3)
|
||
recherchen: list[str] = []
|
||
offen = []
|
||
for i, path in enumerate(files["recherche"], 1):
|
||
text = _file_payload(path)
|
||
if text is not None:
|
||
recherchen.append(text)
|
||
else:
|
||
offen.append((i, path))
|
||
vorhanden = len(recherchen)
|
||
set_p(f"Recherche läuft ({vorhanden} gültig, min. 3)…", step=0)
|
||
if vorhanden < 3:
|
||
caps = "files" if project else "full"
|
||
slots = [
|
||
{
|
||
"key": f"bausteine-{topic}-recherche-{i}",
|
||
"prompt": _build_recherche_prompt(topic, path, instructions, project),
|
||
"role": "quick", "capabilities": caps,
|
||
"payload": (lambda result, p=path: _file_payload(p)),
|
||
}
|
||
for i, path in offen
|
||
]
|
||
neue = await _race(
|
||
topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider,
|
||
on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c} gültig, min. 3)…"),
|
||
cancelled=is_cancelled, grace=KONSENS_GRACE,
|
||
)
|
||
if is_cancelled():
|
||
abgebrochen()
|
||
return
|
||
if neue is None:
|
||
_bausteine_errors[topic] = "Recherche fehlgeschlagen (Minimum nicht erreicht)"
|
||
return
|
||
recherchen += neue
|
||
|
||
# Schritt 2: Recherche-Mapping — Code-Vormerge (exakte Titel) + 1 Agent
|
||
# für semantische Dubletten und Konsens/Rest-Teilung (fatal)
|
||
mapping = _mapping_schema(_json_datei(files["recherche_mapping"]))
|
||
if mapping is None:
|
||
set_p("Konsolidiere Recherche…", step=1)
|
||
files["recherche_mapping"].unlink(missing_ok=True)
|
||
gemergt = _vormerge([_parse_auswahl(t) for t in recherchen])
|
||
eintraege = "\n".join(f"{i}. {text} ({n}× genannt)" for i, (text, n) in enumerate(gemergt, 1))
|
||
status, mapping = await run_single_slot(
|
||
ctx, "Recherche-Mapping",
|
||
key=f"bausteine-{topic}-recherche-mapping",
|
||
prompt=_prompt(
|
||
"Bausteine-Recherche-Mapping",
|
||
topic=topic, n=len(recherchen), eintraege=eintraege,
|
||
out_path=files["recherche_mapping"],
|
||
),
|
||
role="judge", capabilities="files",
|
||
payload=lambda result: _mapping_schema(_json_datei(files["recherche_mapping"])),
|
||
timeout=_timeout("recherche_mapping", len(gemergt)),
|
||
)
|
||
if status == CANCELLED:
|
||
abgebrochen()
|
||
return
|
||
if status == FAILED:
|
||
_bausteine_errors[topic] = "Recherche-Mapping fehlgeschlagen"
|
||
return
|
||
konsens, rest = mapping
|
||
|
||
# Klärungs-Loop: 3 Auswahl-Agenten entscheiden über den Rest, ein
|
||
# Mapping-Agent sortiert in aufnehmen/verwerfen/weiter strittig.
|
||
# Leerer Rest beendet den Loop; Runde KONSENS_MAX_RUNDEN muss
|
||
# alles entscheiden. Der Konsens wächst nur hier im Code.
|
||
runde = 0
|
||
while rest and runde < KONSENS_MAX_RUNDEN:
|
||
runde += 1
|
||
final_runde = runde == KONSENS_MAX_RUNDEN
|
||
set_p(f"Klärung läuft (Runde {runde}/{KONSENS_MAX_RUNDEN})…", step=2)
|
||
mapping_path = files["mapping"][runde]
|
||
|
||
# Resume: fertiges Runden-Mapping wird direkt übernommen
|
||
ergebnis = _runde_schema(_json_datei(mapping_path), final=final_runde)
|
||
if ergebnis is None:
|
||
mapping_path.unlink(missing_ok=True)
|
||
konsens_block = "\n".join(f"- {t}" for t in konsens)
|
||
rest_block = "\n".join(f"- {t}" for t in rest)
|
||
|
||
# 3 Auswahl-Agenten, min. 2 mit Grace-Fenster
|
||
entscheidungen = []
|
||
offen = []
|
||
for i, path in enumerate(files["auswahl"][runde], 1):
|
||
res = _rest_schema(_json_datei(path))
|
||
if res is not None:
|
||
entscheidungen.append(res)
|
||
else:
|
||
offen.append((i, path))
|
||
if len(entscheidungen) < 2:
|
||
slots = [
|
||
{
|
||
"key": f"bausteine-{topic}-auswahl-r{runde}-{i}",
|
||
"prompt": _prompt(
|
||
"Bausteine-Auswahl",
|
||
topic=topic, konsens=konsens_block, rest=rest_block, out_path=path,
|
||
),
|
||
"role": "fast", "capabilities": "files",
|
||
"payload": (lambda result, p=path: _rest_schema(_json_datei(p))),
|
||
}
|
||
for i, path in offen
|
||
]
|
||
neue = await _race(
|
||
topic, f"Auswahl r{runde}", slots, 2 - len(entscheidungen),
|
||
_timeout("auswahl", len(rest)), provider,
|
||
cancelled=is_cancelled, grace=KONSENS_GRACE,
|
||
)
|
||
if is_cancelled():
|
||
abgebrochen()
|
||
return
|
||
if neue is None:
|
||
_bausteine_errors[topic] = f"Auswahl fehlgeschlagen (Runde {runde}, Minimum nicht erreicht)"
|
||
return
|
||
entscheidungen += neue
|
||
|
||
# Votum pro Rest-Eintrag deterministisch zählen
|
||
indizes = [_titel_index(dict(enumerate(e, 1))) for e in entscheidungen]
|
||
voten = "\n".join(
|
||
f"{i}. {text} (von {sum(1 for idx in indizes if _titel_aufloesen(idx, text) is not None)}"
|
||
f"/{len(entscheidungen)} Agenten übernommen)"
|
||
for i, text in enumerate(rest, 1)
|
||
)
|
||
final_zusatz = (
|
||
"\n- LETZTE RUNDE: Es gibt keine weitere Runde. `rest` MUSS leer sein"
|
||
" — entscheide JEDEN Eintrag selbst: aufnehmen oder verwerfen."
|
||
if final_runde else ""
|
||
)
|
||
status, ergebnis = await run_single_slot(
|
||
ctx, f"Auswahl-Mapping r{runde}",
|
||
key=f"bausteine-{topic}-auswahl-mapping-r{runde}",
|
||
prompt=_prompt(
|
||
"Bausteine-Auswahl-Mapping",
|
||
topic=topic, n=len(entscheidungen), konsens=konsens_block,
|
||
rest=voten, final=final_zusatz, out_path=mapping_path,
|
||
),
|
||
role="judge", capabilities="files",
|
||
payload=lambda result, p=mapping_path, f=final_runde: _runde_schema(_json_datei(p), final=f),
|
||
timeout=_timeout("auswahl_mapping", len(rest)),
|
||
)
|
||
if status == CANCELLED:
|
||
abgebrochen()
|
||
return
|
||
if status == FAILED:
|
||
_bausteine_errors[topic] = f"Auswahl-Mapping fehlgeschlagen (Runde {runde})"
|
||
return
|
||
|
||
aufnehmen, rest = ergebnis
|
||
_log(topic, f"Klärung Runde {runde}: {len(aufnehmen)} aufgenommen, {len(rest)} weiter strittig")
|
||
konsens = konsens + aufnehmen
|
||
|
||
entries = {i: t for i, t in enumerate(konsens, 1)}
|
||
|
||
# Nur Projekte: Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt,
|
||
# ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung].
|
||
if project:
|
||
set_p("Ergänze Themenfeld…", step=3)
|
||
erg_path = files["ergaenzung"]
|
||
ergaenzungen = _ergaenzung_schema(_json_datei(erg_path))
|
||
if ergaenzungen is None:
|
||
erg_path.unlink(missing_ok=True)
|
||
status, ergaenzungen = await run_single_slot(
|
||
ctx, "Ergänzung",
|
||
key=f"bausteine-{topic}-ergaenzung-1",
|
||
prompt=_prompt(
|
||
"Bausteine-Ergaenzung",
|
||
topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()),
|
||
out_path=erg_path, extra=_extra(instructions),
|
||
),
|
||
role="quick", capabilities="full",
|
||
payload=lambda result: _ergaenzung_schema(_json_datei(erg_path)),
|
||
timeout=_timeout("ergaenzung"),
|
||
)
|
||
if status == CANCELLED:
|
||
abgebrochen()
|
||
return
|
||
if status == FAILED:
|
||
_bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)"
|
||
return
|
||
idx = _titel_index(entries)
|
||
neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None]
|
||
if neu:
|
||
_log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt")
|
||
start = max(entries, default=0) + 1
|
||
for off, (t, b) in enumerate(neu):
|
||
entries[start + off] = f"{t} — {b} [Ergänzung]"
|
||
|
||
# Titel eindeutig machen und unsortiertes Inventar schreiben
|
||
entries = _eindeutige_titel(entries)
|
||
atomic_write_text(final_path, "\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n")
|
||
except Exception as e:
|
||
log.exception("[%s] Bausteine-Generierung fehlgeschlagen", topic)
|
||
_bausteine_errors[topic] = str(e)[:2000]
|
||
finally:
|
||
# Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit.
|
||
_bausteine_progress.pop(topic, None)
|
||
_bausteine_step.pop(topic, None)
|
||
_bausteine_cancelled.discard(topic)
|