"""Bausteine-Pipeline: 4x Recherche (3 nötig) → 2x Auswahl (1) → Prüfung — reines Inventar, unsortiert.""" import asyncio import logging import shutil import subprocess from pathlib import Path from agents import kill_process from config import DEFAULT_PROVIDER from fsutil import atomic_write_text from jsonio import read_json_file as _json_datei from paths import arbeit_dir, bausteine_path, project_dir from pipeline import ( CANCELLED, FAILED, GenContext, _extra, _log, _prompt, _race, _semaphore, _timeout, run_single_slot, ) from textkit import _eindeutige_titel, _parse_auswahl, _titel, _titel_aufloesen, _titel_index log = logging.getLogger("creator.bausteine") _bausteine_progress: dict[str, str] = {} _bausteine_errors: dict[str, str] = {} _bausteine_cancelled: set[str] = set() _bausteine_step: dict[str, int] = {} BAUSTEINE_STEPS = ("Recherche", "Auswahl", "Prüfung") def _bausteine_steps(topic: str) -> tuple: """Projekte haben einen 4. Schritt: Themenfeld-Ergänzung per Web-Recherche.""" if project_dir(topic).is_dir(): return BAUSTEINE_STEPS + ("Ergänzung",) return BAUSTEINE_STEPS def _bausteine_files(topic: str) -> dict: arbeit = arbeit_dir(topic) return { "final": bausteine_path(topic), "arbeit": arbeit, "recherche": [arbeit / f"recherche-{i}.md" for i in (1, 2, 3, 4)], "auswahl": [arbeit / f"auswahl-{i}.md" for i in (1, 2)], "auswahl_check": arbeit / "auswahl-check.json", "ergaenzung": arbeit / "ergaenzung.json", } def _alle_slot_dateien(files: dict) -> list[Path]: return [*files["recherche"], *files["auswahl"], files["auswahl_check"], files["ergaenzung"]] def cancel_bausteine(topic: str) -> bool: if topic not in _bausteine_progress: return False _bausteine_cancelled.add(topic) kill_process(f"bausteine-{topic}-") return True def _resume_step(topic: str) -> int: """Erster noch offener Schritt anhand der persistierten Zwischendateien.""" files = _bausteine_files(topic) if sum(p.exists() for p in files["recherche"]) < 3: return 0 if not any(p.exists() for p in files["auswahl"]): return 1 if not files["auswahl_check"].exists(): return 2 if project_dir(topic).is_dir() and not files["ergaenzung"].exists(): return 3 return len(_bausteine_steps(topic)) def bausteine_status(topic: str) -> dict: steps = _bausteine_steps(topic) ready = bausteine_path(topic).exists() generating = topic in _bausteine_progress partial = False if generating: current = _bausteine_step.get(topic) states = [ "pending" if current is None else "done" if i < current else "active" if i == current else "pending" for i in range(len(steps)) ] elif ready: states = ["done"] * len(steps) else: nxt = _resume_step(topic) partial = nxt > 0 states = ["done" if i < nxt else "pending" for i in range(len(steps))] return { "ready": ready, "generating": generating, "progress": _bausteine_progress.get(topic), "error": _bausteine_errors.get(topic), "partial": partial, "steps": [{"label": label, "state": s} for label, s in zip(steps, states)], } def active_bausteine() -> list[dict]: return [{"topic": t, "progress": p} for t, p in _bausteine_progress.items()] def reset_bausteine(topic: str) -> None: files = _bausteine_files(topic) files["final"].unlink(missing_ok=True) shutil.rmtree(files["arbeit"], ignore_errors=True) _bausteine_errors.pop(topic, None) def _ergaenzung_schema(data): """{"bausteine": [{"titel", "beschreibung"}]} → Liste (leer erlaubt) · sonst None.""" if not isinstance(data, dict) or not isinstance(data.get("bausteine"), list): return None out = [] for b in data["bausteine"]: if not isinstance(b, dict) or not isinstance(b.get("titel"), str) or not isinstance(b.get("beschreibung"), str): return None titel, beschreibung = b["titel"].strip(), b["beschreibung"].strip() if not titel: return None out.append((titel, beschreibung)) return out def _pdfs_konvertieren(project: Path) -> None: """PDFs im Projekt in .txt wandeln (pdftotext) — Agenten lesen Text statt Seiten-Bildern. Wird vor jeder Projekt-Generierung aufgerufen; konvertiert nur, wenn die .txt fehlt oder älter als das PDF ist. Das Original bleibt unangetastet. Fehlt pdftotext und das Projekt enthält PDFs → harter Fehler statt unzuverlässigem Direkt-Lese-Modus (MiniMax-Bilderlimit, Vision-Kosten). """ pdfs = list(project.rglob("*.pdf")) if not pdfs: return if shutil.which("pdftotext") is None: raise RuntimeError("pdftotext fehlt (poppler-utils installieren) — PDFs im Projekt können nicht gelesen werden") for pdf in pdfs: txt = pdf.with_suffix(".txt") if txt.exists() and txt.stat().st_mtime >= pdf.stat().st_mtime: continue try: subprocess.run(["pdftotext", "-layout", str(pdf), str(txt)], check=True, timeout=120) _log(project.name, f"PDF konvertiert: {pdf.name} → {txt.name}") except Exception as e: raise RuntimeError(f"PDF-Konvertierung fehlgeschlagen ({pdf.name}): {e}") from e def _build_recherche_prompt(topic: str, out_path: Path, instructions: str = "", project: Path | None = None) -> str: if project: source = _prompt("Bausteine-Quelle-Projekt", project=project) else: source = _prompt("Bausteine-Quelle-Thema", topic=topic) return _prompt( "Bausteine-Recherche", topic=topic, source=source, bausteine_path=out_path, extra=_extra(instructions), ) def _file_payload(path: Path): """Gültig, wenn die Slot-Datei existiert und nummerierte Einträge enthält.""" if not path.exists(): return None text = path.read_text(encoding="utf-8") return text if _parse_auswahl(text) else None def _auswahl_payload(path: Path): if not path.exists(): return None text = path.read_text(encoding="utf-8") entries = _parse_auswahl(text) return (text, entries) if entries else None def _auswahl_check_schema(data): """{"nachtraege": [...], "streichen": [...]} — None bei Schema-Verstoß.""" if not isinstance(data, dict): return None nach = data.get("nachtraege", []) streich = data.get("streichen", []) if not isinstance(nach, list) or not isinstance(streich, list): return None if not all(isinstance(x, str) for x in [*nach, *streich]): return None return {"nachtraege": nach, "streichen": streich} async def generate_bausteine(topic: str, instructions: str = "", provider: str = DEFAULT_PROVIDER) -> None: if topic in _bausteine_progress: return _bausteine_progress[topic] = "Wartend…" _bausteine_errors.pop(topic, None) files = _bausteine_files(topic) final_path = files["final"] project = project_dir(topic) if project_dir(topic).is_dir() else None def set_p(msg: str, step: int | None = None) -> None: _bausteine_progress[topic] = msg if step is not None: _bausteine_step[topic] = step def is_cancelled() -> bool: return topic in _bausteine_cancelled def abgebrochen() -> None: _bausteine_errors[topic] = "Abgebrochen — Fortschritt bleibt erhalten" ctx = GenContext(topic=topic, provider=provider, is_cancelled=is_cancelled) try: async with _semaphore: files["arbeit"].mkdir(parents=True, exist_ok=True) if project: await asyncio.to_thread(_pdfs_konvertieren, project) # „Neu erstellen": fertige Bausteine → kompletter Frischstart. # Sonst sind Slot-Dateien Reste eines Abbruchs/Fehlers → Resume. if final_path.exists(): for p_alt in _alle_slot_dateien(files): p_alt.unlink(missing_ok=True) # Schritt 1: 4 Recherche-Agenten, 3 gültige nötig — vorhandene Slot-Dateien zählen recherchen: list[str] = [] offen = [] for i, path in enumerate(files["recherche"], 1): text = _file_payload(path) if text is not None and len(recherchen) < 3: recherchen.append(text) else: offen.append((i, path)) vorhanden = len(recherchen) set_p(f"Recherche läuft ({vorhanden}/3 gültig)…", step=0) if vorhanden < 3: caps = "files" if project else "full" slots = [ { "key": f"bausteine-{topic}-recherche-{i}", "prompt": _build_recherche_prompt(topic, path, instructions, project), "role": "quick", "capabilities": caps, "payload": (lambda result, p=path: _file_payload(p)), } for i, path in offen ] neue = await _race( topic, "Recherche", slots, 3 - vorhanden, _timeout("recherche"), provider, on_update=lambda c: set_p(f"Recherche läuft ({vorhanden + c}/3 gültig)…"), cancelled=is_cancelled, ) if is_cancelled(): abgebrochen() return if neue is None: _bausteine_errors[topic] = "Recherche fehlgeschlagen (Quorum nicht erreicht)" return recherchen += neue # Schritt 2: 2 Auswahl-Agenten, der erste gewinnt — vorhandene gültige Datei wird übernommen n_est = max(len(_parse_auswahl(t)) for t in recherchen) bestehende = next((res for p in files["auswahl"] if (res := _auswahl_payload(p)) is not None), None) if bestehende is not None: flat, entries = bestehende else: set_p("Konsolidiere Recherche…", step=1) results_block = "\n\n".join(f"### Recherche {i}\n\n{text}" for i, text in enumerate(recherchen, 1)) slots = [ { "key": f"bausteine-{topic}-auswahl-{i}", "prompt": _prompt("Bausteine-Auswahl", topic=topic, results=results_block, out_path=path), "role": "fast", "capabilities": "files", "payload": (lambda result, p=path: _auswahl_payload(p)), } for i, path in enumerate(files["auswahl"], 1) ] auswahl = await _race(topic, "Auswahl", slots, 1, _timeout("auswahl", n_est), provider, cancelled=is_cancelled) if is_cancelled(): abgebrochen() return if auswahl is None: _bausteine_errors[topic] = "Auswahl fehlgeschlagen (kein gültiges Ergebnis)" return flat, entries = auswahl[0] # Schritt 2b: Auswahl-Prüfung gegen die Recherche-Titel (JSON, nicht fatal) set_p("Prüfe Auswahl…", step=2) check_path = files["auswahl_check"] patch = _auswahl_check_schema(_json_datei(check_path)) if patch is None: check_path.unlink(missing_ok=True) titel_listen = "\n\n".join( f"### Recherche {i}\n" + "\n".join(f"- {_titel(t)}" for t in _parse_auswahl(text).values()) for i, text in enumerate(recherchen, 1) ) status, check = await run_single_slot( ctx, "Auswahl-Check", key=f"bausteine-{topic}-auswahlcheck-1", prompt=_prompt("Bausteine-Auswahl-Check", topic=topic, results=titel_listen, auswahl=flat, out_path=check_path), role="fast", capabilities="files", payload=lambda result: _auswahl_check_schema(_json_datei(check_path)), timeout=_timeout("auswahl_check", len(entries)), ) if status == CANCELLED: abgebrochen() return if status == FAILED: _log(topic, "Auswahl-Check fehlgeschlagen — fahre ohne Korrekturen fort") else: patch = check if patch is not None and (patch["streichen"] or patch["nachtraege"]): idx = _titel_index(entries) weg = {num for t in patch["streichen"] if (num := _titel_aufloesen(idx, t)) is not None} if weg: _log(topic, f"Auswahl-Check streicht Duplikate: {sorted(weg)}") entries = {n: t for n, t in entries.items() if n not in weg} if patch["nachtraege"]: _log(topic, f"Auswahl-Check ergänzt {len(patch['nachtraege'])} Bausteine") texts = [t for _, t in sorted(entries.items())] + list(patch["nachtraege"]) entries = {i: t for i, t in enumerate(texts, 1)} # Schritt 4 (nur Projekte): Themenfeld-Ergänzung — Skript/Projekt ist ein Ausschnitt, # ein Web-Agent ergänzt kanonisch fehlende Bausteine, markiert mit [Ergänzung]. if project: set_p("Ergänze Themenfeld…", step=3) erg_path = files["ergaenzung"] ergaenzungen = _ergaenzung_schema(_json_datei(erg_path)) if ergaenzungen is None: erg_path.unlink(missing_ok=True) status, ergaenzungen = await run_single_slot( ctx, "Ergänzung", key=f"bausteine-{topic}-ergaenzung-1", prompt=_prompt( "Bausteine-Ergaenzung", topic=topic, bausteine="\n".join(f"- {t}" for t in entries.values()), out_path=erg_path, extra=_extra(instructions), ), role="quick", capabilities="full", payload=lambda result: _ergaenzung_schema(_json_datei(erg_path)), timeout=_timeout("ergaenzung"), ) if status == CANCELLED: abgebrochen() return if status == FAILED: _bausteine_errors[topic] = "Ergänzung fehlgeschlagen (kein gültiges Ergebnis)" return idx = _titel_index(entries) neu = [(t, b) for t, b in ergaenzungen if _titel_aufloesen(idx, t) is None] if neu: _log(topic, f"Ergänzung: {len(neu)} Baustein(e) aus dem Themenfeld ergänzt") start = max(entries, default=0) + 1 for off, (t, b) in enumerate(neu): entries[start + off] = f"{t} — {b} [Ergänzung]" # Titel eindeutig machen und unsortiertes Inventar schreiben entries = _eindeutige_titel(entries) atomic_write_text(final_path, "\n".join(f"{i}. {t}" for i, t in entries.items()) + "\n") except Exception as e: log.exception("[%s] Bausteine-Generierung fehlgeschlagen", topic) _bausteine_errors[topic] = str(e)[:2000] finally: # Kein Datei-Cleanup: Zwischendateien bleiben für Resume bzw. Nachvollziehbarkeit. _bausteine_progress.pop(topic, None) _bausteine_step.pop(topic, None) _bausteine_cancelled.discard(topic)