harden code

This commit is contained in:
team 1
2026-02-15 16:01:08 +01:00
parent 5b100039e0
commit c099f72703
13 changed files with 397 additions and 59 deletions

View File

@@ -7,6 +7,7 @@ use App\Entity\IngestJob;
use App\Entity\User;
use App\Ingest\IngestFlow;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Uid\Uuid;
final class IngestOrchestrator
{
@@ -19,14 +20,14 @@ final class IngestOrchestrator
}
/**
* Startet Ingest für eine bestimmte DocumentVersion (1 Job pro Run).
* @throws \Throwable
* SYNCHRONE Variante (falls noch genutzt)
*/
public function runForVersion(
DocumentVersion $version,
User $user,
bool $dryRun = false
): IngestJob {
if (!$this->lockService->acquire()) {
throw new \RuntimeException('Another ingest job is already running.');
}
@@ -34,16 +35,12 @@ final class IngestOrchestrator
$job = null;
try {
// Governance: nur PENDING/FAILED erlauben
$status = $version->getIngestStatus();
if (!in_array($status, [
DocumentVersion::INGEST_PENDING,
DocumentVersion::INGEST_FAILED,
], true)) {
throw new \RuntimeException(sprintf('Ingest not allowed for status "%s".', $status));
if ($status === DocumentVersion::INGEST_INDEXED) {
throw new \RuntimeException('DocumentVersion already indexed.');
}
// Job anlegen (einmal!)
$job = $this->jobService->startJob(
IngestJob::TYPE_DOCUMENT,
$user,
@@ -51,18 +48,15 @@ final class IngestOrchestrator
$version->getId(),
);
// Status → RUNNING
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
if ($dryRun) {
usleep(200000);
} else {
// Fachlogik ausführen (Flow erzeugt keine Jobs!)
$this->ingestFlow->ingestDocumentVersion($version);
}
// Erfolg
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->jobService->markCompleted($job);
$this->em->flush();
@@ -86,7 +80,120 @@ final class IngestOrchestrator
}
/**
* Globaler Reindex aller aktiven Dokumente.
* ASYNCHRONE Variante (Detached CLI)
*/
public function runExistingJob(IngestJob $job, bool $dryRun = false): void
{
if (!$this->lockService->acquire()) {
throw new \RuntimeException('Another ingest job is already running.');
}
try {
// Falls Job bereits final ist → nichts tun (idempotent)
if (in_array($job->getStatus(), [
IngestJob::STATUS_COMPLETED,
IngestJob::STATUS_FAILED,
IngestJob::STATUS_ABORTED,
], true)) {
return;
}
$job->markRunning();
$this->em->flush();
// Global Reindex
if ($job->getType() === IngestJob::TYPE_GLOBAL_REINDEX) {
if ($dryRun) {
usleep(200000);
} else {
$this->ingestFlow->globalReindex();
}
$this->jobService->markCompleted($job);
return;
}
if ($job->getType() !== IngestJob::TYPE_DOCUMENT) {
throw new \RuntimeException(sprintf(
'Unsupported ingest job type "%s".',
$job->getType()
));
}
$versionId = $job->getDocumentVersionId();
if (!$versionId instanceof Uuid) {
throw new \RuntimeException('Job has no document version id.');
}
/** @var DocumentVersion|null $version */
$version = $this->em
->getRepository(DocumentVersion::class)
->find($versionId);
if (!$version) {
throw new \RuntimeException('DocumentVersion not found.');
}
$status = $version->getIngestStatus();
// Nur blockieren wenn wirklich schon indexed
if ($status === DocumentVersion::INGEST_INDEXED) {
throw new \RuntimeException('DocumentVersion already indexed.');
}
// RUNNING darf hier erlaubt sein (async!)
if (!in_array($status, [
DocumentVersion::INGEST_PENDING,
DocumentVersion::INGEST_FAILED,
DocumentVersion::INGEST_RUNNING,
], true)) {
throw new \RuntimeException(sprintf(
'Ingest not allowed for status "%s".',
$status
));
}
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
if ($dryRun) {
usleep(200000);
} else {
$this->ingestFlow->ingestDocumentVersion($version);
}
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->jobService->markCompleted($job);
$this->em->flush();
} catch (\Throwable $e) {
$this->jobService->markFailed($job, $e->getMessage());
$versionId = $job->getDocumentVersionId();
if ($versionId instanceof Uuid) {
$version = $this->em
->getRepository(DocumentVersion::class)
->find($versionId);
if ($version) {
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
$this->em->flush();
}
}
throw $e;
} finally {
$this->lockService->release();
}
}
/**
* Globaler Reindex (synchron)
*/
public function runGlobal(User $user, bool $dryRun = false): IngestJob
{
@@ -97,12 +204,15 @@ final class IngestOrchestrator
$job = null;
try {
$job = $this->jobService->startJob(IngestJob::TYPE_GLOBAL_REINDEX, $user);
$job = $this->jobService->startJob(
IngestJob::TYPE_GLOBAL_REINDEX,
$user
);
if ($dryRun) {
usleep(200000);
} else {
$this->ingestFlow->globalReindex($job->getLogPath());
$this->ingestFlow->globalReindex();
}
$this->jobService->markCompleted($job);
@@ -110,6 +220,7 @@ final class IngestOrchestrator
return $job;
} catch (\Throwable $e) {
if ($job) {
$this->jobService->markFailed($job, $e->getMessage());
}