368 lines
16 KiB
Python
368 lines
16 KiB
Python
"""Bausteine-Pipeline: 4x Recherche (3 nötig) → 2x Auswahl (1) → Prüfung — reines Inventar, unsortiert."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import shutil
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from agents import kill_process
|
|
from config import DEFAULT_PROVIDER
|
|
from fsutil import atomic_write_text
|
|
from jsonio import read_json_file as _json_datei
|
|
from paths import arbeit_dir, bausteine_path, project_dir
|
|
from pipeline import (
|
|
CANCELLED, FAILED, GenContext, _extra, _log, _prompt, _race,
|
|
_semaphore, _timeout, run_single_slot,
|
|
)
|
|
from textkit import _eindeutige_titel, _parse_auswahl, _titel, _titel_aufloesen, _titel_index
|
|
|
|
log = logging.getLogger("creator.bausteine")
|
|
|
|
_bausteine_progress: dict[str, str] = {}
|
|
_bausteine_errors: dict[str, str] = {}
|
|
_bausteine_cancelled: set[str] = set()
|
|
_bausteine_step: dict[str, int] = {}
|
|
|
|
BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung")
|
|
|
|
|
|
def _bausteine_steps(topic: str) -> tuple:
|
|
"""Projekte haben einen 4. Schritt: Themenfeld-Ergänzung per Web-Recherche."""
|
|
if project_dir(topic).is_dir():
|
|
return BAUSTEINE_STEPS + ("Ergänzung",)
|
|
return BAUSTEINE_STEPS
|
|
|
|
|
|
def _bausteine_files(topic: str) -> dict:
|
|
arbeit = arbeit_dir(topic)
|
|
return {
|
|
"final": bausteine_path(topic),
|
|
"arbeit": arbeit,
|
|
"recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)],
|
|
"auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)],
|
|
"auswahl_check": arbeit / "auswahl-check.json",
|
|
"ergaenzung": arbeit / "ergaenzung.json",
|
|
}
|
|
|
|
|
|
def _alle_slot_dateien(files: dict) -> list[Path]:
|
|
return [*files["recherche"], *files["auswahl"], files["auswahl_check"], files["ergaenzung"]]
|
|
|
|
|
|
def cancel_bausteine(topic: str) -> bool:
|
|
if topic not in _bausteine_progress:
|
|
return False
|
|
_bausteine_cancelled.add(topic)
|
|
kill_process(f"bausteine-{topic}-")
|
|
return True
|
|
|
|
|
|
def _resume_step(topic: str) -> int:
|
|
"""Erster noch offener Schritt anhand der persistierten Zwischendateien."""
|
|
files = _bausteine_files(topic)
|
|
if sum(p.exists() for p in files["recherche"]) < 3:
|
|
return 0
|
|
if not any(p.exists() for p in files["auswahl"]):
|
|
return 1
|
|
if not files["auswahl_check"].exists():
|
|
return 2
|
|
if project_dir(topic).is_dir() and not files["ergaenzung"].exists():
|
|
return 3
|
|
return len(_bausteine_steps(topic))
|
|
|
|
|
|
def bausteine_status(topic: str) -> dict:
|
|
steps = _bausteine_steps(topic)
|
|
ready = bausteine_path(topic).exists()
|
|
generating = topic in _bausteine_progress
|
|
partial = False
|
|
if generating:
|
|
current = _bausteine_step.get(topic)
|
|
states = [
|
|
"pending" if current is None else "done" if i < current else "active" if i == current else "pending"
|
|
for i in range(len(steps))
|
|
]
|
|
elif ready:
|
|
states = ["done"] * len(steps)
|
|
else:
|
|
nxt = _resume_step(topic)
|
|
partial = nxt > 0
|
|
states = ["done" if i < nxt else "pending" for i in range(len(steps))]
|
|
return {
|
|
"ready": ready,
|
|
"generating": generating,
|
|
"progress": _bausteine_progress.get(topic),
|
|
"error": _bausteine_errors.get(topic),
|
|
"partial": partial,
|
|
"steps": [{"label": label, "state": s} for label, s in zip(steps, states)],
|
|
}
|
|
|
|
|
|
def active_bausteine() -> list[dict]:
|
|
return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()]
|
|
|
|
|
|
def reset_bausteine(topic: str) -> None:
|
|
files = _bausteine_files(topic)
|
|
files["final"].unlink(missing_ok=True)
|
|
shutil.rmtree(files["arbeit"], ignore_errors=True)
|
|
_bausteine_errors.pop(topic, None)
|
|
|
|
|
|
def _ergaenzung_schema(data):
|
|
"""{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None."""
|
|
if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list):
|
|
return None
|
|
out = []
|
|
for b in data["bausteine"]:
|
|
if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str):
|
|
return None
|
|
titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip()
|
|
if not titel:
|
|
return None
|
|
out.append((titel, beschreibung))
|
|
return out
|
|
|
|
|
|
def _pdfs_konvertieren(project: Path) -> None:
|
|
"""PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern.
|
|
|
|
Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die
|
|
.txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet.
|
|
Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt
|
|
unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten).
|
|
"""
|
|
pdfs = list(project.rglob("*.pdf"))
|
|
if not pdfs:
|
|
return
|
|
if shutil.which("pdftotext") is None:
|
|
raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden")
|
|
for pdf in pdfs:
|
|
txt = pdf.with_suffix(".txt")
|
|
if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime:
|
|
continue
|
|
try:
|
|
subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120)
|
|
_log(project.name, f"PDF konvertiert: {pdf.name} → {txt.name}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e
|
|
|
|
|
|
def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str:
|
|
if project:
|
|
source = _prompt("Bausteine-Quelle-Projekt", project=project)
|
|
else:
|
|
source = _prompt("Bausteine-Quelle-Thema", topic=topic)
|
|
return _prompt(
|
|
"Bausteine-Recherche",
|
|
topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions),
|
|
)
|
|
|
|
|
|
def _file_payload(path: Path):
|
|
"""Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält."""
|
|
if not path.exists():
|
|
return None
|
|
text = path.read_text(encoding="utf-8")
|
|
return text if _parse_auswahl(text) else None
|
|
|
|
|
|
def _auswahl_payload(path: Path):
|
|
if not path.exists():
|
|
return None
|
|
text = path.read_text(encoding="utf-8")
|
|
entries = _parse_auswahl(text)
|
|
return (text, entries) if entries else None
|
|
|
|
|
|
def _auswahl_check_schema(data):
|
|
"""{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß."""
|
|
if not isinstance(data, dict):
|
|
return None
|
|
nach = data.get("nachtraege", [])
|
|
streich = data.get("streichen", [])
|
|
if not isinstance(nach, list) or not isinstance(streich, list):
|
|
return None
|
|
if not all(isinstance(x, str) for x in [*nach, *streich]):
|
|
return None
|
|
return {"nachtraege": nach, "streichen": streich}
|
|
|
|
|
|
async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None:
|
|
if topic in _bausteine_progress:
|
|
return
|
|
_bausteine_progress[topic] = "Wartend…"
|
|
_bausteine_errors.pop(topic, None)
|
|
|
|
files = _bausteine_files(topic)
|
|
final_path = files["final"]
|
|
project = project_dir(topic) if project_dir(topic).is_dir() else None
|
|
|
|
def set_p(msg: str, step: int | None = None) -> None:
|
|
_bausteine_progress[topic] = msg
|
|
if step is not None:
|
|
_bausteine_step[topic] = step
|
|
|
|
def is_cancelled() -> bool:
|
|
return topic in _bausteine_cancelled
|
|
|
|
def abgebrochen() -> None:
|
|
_bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten"
|
|
|
|
ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled)
|
|
|
|
try:
|
|
async with _semaphore:
|
|
files["arbeit"].mkdir(parents=True, exist_ok=True)
|
|
if project:
|
|
await asyncio.to_thread(_pdfs_konvertieren, project)
|
|
# „Neu erstellen": fertige Bausteine → kompletter Frischstart.
|
|
# Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume.
|
|
if final_path.exists():
|
|
for p_alt in _alle_slot_dateien(files):
|
|
p_alt.unlink(missing_ok=True)
|
|
|
|
# Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen
|
|
recherchen: list[str] = []
|
|
offen = []
|
|
for i, path in enumerate(files["recherche"], 1):
|
|
text = _file_payload(path)
|
|
if text is not None and len(recherchen) < 3:
|
|
recherchen.append(text)
|
|
else:
|
|
offen.append((i, path))
|
|
vorhanden = len(recherchen)
|
|
set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0)
|
|
if vorhanden < 3:
|
|
caps = "files" if project else "full"
|
|
slots = [
|
|
{
|
|
"key": f"bausteine-{topic}-recherche-{i}",
|
|
"prompt": _build_recherche_prompt(topic, path, instructions, project),
|
|
"role": "quick", "capabilities": caps,
|
|
"payload": (lambda result, p=path: _file_payload(p)),
|
|
}
|
|
for i, path in offen
|
|
]
|
|
neue = await _race(
|
|
topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider,
|
|
on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"),
|
|
cancelled=is_cancelled,
|
|
)
|
|
if is_cancelled():
|
|
abgebrochen()
|
|
return
|
|
if neue is None:
|
|
_bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)"
|
|
return
|
|
recherchen += neue
|
|
|
|
# Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen
|
|
n_est = max(len(_parse_auswahl(t)) for t in recherchen)
|
|
bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None)
|
|
if bestehende is not None:
|
|
flat, entries = bestehende
|
|
else:
|
|
set_p("Konsolidiere Recherche…", step=1)
|
|
results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1))
|
|
slots = [
|
|
{
|
|
"key": f"bausteine-{topic}-auswahl-{i}",
|
|
"prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path),
|
|
"role": "fast", "capabilities": "files",
|
|
"payload": (lambda result, p=path: _auswahl_payload(p)),
|
|
}
|
|
for i, path in enumerate(files["auswahl"], 1)
|
|
]
|
|
auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled)
|
|
if is_cancelled():
|
|
abgebrochen()
|
|
return
|
|
if auswahl is None:
|
|
_bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)"
|
|
return
|
|
flat, entries = auswahl[0]
|
|
|
|
# Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal)
|
|
set_p("Prüfe Auswahl…", step=2)
|
|
check_path = files["auswahl_check"]
|
|
patch = _auswahl_check_schema(_json_datei(check_path))
|
|
if patch is None:
|
|
check_path.unlink(missing_ok=True)
|
|
titel_listen = "\n\n".join(
|
|
f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values())
|
|
for i, text in enumerate(recherchen, 1)
|
|
)
|
|
status, check = await run_single_slot(
|
|
ctx, "Auswahl-Check",
|
|
key=f"bausteine-{topic}-auswahlcheck-1",
|
|
prompt=_prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path),
|
|
role="fast", capabilities="files",
|
|
payload=lambda result: _auswahl_check_schema(_json_datei(check_path)),
|
|
timeout=_timeout("auswahl_check", len(entries)),
|
|
)
|
|
if status == CANCELLED:
|
|
abgebrochen()
|
|
return
|
|
if status == FAILED:
|
|
_log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort")
|
|
else:
|
|
patch = check
|
|
if patch is not None and (patch["streichen"] or patch["nachtraege"]):
|
|
idx = _titel_index(entries)
|
|
weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None}
|
|
if weg:
|
|
_log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}")
|
|
entries = {n: t for n, t in entries.items() if n not in weg}
|
|
if patch["nachtraege"]:
|
|
_log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine")
|
|
texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"])
|
|
entries = {i: t for i, t in enumerate(texts, 1)}
|
|
|
|
# Schritt 4 (nur Projekte): Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt,
|
|
# ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung].
|
|
if project:
|
|
set_p("Ergänze Themenfeld…", step=3)
|
|
erg_path = files["ergaenzung"]
|
|
ergaenzungen = _ergaenzung_schema(_json_datei(erg_path))
|
|
if ergaenzungen is None:
|
|
erg_path.unlink(missing_ok=True)
|
|
status, ergaenzungen = await run_single_slot(
|
|
ctx, "Ergänzung",
|
|
key=f"bausteine-{topic}-ergaenzung-1",
|
|
prompt=_prompt(
|
|
"Bausteine-Ergaenzung",
|
|
topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()),
|
|
out_path=erg_path, extra=_extra(instructions),
|
|
),
|
|
role="quick", capabilities="full",
|
|
payload=lambda result: _ergaenzung_schema(_json_datei(erg_path)),
|
|
timeout=_timeout("ergaenzung"),
|
|
)
|
|
if status == CANCELLED:
|
|
abgebrochen()
|
|
return
|
|
if status == FAILED:
|
|
_bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)"
|
|
return
|
|
idx = _titel_index(entries)
|
|
neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None]
|
|
if neu:
|
|
_log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt")
|
|
start = max(entries, default=0) + 1
|
|
for off, (t, b) in enumerate(neu):
|
|
entries[start + off] = f"{t} — {b} [Ergänzung]"
|
|
|
|
# Titel eindeutig machen und unsortiertes Inventar schreiben
|
|
entries = _eindeutige_titel(entries)
|
|
atomic_write_text(final_path, "\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n")
|
|
except Exception as e:
|
|
log.exception("[%s] Bausteine-Generierung fehlgeschlagen", topic)
|
|
_bausteine_errors[topic] = str(e)[:2000]
|
|
finally:
|
|
# Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit.
|
|
_bausteine_progress.pop(topic, None)
|
|
_bausteine_step.pop(topic, None)
|
|
_bausteine_cancelled.discard(topic)
|