diff --git a/config/services.yaml b/config/services.yaml index 06c0bc5..211b646 100644 --- a/config/services.yaml +++ b/config/services.yaml @@ -3,15 +3,9 @@ # ------------------------------------------------------------ parameters: - # ------------------------------------------------------------ - # Root - # ------------------------------------------------------------ mto.root: '%kernel.project_dir%' mto.kernel.dir: '%mto.root%' - # ------------------------------------------------------------ - # Knowledge Root (ZENTRAL) - # ------------------------------------------------------------ mto.knowledge.root: '%mto.root%/var/knowledge' mto.knowledge.ndjson: '%mto.knowledge.root%/index.ndjson' @@ -21,42 +15,25 @@ parameters: mto.runtime.meta: '%mto.knowledge.root%/index_runtime.json' mto.knowledge.upload: '%mto.knowledge.root%/uploads' - # ------------------------------------------------------------ - # Tags (Document Routing) - # ------------------------------------------------------------ mto.knowledge.tags_ndjson: '%mto.knowledge.root%/tags.ndjson' - - # Tag vector index outputs mto.knowledge.vector_tags_index: '%mto.knowledge.root%/vector_tags.index' mto.knowledge.vector_tags_index_meta: '%mto.knowledge.root%/vector_tags.index.meta.json' - # ------------------------------------------------------------ - # Vector Script Directory (A2) - # ------------------------------------------------------------ mto.vector.script_dir: '%mto.root%/python/vector' - # Tag vector scripts mto.vector.ingest_tags_script: '%mto.vector.script_dir%/vector_ingest_tags.py' mto.vector.search_tags_script: '%mto.vector.script_dir%/vector_search_tags.py' - # Lock for tag rebuild jobs mto.tags.rebuild_lock: '%mto.knowledge.root%/locks/tag_rebuild.lock' - # Backward compatibility alias mto.vector.data.upload.path: '%mto.knowledge.upload%' - # ------------------------------------------------------------ - # Index Configuration (Fallback Guardrails) - # ------------------------------------------------------------ mto.index.chunk_size: 800 mto.index.chunk_overlap: 100 mto.index.embedding_model: 'all-MiniLM-L6-v2' mto.index.embedding_dimension: 768 mto.index.scoring_version: 1 - # ------------------------------------------------------------ - # Python / Vector Runtime - # ------------------------------------------------------------ mto.vector.python_bin: '/var/www/html/.venv/bin/python3' mto.vector.ingest_script: '%mto.vector.script_dir%/vector_ingest.py' mto.vector.search_script: '%mto.vector.script_dir%/vector_search.py' @@ -131,7 +108,7 @@ services: alias: App\Knowledge\Retrieval\CachedRetriever # ------------------------------------------------------------ - # Index Configuration Provider (DB + Fallback) + # Index Configuration Provider # ------------------------------------------------------------ App\Index\IndexConfigurationProvider: @@ -144,7 +121,7 @@ services: $fallbackScoringVersion: '%mto.index.scoring_version%' # ------------------------------------------------------------ - # Index Meta Manager (uses Provider) + # Index Meta Manager # ------------------------------------------------------------ App\Index\IndexMetaManager: @@ -167,13 +144,24 @@ services: $pythonBin: '%mto.vector.python_bin%' $scriptPath: '%mto.vector.ingest_script%' $indexNdjsonPath: '%mto.knowledge.ndjson%' - $indexMetaPath: '%mto.knowledge.index_meta%' $vectorIndexPath: '%mto.knowledge.vector_index%' $timeoutSeconds: '%mto.vector.timeout%' $configurationProvider: '@App\Index\IndexConfigurationProvider' # ------------------------------------------------------------ - # Tags Export (Document Routing) + # Ingest Layer (Phase B Refactor) + # ------------------------------------------------------------ + + App\Ingest\GuardrailValidator: ~ + + App\Ingest\ChunkWriteService: ~ + + App\Ingest\VectorRebuildService: ~ + + App\Ingest\IngestFlow: ~ + + # ------------------------------------------------------------ + # Tags Export # ------------------------------------------------------------ App\Tag\TagNdjsonExporter: @@ -202,7 +190,7 @@ services: App\Tag\TagRoutingService: ~ # ------------------------------------------------------------ - # Tag Rebuild Jobs (8A) + # Tag Rebuild Jobs # ------------------------------------------------------------ App\Service\TagRebuildJobService: diff --git a/python/vector/vector_ingest.py b/python/vector/vector_ingest.py index ebc847b..12973dd 100644 --- a/python/vector/vector_ingest.py +++ b/python/vector/vector_ingest.py @@ -11,7 +11,8 @@ from pathlib import Path parser = argparse.ArgumentParser(description="Build FAISS index from NDJSON") parser.add_argument("--index", required=True, help="Path to index.ndjson") -parser.add_argument("--out", required=True, help="Path to output vector.index") +parser.add_argument("--out", required=True, help="Path to output vector.index (tmp)") + parser.add_argument("--model", default="all-MiniLM-L6-v2", help="SentenceTransformer model") args = parser.parse_args() @@ -82,13 +83,7 @@ with open(index_path, "r", encoding="utf-8") as f: if not texts: print("No chunks found. Removing vector index.") - if out_path.exists(): - out_path.unlink() - - meta_path = out_path.with_suffix(".meta.json") - if meta_path.exists(): - meta_path.unlink() - + # Entferne final erst später in PHP atomar sys.exit(0) print(f"Loaded {len(texts)} chunks.") @@ -119,16 +114,19 @@ index.add(embeddings) # Ensure output directory exists out_path.parent.mkdir(parents=True, exist_ok=True) +# --------------------------------------------------------- +# Write FAISS index (tmp) +# --------------------------------------------------------- print(f"Writing FAISS index to {out_path}") faiss.write_index(index, str(out_path)) # --------------------------------------------------------- -# Write ID mapping meta +# Write ID mapping meta (tmp) # --------------------------------------------------------- -meta_path = out_path.with_suffix(".meta.json") +meta_tmp_path = Path(str(out_path) + ".meta.json") -with open(meta_path, "w", encoding="utf-8") as f: +with open(meta_tmp_path, "w", encoding="utf-8") as f: json.dump(ids, f) print(f"Indexed {len(ids)} chunks successfully.") -sys.exit(0) +sys.exit(0) \ No newline at end of file diff --git a/src/Controller/Admin/DocumentController.php b/src/Controller/Admin/DocumentController.php index 3f6a476..b428e25 100644 --- a/src/Controller/Admin/DocumentController.php +++ b/src/Controller/Admin/DocumentController.php @@ -21,12 +21,10 @@ use Symfony\Component\HttpFoundation\Response; use Symfony\Component\HttpKernel\Exception\NotFoundHttpException; use Symfony\Component\Routing\Attribute\Route; use Symfony\Component\Uid\Uuid; -use function function_exists; #[Route('/admin/documents')] class DocumentController extends AbstractController { - #[Route('', name: 'admin_documents')] public function index(EntityManagerInterface $em): Response { @@ -41,7 +39,7 @@ class DocumentController extends AbstractController ->getResult(); return $this->render('admin/document/index.html.twig', [ - 'documents' => $documents + 'documents' => $documents, ]); } @@ -54,7 +52,7 @@ class DocumentController extends AbstractController { try { $uuid = Uuid::fromString($id); - } catch (\Exception $e) { + } catch (\Exception) { throw new NotFoundHttpException(); } @@ -65,7 +63,7 @@ class DocumentController extends AbstractController } return $this->render('admin/document/show.html.twig', [ - 'document' => $document + 'document' => $document, ]); } @@ -76,92 +74,72 @@ class DocumentController extends AbstractController FormatText $formatText, IngestJobService $jobService, ParameterBagInterface $params - ): Response - { - if ($request->isMethod('POST')) { - - /** @var UploadedFile|null $file */ - $file = $request->files->get('file'); - - if (!$file instanceof UploadedFile) { - throw new \InvalidArgumentException('No valid file uploaded.'); - } - - $rawTitle = $request->request->get('title'); - - $title = is_string($rawTitle) && $rawTitle !== '' - ? $rawTitle - : $formatText->slugify($file->getClientOriginalName()); - - - if (!$title) { - $this->addFlash('error', 'Titel ist erforderlich.'); - return $this->redirectToRoute('admin_document_new'); - } - - $uploadDir = $params->get('mto.vector.data.upload.path'); - - if (!is_dir($uploadDir)) { - mkdir($uploadDir, 0777, true); - } - - $newFilename = uniqid() . '_' . $file->getClientOriginalName(); - - try { - $file->move($uploadDir, $newFilename); - } catch (FileException $e) { - throw new \RuntimeException('File upload failed.'); - } - - $filePath = $uploadDir . '/' . $newFilename; - - // Dokument erstellen - $document = $documentService->createDocument( - $title, - $filePath, - $this->getUser() - ); - - // --------------------------------------------------------- - // AUTO-INTEGRATION: gleicher Flow wie "Version aktivieren" - // --------------------------------------------------------- - - $version = $document->getCurrentVersion(); - - $job = $jobService->startJob( - IngestJob::TYPE_DOCUMENT_VERSION_ACTIVATE, - $this->getUser(), - $version->getDocument()->getId(), - $version->getId(), - null, - IngestJob::STATUS_QUEUED - ); - - $projectDir = (string)$this->getParameter('kernel.project_dir'); - $console = $projectDir . '/bin/console'; - - $cmd = sprintf( - '%s %s %s %s > /dev/null 2>&1 &', - escapeshellarg($console), - escapeshellarg('mto:agent:ingest:run'), - escapeshellarg((string)$job->getId()), - escapeshellarg('--no-interaction'), - ); - - if (!function_exists('exec')) { - $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); - $this->addFlash('danger', 'Dokument erstellt, aber Ingest konnte nicht asynchron gestartet werden.'); - return $this->redirectToRoute('admin_documents'); - } - - exec($cmd); - - return $this->redirectToRoute('admin_job_show', [ - 'id' => (string)$job->getId(), - ]); + ): Response { + if (!$request->isMethod('POST')) { + return $this->render('admin/document/new.html.twig'); } - return $this->render('admin/document/new.html.twig'); + /** @var UploadedFile|null $file */ + $file = $request->files->get('file'); + if (!$file instanceof UploadedFile) { + throw new \InvalidArgumentException('No valid file uploaded.'); + } + + $rawTitle = $request->request->get('title'); + $title = is_string($rawTitle) && $rawTitle !== '' + ? $rawTitle + : $formatText->slugify($file->getClientOriginalName()); + + if (!$title) { + $this->addFlash('error', 'Titel ist erforderlich.'); + return $this->redirectToRoute('admin_document_new'); + } + + $uploadDir = (string)$params->get('mto.vector.data.upload.path'); + $this->ensureDir($uploadDir); + + $newFilename = uniqid('', true) . '_' . $file->getClientOriginalName(); + + try { + $file->move($uploadDir, $newFilename); + } catch (FileException) { + throw new \RuntimeException('File upload failed.'); + } + + $filePath = $uploadDir . '/' . $newFilename; + + $document = $documentService->createDocument( + $title, + $filePath, + $this->getUser() + ); + + $version = $document->getCurrentVersion(); + if (!$version instanceof DocumentVersion) { + $this->addFlash('danger', 'Dokument erstellt, aber es wurde keine aktuelle Version erzeugt.'); + return $this->redirectToRoute('admin_documents'); + } + + $job = $jobService->startJob( + IngestJob::TYPE_DOCUMENT_VERSION_ACTIVATE, + $this->getUser(), + $version->getDocument()->getId(), + $version->getId(), + null, + IngestJob::STATUS_QUEUED + ); + + if (!$this->canExec()) { + $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); + $this->addFlash('danger', 'Dokument erstellt, aber Ingest konnte nicht asynchron gestartet werden (exec deaktiviert).'); + return $this->redirectToRoute('admin_documents'); + } + + $this->startIngestJob((string)$job->getId()); + + return $this->redirectToRoute('admin_job_show', [ + 'id' => (string)$job->getId(), + ]); } #[Route('/{id}/version/new', name: 'admin_document_version_new', requirements: ['id' => '[0-9a-fA-F\-]{36}'])] @@ -171,52 +149,46 @@ class DocumentController extends AbstractController EntityManagerInterface $em, DocumentService $documentService, ParameterBagInterface $params - ): Response - { - + ): Response { $document = $em->getRepository(Document::class)->find($id); if (!$document) { throw $this->createNotFoundException(); } - if ($request->isMethod('POST')) { - - $file = $request->files->get('file'); - - if (!$file) { - $this->addFlash('error', 'Datei ist erforderlich.'); - return $this->redirectToRoute('admin_document_version_new', ['id' => $id]); - } - - $uploadDir = $params->get('mto.vector.data.upload.path'); - - if (!is_dir($uploadDir)) { - mkdir($uploadDir, 0777, true); - } - - $newFilename = uniqid() . '_' . $file->getClientOriginalName(); - - try { - $file->move($uploadDir, $newFilename); - } catch (FileException $e) { - throw new \RuntimeException('File upload failed.'); - } - - $filePath = $uploadDir . '/' . $newFilename; - - $documentService->addVersion( - $document, - $filePath, - $this->getUser() - ); - - return $this->redirectToRoute('admin_document_show', ['id' => $id]); + if (!$request->isMethod('POST')) { + return $this->render('admin/document/new_version.html.twig', [ + 'document' => $document, + ]); } - return $this->render('admin/document/new_version.html.twig', [ - 'document' => $document - ]); + /** @var UploadedFile|null $file */ + $file = $request->files->get('file'); + if (!$file instanceof UploadedFile) { + $this->addFlash('error', 'Datei ist erforderlich.'); + return $this->redirectToRoute('admin_document_version_new', ['id' => $id]); + } + + $uploadDir = (string)$params->get('mto.vector.data.upload.path'); + $this->ensureDir($uploadDir); + + $newFilename = uniqid('', true) . '_' . $file->getClientOriginalName(); + + try { + $file->move($uploadDir, $newFilename); + } catch (FileException) { + throw new \RuntimeException('File upload failed.'); + } + + $filePath = $uploadDir . '/' . $newFilename; + + $documentService->addVersion( + $document, + $filePath, + $this->getUser() + ); + + return $this->redirectToRoute('admin_document_show', ['id' => $id]); } #[Route( @@ -231,27 +203,18 @@ class DocumentController extends AbstractController EntityManagerInterface $em, DocumentService $documentService, IngestJobService $jobService, - ): RedirectResponse - { - - if (!$this->isCsrfTokenValid('activate_version_' . $versionId, $request->request->get('_token'))) { + ): RedirectResponse { + if (!$this->isCsrfTokenValid('activate_version_' . $versionId, (string)$request->request->get('_token'))) { throw $this->createAccessDeniedException(); } $version = $em->getRepository(DocumentVersion::class)->find($versionId); - if (!$version) { throw $this->createNotFoundException(); } try { $documentService->activateVersion($version); - // --------------------------------------------------------- - // Saubere IngestJob-Integration: - // 1) Job als QUEUED anlegen (spezieller Typ für Aktivierung) - // 2) Symfony-Command im Hintergrund starten - // 3) Direkt auf Job-Detailseite redirecten (Loader + Polling) - // --------------------------------------------------------- $job = $jobService->startJob( IngestJob::TYPE_DOCUMENT_VERSION_ACTIVATE, @@ -262,28 +225,15 @@ class DocumentController extends AbstractController IngestJob::STATUS_QUEUED ); - // Hintergrundprozess starten (Provider-kompatibel, kein Worker/Daemon) - $projectDir = (string)$this->getParameter('kernel.project_dir'); - $console = $projectDir . '/bin/console'; - - $cmd = sprintf( - '%s %s %s %s > /dev/null 2>&1 &', - escapeshellarg($console), - escapeshellarg('mto:agent:ingest:run'), - escapeshellarg((string)$job->getId()), - escapeshellarg('--no-interaction'), - ); - - // Best effort: wenn exec deaktiviert ist, sauber abbrechen. - if (!function_exists('exec')) { + if (!$this->canExec()) { $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); $this->addFlash('danger', 'Aktivierung ok, aber Ingest konnte nicht asynchron gestartet werden (exec deaktiviert).'); return $this->redirectToRoute('admin_document_show', [ - 'id' => $version->getDocument()->getId() + 'id' => $version->getDocument()->getId(), ]); } - exec($cmd); + $this->startIngestJob((string)$job->getId()); $this->addFlash('success', 'Version aktiviert. Ingest-Job wurde erstellt und gestartet.'); @@ -295,7 +245,7 @@ class DocumentController extends AbstractController } return $this->redirectToRoute('admin_document_show', [ - 'id' => $version->getDocument()->getId() + 'id' => $version->getDocument()->getId(), ]); } @@ -310,19 +260,17 @@ class DocumentController extends AbstractController Request $request, EntityManagerInterface $em, IngestJobService $jobService, - ): ?RedirectResponse - { - $dryRun = false; - if (!$this->isCsrfTokenValid('ingest_version_' . $versionId, $request->request->get('_token'))) { + ): ?RedirectResponse { + if (!$this->isCsrfTokenValid('ingest_version_' . $versionId, (string)$request->request->get('_token'))) { throw $this->createAccessDeniedException(); } $version = $em->getRepository(DocumentVersion::class)->find($versionId); - if (!$version) { throw $this->createNotFoundException(); } + /** @var IngestJob|null $existing */ $existing = $em->getRepository(IngestJob::class) ->findOneBy( ['documentVersionId' => $version->getId()], @@ -333,13 +281,6 @@ class DocumentController extends AbstractController return null; } - // --------------------------------------------------------- - // Asynchroner Ingest (ohne Messenger): - // 1) Job als QUEUED anlegen - // 2) Symfony-Command im Hintergrund starten - // 3) Direkt auf Job-Detailseite redirecten (Loader + Polling) - // --------------------------------------------------------- - $job = $jobService->startJob( IngestJob::TYPE_DOCUMENT, $this->getUser(), @@ -349,28 +290,15 @@ class DocumentController extends AbstractController IngestJob::STATUS_QUEUED ); - // Hintergrundprozess starten (Provider-kompatibel, kein Worker/Daemon) - $projectDir = (string)$this->getParameter('kernel.project_dir'); - $console = $projectDir . '/bin/console'; - - $cmd = sprintf( - '%s %s %s %s > /dev/null 2>&1 &', - escapeshellarg($console), - escapeshellarg('mto:agent:ingest:run'), - escapeshellarg((string)$job->getId()), - escapeshellarg('--no-interaction'), - ); - - // Best effort: wenn exec deaktiviert ist, sauber abbrechen. - if (!function_exists('exec')) { + if (!$this->canExec()) { $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); $this->addFlash('error', 'Ingest konnte nicht asynchron gestartet werden (exec deaktiviert).'); return $this->redirectToRoute('admin_document_show', [ - 'id' => $version->getDocument()->getId() + 'id' => $version->getDocument()->getId(), ]); } - exec($cmd); + $this->startIngestJob((string)$job->getId()); return $this->redirectToRoute('admin_job_show', [ 'id' => (string)$job->getId(), @@ -384,17 +312,21 @@ class DocumentController extends AbstractController )] public function resetCompleteSystem(ParameterBagInterface $params, Connection $connection): ?RedirectResponse { - if (!function_exists('exec')) { + if (!$this->canExec()) { $this->addFlash('danger', 'Der Reset konnte nicht gestartet werden (exec deaktiviert).'); return $this->redirectToRoute('admin_dashboard'); } - @unlink($params->get('mto.knowledge.ndjson')); - @unlink($params->get('mto.knowledge.vector_index')); - @unlink($params->get('mto.knowledge.vector_index_meta')); - @unlink($params->get('mto.knowledge.index_meta')); - @unlink($params->get('mto.runtime.meta')); - exec('rm -rf ' . $params->get('mto.knowledge.upload')); + @unlink((string)$params->get('mto.knowledge.ndjson')); + @unlink((string)$params->get('mto.knowledge.vector_index')); + @unlink((string)$params->get('mto.knowledge.vector_index_meta')); + @unlink((string)$params->get('mto.knowledge.index_meta')); + @unlink((string)$params->get('mto.runtime.meta')); + + $uploadDir = (string)$params->get('mto.knowledge.upload'); + if ($uploadDir !== '' && is_dir($uploadDir)) { + exec('rm -rf ' . escapeshellarg($uploadDir)); + } $sql = ' SET FOREIGN_KEY_CHECKS = 0; @@ -425,39 +357,29 @@ class DocumentController extends AbstractController EntityManagerInterface $em, IngestJobService $jobService, LockService $lockService, - DocumentService $documentService - ): RedirectResponse - { - if (!$this->isCsrfTokenValid('delete_document_' . $id, $request->request->get('_token'))) { + ): RedirectResponse { + if (!$this->isCsrfTokenValid('delete_document_' . $id, (string)$request->request->get('_token'))) { throw $this->createAccessDeniedException(); } try { $uuid = Uuid::fromString($id); - } catch (\Exception $e) { + } catch (\Exception) { throw $this->createNotFoundException(); } + /** @var Document|null $document */ $document = $em->getRepository(Document::class)->find($uuid); - if (!$document) { throw $this->createNotFoundException(); } - // --------------------------------------------------------- - // 🔒 Delete nur erlauben wenn kein anderer Job läuft - // --------------------------------------------------------- if (!$lockService->acquire()) { $this->addFlash('danger', 'Ein Ingest-Job läuft bereits. Löschen derzeit nicht möglich.'); return $this->redirectToRoute('admin_documents'); } - - // Nur Test-Lock – echter Lock im Orchestrator $lockService->release(); - // --------------------------------------------------------- - // 1) Delete-Job anlegen (QUEUED) - // --------------------------------------------------------- $job = $jobService->startJob( IngestJob::TYPE_DOCUMENT_DELETE, $this->getUser(), @@ -467,27 +389,13 @@ class DocumentController extends AbstractController IngestJob::STATUS_QUEUED ); - // --------------------------------------------------------- - // 2) Hintergrundprozess starten - // --------------------------------------------------------- - $projectDir = (string)$this->getParameter('kernel.project_dir'); - $console = $projectDir . '/bin/console'; - - $cmd = sprintf( - '%s %s %s %s > /dev/null 2>&1 &', - escapeshellarg($console), - escapeshellarg('mto:agent:ingest:run'), - escapeshellarg((string)$job->getId()), - escapeshellarg('--no-interaction'), - ); - - if (!function_exists('exec')) { + if (!$this->canExec()) { $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); $this->addFlash('danger', 'Löschen konnte nicht gestartet werden (exec deaktiviert).'); return $this->redirectToRoute('admin_documents'); } - exec($cmd); + $this->startIngestJob((string)$job->getId()); $this->addFlash('success', 'Löschvorgang gestartet. Dokument wird nach Index-Rebuild entfernt.'); @@ -495,4 +403,42 @@ class DocumentController extends AbstractController 'id' => (string)$job->getId(), ]); } -} + + // ========================================================= + // Helpers + // ========================================================= + + private function canExec(): bool + { + return function_exists('exec'); + } + + private function ensureDir(string $dir): void + { + if ($dir === '') { + throw new \RuntimeException('Upload directory not configured.'); + } + + if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) { + throw new \RuntimeException('Unable to create upload directory.'); + } + } + + private function startIngestJob(string $jobId): void + { + $projectDir = (string)$this->getParameter('kernel.project_dir'); + $console = $projectDir . '/bin/console'; + + // WICHTIG: --no-interaction ist ein GLOBAL-Flag und muss VOR dem Command stehen! + $cmd = sprintf( + '%s %s %s %s %s > /dev/null 2>&1 &', + escapeshellarg(PHP_BINARY), + escapeshellarg($console), + '--no-interaction', + escapeshellarg('mto:agent:ingest:run'), + escapeshellarg($jobId), + ); + + exec($cmd); + } +} \ No newline at end of file diff --git a/src/Index/IndexMetaManager.php b/src/Index/IndexMetaManager.php index a769be9..34ec93d 100644 --- a/src/Index/IndexMetaManager.php +++ b/src/Index/IndexMetaManager.php @@ -15,15 +15,13 @@ final class IndexMetaManager string $runTimePath, IndexConfigurationProvider $provider ) { - $this->metaPath = $metaPath; - $this->provider = $provider; - - // runtime liegt im selben Verzeichnis + $this->metaPath = $metaPath; $this->runtimePath = $runTimePath; + $this->provider = $provider; } // ===================================================== - // META (Governance – unverändert lassen!) + // META (Governance – unverändert inhaltlich) // ===================================================== public function ensureExists(): void @@ -39,10 +37,12 @@ final class IndexMetaManager return null; } - return json_decode( + $data = json_decode( (string) file_get_contents($this->metaPath), true ); + + return is_array($data) ? $data : null; } public function validateAgainstCurrent(): void @@ -85,18 +85,7 @@ final class IndexMetaManager $config->toStructureArray() ); - $dir = dirname($this->metaPath); - if (!is_dir($dir)) { - mkdir($dir, 0777, true); - } - - file_put_contents( - $this->metaPath, - json_encode( - $payload, - JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES - ) - ); + $this->atomicWriteJson($this->metaPath, $payload); } // ===================================================== @@ -109,20 +98,12 @@ final class IndexMetaManager return; } - $dir = dirname($this->runtimePath); - if (!is_dir($dir)) { - mkdir($dir, 0777, true); - } - $payload = [ - 'chunk_count' => 0, + 'chunk_count' => 0, 'last_rebuild_at' => null, ]; - file_put_contents( - $this->runtimePath, - json_encode($payload, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES) - ); + $this->atomicWriteJson($this->runtimePath, $payload); } public function updateRuntimeStats(int $chunkCount): void @@ -130,14 +111,11 @@ final class IndexMetaManager $this->ensureRuntimeFileExists(); $payload = [ - 'chunk_count' => $chunkCount, + 'chunk_count' => $chunkCount, 'last_rebuild_at' => (new \DateTimeImmutable())->format(DATE_ATOM), ]; - file_put_contents( - $this->runtimePath, - json_encode($payload, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES) - ); + $this->atomicWriteJson($this->runtimePath, $payload); } public function getRuntimeChunkCount(): int @@ -151,5 +129,37 @@ final class IndexMetaManager return (int)($data['chunk_count'] ?? 0); } -} + // ===================================================== + // INTERNAL ATOMIC JSON WRITE + // ===================================================== + + private function atomicWriteJson(string $path, array $payload): void + { + $dir = dirname($path); + + if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) { + throw new \RuntimeException('Unable to create directory for meta/runtime'); + } + + $tmpPath = $path . '.tmp'; + + $json = json_encode( + $payload, + JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES + ); + + if ($json === false) { + throw new \RuntimeException('Unable to encode JSON payload'); + } + + if (file_put_contents($tmpPath, $json) === false) { + throw new \RuntimeException('Unable to write temporary JSON file'); + } + + if (!rename($tmpPath, $path)) { + @unlink($tmpPath); + throw new \RuntimeException('Atomic switch failed for JSON file'); + } + } +} \ No newline at end of file diff --git a/src/Ingest/ChunkWriteService.php b/src/Ingest/ChunkWriteService.php new file mode 100644 index 0000000..4d445da --- /dev/null +++ b/src/Ingest/ChunkWriteService.php @@ -0,0 +1,72 @@ +chunkManager->getIndexPath(); + } + + public function countAllChunks(): int + { + return $this->chunkManager->countAllChunks(); + } + + public function compactByDocumentId(Uuid $documentId): void + { + $this->chunkManager->compactByDocument($documentId); + } + + /** + * @param iterable> $chunks + */ + public function appendChunks(iterable $chunks): void + { + $this->chunkManager->appendChunks($chunks); + } + + /** + * Lokaler Ingest für eine einzelne DocumentVersion. + * + * Ablauf: + * 1. Entfernt bestehende Chunks dieses Dokuments + * 2. Appendet neue Chunks + * + * @param iterable> $chunks + */ + public function writeForDocumentVersion( + DocumentVersion $version, + iterable $chunks + ): void { + $documentId = $version->getDocument()->getId(); + + if (!$documentId instanceof Uuid) { + throw new \RuntimeException('Document ID must be a Uuid instance'); + } + + $this->chunkManager->compactByDocument($documentId); + $this->chunkManager->appendChunks($chunks); + } + + /** + * Vollständiger Rewrite des NDJSON-Index (Global Reindex). + * + * @param iterable> $allChunks + */ + public function rewriteAll(iterable $allChunks): void + { + $this->chunkManager->rewriteAll($allChunks); + } +} \ No newline at end of file diff --git a/src/Ingest/GuardrailValidator.php b/src/Ingest/GuardrailValidator.php new file mode 100644 index 0000000..50282d6 --- /dev/null +++ b/src/Ingest/GuardrailValidator.php @@ -0,0 +1,23 @@ +metaManager->validateAgainstCurrent(); + } +} \ No newline at end of file diff --git a/src/Ingest/IngestFlow.php b/src/Ingest/IngestFlow.php index e36f34a..dd6ae36 100644 --- a/src/Ingest/IngestFlow.php +++ b/src/Ingest/IngestFlow.php @@ -7,9 +7,7 @@ namespace App\Ingest; use App\Entity\Document; use App\Entity\DocumentVersion; use App\Index\IndexMetaManager; -use App\Knowledge\ChunkManager; use App\Knowledge\Ingest\KnowledgeIngestService; -use App\Vector\VectorIndexBuilder; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Uid\Uuid; @@ -21,45 +19,149 @@ final readonly class IngestFlow public function __construct( private KnowledgeIngestService $knowledgeIngestService, - private ChunkManager $chunkManager, - private VectorIndexBuilder $vectorBuilder, + private GuardrailValidator $guardrailValidator, + private ChunkWriteService $chunkWriteService, + private VectorRebuildService $vectorRebuildService, private IndexMetaManager $metaManager, + private IngestLockService $lockService, private LoggerInterface $logger, private EntityManagerInterface $em, ) {} // ========================================================= - // DOCUMENT INGEST (STREAMING SAFE) + // DOCUMENT INGEST // ========================================================= public function ingestDocumentVersion(DocumentVersion $version): void { - $this->metaManager->validateAgainstCurrent(); + $this->withLock(function () use ($version): void { - $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); - $this->em->flush(); + $this->guardrailValidator->validateOrThrow(); - try { + $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); + $this->em->flush(); - $this->chunkManager->compactByDocument($version->getDocument()->getId()); + try { + $documentId = $version->getDocument()->getId(); + if (!$documentId instanceof Uuid) { + throw new \RuntimeException('Document ID must be a Uuid instance'); + } - $existing = $this->chunkManager->countAllChunks(); + // 1) Alte Chunks entfernen + $this->chunkWriteService->compactByDocumentId($documentId); + // 2) Existing Chunks nach Compaction zählen + $existing = $this->chunkWriteService->countAllChunks(); + + $incoming = 0; + $warned = false; + + $generator = $this->knowledgeIngestService->buildChunkRecords($version); + + $wrappedGenerator = (function () use ($generator, $existing, &$incoming, &$warned) { + foreach ($generator as $record) { + $incoming++; + $total = $existing + $incoming; + + if (!$warned && $total >= self::CHUNK_LIMIT_WARN) { + $warned = true; + } + + if ($total > self::CHUNK_LIMIT_HARD) { + throw new \RuntimeException('Chunk limit exceeded.'); + } + + yield $record; + } + })(); + + // 3) Streaming Append + $this->chunkWriteService->appendChunks($wrappedGenerator); + + $total = $existing + $incoming; + + if ($warned) { + $this->logger->warning('Chunk count approaching limit.', [ + 'existing' => $existing, + 'incoming' => $incoming, + 'total' => $total, + 'document' => (string)$documentId, + 'version' => (string)$version->getId(), + ]); + } + + // 4) Vector Rebuild + Runtime Update + $this->vectorRebuildService->rebuild(); + + $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); + $this->em->flush(); + + } catch (\Throwable $e) { + $version->setIngestStatus(DocumentVersion::INGEST_FAILED); + $this->em->flush(); + throw $e; + } + }); + } + + // ========================================================= + // GLOBAL REINDEX + // ========================================================= + + public function globalReindex(): void + { + $this->withLock(function (): void { + + // Global Reindex ist der Drift-Fix → keine Guardrail-Blockade hier. + + $activeDocuments = $this->em + ->getRepository(Document::class) + ->createQueryBuilder('d') + ->where('d.status = :status') + ->setParameter('status', Document::STATUS_ACTIVE) + ->getQuery() + ->getResult(); + + if (empty($activeDocuments)) { + throw new \RuntimeException('Global Reindex aborted: no active documents found.'); + } + + $existing = 0; // rewriteAll ersetzt alles $incoming = 0; - $generator = $this->knowledgeIngestService->buildChunkRecords($version); + $warned = false; - $wrappedGenerator = (function () use ($generator, $existing, &$incoming) { + $generator = $this->knowledgeIngestService->buildAllActiveChunkRecords(); + + // 1) "Peek" ohne RAM: erstes Element ziehen + $first = null; + foreach ($generator as $record) { + $first = $record; + $incoming++; + break; + } + + if ($first === null) { + throw new \RuntimeException('Global Reindex aborted: no chunks generated.'); + } + + // 2) Stream bauen, Limits prüfen + $stream = (function () use ($first, $generator, $existing, &$incoming, &$warned) { + // first + $total = $existing + $incoming; + if (!$warned && $total >= self::CHUNK_LIMIT_WARN) { + $warned = true; + } + if ($total > self::CHUNK_LIMIT_HARD) { + throw new \RuntimeException('Chunk limit exceeded.'); + } + yield $first; foreach ($generator as $record) { - $incoming++; $total = $existing + $incoming; - if ($total >= self::CHUNK_LIMIT_WARN) { - // Nur einmal warnen - if ($incoming === 1 || $total === self::CHUNK_LIMIT_WARN) { - // Logging erfolgt außerhalb des Streams final - } + if (!$warned && $total >= self::CHUNK_LIMIT_WARN) { + $warned = true; } if ($total > self::CHUNK_LIMIT_HARD) { @@ -68,89 +170,23 @@ final readonly class IngestFlow yield $record; } - })(); - $this->chunkManager->appendChunks($wrappedGenerator); + // 3) Rewrite + Rebuild + $this->chunkWriteService->rewriteAll($stream); - $total = $existing + $incoming; - - if ($total >= self::CHUNK_LIMIT_WARN) { - $this->logger->warning('Chunk count approaching limit.', [ - 'existing' => $existing, + if ($warned) { + $this->logger->warning('Chunk count approaching limit after global reindex.', [ 'incoming' => $incoming, - 'total' => $total, + 'total' => $incoming, ]); } - $this->rebuildIndex(false); + $this->vectorRebuildService->rebuild(); - $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); - $this->em->flush(); - - } catch (\Throwable $e) { - - $version->setIngestStatus(DocumentVersion::INGEST_FAILED); - $this->em->flush(); - throw $e; - } - } - - // ========================================================= - // GLOBAL REINDEX (STREAMING SAFE) - // ========================================================= - - public function globalReindex(): void - { - $activeDocuments = $this->em - ->getRepository(Document::class) - ->createQueryBuilder('d') - ->where('d.status = :status') - ->setParameter('status', Document::STATUS_ACTIVE) - ->getQuery() - ->getResult(); - - if (empty($activeDocuments)) { - throw new \RuntimeException( - 'Global Reindex abgebrochen: Es sind keine aktiven Dokumente vorhanden.' - ); - } - - $incoming = 0; - - $generator = $this->knowledgeIngestService->buildAllActiveChunkRecords(); - - $wrappedGenerator = (function () use ($generator, &$incoming) { - - foreach ($generator as $record) { - $incoming++; - yield $record; - } - - })(); - - // Prüfen ob überhaupt etwas kommt (ohne alles in RAM zu ziehen) - $peekIterator = $wrappedGenerator instanceof \Iterator - ? $wrappedGenerator - : (function () use ($wrappedGenerator) { - foreach ($wrappedGenerator as $item) { - yield $item; - } - })(); - - if (!$peekIterator->valid()) { - $peekIterator->rewind(); - } - - if (!$peekIterator->valid()) { - throw new \RuntimeException( - 'Global Reindex abgebrochen: Es wurden keine Chunks erzeugt.' - ); - } - - $this->chunkManager->rewriteAll($peekIterator); - - $this->rebuildIndex(true); + // Governance: Version erhöhen + $this->metaManager->writeMetaForGlobalReindex(); + }); } // ========================================================= @@ -159,42 +195,54 @@ final readonly class IngestFlow public function deleteDocument(Uuid $documentId): void { - $this->metaManager->validateAgainstCurrent(); + $this->withLock(function () use ($documentId): void { - $document = $this->em - ->getRepository(Document::class) - ->find($documentId); + $this->guardrailValidator->validateOrThrow(); - if (!$document) { - throw new \RuntimeException('Document not found.'); - } + /** @var Document|null $document */ + $document = $this->em + ->getRepository(Document::class) + ->find($documentId); - $this->chunkManager->compactByDocument($documentId); + if (!$document) { + throw new \RuntimeException('Document not found.'); + } - $this->em->remove($document); - $this->em->flush(); + // 1) Chunks entfernen + $this->chunkWriteService->compactByDocumentId($documentId); - $this->rebuildIndex(false); + // 2) FK-sicher löschen: currentVersion lösen (verhindert „Version zeigt noch auf DocumentVersion“) + if (method_exists($document, 'getCurrentVersion') && method_exists($document, 'setCurrentVersion')) { + if ($document->getCurrentVersion() !== null) { + $document->setCurrentVersion(null); + $this->em->flush(); + } + } + + // 3) Dokument entfernen + $this->em->remove($document); + $this->em->flush(); + + // 4) Vector rebuild + runtime update + $this->vectorRebuildService->rebuild(); + }); } // ========================================================= - // CENTRAL REBUILD + // INTERNALS // ========================================================= - private function rebuildIndex(bool $isGlobal): void + /** + * @param callable():void $fn + */ + private function withLock(callable $fn): void { - $this->vectorBuilder->rebuildFromNdjson(); + $this->lockService->acquire(); - if ($isGlobal) { - $this->metaManager->writeMetaForGlobalReindex(); + try { + $fn(); + } finally { + $this->lockService->release(); } - - $this->updateChunkCount(); - } - - private function updateChunkCount(): void - { - $chunkCount = $this->chunkManager->countAllChunks(); - $this->metaManager->updateRuntimeStats($chunkCount); } } \ No newline at end of file diff --git a/src/Ingest/IngestLockService.php b/src/Ingest/IngestLockService.php new file mode 100644 index 0000000..9469d71 --- /dev/null +++ b/src/Ingest/IngestLockService.php @@ -0,0 +1,51 @@ +lockFilePath = rtrim($projectDir, '/') . '/var/knowledge/locks/ingest.lock'; + } + + public function acquire(): void + { + $dir = dirname($this->lockFilePath); + + if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) { + throw new \RuntimeException('Unable to create lock directory.'); + } + + $this->handle = fopen($this->lockFilePath, 'c'); + + if ($this->handle === false) { + throw new \RuntimeException('Unable to open ingest lock file.'); + } + + if (!flock($this->handle, LOCK_EX | LOCK_NB)) { + throw new \RuntimeException('Another ingest process is already running.'); + } + } + + public function release(): void + { + if ($this->handle !== null) { + flock($this->handle, LOCK_UN); + fclose($this->handle); + $this->handle = null; + } + } + + public function __destruct() + { + $this->release(); + } +} \ No newline at end of file diff --git a/src/Ingest/VectorRebuildService.php b/src/Ingest/VectorRebuildService.php new file mode 100644 index 0000000..c264125 --- /dev/null +++ b/src/Ingest/VectorRebuildService.php @@ -0,0 +1,38 @@ +vectorBuilder->rebuildFromNdjson($logPath); + + // 2️⃣ Chunk Count streaming-safe zählen + $chunkCount = $this->chunkManager->countAllChunks(); + + // 3️⃣ Runtime-Stats aktualisieren (atomar) + $this->metaManager->updateRuntimeStats($chunkCount); + } +} \ No newline at end of file diff --git a/src/Vector/VectorIndexBuilder.php b/src/Vector/VectorIndexBuilder.php index fe7a3c6..40c99a8 100644 --- a/src/Vector/VectorIndexBuilder.php +++ b/src/Vector/VectorIndexBuilder.php @@ -13,7 +13,6 @@ final class VectorIndexBuilder private string $pythonBin; private string $scriptPath; private string $indexNdjsonPath; - private string $indexMetaPath; private string $vectorIndexPath; private string $vectorMetaPath; private int $timeoutSeconds; @@ -24,7 +23,6 @@ final class VectorIndexBuilder string $pythonBin, string $scriptPath, string $indexNdjsonPath, - string $indexMetaPath, string $vectorIndexPath, int $timeoutSeconds, IndexConfigurationProvider $configurationProvider @@ -32,54 +30,30 @@ final class VectorIndexBuilder $this->pythonBin = $pythonBin; $this->scriptPath = $scriptPath; $this->indexNdjsonPath = $indexNdjsonPath; - $this->indexMetaPath = $indexMetaPath; $this->vectorIndexPath = $vectorIndexPath; $this->vectorMetaPath = $vectorIndexPath . '.meta.json'; $this->timeoutSeconds = $timeoutSeconds; $this->configurationProvider = $configurationProvider; } - /** - * Rebuild FAISS Index deterministisch aus index.ndjson. - */ public function rebuildFromNdjson(?string $logPath = null): void { $this->assertPreconditions(); - // -------------------------------------------- - // 🔵 FALL: NDJSON ist leer → kein Vector Index - // -------------------------------------------- if (!is_file($this->indexNdjsonPath) || filesize($this->indexNdjsonPath) === 0) { - @unlink($this->vectorIndexPath); @unlink($this->vectorMetaPath); - - if ($logPath !== null) { - @file_put_contents( - $logPath, - "NDJSON empty → Vector index removed\n", - FILE_APPEND - ); - } - return; } - // -------------------------------------------- - // 🟢 FALL: NDJSON enthält Chunks - // -------------------------------------------- - - if (!is_file($this->indexMetaPath)) { - $this->initializeIndexMeta(); - } - - $indexMeta = $this->readIndexMeta(); - $embeddingModel = $indexMeta['embedding_model']; + $config = $this->configurationProvider->getConfiguration(); + $embeddingModel = $config->getEmbeddingModel(); $tmpVectorIndexPath = $this->vectorIndexPath . '.tmp'; + $tmpVectorMetaPath = $tmpVectorIndexPath . '.meta.json'; @unlink($tmpVectorIndexPath); - @unlink($this->vectorMetaPath); + @unlink($tmpVectorMetaPath); $cmd = [ $this->pythonBin, @@ -94,107 +68,51 @@ final class VectorIndexBuilder $this->runProcess($process, $logPath); - $this->validatePythonOutputs($tmpVectorIndexPath); + $this->validateOutputs($tmpVectorIndexPath, $tmpVectorMetaPath); - $this->atomicSwitch($tmpVectorIndexPath); + $this->atomicSwitchPair( + $tmpVectorIndexPath, + $tmpVectorMetaPath + ); } - // ----------------------------------------------------- - // Internals - // ----------------------------------------------------- - private function assertPreconditions(): void { if (!is_file($this->scriptPath)) { - throw new \RuntimeException( - 'Vector build script not found at: ' . $this->scriptPath - ); + throw new \RuntimeException('Vector build script not found.'); } - if (!is_file($this->indexNdjsonPath)) { - throw new \RuntimeException( - 'index.ndjson not found at: ' . $this->indexNdjsonPath - ); + throw new \RuntimeException('index.ndjson not found.'); } } - private function readIndexMeta(): array + private function validateOutputs(string $tmpIndex, string $tmpMeta): void { - $meta = json_decode( - (string) file_get_contents($this->indexMetaPath), - true - ); - - if (!is_array($meta) || empty($meta['embedding_model'])) { - throw new \RuntimeException('Invalid index_meta.json'); - } - - return $meta; - } - - private function initializeIndexMeta(): void - { - $dir = dirname($this->indexMetaPath); - - if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) { - throw new \RuntimeException('Cannot create knowledge directory'); - } - - $config = $this->configurationProvider->getConfiguration(); - - $data = [ - 'index_version' => 1, - 'created_at' => (new \DateTimeImmutable())->format(DATE_ATOM), - 'embedding_model' => $config->getEmbeddingModel(), - 'embedding_dimension' => $config->getEmbeddingDimension(), - 'chunk_size' => $config->getChunkSize(), - 'chunk_overlap' => $config->getChunkOverlap(), - 'scoring_version' => $config->getScoringVersion(), - 'index_format' => 'ndjson', - 'vector_backend' => 'faiss', - ]; - - file_put_contents( - $this->indexMetaPath, - json_encode($data, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES) - ); - } - - private function validatePythonOutputs(string $tmpVectorIndexPath): void - { - if (!is_file($tmpVectorIndexPath) || filesize($tmpVectorIndexPath) === 0) { + if (!is_file($tmpIndex) || filesize($tmpIndex) === 0) { throw new \RuntimeException('Vector index tmp missing or empty'); } - - if (!is_file($this->vectorMetaPath) || filesize($this->vectorMetaPath) === 0) { - throw new \RuntimeException('Vector meta missing or empty'); + if (!is_file($tmpMeta) || filesize($tmpMeta) === 0) { + throw new \RuntimeException('Vector meta tmp missing or empty'); } } - private function atomicSwitch(string $tmpVectorIndexPath): void + private function atomicSwitchPair(string $tmpIndex, string $tmpMeta): void { - if (!rename($tmpVectorIndexPath, $this->vectorIndexPath)) { + if (!rename($tmpIndex, $this->vectorIndexPath)) { throw new \RuntimeException('Atomic switch failed for vector index'); } + + if (!rename($tmpMeta, $this->vectorMetaPath)) { + throw new \RuntimeException('Atomic switch failed for vector meta'); + } } private function runProcess(Process $process, ?string $logPath): void { - if ($logPath !== null) { - @file_put_contents($logPath, "=== VectorIndexBuilder START ===\n", FILE_APPEND); - } - $process->run(); if (!$process->isSuccessful()) { - if ($logPath !== null) { - @file_put_contents($logPath, $process->getErrorOutput(), FILE_APPEND); - } throw new ProcessFailedException($process); } - - if ($logPath !== null) { - @file_put_contents($logPath, "=== VectorIndexBuilder OK ===\n", FILE_APPEND); - } } } \ No newline at end of file