harden code and ingester

This commit is contained in:
team 1
2026-02-12 14:31:29 +01:00
parent 5a52e07edc
commit 994f582f35
8 changed files with 77 additions and 496 deletions

View File

@@ -6,7 +6,7 @@ namespace App\Command;
use App\Entity\DocumentVersion;
use App\Entity\User;
use App\Ingest\IngestFlow;
use App\Service\IngestOrchestrator;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
@@ -18,7 +18,7 @@ use Symfony\Component\Console\Output\OutputInterface;
class KnowledgeIngestCommand extends Command
{
public function __construct(
private readonly IngestFlow $ingestFlow,
private readonly IngestOrchestrator $orchestrator,
private readonly EntityManagerInterface $em,
) {
parent::__construct();
@@ -33,8 +33,8 @@ class KnowledgeIngestCommand extends Command
protected function execute(InputInterface $input, OutputInterface $output): int
{
$versionId = $input->getArgument('versionId');
$userId = $input->getArgument('userId');
$versionId = (string) $input->getArgument('versionId');
$userId = (string) $input->getArgument('userId');
$version = $this->em->getRepository(DocumentVersion::class)->find($versionId);
$user = $this->em->getRepository(User::class)->find($userId);
@@ -46,9 +46,9 @@ class KnowledgeIngestCommand extends Command
$output->writeln('Starting ingest...');
$this->ingestFlow->ingestDocumentVersion($version, $user);
$job = $this->orchestrator->runForVersion($version, $user, false);
$output->writeln('<info>Ingest completed.</info>');
$output->writeln(sprintf('<info>Ingest completed. Job: %s</info>', (string) $job->getId()));
return Command::SUCCESS;
}

View File

@@ -5,6 +5,7 @@ namespace App\Controller\Admin;
use App\Entity\Document;
use App\Entity\DocumentVersion;
use App\Entity\IngestJob;
use App\Service\DocumentService;
use App\Service\IngestOrchestrator;
use Doctrine\ORM\EntityManagerInterface;
@@ -183,16 +184,16 @@ class DocumentController extends AbstractController
#[Route(
'/version/{versionId}/ingest',
name: 'admin_document_version_ingest',
methods: ['POST'],
requirements: ['versionId' => '[0-9a-fA-F\-]{36}']
requirements: ['versionId' => '[0-9a-fA-F\-]{36}'],
methods: ['POST']
)]
public function ingestVersion(
string $versionId,
Request $request,
EntityManagerInterface $em,
IngestOrchestrator $orchestrator
): RedirectResponse {
): ?RedirectResponse {
$dryRun = false;
if (!$this->isCsrfTokenValid('ingest_version', $request->request->get('_token'))) {
throw $this->createAccessDeniedException();
}
@@ -203,10 +204,20 @@ class DocumentController extends AbstractController
throw $this->createNotFoundException();
}
$existing = $em->getRepository(IngestJob::class)
->findOneBy(
['documentVersionId' => $version->getId()],
['startedAt' => 'DESC']
);
if ($existing && $existing->getStartedAt() > new \DateTimeImmutable('-3 seconds')) {
return null;
}
$orchestrator->runForVersion(
$version,
$this->getUser(),
true // erstmal DryRun
$dryRun
);
return $this->redirectToRoute('admin_document_show', [

View File

@@ -5,178 +5,52 @@ declare(strict_types=1);
namespace App\Ingest;
use App\Entity\DocumentVersion;
use App\Entity\IngestJob;
use App\Entity\User;
use App\Index\IndexMetaManager;
use App\Index\IndexStructureChangedException;
use App\Knowledge\ChunkManager;
use App\Service\IngestJobService;
use App\Service\LockService;
use App\Knowledge\Ingest\KnowledgeIngestService;
use App\Vector\VectorIndexBuilder;
use Doctrine\ORM\EntityManagerInterface;
final class IngestFlow
final readonly class IngestFlow
{
public function __construct(
private readonly LockService $lockService,
private readonly IngestJobService $jobService,
private readonly KnowledgeIngestService $knowledgeIngestService,
private readonly ChunkManager $chunkManager,
private readonly VectorIndexBuilder $vectorBuilder,
private readonly IndexMetaManager $metaManager,
private readonly EntityManagerInterface $em,
) {
private KnowledgeIngestService $knowledgeIngestService,
private ChunkManager $chunkManager,
private VectorIndexBuilder $vectorBuilder,
private IndexMetaManager $metaManager,
)
{
}
// ============================================================
// LOCAL DOCUMENT INGEST
// ============================================================
public function ingestDocumentVersion(
DocumentVersion $version,
User $user
): IngestJob {
DocumentVersion $version
): void
{
$this->metaManager->validateAgainstCurrent();
if (!$this->lockService->acquire()) {
throw new \RuntimeException('Another ingest job is already running.');
}
$this->chunkManager->compactByDocument(
$version->getDocument()->getId()
);
$job = null;
$records = $this->knowledgeIngestService
->buildChunkRecords($version);
try {
$this->chunkManager->appendChunks($records);
$job = $this->jobService->startJob(
IngestJob::TYPE_DOCUMENT,
$user,
$version->getDocument()->getId(),
$version->getId(),
);
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
// --------------------------------------------------
// Guardrail: Struktur prüfen
// --------------------------------------------------
$this->metaManager->validateAgainstCurrent();
// --------------------------------------------------
// Alte Chunks dieses Dokuments entfernen (Streaming)
// --------------------------------------------------
$this->chunkManager->compactByDocument(
$version->getDocument()->getId()
);
// --------------------------------------------------
// Neue Chunks erzeugen
// --------------------------------------------------
$records = $this->knowledgeIngestService
->buildChunkRecords($version);
// --------------------------------------------------
// Append in NDJSON
// --------------------------------------------------
$this->chunkManager->appendChunks($records);
// --------------------------------------------------
// FAISS komplett neu bauen (deterministisch)
// --------------------------------------------------
$logPath = $job->getLogPath();
$this->vectorBuilder->rebuildFromNdjson($logPath);
// --------------------------------------------------
// Erfolg
// --------------------------------------------------
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->jobService->markCompleted($job);
$this->em->flush();
} catch (IndexStructureChangedException $e) {
if ($job) {
$this->jobService->markFailed($job, $e->getMessage());
}
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
$this->em->flush();
throw $e;
} catch (\Throwable $e) {
if ($job) {
$this->jobService->markFailed($job, $e->getMessage());
}
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
$this->em->flush();
throw $e;
} finally {
$this->lockService->release();
}
return $job;
$this->vectorBuilder->rebuildFromNdjson();
}
// ============================================================
// GLOBAL REINDEX
// ============================================================
public function globalReindex(User $user): IngestJob
public function globalReindex(): void
{
if (!$this->lockService->acquire()) {
throw new \RuntimeException('Another ingest job is already running.');
}
$allRecords = $this->knowledgeIngestService
->buildAllActiveChunkRecords();
$job = null;
$this->chunkManager->rewriteAll($allRecords);
try {
$this->vectorBuilder->rebuildFromNdjson();
$job = $this->jobService->startJob(
IngestJob::TYPE_GLOBAL_REINDEX,
$user
);
// --------------------------------------------------
// Alle aktiven Dokumente neu ingestieren
// --------------------------------------------------
$allRecords = $this->knowledgeIngestService
->buildAllActiveChunkRecords();
// --------------------------------------------------
// Komplettes NDJSON neu schreiben
// --------------------------------------------------
$this->chunkManager->rewriteAll($allRecords);
// --------------------------------------------------
// FAISS komplett neu bauen
// --------------------------------------------------
$logPath = $job->getLogPath();
$this->vectorBuilder->rebuildFromNdjson($logPath);
// --------------------------------------------------
// Meta aktualisieren + index_version++
// --------------------------------------------------
$this->metaManager->writeMetaForGlobalReindex();
$this->jobService->markCompleted($job);
} catch (\Throwable $e) {
if ($job) {
$this->jobService->markFailed($job, $e->getMessage());
}
throw $e;
} finally {
$this->lockService->release();
}
return $job;
$this->metaManager->writeMetaForGlobalReindex();
}
}

View File

@@ -17,8 +17,6 @@ final class DocumentLoader
return match ($ext) {
'txt', 'md' => $this->loadText($path),
// später:
// 'pdf' => $this->loadPdf($path),
// 'docx' => $this->loadDocx($path),
@@ -34,4 +32,5 @@ final class DocumentLoader
}
return $content;
}
}

