harden document versions
This commit is contained in:
@@ -5,15 +5,12 @@ namespace App\Service;
|
||||
use App\Entity\Document;
|
||||
use App\Entity\DocumentVersion;
|
||||
use App\Entity\User;
|
||||
use App\Ingest\IngestFlow;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
|
||||
class DocumentService
|
||||
{
|
||||
public function __construct(
|
||||
private EntityManagerInterface $em,
|
||||
private LockService $lockService,
|
||||
private IngestFlow $ingestFlow,
|
||||
) {}
|
||||
|
||||
/**
|
||||
@@ -83,39 +80,21 @@ class DocumentService
|
||||
*/
|
||||
public function activateVersion(DocumentVersion $version): void
|
||||
{
|
||||
if (!$this->lockService->acquire()) {
|
||||
throw new \RuntimeException('Another ingest job is already running.');
|
||||
$document = $version->getDocument();
|
||||
|
||||
// Aktiv-Status in DB konsistent setzen (genau 1 aktive Version)
|
||||
foreach ($document->getVersions() as $existingVersion) {
|
||||
$existingVersion->setActive(false);
|
||||
}
|
||||
|
||||
try {
|
||||
$document = $version->getDocument();
|
||||
$version->setActive(true);
|
||||
$document->setCurrentVersion($version);
|
||||
|
||||
// 1) Aktiv-Status in DB konsistent setzen (genau 1 aktive Version)
|
||||
foreach ($document->getVersions() as $existingVersion) {
|
||||
$existingVersion->setActive(false);
|
||||
}
|
||||
// Wichtig: Aktivierung soll einen Job auslösen. Damit der Job NICHT an "INDEXED" scheitert,
|
||||
// setzen wir hier bewusst auf PENDING.
|
||||
$version->setIngestStatus(DocumentVersion::INGEST_PENDING);
|
||||
|
||||
$version->setActive(true);
|
||||
$document->setCurrentVersion($version);
|
||||
|
||||
// 2) Ingest-Status (UI) – wird im Fehlerfall auf FAILED gesetzt
|
||||
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
|
||||
$this->em->flush();
|
||||
|
||||
// 3) Deterministischer Re-Ingest: alte Chunks raus, neue rein, FAISS rebuild
|
||||
$this->ingestFlow->ingestDocumentVersion($version);
|
||||
|
||||
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
|
||||
$this->em->flush();
|
||||
|
||||
} catch (\Throwable $e) {
|
||||
// Aktivierung bleibt in DB bestehen, aber Index ist ggf. nicht aktuell → Status markieren
|
||||
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
|
||||
$this->em->flush();
|
||||
throw $e;
|
||||
} finally {
|
||||
$this->lockService->release();
|
||||
}
|
||||
$this->em->flush();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -152,4 +131,4 @@ class DocumentService
|
||||
|
||||
return $max + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,7 +115,9 @@ final class IngestOrchestrator
|
||||
return;
|
||||
}
|
||||
|
||||
if ($job->getType() !== IngestJob::TYPE_DOCUMENT) {
|
||||
$isActivateJob = $job->getType() === IngestJob::TYPE_DOCUMENT_VERSION_ACTIVATE;
|
||||
|
||||
if (!$isActivateJob && $job->getType() !== IngestJob::TYPE_DOCUMENT) {
|
||||
throw new \RuntimeException(sprintf(
|
||||
'Unsupported ingest job type "%s".',
|
||||
$job->getType()
|
||||
@@ -139,21 +141,27 @@ final class IngestOrchestrator
|
||||
|
||||
$status = $version->getIngestStatus();
|
||||
|
||||
// Nur blockieren wenn wirklich schon indexed
|
||||
if ($status === DocumentVersion::INGEST_INDEXED) {
|
||||
throw new \RuntimeException('DocumentVersion already indexed.');
|
||||
// Bei Aktivierungs-Jobs IMMER re-ingestieren (auch wenn die Version früher schon indexed war).
|
||||
// Hintergrund: nach Aktivierung soll der Index deterministisch die aktive Version widerspiegeln.
|
||||
if (!$isActivateJob) {
|
||||
// Nur blockieren wenn wirklich schon indexed
|
||||
if ($status === DocumentVersion::INGEST_INDEXED) {
|
||||
throw new \RuntimeException('DocumentVersion already indexed.');
|
||||
}
|
||||
}
|
||||
|
||||
// RUNNING darf hier erlaubt sein (async!)
|
||||
if (!in_array($status, [
|
||||
DocumentVersion::INGEST_PENDING,
|
||||
DocumentVersion::INGEST_FAILED,
|
||||
DocumentVersion::INGEST_RUNNING,
|
||||
], true)) {
|
||||
throw new \RuntimeException(sprintf(
|
||||
'Ingest not allowed for status "%s".',
|
||||
$status
|
||||
));
|
||||
if (!$isActivateJob) {
|
||||
if (!in_array($status, [
|
||||
DocumentVersion::INGEST_PENDING,
|
||||
DocumentVersion::INGEST_FAILED,
|
||||
DocumentVersion::INGEST_RUNNING,
|
||||
], true)) {
|
||||
throw new \RuntimeException(sprintf(
|
||||
'Ingest not allowed for status "%s".',
|
||||
$status
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
|
||||
|
||||
Reference in New Issue
Block a user