diff --git a/config/packages/messenger.yaml b/config/packages/messenger.yaml
index 19db483..4b5d2e4 100644
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -1,22 +1,7 @@
framework:
messenger:
- # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
- # failure_transport: failed
-
transports:
- # https://symfony.com/doc/current/messenger.html#transport-configuration
- # async: '%env(MESSENGER_TRANSPORT_DSN)%'
- # failed: 'doctrine://default?queue_name=failed'
- sync: 'sync://'
+ async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
- # Route your messages to the transports
- # 'App\Message\YourMessage': async
-
-# when@test:
-# framework:
-# messenger:
-# transports:
-# # replace with your transport name here (e.g., my_transport: 'in-memory://')
-# # For more Messenger testing tools, see https://github.com/zenstruck/messenger-test
-# async: 'in-memory://'
+ #'App\Message\IngestDocumentMessage': async
\ No newline at end of file
diff --git a/src/Command/IngestRunJobCommand.php b/src/Command/IngestRunJobCommand.php
new file mode 100644
index 0000000..7ca53f9
--- /dev/null
+++ b/src/Command/IngestRunJobCommand.php
@@ -0,0 +1,60 @@
+addArgument('jobId', InputArgument::REQUIRED, 'UUID of IngestJob');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ $jobId = (string) $input->getArgument('jobId');
+
+ /** @var IngestJob|null $job */
+ $job = $this->em->getRepository(IngestJob::class)->find($jobId);
+
+ if (!$job) {
+ $output->writeln('IngestJob not found.');
+ return Command::FAILURE;
+ }
+
+ // Idempotenz: wenn der Job bereits beendet ist, einfach ok zurück.
+ if (in_array($job->getStatus(), [IngestJob::STATUS_COMPLETED, IngestJob::STATUS_FAILED, IngestJob::STATUS_ABORTED], true)) {
+ $output->writeln('Job already finished.');
+ return Command::SUCCESS;
+ }
+
+ try {
+ $output->writeln(sprintf('Running ingest job %s ...', (string) $job->getId()));
+ $this->orchestrator->runExistingJob($job, false);
+ $output->writeln('Job completed.');
+ return Command::SUCCESS;
+ } catch (\Throwable $e) {
+ $output->writeln(sprintf('Job failed: %s', $e->getMessage()));
+ return Command::FAILURE;
+ }
+ }
+}
diff --git a/src/Command/KnowledgeIngestCommand.php b/src/Command/KnowledgeIngestCommand.php
index d64ccda..d3bded9 100644
--- a/src/Command/KnowledgeIngestCommand.php
+++ b/src/Command/KnowledgeIngestCommand.php
@@ -46,7 +46,7 @@ class KnowledgeIngestCommand extends Command
$output->writeln('Starting ingest...');
- $job = $this->orchestrator->runForVersion($version, $user, false);
+ $job = $this->orchestrator->runForVersion($version, $user);
$output->writeln(sprintf('Ingest completed. Job: %s', (string) $job->getId()));
diff --git a/src/Controller/Admin/DocumentController.php b/src/Controller/Admin/DocumentController.php
index bf8d800..e8bcc12 100644
--- a/src/Controller/Admin/DocumentController.php
+++ b/src/Controller/Admin/DocumentController.php
@@ -1,13 +1,13 @@
isMethod('POST')) {
- $title = $request->request->get('title');
+
$file = $request->files->get('file');
+ $title = $request->request->get('title') ?: $file->getClientOriginalName();
+ $title = $this->formatText->slugify($title);
if (!$file || !$title) {
$this->addFlash('error', 'Titel und Datei sind erforderlich.');
@@ -191,7 +198,7 @@ class DocumentController extends AbstractController
string $versionId,
Request $request,
EntityManagerInterface $em,
- IngestOrchestrator $orchestrator
+ IngestJobService $jobService,
): ?RedirectResponse {
$dryRun = false;
if (!$this->isCsrfTokenValid('ingest_version', $request->request->get('_token'))) {
@@ -214,14 +221,47 @@ class DocumentController extends AbstractController
return null;
}
- $orchestrator->runForVersion(
- $version,
+ // ---------------------------------------------------------
+ // Asynchroner Ingest (ohne Messenger):
+ // 1) Job als QUEUED anlegen
+ // 2) Symfony-Command im Hintergrund starten
+ // 3) Direkt auf Job-Detailseite redirecten (Loader + Polling)
+ // ---------------------------------------------------------
+
+ $job = $jobService->startJob(
+ IngestJob::TYPE_DOCUMENT,
$this->getUser(),
- $dryRun
+ $version->getDocument()->getId(),
+ $version->getId(),
+ null,
+ IngestJob::STATUS_QUEUED
);
- return $this->redirectToRoute('admin_document_show', [
- 'id' => $version->getDocument()->getId()
+ // Hintergrundprozess starten (Provider-kompatibel, kein Worker/Daemon)
+ $projectDir = (string) $this->getParameter('kernel.project_dir');
+ $console = $projectDir . '/bin/console';
+
+ $cmd = sprintf(
+ '%s %s %s %s > /dev/null 2>&1 &',
+ escapeshellarg($console),
+ escapeshellarg('mto:agent:ingest:run'),
+ escapeshellarg((string) $job->getId()),
+ escapeshellarg('--no-interaction'),
+ );
+
+ // Best effort: wenn exec deaktiviert ist, sauber abbrechen.
+ if (!\function_exists('exec')) {
+ $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).');
+ $this->addFlash('error', 'Ingest konnte nicht asynchron gestartet werden (exec deaktiviert).');
+ return $this->redirectToRoute('admin_document_show', [
+ 'id' => $version->getDocument()->getId()
+ ]);
+ }
+
+ exec($cmd);
+
+ return $this->redirectToRoute('admin_job_show', [
+ 'id' => (string) $job->getId(),
]);
}
diff --git a/src/Controller/Admin/IngestJobController.php b/src/Controller/Admin/IngestJobController.php
index dfc43db..6884577 100644
--- a/src/Controller/Admin/IngestJobController.php
+++ b/src/Controller/Admin/IngestJobController.php
@@ -1,6 +1,5 @@
'[0-9a-fA-F\-]{36}'],
+ methods: ['GET']
+ )]
+ public function status(string $id, EntityManagerInterface $em): JsonResponse
+ {
+ $this->denyAccessUnlessGranted('ROLE_USER');
+
+ /** @var IngestJob|null $job */
+ $job = $em->getRepository(IngestJob::class)->find($id);
+
+ if (!$job) {
+ throw new NotFoundHttpException();
+ }
+
+ return $this->json([
+ 'id' => (string) $job->getId(),
+ 'type' => $job->getType(),
+ 'status' => $job->getStatus(),
+ 'startedAt' => $job->getStartedAt()->format(DATE_ATOM),
+ 'finishedAt' => $job->getFinishedAt()?->format(DATE_ATOM),
+ 'errorMessage' => $job->getErrorMessage(),
+ ]);
+ }
+
#[Route('/global-reindex', name: 'admin_global_reindex', methods: ['POST'])]
public function globalReindex(
IngestFlow $flow
): RedirectResponse {
$this->denyAccessUnlessGranted('ROLE_SUPER_ADMIN');
- $flow->globalReindex($this->getUser());
+ $flow->globalReindex();
return $this->redirectToRoute('admin_jobs');
}
diff --git a/src/Entity/IngestJob.php b/src/Entity/IngestJob.php
index 8ee801b..b0bd125 100644
--- a/src/Entity/IngestJob.php
+++ b/src/Entity/IngestJob.php
@@ -11,6 +11,7 @@ class IngestJob
public const TYPE_DOCUMENT = 'DOCUMENT';
public const TYPE_GLOBAL_REINDEX = 'GLOBAL_REINDEX';
+ public const STATUS_QUEUED = 'QUEUED';
public const STATUS_RUNNING = 'RUNNING';
public const STATUS_COMPLETED = 'COMPLETED';
public const STATUS_FAILED = 'FAILED';
@@ -94,6 +95,11 @@ class IngestJob
$this->finishedAt = new \DateTimeImmutable();
}
+ public function markRunning(): void
+ {
+ $this->status = self::STATUS_RUNNING;
+ }
+
public function getErrorMessage(): ?string
{
return $this->errorMessage;
diff --git a/src/Ingest/IngestFlow.php b/src/Ingest/IngestFlow.php
index b74271a..dc6a87a 100644
--- a/src/Ingest/IngestFlow.php
+++ b/src/Ingest/IngestFlow.php
@@ -28,16 +28,12 @@ final readonly class IngestFlow
): void
{
$this->metaManager->validateAgainstCurrent();
-
$this->chunkManager->compactByDocument(
$version->getDocument()->getId()
);
-
$records = $this->knowledgeIngestService
->buildChunkRecords($version);
-
$this->chunkManager->appendChunks($records);
-
$this->vectorBuilder->rebuildFromNdjson();
}
diff --git a/src/Service/FormatText.php b/src/Service/FormatText.php
new file mode 100644
index 0000000..5f26a93
--- /dev/null
+++ b/src/Service/FormatText.php
@@ -0,0 +1,30 @@
+ 'ae',
+ 'ö' => 'oe',
+ 'ü' => 'ue',
+ 'ß' => 'ss'
+ ];
+ $text = str_replace(array_keys($replacements), $replacements, $text);
+
+ // Nicht erlaubte Zeichen entfernen
+ $text = preg_replace('/[^a-z0-9\s.-]/', '', $text);
+
+ // Leerzeichen zu Bindestrichen
+ $text = preg_replace('/[\s-]+/', '-', $text);
+
+ $text = preg_replace('/\./', '-', $text);
+
+ return trim($text, '-');
+ }
+}
\ No newline at end of file
diff --git a/src/Service/IngestJobService.php b/src/Service/IngestJobService.php
index 7689629..1b8039d 100644
--- a/src/Service/IngestJobService.php
+++ b/src/Service/IngestJobService.php
@@ -1,6 +1,5 @@
setStartedBy($user);
$job->setDocumentId($documentId);
$job->setDocumentVersionId($documentVersionId);
diff --git a/src/Service/IngestOrchestrator.php b/src/Service/IngestOrchestrator.php
index d286ed9..c39c2c9 100644
--- a/src/Service/IngestOrchestrator.php
+++ b/src/Service/IngestOrchestrator.php
@@ -7,6 +7,7 @@ use App\Entity\IngestJob;
use App\Entity\User;
use App\Ingest\IngestFlow;
use Doctrine\ORM\EntityManagerInterface;
+use Symfony\Component\Uid\Uuid;
final class IngestOrchestrator
{
@@ -19,14 +20,14 @@ final class IngestOrchestrator
}
/**
- * Startet Ingest für eine bestimmte DocumentVersion (1 Job pro Run).
- * @throws \Throwable
+ * SYNCHRONE Variante (falls noch genutzt)
*/
public function runForVersion(
DocumentVersion $version,
User $user,
bool $dryRun = false
): IngestJob {
+
if (!$this->lockService->acquire()) {
throw new \RuntimeException('Another ingest job is already running.');
}
@@ -34,16 +35,12 @@ final class IngestOrchestrator
$job = null;
try {
- // Governance: nur PENDING/FAILED erlauben
$status = $version->getIngestStatus();
- if (!in_array($status, [
- DocumentVersion::INGEST_PENDING,
- DocumentVersion::INGEST_FAILED,
- ], true)) {
- throw new \RuntimeException(sprintf('Ingest not allowed for status "%s".', $status));
+
+ if ($status === DocumentVersion::INGEST_INDEXED) {
+ throw new \RuntimeException('DocumentVersion already indexed.');
}
- // Job anlegen (einmal!)
$job = $this->jobService->startJob(
IngestJob::TYPE_DOCUMENT,
$user,
@@ -51,18 +48,15 @@ final class IngestOrchestrator
$version->getId(),
);
- // Status → RUNNING
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
if ($dryRun) {
usleep(200000);
} else {
- // Fachlogik ausführen (Flow erzeugt keine Jobs!)
$this->ingestFlow->ingestDocumentVersion($version);
}
- // Erfolg
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->jobService->markCompleted($job);
$this->em->flush();
@@ -86,7 +80,120 @@ final class IngestOrchestrator
}
/**
- * Globaler Reindex aller aktiven Dokumente.
+ * ASYNCHRONE Variante (Detached CLI)
+ */
+ public function runExistingJob(IngestJob $job, bool $dryRun = false): void
+ {
+ if (!$this->lockService->acquire()) {
+ throw new \RuntimeException('Another ingest job is already running.');
+ }
+
+ try {
+
+ // Falls Job bereits final ist → nichts tun (idempotent)
+ if (in_array($job->getStatus(), [
+ IngestJob::STATUS_COMPLETED,
+ IngestJob::STATUS_FAILED,
+ IngestJob::STATUS_ABORTED,
+ ], true)) {
+ return;
+ }
+
+ $job->markRunning();
+ $this->em->flush();
+
+ // Global Reindex
+ if ($job->getType() === IngestJob::TYPE_GLOBAL_REINDEX) {
+
+ if ($dryRun) {
+ usleep(200000);
+ } else {
+ $this->ingestFlow->globalReindex();
+ }
+
+ $this->jobService->markCompleted($job);
+ return;
+ }
+
+ if ($job->getType() !== IngestJob::TYPE_DOCUMENT) {
+ throw new \RuntimeException(sprintf(
+ 'Unsupported ingest job type "%s".',
+ $job->getType()
+ ));
+ }
+
+ $versionId = $job->getDocumentVersionId();
+
+ if (!$versionId instanceof Uuid) {
+ throw new \RuntimeException('Job has no document version id.');
+ }
+
+ /** @var DocumentVersion|null $version */
+ $version = $this->em
+ ->getRepository(DocumentVersion::class)
+ ->find($versionId);
+
+ if (!$version) {
+ throw new \RuntimeException('DocumentVersion not found.');
+ }
+
+ $status = $version->getIngestStatus();
+
+ // Nur blockieren wenn wirklich schon indexed
+ if ($status === DocumentVersion::INGEST_INDEXED) {
+ throw new \RuntimeException('DocumentVersion already indexed.');
+ }
+
+ // RUNNING darf hier erlaubt sein (async!)
+ if (!in_array($status, [
+ DocumentVersion::INGEST_PENDING,
+ DocumentVersion::INGEST_FAILED,
+ DocumentVersion::INGEST_RUNNING,
+ ], true)) {
+ throw new \RuntimeException(sprintf(
+ 'Ingest not allowed for status "%s".',
+ $status
+ ));
+ }
+
+ $version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
+ $this->em->flush();
+
+ if ($dryRun) {
+ usleep(200000);
+ } else {
+ $this->ingestFlow->ingestDocumentVersion($version);
+ }
+
+ $version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
+ $this->jobService->markCompleted($job);
+ $this->em->flush();
+
+ } catch (\Throwable $e) {
+
+ $this->jobService->markFailed($job, $e->getMessage());
+
+ $versionId = $job->getDocumentVersionId();
+ if ($versionId instanceof Uuid) {
+ $version = $this->em
+ ->getRepository(DocumentVersion::class)
+ ->find($versionId);
+
+ if ($version) {
+ $version->setIngestStatus(DocumentVersion::INGEST_FAILED);
+ $this->em->flush();
+ }
+ }
+
+ throw $e;
+
+ } finally {
+ $this->lockService->release();
+ }
+ }
+
+ /**
+ * Globaler Reindex (synchron)
*/
public function runGlobal(User $user, bool $dryRun = false): IngestJob
{
@@ -97,12 +204,15 @@ final class IngestOrchestrator
$job = null;
try {
- $job = $this->jobService->startJob(IngestJob::TYPE_GLOBAL_REINDEX, $user);
+ $job = $this->jobService->startJob(
+ IngestJob::TYPE_GLOBAL_REINDEX,
+ $user
+ );
if ($dryRun) {
usleep(200000);
} else {
- $this->ingestFlow->globalReindex($job->getLogPath());
+ $this->ingestFlow->globalReindex();
}
$this->jobService->markCompleted($job);
@@ -110,6 +220,7 @@ final class IngestOrchestrator
return $job;
} catch (\Throwable $e) {
+
if ($job) {
$this->jobService->markFailed($job, $e->getMessage());
}
diff --git a/templates/admin/document/new.html.twig b/templates/admin/document/new.html.twig
index 20fac7d..a57d8bb 100644
--- a/templates/admin/document/new.html.twig
+++ b/templates/admin/document/new.html.twig
@@ -9,7 +9,7 @@
-
+
diff --git a/templates/admin/job/index.html.twig b/templates/admin/job/index.html.twig
index eb70a05..083cbf4 100644
--- a/templates/admin/job/index.html.twig
+++ b/templates/admin/job/index.html.twig
@@ -53,6 +53,8 @@
{% if job.status == 'COMPLETED' %}
COMPLETED
+ {% elseif job.status == 'QUEUED' %}
+ QUEUED
{% elseif job.status == 'RUNNING' %}
RUNNING
{% elseif job.status == 'FAILED' %}
@@ -64,7 +66,7 @@
|
{% if job.documentId %}
- {{ job.documentId }}
+ {{ job.documentId }}
{% else %}
-
{% endif %}
diff --git a/templates/admin/job/show.html.twig b/templates/admin/job/show.html.twig
index 08c44f4..28a6f41 100644
--- a/templates/admin/job/show.html.twig
+++ b/templates/admin/job/show.html.twig
@@ -24,8 +24,11 @@
Status:
+
{% if job.status == 'COMPLETED' %}
COMPLETED
+ {% elseif job.status == 'QUEUED' %}
+ QUEUED
{% elseif job.status == 'RUNNING' %}
RUNNING
{% elseif job.status == 'FAILED' %}
@@ -33,6 +36,7 @@
{% else %}
{{ job.status }}
{% endif %}
+
@@ -52,11 +56,13 @@
Beendet:
- {% if job.finishedAt %}
- {{ job.finishedAt|date('d.m.Y H:i:s') }}
- {% else %}
- -
- {% endif %}
+
+ {% if job.finishedAt %}
+ {{ job.finishedAt|date('d.m.Y H:i:s') }}
+ {% else %}
+ -
+ {% endif %}
+
@@ -68,6 +74,18 @@
{% endif %}
+
+
+
+
+ Ingest läuft…
+ Diese Seite aktualisiert den Status automatisch.
+
+
+
+
+
+
{% if job.errorMessage %}
Fehler:
@@ -85,4 +103,67 @@
+
+
{% endblock %}
|