phase a audit

This commit is contained in:
team2
2026-02-22 18:04:53 +01:00
parent b3e9110dd1
commit 3b2e1bc772
10 changed files with 608 additions and 516 deletions

View File

@@ -0,0 +1,72 @@
<?php
declare(strict_types=1);
namespace App\Ingest;
use App\Entity\DocumentVersion;
use App\Knowledge\ChunkManager;
use Symfony\Component\Uid\Uuid;
final readonly class ChunkWriteService
{
public function __construct(
private ChunkManager $chunkManager,
) {}
public function getIndexPath(): string
{
return $this->chunkManager->getIndexPath();
}
public function countAllChunks(): int
{
return $this->chunkManager->countAllChunks();
}
public function compactByDocumentId(Uuid $documentId): void
{
$this->chunkManager->compactByDocument($documentId);
}
/**
* @param iterable<array<string,mixed>> $chunks
*/
public function appendChunks(iterable $chunks): void
{
$this->chunkManager->appendChunks($chunks);
}
/**
* Lokaler Ingest für eine einzelne DocumentVersion.
*
* Ablauf:
* 1. Entfernt bestehende Chunks dieses Dokuments
* 2. Appendet neue Chunks
*
* @param iterable<array<string,mixed>> $chunks
*/
public function writeForDocumentVersion(
DocumentVersion $version,
iterable $chunks
): void {
$documentId = $version->getDocument()->getId();
if (!$documentId instanceof Uuid) {
throw new \RuntimeException('Document ID must be a Uuid instance');
}
$this->chunkManager->compactByDocument($documentId);
$this->chunkManager->appendChunks($chunks);
}
/**
* Vollständiger Rewrite des NDJSON-Index (Global Reindex).
*
* @param iterable<array<string,mixed>> $allChunks
*/
public function rewriteAll(iterable $allChunks): void
{
$this->chunkManager->rewriteAll($allChunks);
}
}

View File

@@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace App\Ingest;
use App\Index\IndexMetaManager;
final readonly class GuardrailValidator
{
public function __construct(
private IndexMetaManager $metaManager,
) {}
/**
* Wirft eine Exception, wenn ein lokaler Ingest nicht kompatibel ist
* und ein Global Reindex erforderlich ist.
*/
public function validateOrThrow(): void
{
$this->metaManager->validateAgainstCurrent();
}
}

View File

