add chunk limiter
This commit is contained in:
@@ -9,43 +9,90 @@ use App\Index\IndexMetaManager;
|
||||
use App\Knowledge\ChunkManager;
|
||||
use App\Knowledge\Ingest\KnowledgeIngestService;
|
||||
use App\Vector\VectorIndexBuilder;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
final readonly class IngestFlow
|
||||
{
|
||||
/**
|
||||
* Realistische Betriebsgrenze für dieses Systemdesign (CPU Embedding + FlatIP + Full Rebuild).
|
||||
* Wird beim lokalen Ingest (Dokumentversion) enforced.
|
||||
*/
|
||||
private const CHUNK_LIMIT_HARD = 120000;
|
||||
|
||||
/**
|
||||
* Ab hier nur Warnung (keine Blockade) – damit man frühzeitig reagieren kann.
|
||||
*/
|
||||
private const CHUNK_LIMIT_WARN = 100000;
|
||||
|
||||
public function __construct(
|
||||
private KnowledgeIngestService $knowledgeIngestService,
|
||||
private ChunkManager $chunkManager,
|
||||
private VectorIndexBuilder $vectorBuilder,
|
||||
private IndexMetaManager $metaManager,
|
||||
)
|
||||
{
|
||||
private LoggerInterface $logger,
|
||||
) {
|
||||
}
|
||||
|
||||
|
||||
public function ingestDocumentVersion(
|
||||
DocumentVersion $version
|
||||
): void
|
||||
public function ingestDocumentVersion(DocumentVersion $version): void
|
||||
{
|
||||
$this->metaManager->validateAgainstCurrent();
|
||||
$this->chunkManager->compactByDocument(
|
||||
$version->getDocument()->getId()
|
||||
);
|
||||
$records = $this->knowledgeIngestService
|
||||
->buildChunkRecords($version);
|
||||
|
||||
// Entfernt alte Chunks dieses Dokuments -> danach ist "existing" der Basis-Index ohne dieses Dokument.
|
||||
$this->chunkManager->compactByDocument($version->getDocument()->getId());
|
||||
|
||||
// ------------------------------
|
||||
// Chunk-Limit Guardrail (Hard Cap)
|
||||
// ------------------------------
|
||||
$existing = $this->chunkManager->countAllChunks();
|
||||
|
||||
// buildChunkRecords() ist generatorbasiert; für einen sauberen Hard-Cap materialisieren wir lokal,
|
||||
// damit wir vor dem Append abbrechen können (keine Partial Writes).
|
||||
$recordsIterable = $this->knowledgeIngestService->buildChunkRecords($version);
|
||||
$records = is_array($recordsIterable)
|
||||
? $recordsIterable
|
||||
: iterator_to_array($recordsIterable, false);
|
||||
|
||||
$incoming = count($records);
|
||||
$total = $existing + $incoming;
|
||||
|
||||
if ($total >= self::CHUNK_LIMIT_WARN) {
|
||||
$this->logger->warning('RAG chunk count approaching limit.', [
|
||||
'existing' => $existing,
|
||||
'incoming' => $incoming,
|
||||
'total' => $total,
|
||||
'warn_at' => self::CHUNK_LIMIT_WARN,
|
||||
'hard_cap' => self::CHUNK_LIMIT_HARD,
|
||||
'document_id' => $version->getDocument()->getId()->toRfc4122(),
|
||||
'version_id' => $version->getId()->toRfc4122(),
|
||||
]);
|
||||
}
|
||||
|
||||
if ($total > self::CHUNK_LIMIT_HARD) {
|
||||
throw new \RuntimeException(sprintf(
|
||||
'Chunk limit reached: %d existing + %d incoming = %d (hard cap: %d). Reduce knowledge base or move to a scaled vector setup (IVF/HNSW/GPU/sharding).',
|
||||
$existing,
|
||||
$incoming,
|
||||
$total,
|
||||
self::CHUNK_LIMIT_HARD
|
||||
));
|
||||
}
|
||||
|
||||
$this->chunkManager->appendChunks($records);
|
||||
$this->vectorBuilder->rebuildFromNdjson();
|
||||
}
|
||||
|
||||
$chunkCount = $this->chunkManager->countAllChunks();
|
||||
$this->metaManager->updateRuntimeStats($chunkCount);
|
||||
}
|
||||
|
||||
public function globalReindex(): void
|
||||
{
|
||||
$allRecords = $this->knowledgeIngestService
|
||||
->buildAllActiveChunkRecords();
|
||||
$allRecords = $this->knowledgeIngestService->buildAllActiveChunkRecords();
|
||||
|
||||
// Optional (wenn du willst): Hier könnte man ebenfalls ein Hard-Cap enforce'n (rewriteAll mit Limit).
|
||||
$this->chunkManager->rewriteAll($allRecords);
|
||||
|
||||
$this->vectorBuilder->rebuildFromNdjson();
|
||||
|
||||
$this->metaManager->writeMetaForGlobalReindex();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user