optimize tag and rebuilding
This commit is contained in:
@@ -36,68 +36,90 @@ final class TagRebuildRunJobCommand extends Command
|
||||
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
$jobId = (string)$input->getArgument('jobId');
|
||||
$jobId = (string) $input->getArgument('jobId');
|
||||
|
||||
/** @var TagRebuildJob|null $job */
|
||||
$job = $this->em->getRepository(TagRebuildJob::class)->find($jobId);
|
||||
|
||||
if (!$job instanceof TagRebuildJob) {
|
||||
$output->writeln('<error>Job not found.</error>');
|
||||
return Command::FAILURE;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// Global lock to avoid parallel rebuilds
|
||||
// ---------------------------------------------------------
|
||||
$lockDir = \dirname($this->lockFilePath);
|
||||
if (!\is_dir($lockDir)) {
|
||||
@\mkdir($lockDir, 0775, true);
|
||||
}
|
||||
|
||||
$fh = @\fopen($this->lockFilePath, 'c+');
|
||||
if (!$fh) {
|
||||
$job->markFailed('Cannot open lock file: ' . $this->lockFilePath);
|
||||
$this->em->flush();
|
||||
$output->writeln('<error>Cannot open lock file.</error>');
|
||||
return Command::FAILURE;
|
||||
}
|
||||
|
||||
// If another rebuild runs, we fail fast (simple & safe).
|
||||
if (!@\flock($fh, LOCK_EX | LOCK_NB)) {
|
||||
\fclose($fh);
|
||||
$job->markFailed('Another tag rebuild is currently running (lock busy).');
|
||||
$this->em->flush();
|
||||
$output->writeln('<error>Lock busy. Another rebuild is running.</error>');
|
||||
return Command::FAILURE;
|
||||
}
|
||||
|
||||
// mark running
|
||||
$job->markRunning();
|
||||
$this->em->flush();
|
||||
$fh = null;
|
||||
|
||||
try {
|
||||
// ---------------------------------------------------------
|
||||
// LOCK INITIALIZATION
|
||||
// ---------------------------------------------------------
|
||||
$lockDir = \dirname($this->lockFilePath);
|
||||
if (!\is_dir($lockDir) && !@\mkdir($lockDir, 0775, true) && !\is_dir($lockDir)) {
|
||||
throw new \RuntimeException('Cannot create lock directory.');
|
||||
}
|
||||
|
||||
$fh = @\fopen($this->lockFilePath, 'c+');
|
||||
if (!$fh) {
|
||||
throw new \RuntimeException('Cannot open lock file: ' . $this->lockFilePath);
|
||||
}
|
||||
|
||||
if (!@\flock($fh, LOCK_EX | LOCK_NB)) {
|
||||
throw new \RuntimeException('Another tag rebuild is currently running (lock busy).');
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// MARK RUNNING
|
||||
// ---------------------------------------------------------
|
||||
$job->markRunning();
|
||||
$this->em->flush();
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// EXPORT TAGS (NDJSON)
|
||||
// ---------------------------------------------------------
|
||||
$export = $this->exporter->export();
|
||||
|
||||
if (
|
||||
!isset($export['path']) ||
|
||||
!\is_string($export['path']) ||
|
||||
!\file_exists($export['path'])
|
||||
) {
|
||||
throw new \RuntimeException('Export failed: NDJSON file missing.');
|
||||
}
|
||||
|
||||
if (isset($export['count']) && (int)$export['count'] === 0) {
|
||||
throw new \RuntimeException('Export produced zero tags.');
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// BUILD VECTOR INDEX
|
||||
// ---------------------------------------------------------
|
||||
$this->builder->build();
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// MARK COMPLETED
|
||||
// ---------------------------------------------------------
|
||||
$job->markCompleted();
|
||||
$this->em->flush();
|
||||
|
||||
$output->writeln('<info>OK</info>');
|
||||
$output->writeln('tags.ndjson: ' . $export['path']);
|
||||
} catch (\Throwable $e) {
|
||||
$output->writeln('<info>Tag rebuild successful.</info>');
|
||||
$output->writeln('NDJSON: ' . $export['path']);
|
||||
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
catch (\Throwable $e) {
|
||||
|
||||
$job->markFailed($e->getMessage());
|
||||
$this->em->flush();
|
||||
|
||||
$output->writeln('<error>FAILED: ' . $e->getMessage() . '</error>');
|
||||
|
||||
@\flock($fh, LOCK_UN);
|
||||
@\fclose($fh);
|
||||
|
||||
return Command::FAILURE;
|
||||
}
|
||||
finally {
|
||||
|
||||
@\flock($fh, LOCK_UN);
|
||||
@\fclose($fh);
|
||||
|
||||
return Command::SUCCESS;
|
||||
if ($fh) {
|
||||
@\flock($fh, LOCK_UN);
|
||||
@\fclose($fh);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,27 +6,33 @@ namespace App\Controller\Admin;
|
||||
|
||||
use App\Entity\Document;
|
||||
use App\Entity\Tag;
|
||||
use App\Entity\TagRebuildJob;
|
||||
use App\Service\TagRebuildJobService;
|
||||
use Doctrine\DBAL\Types\Types;
|
||||
use App\Tag\TagService;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\HttpFoundation\JsonResponse;
|
||||
use Symfony\Component\HttpFoundation\RedirectResponse;
|
||||
use Symfony\Component\HttpFoundation\Request;
|
||||
use Symfony\Component\HttpFoundation\Response;
|
||||
use Symfony\Component\Routing\Attribute\Route;
|
||||
use Symfony\Component\Uid\Uuid;
|
||||
|
||||
#[Route('/admin/documents')]
|
||||
final class DocumentTagController extends AbstractController
|
||||
{
|
||||
#[Route('/{id}/tags', name: 'admin_document_tags_edit', methods: ['GET'])]
|
||||
public function edit(string $id, EntityManagerInterface $em): Response
|
||||
{
|
||||
public function edit(
|
||||
string $id,
|
||||
EntityManagerInterface $em,
|
||||
TagRebuildJobService $jobs
|
||||
): Response {
|
||||
|
||||
$document = $em->getRepository(Document::class)->find($id);
|
||||
if (!$document instanceof Document) {
|
||||
throw $this->createNotFoundException('Document not found');
|
||||
}
|
||||
|
||||
// 🔹 Alle verfügbaren Tags laden (fehlte!)
|
||||
$allTags = $em->createQueryBuilder()
|
||||
->select('t')
|
||||
->from(Tag::class, 't')
|
||||
@@ -34,15 +40,16 @@ final class DocumentTagController extends AbstractController
|
||||
->getQuery()
|
||||
->getResult();
|
||||
|
||||
$assigned = [];
|
||||
foreach ($document->getTags() as $tag) {
|
||||
$assigned[(string)$tag->getId()] = true;
|
||||
}
|
||||
$latestJob = $jobs->getLatestJob();
|
||||
|
||||
return $this->render('admin/document_tags/edit.html.twig', [
|
||||
'document' => $document,
|
||||
'allTags' => $allTags,
|
||||
'assigned' => $assigned,
|
||||
'document' => $document,
|
||||
'allTags' => $allTags, // ✅ jetzt vorhanden
|
||||
'latestJob' => $latestJob,
|
||||
'statusRunning' => TagRebuildJob::STATUS_RUNNING,
|
||||
'statusQueued' => TagRebuildJob::STATUS_QUEUED,
|
||||
'statusCompleted' => TagRebuildJob::STATUS_COMPLETED,
|
||||
'statusFailed' => TagRebuildJob::STATUS_FAILED,
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -51,7 +58,7 @@ final class DocumentTagController extends AbstractController
|
||||
string $id,
|
||||
Request $request,
|
||||
EntityManagerInterface $em,
|
||||
TagRebuildJobService $jobs
|
||||
TagService $tagService
|
||||
): RedirectResponse {
|
||||
|
||||
$document = $em->getRepository(Document::class)->find($id);
|
||||
@@ -61,33 +68,23 @@ final class DocumentTagController extends AbstractController
|
||||
|
||||
$selected = $request->request->all('tag_ids') ?? [];
|
||||
|
||||
$uuidObjects = [];
|
||||
foreach ($selected as $value) {
|
||||
try {
|
||||
$uuidObjects[] = \Symfony\Component\Uid\Uuid::fromString($value);
|
||||
} catch (\Throwable) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
$tagService->syncDocumentTags($document, $selected);
|
||||
$this->addFlash('success', 'Tags wurden aktualisiert. Rebuild läuft im Hintergrund.');
|
||||
} catch (\Throwable $e) {
|
||||
$this->addFlash('danger', $e->getMessage());
|
||||
}
|
||||
|
||||
// Remove
|
||||
foreach ($document->getTags() as $tag) {
|
||||
if (!in_array($tag->getId(), $uuidObjects, false)) {
|
||||
$document->removeTag($tag);
|
||||
}
|
||||
}
|
||||
|
||||
// Add
|
||||
foreach ($uuidObjects as $uuid) {
|
||||
$tag = $em->find(\App\Entity\Tag::class, $uuid);
|
||||
if ($tag && !$document->hasTag($tag)) {
|
||||
$document->addTag($tag);
|
||||
}
|
||||
}
|
||||
|
||||
$em->flush();
|
||||
$jobs->enqueueAndStartAsync();
|
||||
|
||||
return $this->redirectToRoute('admin_document_tags_edit', ['id' => $id]);
|
||||
}
|
||||
|
||||
#[Route('/admin/tags/status', name: 'admin_tags_status', methods: ['GET'])]
|
||||
public function status(TagRebuildJobService $jobs): JsonResponse
|
||||
{
|
||||
$job = $jobs->getLatestJob();
|
||||
|
||||
return $this->json([
|
||||
'status' => $job?->getStatus(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,8 @@ declare(strict_types=1);
|
||||
namespace App\Controller\Admin;
|
||||
|
||||
use App\Entity\Tag;
|
||||
use App\Service\TagRebuildJobService;
|
||||
use App\Entity\TagRebuildJob;
|
||||
use App\Tag\TagService;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\HttpFoundation\RedirectResponse;
|
||||
@@ -17,85 +18,76 @@ use Symfony\Component\Routing\Attribute\Route;
|
||||
final class TagController extends AbstractController
|
||||
{
|
||||
#[Route('', name: 'admin_tags_index', methods: ['GET'])]
|
||||
public function index(EntityManagerInterface $em): Response
|
||||
{
|
||||
$tags = $em->createQueryBuilder()
|
||||
->select('t')
|
||||
->from(Tag::class, 't')
|
||||
->orderBy('t.label', 'ASC')
|
||||
->getQuery()
|
||||
->getResult();
|
||||
public function index(
|
||||
EntityManagerInterface $em,
|
||||
\App\Service\TagRebuildJobService $jobs
|
||||
): Response {
|
||||
|
||||
$tags = $em->getRepository(\App\Entity\Tag::class)
|
||||
->findBy([], ['label' => 'ASC']);
|
||||
|
||||
return $this->render('admin/tag/index.html.twig', [
|
||||
'tags' => $tags,
|
||||
'latestJob' => $jobs->getLatestJob(),
|
||||
'hasActiveJob' => $jobs->hasActiveJob(),
|
||||
'statusRunning' => TagRebuildJob::STATUS_RUNNING,
|
||||
'statusQueued' => TagRebuildJob::STATUS_QUEUED,
|
||||
'statusCompleted' => TagRebuildJob::STATUS_COMPLETED,
|
||||
'statusFailed' => TagRebuildJob::STATUS_FAILED,
|
||||
]);
|
||||
}
|
||||
|
||||
#[Route('/create', name: 'admin_tags_create', methods: ['POST'])]
|
||||
public function create(Request $request, EntityManagerInterface $em, TagRebuildJobService $jobs): RedirectResponse
|
||||
{
|
||||
$token = (string)$request->request->get('_token', '');
|
||||
public function create(
|
||||
Request $request,
|
||||
TagService $tagService
|
||||
): RedirectResponse {
|
||||
$token = (string) $request->request->get('_token', '');
|
||||
|
||||
if (!$this->isCsrfTokenValid('admin_tag_create', $token)) {
|
||||
$this->addFlash('danger', 'Ungültiges CSRF Token.');
|
||||
return $this->redirectToRoute('admin_tags_index');
|
||||
}
|
||||
|
||||
$label = trim((string)$request->request->get('label', ''));
|
||||
$slug = trim((string)$request->request->get('slug', ''));
|
||||
$desc = trim((string)$request->request->get('description', ''));
|
||||
try {
|
||||
$tagService->create(
|
||||
(string) $request->request->get('slug', ''),
|
||||
(string) $request->request->get('label', ''),
|
||||
$request->request->get('description')
|
||||
? (string) $request->request->get('description')
|
||||
: null
|
||||
);
|
||||
|
||||
if ($label === '' || $slug === '') {
|
||||
$this->addFlash('danger', 'Label und Slug sind Pflichtfelder.');
|
||||
return $this->redirectToRoute('admin_tags_index');
|
||||
$this->addFlash('success', 'Tag wurde erstellt. Rebuild läuft im Hintergrund.');
|
||||
} catch (\Throwable $e) {
|
||||
$this->addFlash('danger', $e->getMessage());
|
||||
}
|
||||
|
||||
$exists = (int)$em->createQueryBuilder()
|
||||
->select('COUNT(t.id)')
|
||||
->from(Tag::class, 't')
|
||||
->where('t.slug = :slug')
|
||||
->setParameter('slug', $slug)
|
||||
->getQuery()
|
||||
->getSingleScalarResult();
|
||||
|
||||
if ($exists > 0) {
|
||||
$this->addFlash('danger', 'Slug existiert bereits.');
|
||||
return $this->redirectToRoute('admin_tags_index');
|
||||
}
|
||||
|
||||
$tag = new Tag($slug, $label, $desc !== '' ? $desc : null);
|
||||
|
||||
$em->persist($tag);
|
||||
$em->flush();
|
||||
|
||||
// enqueue async rebuild
|
||||
$jobs->enqueueAndStartAsync();
|
||||
|
||||
$this->addFlash('success', 'Tag wurde erstellt. Rebuild läuft im Hintergrund.');
|
||||
return $this->redirectToRoute('admin_tags_index');
|
||||
}
|
||||
|
||||
#[Route('/{id}/delete', name: 'admin_tags_delete', methods: ['POST'])]
|
||||
public function delete(string $id, Request $request, EntityManagerInterface $em, TagRebuildJobService $jobs): RedirectResponse
|
||||
{
|
||||
$token = (string)$request->request->get('_token', '');
|
||||
public function delete(
|
||||
string $id,
|
||||
Request $request,
|
||||
TagService $tagService
|
||||
): RedirectResponse {
|
||||
$token = (string) $request->request->get('_token', '');
|
||||
|
||||
if (!$this->isCsrfTokenValid('admin_tag_delete_' . $id, $token)) {
|
||||
$this->addFlash('danger', 'Ungültiges CSRF Token.');
|
||||
return $this->redirectToRoute('admin_tags_index');
|
||||
}
|
||||
|
||||
$tag = $em->getRepository(Tag::class)->find($id);
|
||||
if (!$tag instanceof Tag) {
|
||||
$this->addFlash('danger', 'Tag nicht gefunden.');
|
||||
return $this->redirectToRoute('admin_tags_index');
|
||||
try {
|
||||
$tagService->deleteById($id);
|
||||
$this->addFlash('success', 'Tag wurde gelöscht. Rebuild läuft im Hintergrund.');
|
||||
} catch (\Throwable $e) {
|
||||
$this->addFlash('danger', $e->getMessage());
|
||||
}
|
||||
|
||||
$em->remove($tag);
|
||||
$em->flush();
|
||||
|
||||
// enqueue async rebuild
|
||||
$jobs->enqueueAndStartAsync();
|
||||
|
||||
$this->addFlash('success', 'Tag wurde gelöscht. Rebuild läuft im Hintergrund.');
|
||||
return $this->redirectToRoute('admin_tags_index');
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
62
src/Controller/Admin/TagRebuildStreamController.php
Normal file
62
src/Controller/Admin/TagRebuildStreamController.php
Normal file
@@ -0,0 +1,62 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Controller\Admin;
|
||||
|
||||
use App\Entity\TagRebuildJob;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Symfony\Component\HttpFoundation\StreamedResponse;
|
||||
use Symfony\Component\Routing\Attribute\Route;
|
||||
|
||||
final class TagRebuildStreamController
|
||||
{
|
||||
#[Route('/admin/tags/rebuild/stream', name: 'admin_tags_rebuild_stream')]
|
||||
public function stream(EntityManagerInterface $em): StreamedResponse
|
||||
{
|
||||
$response = new StreamedResponse(function () use ($em) {
|
||||
|
||||
// Sofort erstes Event senden (wichtig!)
|
||||
echo "event: ping\n";
|
||||
echo "data: " . json_encode(['init' => true]) . "\n\n";
|
||||
|
||||
@ob_flush();
|
||||
@flush();
|
||||
|
||||
while (!connection_aborted()) {
|
||||
|
||||
$em->clear();
|
||||
|
||||
$job = $em->createQueryBuilder()
|
||||
->select('j')
|
||||
->from(TagRebuildJob::class, 'j')
|
||||
->orderBy('j.createdAt', 'DESC')
|
||||
->setMaxResults(1)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
|
||||
if ($job) {
|
||||
echo "event: message\n";
|
||||
echo "data: " . json_encode([
|
||||
'status' => $job->getStatus(),
|
||||
'startedAt' => $job->getStartedAt()?->format(DATE_ATOM),
|
||||
'finishedAt' => $job->getFinishedAt()?->format(DATE_ATOM),
|
||||
'error' => $job->getErrorMessage(),
|
||||
]) . "\n\n";
|
||||
|
||||
@ob_flush();
|
||||
@flush();
|
||||
}
|
||||
|
||||
sleep(2);
|
||||
}
|
||||
});
|
||||
|
||||
$response->headers->set('Content-Type', 'text/event-stream');
|
||||
$response->headers->set('Cache-Control', 'no-cache');
|
||||
$response->headers->set('Connection', 'keep-alive');
|
||||
$response->headers->set('X-Accel-Buffering', 'no'); // 🔥 wichtig bei nginx
|
||||
|
||||
return $response;
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,12 @@ use Psr\Log\LoggerInterface;
|
||||
|
||||
final readonly class TagRebuildJobService
|
||||
{
|
||||
/**
|
||||
* Wenn ein QUEUED-Job länger nicht startet, gilt er als "stale" und wird auf FAILED gesetzt,
|
||||
* damit das System nicht dauerhaft blockiert.
|
||||
*/
|
||||
private const STALE_QUEUED_AFTER_SECONDS = 300; // 5 Minuten
|
||||
|
||||
public function __construct(
|
||||
private EntityManagerInterface $em,
|
||||
private LoggerInterface $agentLogger,
|
||||
@@ -28,24 +34,153 @@ final readonly class TagRebuildJobService
|
||||
return $job;
|
||||
}
|
||||
|
||||
public function enqueueIfIdle(): ?TagRebuildJob
|
||||
{
|
||||
// Coalescing: Wenn ein Job läuft oder queued ist -> nichts tun
|
||||
if ($this->hasActiveJob()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return $this->enqueueAndStartAsync();
|
||||
}
|
||||
|
||||
/**
|
||||
* Letzter Job (egal welcher Status).
|
||||
*/
|
||||
public function getLatestJob(): ?TagRebuildJob
|
||||
{
|
||||
return $this->em->createQueryBuilder()
|
||||
->select('j')
|
||||
->from(TagRebuildJob::class, 'j')
|
||||
->orderBy('j.createdAt', 'DESC')
|
||||
->setMaxResults(1)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* Letzter Job mit Status COMPLETED.
|
||||
*/
|
||||
public function getLatestCompletedJob(): ?TagRebuildJob
|
||||
{
|
||||
return $this->em->createQueryBuilder()
|
||||
->select('j')
|
||||
->from(TagRebuildJob::class, 'j')
|
||||
->where('j.status = :status')
|
||||
->setParameter('status', TagRebuildJob::STATUS_COMPLETED)
|
||||
->orderBy('j.createdAt', 'DESC')
|
||||
->setMaxResults(1)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ob gerade ein Job aktiv ist:
|
||||
* - RUNNING ist immer aktiv
|
||||
* - QUEUED ist nur aktiv, wenn er nicht stale ist
|
||||
*
|
||||
* Zusätzlich: stale QUEUED Jobs werden auf FAILED gesetzt (Recovery).
|
||||
*/
|
||||
public function hasActiveJob(): bool
|
||||
{
|
||||
$this->markStaleQueuedJobsFailed();
|
||||
|
||||
$cutoff = new \DateTimeImmutable('-' . self::STALE_QUEUED_AFTER_SECONDS . ' seconds');
|
||||
|
||||
$qb = $this->em->createQueryBuilder();
|
||||
$qb->select('COUNT(j.id)')
|
||||
->from(TagRebuildJob::class, 'j')
|
||||
->where(
|
||||
$qb->expr()->orX(
|
||||
'j.status = :running',
|
||||
$qb->expr()->andX(
|
||||
'j.status = :queued',
|
||||
'j.createdAt >= :cutoff'
|
||||
)
|
||||
)
|
||||
)
|
||||
->setParameter('running', TagRebuildJob::STATUS_RUNNING)
|
||||
->setParameter('queued', TagRebuildJob::STATUS_QUEUED)
|
||||
->setParameter('cutoff', $cutoff);
|
||||
|
||||
return (int) $qb->getQuery()->getSingleScalarResult() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Startet den Job async über bin/console.
|
||||
* Wichtige Fixes:
|
||||
* - php explizit verwenden
|
||||
* - --no-interaction
|
||||
* - Logfile statt /dev/null
|
||||
*/
|
||||
private function startAsync(TagRebuildJob $job): void
|
||||
{
|
||||
$php = PHP_BINARY; // safest in runtime
|
||||
$console = rtrim($this->projectDir, '/') . '/bin/console';
|
||||
$projectDir = rtrim($this->projectDir, '/');
|
||||
$console = $projectDir . '/bin/console';
|
||||
|
||||
$jobId = (string) $job->getId();
|
||||
|
||||
$logDir = $projectDir . '/var/log/tags';
|
||||
if (!is_dir($logDir)) {
|
||||
@mkdir($logDir, 0777, true);
|
||||
}
|
||||
$logFile = $logDir . '/job_' . $jobId . '.log';
|
||||
|
||||
// Robust: cd ins Projekt, dann nohup php bin/console ...
|
||||
$cmd = sprintf(
|
||||
'%s %s %s %s > /dev/null 2>&1 &',
|
||||
escapeshellarg($php),
|
||||
'cd %s && nohup %s %s %s %s --no-interaction >> %s 2>&1 &',
|
||||
escapeshellarg($projectDir),
|
||||
escapeshellcmd('php'),
|
||||
escapeshellarg($console),
|
||||
'mto:agent:tags:job:run',
|
||||
escapeshellarg((string)$job->getId())
|
||||
escapeshellarg('mto:agent:tags:job:run'),
|
||||
escapeshellarg($jobId),
|
||||
escapeshellarg($logFile)
|
||||
);
|
||||
|
||||
$this->agentLogger->info('[tags] enqueue job async', [
|
||||
'job' => (string)$job->getId(),
|
||||
'job' => $jobId,
|
||||
'cmd' => $cmd,
|
||||
'log' => $logFile,
|
||||
]);
|
||||
|
||||
@exec($cmd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recovery gegen "ewig QUEUED":
|
||||
* Setzt alte QUEUED Jobs auf FAILED, damit enqueueIfIdle() nicht dauerhaft blockiert.
|
||||
*/
|
||||
private function markStaleQueuedJobsFailed(): void
|
||||
{
|
||||
$cutoff = new \DateTimeImmutable('-' . self::STALE_QUEUED_AFTER_SECONDS . ' seconds');
|
||||
|
||||
$qb = $this->em->createQueryBuilder();
|
||||
$qb->select('j')
|
||||
->from(TagRebuildJob::class, 'j')
|
||||
->where('j.status = :queued')
|
||||
->andWhere('j.createdAt < :cutoff')
|
||||
->setParameter('queued', TagRebuildJob::STATUS_QUEUED)
|
||||
->setParameter('cutoff', $cutoff)
|
||||
->setMaxResults(25);
|
||||
|
||||
/** @var TagRebuildJob[] $stale */
|
||||
$stale = $qb->getQuery()->getResult();
|
||||
|
||||
if (!$stale) {
|
||||
return;
|
||||
}
|
||||
|
||||
foreach ($stale as $job) {
|
||||
$jobId = (string) $job->getId();
|
||||
|
||||
$job->markFailed('Stale QUEUED job detected (async start likely failed).');
|
||||
|
||||
$this->agentLogger->warning('[tags] stale QUEUED job marked FAILED', [
|
||||
'job' => $jobId,
|
||||
'cutoff' => $cutoff->format(\DateTimeInterface::ATOM),
|
||||
]);
|
||||
}
|
||||
|
||||
$this->em->flush();
|
||||
}
|
||||
}
|
||||
134
src/Tag/TagService.php
Normal file
134
src/Tag/TagService.php
Normal file
@@ -0,0 +1,134 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Tag;
|
||||
|
||||
use App\Entity\Tag;
|
||||
use App\Entity\Document;
|
||||
use App\Entity\DocumentTag;
|
||||
use App\Service\TagRebuildJobService;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
|
||||
final class TagService
|
||||
{
|
||||
public function __construct(
|
||||
private EntityManagerInterface $em,
|
||||
private TagRebuildJobService $jobs,
|
||||
) {}
|
||||
|
||||
// =========================================================
|
||||
// TAG CREATE
|
||||
// =========================================================
|
||||
|
||||
public function create(string $slug, string $label, ?string $description = null): Tag
|
||||
{
|
||||
$slug = trim($slug);
|
||||
$label = trim($label);
|
||||
|
||||
if ($label === '' || $slug === '') {
|
||||
throw new \InvalidArgumentException('Label und Slug sind Pflichtfelder.');
|
||||
}
|
||||
|
||||
if ($this->slugExists($slug)) {
|
||||
throw new \RuntimeException('Slug existiert bereits.');
|
||||
}
|
||||
|
||||
$tag = new Tag($slug, $label, $description);
|
||||
|
||||
$this->em->persist($tag);
|
||||
$this->em->flush();
|
||||
|
||||
$this->triggerRebuildIfIdle();
|
||||
|
||||
return $tag;
|
||||
}
|
||||
|
||||
// =========================================================
|
||||
// TAG DELETE
|
||||
// =========================================================
|
||||
|
||||
public function deleteById(string $tagId): void
|
||||
{
|
||||
$tag = $this->em->getRepository(Tag::class)->find($tagId);
|
||||
|
||||
if (!$tag instanceof Tag) {
|
||||
throw new \RuntimeException('Tag nicht gefunden.');
|
||||
}
|
||||
|
||||
$this->delete($tag);
|
||||
}
|
||||
|
||||
public function delete(Tag $tag): void
|
||||
{
|
||||
$this->em->remove($tag);
|
||||
$this->em->flush();
|
||||
|
||||
$this->triggerRebuildIfIdle();
|
||||
}
|
||||
|
||||
// =========================================================
|
||||
// DOCUMENT TAG SYNC
|
||||
// =========================================================
|
||||
|
||||
/**
|
||||
* Synchronisiert alle Tags eines Dokuments.
|
||||
* Löst einen Rebuild aus, da document_ids Teil des NDJSON sind.
|
||||
*/
|
||||
public function syncDocumentTags(Document $document, array $newTagIds): void
|
||||
{
|
||||
$newTagIds = array_unique($newTagIds);
|
||||
|
||||
$currentRelations = $this->em
|
||||
->getRepository(DocumentTag::class)
|
||||
->findBy(['document' => $document]);
|
||||
|
||||
$currentTagIds = array_map(
|
||||
fn(DocumentTag $dt) => (string) $dt->getTag()->getId(),
|
||||
$currentRelations
|
||||
);
|
||||
|
||||
$toAdd = array_diff($newTagIds, $currentTagIds);
|
||||
$toRemove = array_diff($currentTagIds, $newTagIds);
|
||||
|
||||
foreach ($toAdd as $tagId) {
|
||||
$tag = $this->em->getRepository(Tag::class)->find($tagId);
|
||||
if ($tag instanceof Tag) {
|
||||
$this->em->persist(new DocumentTag($document, $tag));
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($currentRelations as $relation) {
|
||||
if (in_array((string) $relation->getTag()->getId(), $toRemove, true)) {
|
||||
$this->em->remove($relation);
|
||||
}
|
||||
}
|
||||
|
||||
if ($toAdd || $toRemove) {
|
||||
$this->em->flush();
|
||||
$this->triggerRebuildIfIdle();
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================
|
||||
// INTERNAL HELPERS
|
||||
// =========================================================
|
||||
|
||||
private function slugExists(string $slug): bool
|
||||
{
|
||||
return (int) $this->em->createQueryBuilder()
|
||||
->select('COUNT(t.id)')
|
||||
->from(Tag::class, 't')
|
||||
->where('t.slug = :slug')
|
||||
->setParameter('slug', $slug)
|
||||
->getQuery()
|
||||
->getSingleScalarResult() > 0;
|
||||
}
|
||||
|
||||
private function triggerRebuildIfIdle(): void
|
||||
{
|
||||
if (!$this->jobs->hasActiveJob()) {
|
||||
$this->jobs->enqueueAndStartAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user