Files
MtoRagSystem/src/Service/IngestOrchestrator.php
2026-02-27 15:49:01 +01:00

178 lines
4.9 KiB
PHP

<?php
namespace App\Service;
use App\Entity\DocumentVersion;
use App\Entity\IngestJob;
use App\Entity\User;
use App\Ingest\IngestFlow;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Uid\Uuid;
final readonly class IngestOrchestrator
{
public function __construct(
private LockService $lockService,
private IngestJobService $jobService,
private EntityManagerInterface $em,
private IngestFlow $ingestFlow,
)
{
}
public function runForVersion(
DocumentVersion $version,
User $user,
bool $dryRun = false
): IngestJob
{
if (!$this->lockService->acquire()) {
throw new \RuntimeException('Another ingest job is already running.');
}
$job = null;
try {
if ($version->getIngestStatus() === DocumentVersion::INGEST_INDEXED) {
throw new \RuntimeException('DocumentVersion already indexed.');
}
$job = $this->jobService->startJob(
IngestJob::TYPE_DOCUMENT,
$user,
$version->getDocument()->getId(),
$version->getId(),
);
if ($dryRun) {
usleep(200000);
} else {
$this->ingestFlow->ingestDocumentVersion($version);
}
$this->jobService->markCompleted($job);
return $job;
} catch (\Throwable $e) {
if ($job) {
$this->jobService->markFailed($job, $e->getMessage());
}
throw $e;
} finally {
$this->lockService->release();
}
}
public function runExistingJob(IngestJob $job, bool $dryRun = false): void
{
if (!$this->lockService->acquire()) {
throw new \RuntimeException('Another ingest job is already running.');
}
try {
// Idempotenz
if (in_array($job->getStatus(), [
IngestJob::STATUS_COMPLETED,
IngestJob::STATUS_FAILED,
IngestJob::STATUS_ABORTED,
], true)) {
return;
}
$job->markRunning();
$this->em->flush();
// ---------------------------
// GLOBAL REINDEX
// ---------------------------
if ($job->getType() === IngestJob::TYPE_GLOBAL_REINDEX) {
if ($dryRun) {
usleep(200000);
} else {
$this->ingestFlow->globalReindex();
}
$this->jobService->markCompleted($job);
return;
}
// ---------------------------
// DOCUMENT DELETE
// ---------------------------
if ($job->getType() === IngestJob::TYPE_DOCUMENT_DELETE) {
$documentId = $job->getDocumentId();
if (!$documentId instanceof Uuid) {
throw new \RuntimeException('Job has no document id.');
}
if ($dryRun) {
usleep(200000);
} else {
$this->ingestFlow->deleteDocument($documentId);
}
$this->jobService->markCompleted($job);
return;
}
// ---------------------------
// DOCUMENT / ACTIVATE
// ---------------------------
$isActivateJob = $job->getType() === IngestJob::TYPE_DOCUMENT_VERSION_ACTIVATE;
if (!$isActivateJob && $job->getType() !== IngestJob::TYPE_DOCUMENT) {
throw new \RuntimeException(sprintf(
'Unsupported ingest job type "%s".',
$job->getType()
));
}
$versionId = $job->getDocumentVersionId();
if (!$versionId instanceof Uuid) {
throw new \RuntimeException('Job has no document version id.');
}
/** @var DocumentVersion|null $version */
$version = $this->em
->getRepository(DocumentVersion::class)
->find($versionId);
if (!$version) {
throw new \RuntimeException('DocumentVersion not found.');
}
if (!$isActivateJob && $version->getIngestStatus() === DocumentVersion::INGEST_INDEXED) {
throw new \RuntimeException('DocumentVersion already indexed.');
}
if ($dryRun) {
usleep(200000);
} else {
$this->ingestFlow->ingestDocumentVersion($version);
}
$this->jobService->markCompleted($job);
} catch (\Throwable $e) {
$this->jobService->markFailed($job, $e->getMessage());
throw $e;
} finally {
$this->lockService->release();
}
}
}