View File

@@ -1,25 +0,0 @@
<?php
// src/Knowledge/Retrieval/ChunkIndexLoader.php
declare(strict_types=1);
namespace App\Knowledge\Retrieval;
final class ChunkIndexLoader
{
public function __construct(
private string $indexPath
) {}
public function load(): array
{
if (!is_file($this->indexPath)) {
return [];
}
$json = file_get_contents($this->indexPath);
$data = $json ? json_decode($json, true) : null;
return is_array($data) ? $data : [];
}
}

View File

@@ -1,269 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Knowledge\Retrieval;
use App\Knowledge\StopWords;
use App\Knowledge\VectorSearchChunked;
use App\Knowledge\KeywordSimilarity;
use App\Vector\VectorSearchClient;
final class ChunkKeywordRetriever implements RetrieverInterface
{
private const MAX_KEYWORD_CANDIDATES = 200;
private const VECTOR_SCORE_THRESHOLD = 0.65;
private const VECTOR_TOP_K = 3;
public function __construct(
private VectorSearchChunked $chunkedSearch,
private ChunkIndexLoader $indexLoader,
private StopWords $stopWords,
private VectorSearchClient $vectorClient,
private string $chunksDir,
private int $maxChunks = 3,
) {
}
/**
* {@inheritdoc}
*/
public function retrieve(string $prompt, int $limit = null): array
{
$limit ??= $this->maxChunks;
// ---------------------------------------------------------
// 1) Prompt → search terms
// ---------------------------------------------------------
$queryTerms = $this->extractTerms($prompt);
// ---------------------------------------------------------
// 2) Keyword-based candidate discovery
// ---------------------------------------------------------
$result = $queryTerms !== []
? $this->findCandidateFiles($queryTerms)
: ['files' => [], 'canonicalTerms' => []];
$candidateScores = array_slice(
$result['files'],
0,
self::MAX_KEYWORD_CANDIDATES,
true
);
// Canonical replacement
$effectiveTerms = array_map(
static fn (string $term): string =>
$result['canonicalTerms'][$term] ?? $term,
$queryTerms
);
// ---------------------------------------------------------
// 3) Keyword scoring
// ---------------------------------------------------------
$scored = [];
foreach ($candidateScores as $file => $similarityScore) {
$path = $this->chunksDir . '/' . $file;
if (!is_file($path)) {
continue;
}
$chunk = file_get_contents($path);
if ($chunk === false || $chunk === '') {
continue;
}
$score = $this->scoreChunk($chunk, $effectiveTerms);
if ($score === 0) {
continue;
}
$scored[$file] = [
'chunk' => trim($chunk),
'score' => (int) round($score * $similarityScore),
];
}
// ---------------------------------------------------------
// 🔑 EARLY EXIT: Keyword results are sufficient
// ---------------------------------------------------------
if (\count($scored) >= $limit) {
return $this->finalize($scored, $limit);
}
// ---------------------------------------------------------
// 4) Vector retrieval (semantic fallback)
// ---------------------------------------------------------
$vectorHits = $this->vectorClient->search($prompt, self::VECTOR_TOP_K);
foreach ($vectorHits as $hit) {
if (
!isset($hit['chunk_id'], $hit['score']) ||
$hit['score'] < self::VECTOR_SCORE_THRESHOLD
) {
continue;
}
$file = $hit['chunk_id'] . '.txt';
$path = $this->chunksDir . '/' . $file;
if (!is_file($path)) {
continue;
}
$baseScore = $scored[$file]['score'] ?? 0;
$vectorBoost = (int) round($hit['score'] * 10);
if ($vectorBoost <= 0) {
continue;
}
$chunk = $scored[$file]['chunk']
?? trim((string) file_get_contents($path));
$scored[$file] = [
'chunk' => $chunk,
'score' => $baseScore + $vectorBoost,
];
}
// ---------------------------------------------------------
// 5) Final fallback
// ---------------------------------------------------------
if ($scored === []) {
return $this->fallbackSearch($prompt);
}
return $this->finalize($scored, $limit);
}
// -------------------------------------------------------------
// FINALIZATION
// -------------------------------------------------------------
private function finalize(array $scored, int $limit): array
{
uasort($scored, fn ($a, $b) => $b['score'] <=> $a['score']);
return array_slice(
$this->normalizeResults(
array_column($scored, 'chunk')
),
0,
$limit
);
}
// -------------------------------------------------------------
// INDEX LOGIC
// -------------------------------------------------------------
private function findCandidateFiles(array $terms): array
{
$index = $this->indexLoader->load();
$files = [];
$canonicalTerms = [];
foreach ($index as $entry) {
if (!isset($entry['file'], $entry['keywords'])) {
continue;
}
foreach ($terms as $term) {
foreach ($entry['keywords'] as $indexKeyword) {
$score = KeywordSimilarity::compare($term, $indexKeyword);
if ($score >= 0.8) {
$files[$entry['file']] = max(
$files[$entry['file']] ?? 0.0,
$score
);
$canonicalTerms[$term] = $indexKeyword;
break 2;
}
}
}
}
return [
'files' => $files,
'canonicalTerms' => $canonicalTerms,
];
}
// -------------------------------------------------------------
// FALLBACK
// -------------------------------------------------------------
private function fallbackSearch(string $prompt): array
{
$chunkedText = trim($this->chunkedSearch->searchAsText($prompt));
if ($chunkedText === '') {
return [];
}
return array_slice(
$this->normalizeResults($this->splitChunks($chunkedText)),
0,
$this->maxChunks
);
}
// -------------------------------------------------------------
// SCORING
// -------------------------------------------------------------
private function scoreChunk(string $chunk, array $terms): int
{
$content = mb_strtolower($chunk);
$score = 0;
foreach ($terms as $term) {
if (
!\in_array($term, $this->stopWords->getStopWords(), true) &&
str_contains($content, $term)
) {
$score += mb_strlen($term) >= 10 ? 2 : 1;
}
}
return $score;
}
// -------------------------------------------------------------
// UTIL
// -------------------------------------------------------------
private function extractTerms(string $text): array
{
$text = mb_strtolower(
preg_replace('/[^\p{L}\p{N}\s]/u', '', $text)
);
return array_values(array_filter(
explode(' ', $text),
static fn (string $w) => mb_strlen($w) > 2
));
}
private function splitChunks(string $text): array
{
return array_values(array_filter(
array_map('trim', explode("\n\n", $text)),
static fn (string $chunk) => $chunk !== ''
));
}
private function normalizeResults(array $chunks): array
{
$seen = [];
$out = [];
foreach ($chunks as $chunk) {
$key = mb_strtolower(preg_replace('/\s+/u', ' ', $chunk));
if (!isset($seen[$key])) {
$seen[$key] = true;
$out[] = $chunk;
}
}
return $out;
}
}

