diff --git a/src/Command/KnowledgeIngestCommand.php b/src/Command/KnowledgeIngestCommand.php index 25d9e65..d64ccda 100644 --- a/src/Command/KnowledgeIngestCommand.php +++ b/src/Command/KnowledgeIngestCommand.php @@ -6,7 +6,7 @@ namespace App\Command; use App\Entity\DocumentVersion; use App\Entity\User; -use App\Ingest\IngestFlow; +use App\Service\IngestOrchestrator; use Doctrine\ORM\EntityManagerInterface; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; @@ -18,7 +18,7 @@ use Symfony\Component\Console\Output\OutputInterface; class KnowledgeIngestCommand extends Command { public function __construct( - private readonly IngestFlow $ingestFlow, + private readonly IngestOrchestrator $orchestrator, private readonly EntityManagerInterface $em, ) { parent::__construct(); @@ -33,8 +33,8 @@ class KnowledgeIngestCommand extends Command protected function execute(InputInterface $input, OutputInterface $output): int { - $versionId = $input->getArgument('versionId'); - $userId = $input->getArgument('userId'); + $versionId = (string) $input->getArgument('versionId'); + $userId = (string) $input->getArgument('userId'); $version = $this->em->getRepository(DocumentVersion::class)->find($versionId); $user = $this->em->getRepository(User::class)->find($userId); @@ -46,9 +46,9 @@ class KnowledgeIngestCommand extends Command $output->writeln('Starting ingest...'); - $this->ingestFlow->ingestDocumentVersion($version, $user); + $job = $this->orchestrator->runForVersion($version, $user, false); - $output->writeln('Ingest completed.'); + $output->writeln(sprintf('Ingest completed. Job: %s', (string) $job->getId())); return Command::SUCCESS; } diff --git a/src/Controller/Admin/DocumentController.php b/src/Controller/Admin/DocumentController.php index 28d5a14..bf8d800 100644 --- a/src/Controller/Admin/DocumentController.php +++ b/src/Controller/Admin/DocumentController.php @@ -5,6 +5,7 @@ namespace App\Controller\Admin; use App\Entity\Document; use App\Entity\DocumentVersion; +use App\Entity\IngestJob; use App\Service\DocumentService; use App\Service\IngestOrchestrator; use Doctrine\ORM\EntityManagerInterface; @@ -183,16 +184,16 @@ class DocumentController extends AbstractController #[Route( '/version/{versionId}/ingest', name: 'admin_document_version_ingest', - methods: ['POST'], - requirements: ['versionId' => '[0-9a-fA-F\-]{36}'] + requirements: ['versionId' => '[0-9a-fA-F\-]{36}'], + methods: ['POST'] )] public function ingestVersion( string $versionId, Request $request, EntityManagerInterface $em, IngestOrchestrator $orchestrator - ): RedirectResponse { - + ): ?RedirectResponse { + $dryRun = false; if (!$this->isCsrfTokenValid('ingest_version', $request->request->get('_token'))) { throw $this->createAccessDeniedException(); } @@ -203,10 +204,20 @@ class DocumentController extends AbstractController throw $this->createNotFoundException(); } + $existing = $em->getRepository(IngestJob::class) + ->findOneBy( + ['documentVersionId' => $version->getId()], + ['startedAt' => 'DESC'] + ); + + if ($existing && $existing->getStartedAt() > new \DateTimeImmutable('-3 seconds')) { + return null; + } + $orchestrator->runForVersion( $version, $this->getUser(), - true // erstmal DryRun + $dryRun ); return $this->redirectToRoute('admin_document_show', [ diff --git a/src/Ingest/IngestFlow.php b/src/Ingest/IngestFlow.php index 568a38a..b74271a 100644 --- a/src/Ingest/IngestFlow.php +++ b/src/Ingest/IngestFlow.php @@ -5,178 +5,52 @@ declare(strict_types=1); namespace App\Ingest; use App\Entity\DocumentVersion; -use App\Entity\IngestJob; -use App\Entity\User; use App\Index\IndexMetaManager; use App\Index\IndexStructureChangedException; use App\Knowledge\ChunkManager; -use App\Service\IngestJobService; -use App\Service\LockService; use App\Knowledge\Ingest\KnowledgeIngestService; use App\Vector\VectorIndexBuilder; -use Doctrine\ORM\EntityManagerInterface; -final class IngestFlow +final readonly class IngestFlow { public function __construct( - private readonly LockService $lockService, - private readonly IngestJobService $jobService, - private readonly KnowledgeIngestService $knowledgeIngestService, - private readonly ChunkManager $chunkManager, - private readonly VectorIndexBuilder $vectorBuilder, - private readonly IndexMetaManager $metaManager, - private readonly EntityManagerInterface $em, - ) { + private KnowledgeIngestService $knowledgeIngestService, + private ChunkManager $chunkManager, + private VectorIndexBuilder $vectorBuilder, + private IndexMetaManager $metaManager, + ) + { } - // ============================================================ - // LOCAL DOCUMENT INGEST - // ============================================================ public function ingestDocumentVersion( - DocumentVersion $version, - User $user - ): IngestJob { + DocumentVersion $version + ): void + { + $this->metaManager->validateAgainstCurrent(); - if (!$this->lockService->acquire()) { - throw new \RuntimeException('Another ingest job is already running.'); - } + $this->chunkManager->compactByDocument( + $version->getDocument()->getId() + ); - $job = null; + $records = $this->knowledgeIngestService + ->buildChunkRecords($version); - try { + $this->chunkManager->appendChunks($records); - $job = $this->jobService->startJob( - IngestJob::TYPE_DOCUMENT, - $user, - $version->getDocument()->getId(), - $version->getId(), - ); - - $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); - $this->em->flush(); - - // -------------------------------------------------- - // Guardrail: Struktur prüfen - // -------------------------------------------------- - $this->metaManager->validateAgainstCurrent(); - - // -------------------------------------------------- - // Alte Chunks dieses Dokuments entfernen (Streaming) - // -------------------------------------------------- - $this->chunkManager->compactByDocument( - $version->getDocument()->getId() - ); - - // -------------------------------------------------- - // Neue Chunks erzeugen - // -------------------------------------------------- - $records = $this->knowledgeIngestService - ->buildChunkRecords($version); - - // -------------------------------------------------- - // Append in NDJSON - // -------------------------------------------------- - $this->chunkManager->appendChunks($records); - - // -------------------------------------------------- - // FAISS komplett neu bauen (deterministisch) - // -------------------------------------------------- - $logPath = $job->getLogPath(); - $this->vectorBuilder->rebuildFromNdjson($logPath); - - // -------------------------------------------------- - // Erfolg - // -------------------------------------------------- - $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); - $this->jobService->markCompleted($job); - - $this->em->flush(); - - } catch (IndexStructureChangedException $e) { - - if ($job) { - $this->jobService->markFailed($job, $e->getMessage()); - } - - $version->setIngestStatus(DocumentVersion::INGEST_FAILED); - $this->em->flush(); - - throw $e; - - } catch (\Throwable $e) { - - if ($job) { - $this->jobService->markFailed($job, $e->getMessage()); - } - - $version->setIngestStatus(DocumentVersion::INGEST_FAILED); - $this->em->flush(); - - throw $e; - - } finally { - $this->lockService->release(); - } - - return $job; + $this->vectorBuilder->rebuildFromNdjson(); } - // ============================================================ - // GLOBAL REINDEX - // ============================================================ - public function globalReindex(User $user): IngestJob + public function globalReindex(): void { - if (!$this->lockService->acquire()) { - throw new \RuntimeException('Another ingest job is already running.'); - } + $allRecords = $this->knowledgeIngestService + ->buildAllActiveChunkRecords(); - $job = null; + $this->chunkManager->rewriteAll($allRecords); - try { + $this->vectorBuilder->rebuildFromNdjson(); - $job = $this->jobService->startJob( - IngestJob::TYPE_GLOBAL_REINDEX, - $user - ); - - // -------------------------------------------------- - // Alle aktiven Dokumente neu ingestieren - // -------------------------------------------------- - $allRecords = $this->knowledgeIngestService - ->buildAllActiveChunkRecords(); - - // -------------------------------------------------- - // Komplettes NDJSON neu schreiben - // -------------------------------------------------- - $this->chunkManager->rewriteAll($allRecords); - - // -------------------------------------------------- - // FAISS komplett neu bauen - // -------------------------------------------------- - $logPath = $job->getLogPath(); - $this->vectorBuilder->rebuildFromNdjson($logPath); - - // -------------------------------------------------- - // Meta aktualisieren + index_version++ - // -------------------------------------------------- - $this->metaManager->writeMetaForGlobalReindex(); - - $this->jobService->markCompleted($job); - - } catch (\Throwable $e) { - - if ($job) { - $this->jobService->markFailed($job, $e->getMessage()); - } - - throw $e; - - } finally { - $this->lockService->release(); - } - - return $job; + $this->metaManager->writeMetaForGlobalReindex(); } } diff --git a/src/Knowledge/Ingest/DocumentLoader.php b/src/Knowledge/Ingest/DocumentLoader.php index a4f91bb..de8391b 100644 --- a/src/Knowledge/Ingest/DocumentLoader.php +++ b/src/Knowledge/Ingest/DocumentLoader.php @@ -17,8 +17,6 @@ final class DocumentLoader return match ($ext) { 'txt', 'md' => $this->loadText($path), - - // später: // 'pdf' => $this->loadPdf($path), // 'docx' => $this->loadDocx($path), @@ -34,4 +32,5 @@ final class DocumentLoader } return $content; } + } diff --git a/src/Knowledge/Retrieval/ChunkIndexLoader.php b/src/Knowledge/Retrieval/ChunkIndexLoader.php deleted file mode 100644 index 39937a0..0000000 --- a/src/Knowledge/Retrieval/ChunkIndexLoader.php +++ /dev/null @@ -1,25 +0,0 @@ -indexPath)) { - return []; - } - - $json = file_get_contents($this->indexPath); - $data = $json ? json_decode($json, true) : null; - - return is_array($data) ? $data : []; - } -} diff --git a/src/Knowledge/Retrieval/ChunkKeywordRetriever.php b/src/Knowledge/Retrieval/ChunkKeywordRetriever.php deleted file mode 100644 index 6619907..0000000 --- a/src/Knowledge/Retrieval/ChunkKeywordRetriever.php +++ /dev/null @@ -1,269 +0,0 @@ -maxChunks; - - // --------------------------------------------------------- - // 1) Prompt → search terms - // --------------------------------------------------------- - $queryTerms = $this->extractTerms($prompt); - - // --------------------------------------------------------- - // 2) Keyword-based candidate discovery - // --------------------------------------------------------- - $result = $queryTerms !== [] - ? $this->findCandidateFiles($queryTerms) - : ['files' => [], 'canonicalTerms' => []]; - - $candidateScores = array_slice( - $result['files'], - 0, - self::MAX_KEYWORD_CANDIDATES, - true - ); - - // Canonical replacement - $effectiveTerms = array_map( - static fn (string $term): string => - $result['canonicalTerms'][$term] ?? $term, - $queryTerms - ); - - // --------------------------------------------------------- - // 3) Keyword scoring - // --------------------------------------------------------- - $scored = []; - - foreach ($candidateScores as $file => $similarityScore) { - $path = $this->chunksDir . '/' . $file; - if (!is_file($path)) { - continue; - } - - $chunk = file_get_contents($path); - if ($chunk === false || $chunk === '') { - continue; - } - - $score = $this->scoreChunk($chunk, $effectiveTerms); - if ($score === 0) { - continue; - } - - $scored[$file] = [ - 'chunk' => trim($chunk), - 'score' => (int) round($score * $similarityScore), - ]; - } - - // --------------------------------------------------------- - // 🔑 EARLY EXIT: Keyword results are sufficient - // --------------------------------------------------------- - if (\count($scored) >= $limit) { - return $this->finalize($scored, $limit); - } - - // --------------------------------------------------------- - // 4) Vector retrieval (semantic fallback) - // --------------------------------------------------------- - $vectorHits = $this->vectorClient->search($prompt, self::VECTOR_TOP_K); - - foreach ($vectorHits as $hit) { - if ( - !isset($hit['chunk_id'], $hit['score']) || - $hit['score'] < self::VECTOR_SCORE_THRESHOLD - ) { - continue; - } - - $file = $hit['chunk_id'] . '.txt'; - $path = $this->chunksDir . '/' . $file; - - if (!is_file($path)) { - continue; - } - - $baseScore = $scored[$file]['score'] ?? 0; - - $vectorBoost = (int) round($hit['score'] * 10); - - if ($vectorBoost <= 0) { - continue; - } - - $chunk = $scored[$file]['chunk'] - ?? trim((string) file_get_contents($path)); - - $scored[$file] = [ - 'chunk' => $chunk, - 'score' => $baseScore + $vectorBoost, - ]; - } - - // --------------------------------------------------------- - // 5) Final fallback - // --------------------------------------------------------- - if ($scored === []) { - return $this->fallbackSearch($prompt); - } - - return $this->finalize($scored, $limit); - } - - // ------------------------------------------------------------- - // FINALIZATION - // ------------------------------------------------------------- - private function finalize(array $scored, int $limit): array - { - uasort($scored, fn ($a, $b) => $b['score'] <=> $a['score']); - - return array_slice( - $this->normalizeResults( - array_column($scored, 'chunk') - ), - 0, - $limit - ); - } - - // ------------------------------------------------------------- - // INDEX LOGIC - // ------------------------------------------------------------- - private function findCandidateFiles(array $terms): array - { - $index = $this->indexLoader->load(); - $files = []; - $canonicalTerms = []; - - foreach ($index as $entry) { - if (!isset($entry['file'], $entry['keywords'])) { - continue; - } - - foreach ($terms as $term) { - foreach ($entry['keywords'] as $indexKeyword) { - $score = KeywordSimilarity::compare($term, $indexKeyword); - - if ($score >= 0.8) { - $files[$entry['file']] = max( - $files[$entry['file']] ?? 0.0, - $score - ); - $canonicalTerms[$term] = $indexKeyword; - break 2; - } - } - } - } - - return [ - 'files' => $files, - 'canonicalTerms' => $canonicalTerms, - ]; - } - - // ------------------------------------------------------------- - // FALLBACK - // ------------------------------------------------------------- - private function fallbackSearch(string $prompt): array - { - $chunkedText = trim($this->chunkedSearch->searchAsText($prompt)); - if ($chunkedText === '') { - return []; - } - - return array_slice( - $this->normalizeResults($this->splitChunks($chunkedText)), - 0, - $this->maxChunks - ); - } - - // ------------------------------------------------------------- - // SCORING - // ------------------------------------------------------------- - private function scoreChunk(string $chunk, array $terms): int - { - $content = mb_strtolower($chunk); - $score = 0; - - foreach ($terms as $term) { - if ( - !\in_array($term, $this->stopWords->getStopWords(), true) && - str_contains($content, $term) - ) { - $score += mb_strlen($term) >= 10 ? 2 : 1; - } - } - - return $score; - } - - // ------------------------------------------------------------- - // UTIL - // ------------------------------------------------------------- - private function extractTerms(string $text): array - { - $text = mb_strtolower( - preg_replace('/[^\p{L}\p{N}\s]/u', '', $text) - ); - - return array_values(array_filter( - explode(' ', $text), - static fn (string $w) => mb_strlen($w) > 2 - )); - } - - private function splitChunks(string $text): array - { - return array_values(array_filter( - array_map('trim', explode("\n\n", $text)), - static fn (string $chunk) => $chunk !== '' - )); - } - - private function normalizeResults(array $chunks): array - { - $seen = []; - $out = []; - - foreach ($chunks as $chunk) { - $key = mb_strtolower(preg_replace('/\s+/u', ' ', $chunk)); - if (!isset($seen[$key])) { - $seen[$key] = true; - $out[] = $chunk; - } - } - - return $out; - } -} diff --git a/src/Service/IngestOrchestrator.php b/src/Service/IngestOrchestrator.php index 01595f6..680172d 100644 --- a/src/Service/IngestOrchestrator.php +++ b/src/Service/IngestOrchestrator.php @@ -5,25 +5,27 @@ namespace App\Service; use App\Entity\DocumentVersion; use App\Entity\IngestJob; use App\Entity\User; +use App\Ingest\IngestFlow; use Doctrine\ORM\EntityManagerInterface; -class IngestOrchestrator +final class IngestOrchestrator { public function __construct( - private LockService $lockService, - private IngestJobService $jobService, - private EntityManagerInterface $em, - ) {} + private readonly LockService $lockService, + private readonly IngestJobService $jobService, + private readonly EntityManagerInterface $em, + private readonly IngestFlow $ingestFlow, + ) { + } /** - * Startet Ingest für eine bestimmte DocumentVersion + * Startet Ingest für eine bestimmte DocumentVersion (1 Job pro Run). */ public function runForVersion( DocumentVersion $version, User $user, bool $dryRun = false ): IngestJob { - if (!$this->lockService->acquire()) { throw new \RuntimeException('Another ingest job is already running.'); } @@ -31,10 +33,16 @@ class IngestOrchestrator $job = null; try { + // Governance: nur PENDING/FAILED erlauben + $status = $version->getIngestStatus(); + if (!in_array($status, [ + DocumentVersion::INGEST_PENDING, + DocumentVersion::INGEST_FAILED, + ], true)) { + throw new \RuntimeException(sprintf('Ingest not allowed for status "%s".', $status)); + } - // -------------------------------------- - // Job anlegen - // -------------------------------------- + // Job anlegen (einmal!) $job = $this->jobService->startJob( IngestJob::TYPE_DOCUMENT, $user, @@ -42,33 +50,25 @@ class IngestOrchestrator $version->getId(), ); - // -------------------------------------- - // Version Status RUNNING - // -------------------------------------- + // Status → RUNNING $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); $this->em->flush(); - // -------------------------------------- - // Simulierter Ablauf (noch kein echter Ingest) - // -------------------------------------- if ($dryRun) { usleep(200000); } else { - // Später: - // - KnowledgeIngestService - // - ChunkWriter - // - VectorIngestCommand + // Fachlogik ausführen (Flow erzeugt keine Jobs!) + $this->ingestFlow->ingestDocumentVersion($version, $job->getLogPath()); } - // -------------------------------------- // Erfolg - // -------------------------------------- $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); $this->jobService->markCompleted($job); $this->em->flush(); - } catch (\Throwable $e) { + return $job; + } catch (\Throwable $e) { if ($job) { $this->jobService->markFailed($job, $e->getMessage()); } @@ -81,12 +81,10 @@ class IngestOrchestrator } finally { $this->lockService->release(); } - - return $job; } /** - * Globaler Reindex + * Globaler Reindex aller aktiven Dokumente. */ public function runGlobal(User $user, bool $dryRun = false): IngestJob { @@ -97,24 +95,19 @@ class IngestOrchestrator $job = null; try { - - $job = $this->jobService->startJob( - IngestJob::TYPE_GLOBAL_REINDEX, - $user - ); + $job = $this->jobService->startJob(IngestJob::TYPE_GLOBAL_REINDEX, $user); if ($dryRun) { usleep(200000); } else { - // Später: - // - Alle aktiven Dokumente neu ingestieren - // - Global vector rebuild + $this->ingestFlow->globalReindex($job->getLogPath()); } $this->jobService->markCompleted($job); - } catch (\Throwable $e) { + return $job; + } catch (\Throwable $e) { if ($job) { $this->jobService->markFailed($job, $e->getMessage()); } @@ -124,7 +117,5 @@ class IngestOrchestrator } finally { $this->lockService->release(); } - - return $job; } } diff --git a/templates/admin/document/show.html.twig b/templates/admin/document/show.html.twig index a85fc7f..7d916fb 100644 --- a/templates/admin/document/show.html.twig +++ b/templates/admin/document/show.html.twig @@ -107,7 +107,7 @@ {% if version.isActive %} - {% if version.ingestStatus != constant('App\\Entity\\DocumentVersion::INGEST_RUNNING') %} + {% if version.ingestStatus in ['PENDING', 'FAILED'] %}
{% else %} - Läuft... + Ingested {% endif %} {% else %}