@@ -7,9 +7,7 @@ namespace App\Ingest;
use App\Entity\Document;
use App\Entity\DocumentVersion;
use App\Index\IndexMetaManager;
use App\Knowledge\ChunkManager;
use App\Knowledge\Ingest\KnowledgeIngestService;
use App\Vector\VectorIndexBuilder;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Uid\Uuid;
@@ -21,45 +19,149 @@ final readonly class IngestFlow
public function __construct(
private KnowledgeIngestService $knowledgeIngestService,
private ChunkManager $chunkManager,
private VectorIndexBuilder $vectorBuilder,
private GuardrailValidator $guardrailValidator,
private ChunkWriteService $chunkWriteService,
private VectorRebuildService $vectorRebuildService,
private IndexMetaManager $metaManager,
private IngestLockService $lockService,
private LoggerInterface $logger,
private EntityManagerInterface $em,
) {}
// =========================================================
// DOCUMENT INGEST (STREAMING SAFE)
// DOCUMENT INGEST
// =========================================================
public function ingestDocumentVersion(DocumentVersion $version): void
{
$this->metaManager->validateAgainstCurrent();
$this->withLock(function () use ($version): void {
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
$this->guardrailValidator->validateOrThrow();
try {
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
$this->chunkManager->compactByDocument($version->getDocument()->getId());
try {
$documentId = $version->getDocument()->getId();
if (!$documentId instanceof Uuid) {
throw new \RuntimeException('Document ID must be a Uuid instance');
}
$existing = $this->chunkManager->countAllChunks();
// 1) Alte Chunks entfernen
$this->chunkWriteService->compactByDocumentId($documentId);
// 2) Existing Chunks nach Compaction zählen
$existing = $this->chunkWriteService->countAllChunks();
$incoming = 0;
$warned = false;
$generator = $this->knowledgeIngestService->buildChunkRecords($version);
$wrappedGenerator = (function () use ($generator, $existing, &$incoming, &$warned) {
foreach ($generator as $record) {
$incoming++;
$total = $existing + $incoming;
if (!$warned && $total >= self::CHUNK_LIMIT_WARN) {
$warned = true;
}
if ($total > self::CHUNK_LIMIT_HARD) {
throw new \RuntimeException('Chunk limit exceeded.');
}
yield $record;
}
})();
// 3) Streaming Append
$this->chunkWriteService->appendChunks($wrappedGenerator);
$total = $existing + $incoming;
if ($warned) {
$this->logger->warning('Chunk count approaching limit.', [
'existing' => $existing,
'incoming' => $incoming,
'total' => $total,
'document' => (string)$documentId,
'version' => (string)$version->getId(),
]);
}
// 4) Vector Rebuild + Runtime Update
$this->vectorRebuildService->rebuild();
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->em->flush();
} catch (\Throwable $e) {
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
$this->em->flush();
throw $e;
}
});
}
// =========================================================
// GLOBAL REINDEX
// =========================================================
public function globalReindex(): void
{
$this->withLock(function (): void {
// Global Reindex ist der Drift-Fix → keine Guardrail-Blockade hier.
$activeDocuments = $this->em
->getRepository(Document::class)
->createQueryBuilder('d')
->where('d.status = :status')
->setParameter('status', Document::STATUS_ACTIVE)
->getQuery()
->getResult();
if (empty($activeDocuments)) {
throw new \RuntimeException('Global Reindex aborted: no active documents found.');
}
$existing = 0; // rewriteAll ersetzt alles
$incoming = 0;
$generator = $this->knowledgeIngestService->buildChunkRecords($version);
$warned = false;
$wrappedGenerator = (function () use ($generator, $existing, &$incoming) {
$generator = $this->knowledgeIngestService->buildAllActiveChunkRecords();
// 1) "Peek" ohne RAM: erstes Element ziehen
$first = null;
foreach ($generator as $record) {
$first = $record;
$incoming++;
break;
}
if ($first === null) {
throw new \RuntimeException('Global Reindex aborted: no chunks generated.');
}
// 2) Stream bauen, Limits prüfen
$stream = (function () use ($first, $generator, $existing, &$incoming, &$warned) {
// first
$total = $existing + $incoming;
if (!$warned && $total >= self::CHUNK_LIMIT_WARN) {
$warned = true;
}
if ($total > self::CHUNK_LIMIT_HARD) {
throw new \RuntimeException('Chunk limit exceeded.');
}
yield $first;
foreach ($generator as $record) {
$incoming++;
$total = $existing + $incoming;
if ($total >= self::CHUNK_LIMIT_WARN) {
// Nur einmal warnen
if ($incoming === 1 || $total === self::CHUNK_LIMIT_WARN) {
// Logging erfolgt außerhalb des Streams final
}
if (!$warned && $total >= self::CHUNK_LIMIT_WARN) {
$warned = true;
}
if ($total > self::CHUNK_LIMIT_HARD) {
@@ -68,89 +170,23 @@ final readonly class IngestFlow
yield $record;
}
})();
$this->chunkManager->appendChunks($wrappedGenerator);
// 3) Rewrite + Rebuild
$this->chunkWriteService->rewriteAll($stream);
$total = $existing + $incoming;
if ($total >= self::CHUNK_LIMIT_WARN) {
$this->logger->warning('Chunk count approaching limit.', [
'existing' => $existing,
if ($warned) {
$this->logger->warning('Chunk count approaching limit after global reindex.', [
'incoming' => $incoming,
'total' => $total,
'total' => $incoming,
]);
}
$this->rebuildIndex(false);
$this->vectorRebuildService->rebuild();
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->em->flush();
} catch (\Throwable $e) {
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
$this->em->flush();
throw $e;
}
}
// =========================================================
// GLOBAL REINDEX (STREAMING SAFE)
// =========================================================
public function globalReindex(): void
{
$activeDocuments = $this->em
->getRepository(Document::class)
->createQueryBuilder('d')
->where('d.status = :status')
->setParameter('status', Document::STATUS_ACTIVE)
->getQuery()
->getResult();
if (empty($activeDocuments)) {
throw new \RuntimeException(
'Global Reindex abgebrochen: Es sind keine aktiven Dokumente vorhanden.'
);
}
$incoming = 0;
$generator = $this->knowledgeIngestService->buildAllActiveChunkRecords();
$wrappedGenerator = (function () use ($generator, &$incoming) {
foreach ($generator as $record) {
$incoming++;
yield $record;
}
})();
// Prüfen ob überhaupt etwas kommt (ohne alles in RAM zu ziehen)
$peekIterator = $wrappedGenerator instanceof \Iterator
? $wrappedGenerator
: (function () use ($wrappedGenerator) {
foreach ($wrappedGenerator as $item) {
yield $item;
}
})();
if (!$peekIterator->valid()) {
$peekIterator->rewind();
}
if (!$peekIterator->valid()) {
throw new \RuntimeException(
'Global Reindex abgebrochen: Es wurden keine Chunks erzeugt.'
);
}
$this->chunkManager->rewriteAll($peekIterator);
$this->rebuildIndex(true);
// Governance: Version erhöhen
$this->metaManager->writeMetaForGlobalReindex();
});
}
// =========================================================
@@ -159,42 +195,54 @@ final readonly class IngestFlow
public function deleteDocument(Uuid $documentId): void
{
$this->metaManager->validateAgainstCurrent();
$this->withLock(function () use ($documentId): void {
$document = $this->em
->getRepository(Document::class)
->find($documentId);
$this->guardrailValidator->validateOrThrow();
if (!$document) {
throw new \RuntimeException('Document not found.');
}
/** @var Document|null $document */
$document = $this->em
->getRepository(Document::class)
->find($documentId);
$this->chunkManager->compactByDocument($documentId);
if (!$document) {
throw new \RuntimeException('Document not found.');
}
$this->em->remove($document);
$this->em->flush();
// 1) Chunks entfernen
$this->chunkWriteService->compactByDocumentId($documentId);
$this->rebuildIndex(false);
// 2) FK-sicher löschen: currentVersion lösen (verhindert „Version zeigt noch auf DocumentVersion“)
if (method_exists($document, 'getCurrentVersion') && method_exists($document, 'setCurrentVersion')) {
if ($document->getCurrentVersion() !== null) {
$document->setCurrentVersion(null);
$this->em->flush();
}
}
// 3) Dokument entfernen
$this->em->remove($document);
$this->em->flush();
// 4) Vector rebuild + runtime update
$this->vectorRebuildService->rebuild();
});
}
// =========================================================
// CENTRAL REBUILD
// INTERNALS
// =========================================================
private function rebuildIndex(bool $isGlobal): void
/**
* @param callable():void $fn
*/
private function withLock(callable $fn): void
{
$this->vectorBuilder->rebuildFromNdjson();
$this->lockService->acquire();
if ($isGlobal) {
$this->metaManager->writeMetaForGlobalReindex();
try {
$fn();
} finally {
$this->lockService->release();
}
$this->updateChunkCount();
}
private function updateChunkCount(): void
{
$chunkCount = $this->chunkManager->countAllChunks();
$this->metaManager->updateRuntimeStats($chunkCount);
}
}

View File

@@ -0,0 +1,51 @@
<?php
declare(strict_types=1);
namespace App\Ingest;
final class IngestLockService
{
private string $lockFilePath;
/** @var resource|null */
private $handle = null;
public function __construct(string $projectDir)
{
$this->lockFilePath = rtrim($projectDir, '/') . '/var/knowledge/locks/ingest.lock';
}
public function acquire(): void
{
$dir = dirname($this->lockFilePath);
if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) {
throw new \RuntimeException('Unable to create lock directory.');
}
$this->handle = fopen($this->lockFilePath, 'c');
if ($this->handle === false) {
throw new \RuntimeException('Unable to open ingest lock file.');
}
if (!flock($this->handle, LOCK_EX | LOCK_NB)) {
throw new \RuntimeException('Another ingest process is already running.');
}
}
public function release(): void
{
if ($this->handle !== null) {
flock($this->handle, LOCK_UN);
fclose($this->handle);
$this->handle = null;
}
}
public function __destruct()
{
$this->release();
}
}

View File

@@ -0,0 +1,38 @@
<?php
declare(strict_types=1);
namespace App\Ingest;
use App\Index\IndexMetaManager;
use App\Knowledge\ChunkManager;
use App\Vector\VectorIndexBuilder;
final readonly class VectorRebuildService
{
public function __construct(
private VectorIndexBuilder $vectorBuilder,
private IndexMetaManager $metaManager,
private ChunkManager $chunkManager,
) {}
/**
* Führt einen vollständigen, deterministischen FAISS-Rebuild aus.
*
* Ablauf:
* 1. Rebuild des Vector Index aus index.ndjson
* 2. Chunk-Zählung via ChunkManager
* 3. Runtime-Stats atomar aktualisieren
*/
public function rebuild(?string $logPath = null): void
{
// 1⃣ Vector Index neu bauen
$this->vectorBuilder->rebuildFromNdjson($logPath);
// 2⃣ Chunk Count streaming-safe zählen
$chunkCount = $this->chunkManager->countAllChunks();
// 3⃣ Runtime-Stats aktualisieren (atomar)
$this->metaManager->updateRuntimeStats($chunkCount);
}
}