View File

@@ -5,25 +5,27 @@ namespace App\Service;
use App\Entity\DocumentVersion;
use App\Entity\IngestJob;
use App\Entity\User;
use App\Ingest\IngestFlow;
use Doctrine\ORM\EntityManagerInterface;
class IngestOrchestrator
final class IngestOrchestrator
{
public function __construct(
private LockService $lockService,
private IngestJobService $jobService,
private EntityManagerInterface $em,
) {}
private readonly LockService $lockService,
private readonly IngestJobService $jobService,
private readonly EntityManagerInterface $em,
private readonly IngestFlow $ingestFlow,
) {
}
/**
* Startet Ingest für eine bestimmte DocumentVersion
* Startet Ingest für eine bestimmte DocumentVersion (1 Job pro Run).
*/
public function runForVersion(
DocumentVersion $version,
User $user,
bool $dryRun = false
): IngestJob {
if (!$this->lockService->acquire()) {
throw new \RuntimeException('Another ingest job is already running.');
}
@@ -31,10 +33,16 @@ class IngestOrchestrator
$job = null;
try {
// Governance: nur PENDING/FAILED erlauben
$status = $version->getIngestStatus();
if (!in_array($status, [
DocumentVersion::INGEST_PENDING,
DocumentVersion::INGEST_FAILED,
], true)) {
throw new \RuntimeException(sprintf('Ingest not allowed for status "%s".', $status));
}
// --------------------------------------
// Job anlegen
// --------------------------------------
// Job anlegen (einmal!)
$job = $this->jobService->startJob(
IngestJob::TYPE_DOCUMENT,
$user,
@@ -42,33 +50,25 @@ class IngestOrchestrator
$version->getId(),
);
// --------------------------------------
// Version Status RUNNING
// --------------------------------------
// Status → RUNNING
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
// --------------------------------------
// Simulierter Ablauf (noch kein echter Ingest)
// --------------------------------------
if ($dryRun) {
usleep(200000);
} else {
// Später:
// - KnowledgeIngestService
// - ChunkWriter
// - VectorIngestCommand
// Fachlogik ausführen (Flow erzeugt keine Jobs!)
$this->ingestFlow->ingestDocumentVersion($version, $job->getLogPath());
}
// --------------------------------------
// Erfolg
// --------------------------------------
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->jobService->markCompleted($job);
$this->em->flush();
} catch (\Throwable $e) {
return $job;
} catch (\Throwable $e) {
if ($job) {
$this->jobService->markFailed($job, $e->getMessage());
}
@@ -81,12 +81,10 @@ class IngestOrchestrator
} finally {
$this->lockService->release();
}
return $job;
}
/**
* Globaler Reindex
* Globaler Reindex aller aktiven Dokumente.
*/
public function runGlobal(User $user, bool $dryRun = false): IngestJob
{
@@ -97,24 +95,19 @@ class IngestOrchestrator
$job = null;
try {
$job = $this->jobService->startJob(
IngestJob::TYPE_GLOBAL_REINDEX,
$user
);
$job = $this->jobService->startJob(IngestJob::TYPE_GLOBAL_REINDEX, $user);
if ($dryRun) {
usleep(200000);
} else {
// Später:
// - Alle aktiven Dokumente neu ingestieren
// - Global vector rebuild
$this->ingestFlow->globalReindex($job->getLogPath());
}
$this->jobService->markCompleted($job);
} catch (\Throwable $e) {
return $job;
} catch (\Throwable $e) {
if ($job) {
$this->jobService->markFailed($job, $e->getMessage());
}
@@ -124,7 +117,5 @@ class IngestOrchestrator
} finally {
$this->lockService->release();
}
return $job;
}
}