first commit
This commit is contained in:
@@ -8,65 +8,99 @@ use App\Entity\Document;
|
||||
use App\Entity\Tag;
|
||||
use App\Service\TagRebuildJobService;
|
||||
use App\Tag\TagService;
|
||||
use App\Tag\TagTypes;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use RuntimeException;
|
||||
|
||||
final class DocumentTagAdminService
|
||||
final readonly class DocumentTagAdminService
|
||||
{
|
||||
public function __construct(
|
||||
private readonly EntityManagerInterface $em,
|
||||
private readonly TagService $tagService,
|
||||
private readonly TagRebuildJobService $jobs,
|
||||
) {}
|
||||
private EntityManagerInterface $em,
|
||||
private TagService $tagService,
|
||||
private TagRebuildJobService $jobs,
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{
|
||||
* document: Document,
|
||||
* allTags: list<Tag>,
|
||||
* latestJob: mixed
|
||||
* latestJob: mixed,
|
||||
* hasActiveJob: bool
|
||||
* }
|
||||
*/
|
||||
public function getEditData(string $documentId): array
|
||||
{
|
||||
$document = $this->em->getRepository(Document::class)->find($documentId);
|
||||
if (!$document instanceof Document) {
|
||||
throw new \RuntimeException('Document not found');
|
||||
}
|
||||
$document = $this->findDocumentById($documentId);
|
||||
|
||||
/** @var list<Tag> $allTags */
|
||||
$allTags = $this->em->createQueryBuilder()
|
||||
->select('t')
|
||||
->from(Tag::class, 't')
|
||||
->orderBy('t.label', 'ASC')
|
||||
->getQuery()
|
||||
->getResult();
|
||||
$allTags = $this->em->getRepository(Tag::class)->findAll();
|
||||
|
||||
$latestJob = $this->jobs->getLatestJob();
|
||||
usort(
|
||||
$allTags,
|
||||
static function (Tag $left, Tag $right): int {
|
||||
$typeOrder = [
|
||||
TagTypes::CATALOG_ENTITY => 10,
|
||||
TagTypes::GENERIC => 20,
|
||||
TagTypes::SALES_SIGNAL => 30,
|
||||
];
|
||||
|
||||
$leftTypeRank = $typeOrder[$left->getType()] ?? 999;
|
||||
$rightTypeRank = $typeOrder[$right->getType()] ?? 999;
|
||||
|
||||
if ($leftTypeRank !== $rightTypeRank) {
|
||||
return $leftTypeRank <=> $rightTypeRank;
|
||||
}
|
||||
|
||||
$labelComparison = strcasecmp($left->getLabel(), $right->getLabel());
|
||||
|
||||
if ($labelComparison !== 0) {
|
||||
return $labelComparison;
|
||||
}
|
||||
|
||||
return strcmp($left->getSlug(), $right->getSlug());
|
||||
}
|
||||
);
|
||||
|
||||
return [
|
||||
'document' => $document,
|
||||
'allTags' => $allTags,
|
||||
'latestJob' => $latestJob,
|
||||
'latestJob' => $this->jobs->getLatestJob(),
|
||||
'hasActiveJob' => $this->jobs->hasActiveJob(),
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Speichert die Tag-Auswahl für ein Dokument (inkl. Sync-Logik).
|
||||
* Persists the selected tag set for a document via the central domain service.
|
||||
*
|
||||
* @param array<mixed> $selectedTagIds
|
||||
*/
|
||||
public function saveTags(string $documentId, array $selectedTagIds): void
|
||||
{
|
||||
$document = $this->em->getRepository(Document::class)->find($documentId);
|
||||
if (!$document instanceof Document) {
|
||||
throw new \RuntimeException('Document not found');
|
||||
}
|
||||
$document = $this->findDocumentById($documentId);
|
||||
|
||||
// Delegation an deine Domain-Logik (bleibt dort, wo sie hingehört)
|
||||
$this->tagService->syncDocumentTags($document, $selectedTagIds);
|
||||
}
|
||||
|
||||
public function getLatestRebuildStatus(): ?string
|
||||
{
|
||||
$job = $this->jobs->getLatestJob();
|
||||
return $this->jobs->getLatestJob()?->getStatus();
|
||||
}
|
||||
|
||||
return $job?->getStatus();
|
||||
private function findDocumentById(string $documentId): Document
|
||||
{
|
||||
$documentId = trim($documentId);
|
||||
|
||||
if ($documentId === '') {
|
||||
throw new RuntimeException('Document not found.');
|
||||
}
|
||||
|
||||
$document = $this->em->getRepository(Document::class)->find($documentId);
|
||||
|
||||
if (!$document instanceof Document) {
|
||||
throw new RuntimeException('Document not found.');
|
||||
}
|
||||
|
||||
return $document;
|
||||
}
|
||||
}
|
||||
@@ -9,23 +9,29 @@ use App\Entity\DocumentTag;
|
||||
use App\Entity\Tag;
|
||||
use App\Service\TagRebuildJobService;
|
||||
use App\Tag\TagService;
|
||||
use App\Tag\TagTypes;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use RuntimeException;
|
||||
|
||||
final readonly class TagAdminService
|
||||
{
|
||||
public function __construct(
|
||||
private EntityManagerInterface $em,
|
||||
private TagService $tagService,
|
||||
private TagRebuildJobService $jobs,
|
||||
) {}
|
||||
private TagService $tagService,
|
||||
private TagRebuildJobService $jobs,
|
||||
) {
|
||||
}
|
||||
|
||||
public function getIndexData(): array
|
||||
{
|
||||
/** @var list<Tag> $tags */
|
||||
$tags = $this->em->getRepository(Tag::class)
|
||||
->findBy([], ['label' => 'ASC']);
|
||||
->findBy([], ['type' => 'ASC', 'label' => 'ASC']);
|
||||
|
||||
return [
|
||||
'tags' => $tags,
|
||||
'tagTypeChoices' => TagTypes::choices(),
|
||||
'documentCountByTagId' => $this->buildDocumentCountByTagId(),
|
||||
'latestJob' => $this->jobs->getLatestJob(),
|
||||
'hasActiveJob' => $this->jobs->hasActiveJob(),
|
||||
];
|
||||
@@ -35,7 +41,7 @@ final readonly class TagAdminService
|
||||
string $slug,
|
||||
string $label,
|
||||
?string $description,
|
||||
string $type = 'generic' // NEU
|
||||
string $type = TagTypes::GENERIC,
|
||||
): void {
|
||||
$this->tagService->create($slug, $label, $description, $type);
|
||||
}
|
||||
@@ -47,35 +53,47 @@ final readonly class TagAdminService
|
||||
|
||||
public function getAssignData(string $tagId): array
|
||||
{
|
||||
$tag = $this->em->getRepository(Tag::class)->find($tagId);
|
||||
$tag = $this->findTagById($tagId);
|
||||
|
||||
if (!$tag instanceof Tag) {
|
||||
throw new \RuntimeException('Tag nicht gefunden.');
|
||||
}
|
||||
|
||||
$documents = $this->em->getRepository(Document::class)->findAll();
|
||||
/** @var list<Document> $documents */
|
||||
$documents = $this->em->getRepository(Document::class)->findBy(
|
||||
['status' => Document::STATUS_ACTIVE],
|
||||
['title' => 'ASC']
|
||||
);
|
||||
|
||||
$documentsData = array_map(
|
||||
fn(Document $d) => [
|
||||
'id' => (string)$d->getId(),
|
||||
'title' => $d->getTitle(),
|
||||
static fn (Document $document): array => [
|
||||
'id' => (string) $document->getId(),
|
||||
'title' => $document->getTitle(),
|
||||
],
|
||||
$documents
|
||||
);
|
||||
|
||||
/** @var list<DocumentTag> $existingRelations */
|
||||
$existingRelations = $this->em
|
||||
->getRepository(DocumentTag::class)
|
||||
->findBy(['tag' => $tag]);
|
||||
|
||||
$assignedDocIds = array_map(
|
||||
fn(DocumentTag $dt) => (string)$dt->getDocument()->getId(),
|
||||
$existingRelations
|
||||
$activeDocumentIds = array_map(
|
||||
static fn (Document $document): string => (string) $document->getId(),
|
||||
$documents
|
||||
);
|
||||
|
||||
$assignedDocIds = [];
|
||||
|
||||
foreach ($existingRelations as $relation) {
|
||||
$documentId = (string) $relation->getDocument()->getId();
|
||||
|
||||
if (in_array($documentId, $activeDocumentIds, true)) {
|
||||
$assignedDocIds[] = $documentId;
|
||||
}
|
||||
}
|
||||
|
||||
return [
|
||||
'tag' => $tag,
|
||||
'documents' => $documentsData,
|
||||
'assignedDocIds' => $assignedDocIds,
|
||||
'assignedDocIds' => array_values(array_unique($assignedDocIds)),
|
||||
'tagTypeChoices' => TagTypes::choices(),
|
||||
'latestJob' => $this->jobs->getLatestJob(),
|
||||
'hasActiveJob' => $this->jobs->hasActiveJob(),
|
||||
];
|
||||
@@ -83,12 +101,55 @@ final readonly class TagAdminService
|
||||
|
||||
public function syncAssignments(string $tagId, array $selectedDocIds): void
|
||||
{
|
||||
$tag = $this->findTagById($tagId);
|
||||
$this->tagService->syncTagDocuments($tag, $selectedDocIds);
|
||||
}
|
||||
|
||||
private function findTagById(string $tagId): Tag
|
||||
{
|
||||
$tagId = trim($tagId);
|
||||
|
||||
if ($tagId === '') {
|
||||
throw new RuntimeException('Tag nicht gefunden.');
|
||||
}
|
||||
|
||||
$tag = $this->em->getRepository(Tag::class)->find($tagId);
|
||||
|
||||
if (!$tag instanceof Tag) {
|
||||
throw new \RuntimeException('Tag nicht gefunden.');
|
||||
throw new RuntimeException('Tag nicht gefunden.');
|
||||
}
|
||||
|
||||
$this->tagService->syncTagDocuments($tag, $selectedDocIds);
|
||||
return $tag;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<string, int>
|
||||
*/
|
||||
private function buildDocumentCountByTagId(): array
|
||||
{
|
||||
$rows = $this->em->createQueryBuilder()
|
||||
->select('t AS tag', 'COUNT(d.id) AS documentCount')
|
||||
->from(Tag::class, 't')
|
||||
->leftJoin(DocumentTag::class, 'dt', 'WITH', 'dt.tag = t')
|
||||
->leftJoin('dt.document', 'd', 'WITH', 'd.status = :status')
|
||||
->groupBy('t.id')
|
||||
->setParameter('status', Document::STATUS_ACTIVE)
|
||||
->getQuery()
|
||||
->getResult();
|
||||
|
||||
$counts = [];
|
||||
|
||||
foreach ($rows as $row) {
|
||||
$tag = $row[0] ?? $row['tag'] ?? null;
|
||||
$documentCount = (int) ($row['documentCount'] ?? 0);
|
||||
|
||||
if (!$tag instanceof Tag) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$counts[$tag->getId()->toRfc4122()] = $documentCount;
|
||||
}
|
||||
|
||||
return $counts;
|
||||
}
|
||||
}
|
||||
@@ -1,29 +1,33 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Service;
|
||||
|
||||
use App\Entity\Document;
|
||||
use App\Entity\DocumentVersion;
|
||||
use App\Entity\User;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use RuntimeException;
|
||||
|
||||
class DocumentService
|
||||
final readonly class DocumentService
|
||||
{
|
||||
public function __construct(
|
||||
private EntityManagerInterface $em,
|
||||
) {}
|
||||
private TagRebuildJobService $tagRebuildJobService,
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Erstellt ein neues Dokument inkl. Version 1
|
||||
* Creates a new document including version 1.
|
||||
*/
|
||||
public function createDocument(
|
||||
string $title,
|
||||
string $filePath,
|
||||
User $user
|
||||
): Document {
|
||||
|
||||
$document = new Document();
|
||||
$document->setTitle($title);
|
||||
$document->setTitle(trim($title));
|
||||
$document->setCreatedBy($user);
|
||||
|
||||
$version = new DocumentVersion();
|
||||
@@ -44,14 +48,13 @@ class DocumentService
|
||||
}
|
||||
|
||||
/**
|
||||
* Fügt neue Version hinzu (immutable)
|
||||
* Adds a new immutable version to an existing document.
|
||||
*/
|
||||
public function addVersion(
|
||||
Document $document,
|
||||
string $filePath,
|
||||
User $user
|
||||
): DocumentVersion {
|
||||
|
||||
$nextVersionNumber = $this->getNextVersionNumber($document);
|
||||
|
||||
$version = new DocumentVersion();
|
||||
@@ -70,7 +73,7 @@ class DocumentService
|
||||
}
|
||||
|
||||
/**
|
||||
* Aktiviert eine Version
|
||||
* Activates a document version and marks it for re-ingest.
|
||||
*/
|
||||
public function activateVersion(DocumentVersion $version): void
|
||||
{
|
||||
@@ -82,41 +85,77 @@ class DocumentService
|
||||
|
||||
$version->setActive(true);
|
||||
$document->setCurrentVersion($version);
|
||||
|
||||
$version->setIngestStatus(DocumentVersion::INGEST_PENDING);
|
||||
|
||||
$this->em->flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Archiviert Dokument
|
||||
* Archives a document.
|
||||
*
|
||||
* If the document had tag assignments, the tag index is rebuilt so the
|
||||
* routing layer no longer works with an outdated active document set.
|
||||
*/
|
||||
public function archive(Document $document): void
|
||||
{
|
||||
if ($document->getStatus() === Document::STATUS_ARCHIVED) {
|
||||
return;
|
||||
}
|
||||
|
||||
$shouldRebuildTags = $this->hasTagAssignments($document);
|
||||
|
||||
$document->archive();
|
||||
$this->em->flush();
|
||||
}
|
||||
|
||||
public function delete(Document $document): void
|
||||
{
|
||||
$this->em->remove($document);
|
||||
$this->em->flush();
|
||||
if ($shouldRebuildTags) {
|
||||
$this->triggerTagRebuildIfIdle();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Berechnet SHA256 Checksum
|
||||
* Deletes a document.
|
||||
*
|
||||
* If the document had tag assignments, the tag index is rebuilt after the
|
||||
* removal so stale document references disappear from tag-based routing.
|
||||
*/
|
||||
public function delete(Document $document): void
|
||||
{
|
||||
$shouldRebuildTags = $this->hasTagAssignments($document);
|
||||
|
||||
$this->em->remove($document);
|
||||
$this->em->flush();
|
||||
|
||||
if ($shouldRebuildTags) {
|
||||
$this->triggerTagRebuildIfIdle();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the SHA256 checksum for a file path.
|
||||
*/
|
||||
private function calculateChecksum(string $filePath): string
|
||||
{
|
||||
if (!file_exists($filePath)) {
|
||||
throw new \RuntimeException('File not found for checksum.');
|
||||
$filePath = trim($filePath);
|
||||
|
||||
if ($filePath === '') {
|
||||
throw new RuntimeException('File path must not be empty.');
|
||||
}
|
||||
|
||||
return hash_file('sha256', $filePath);
|
||||
if (!is_file($filePath)) {
|
||||
throw new RuntimeException('File not found for checksum.');
|
||||
}
|
||||
|
||||
$checksum = hash_file('sha256', $filePath);
|
||||
|
||||
if ($checksum === false) {
|
||||
throw new RuntimeException('Could not calculate file checksum.');
|
||||
}
|
||||
|
||||
return $checksum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ermittelt nächste Versionsnummer
|
||||
* Determines the next version number for a document.
|
||||
*/
|
||||
private function getNextVersionNumber(Document $document): int
|
||||
{
|
||||
@@ -128,4 +167,16 @@ class DocumentService
|
||||
|
||||
return $max + 1;
|
||||
}
|
||||
}
|
||||
|
||||
private function hasTagAssignments(Document $document): bool
|
||||
{
|
||||
return $document->getDocumentTags()->count() > 0;
|
||||
}
|
||||
|
||||
private function triggerTagRebuildIfIdle(): void
|
||||
{
|
||||
if (!$this->tagRebuildJobService->hasActiveJob()) {
|
||||
$this->tagRebuildJobService->enqueueAndStartAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -11,16 +11,24 @@ use Psr\Log\LoggerInterface;
|
||||
final readonly class TagRebuildJobService
|
||||
{
|
||||
/**
|
||||
* Wenn ein QUEUED-Job länger nicht startet, gilt er als "stale" und wird auf FAILED gesetzt,
|
||||
* damit das System nicht dauerhaft blockiert.
|
||||
* If a QUEUED job does not transition into RUNNING in time,
|
||||
* it is treated as stale so the system does not stay blocked forever.
|
||||
*/
|
||||
private const STALE_QUEUED_AFTER_SECONDS = 300; // 5 Minuten
|
||||
private const STALE_QUEUED_AFTER_SECONDS = 300;
|
||||
|
||||
/**
|
||||
* The background runner should switch the job from QUEUED to RUNNING almost
|
||||
* immediately because markRunning() happens at the top of the command.
|
||||
*/
|
||||
private const ASYNC_START_TIMEOUT_SECONDS = 3;
|
||||
private const ASYNC_START_POLL_INTERVAL_MICROSECONDS = 250000;
|
||||
|
||||
public function __construct(
|
||||
private EntityManagerInterface $em,
|
||||
private LoggerInterface $agentLogger,
|
||||
private string $projectDir,
|
||||
) {}
|
||||
private LoggerInterface $agentLogger,
|
||||
private string $projectDir,
|
||||
) {
|
||||
}
|
||||
|
||||
public function enqueueAndStartAsync(): TagRebuildJob
|
||||
{
|
||||
@@ -29,14 +37,25 @@ final readonly class TagRebuildJobService
|
||||
$this->em->persist($job);
|
||||
$this->em->flush();
|
||||
|
||||
$this->startAsync($job);
|
||||
try {
|
||||
$this->startAsync($job);
|
||||
} catch (\Throwable $e) {
|
||||
$job->markFailed('Async tag rebuild start failed: ' . $e->getMessage());
|
||||
$this->em->flush();
|
||||
|
||||
$this->agentLogger->error('[tags] async job start failed', [
|
||||
'job' => (string) $job->getId(),
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
|
||||
throw $e;
|
||||
}
|
||||
|
||||
return $job;
|
||||
}
|
||||
|
||||
public function enqueueIfIdle(): ?TagRebuildJob
|
||||
{
|
||||
// Coalescing: Wenn ein Job läuft oder queued ist -> nichts tun
|
||||
if ($this->hasActiveJob()) {
|
||||
return null;
|
||||
}
|
||||
@@ -44,23 +63,18 @@ final readonly class TagRebuildJobService
|
||||
return $this->enqueueAndStartAsync();
|
||||
}
|
||||
|
||||
/**
|
||||
* Letzter Job (egal welcher Status).
|
||||
*/
|
||||
public function getLatestJob(): ?TagRebuildJob
|
||||
{
|
||||
return $this->em->createQueryBuilder()
|
||||
->select('j')
|
||||
->from(TagRebuildJob::class, 'j')
|
||||
->orderBy('j.createdAt', 'DESC')
|
||||
->addOrderBy('j.id', 'DESC')
|
||||
->setMaxResults(1)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* Letzter Job mit Status COMPLETED.
|
||||
*/
|
||||
public function getLatestCompletedJob(): ?TagRebuildJob
|
||||
{
|
||||
return $this->em->createQueryBuilder()
|
||||
@@ -69,18 +83,12 @@ final readonly class TagRebuildJobService
|
||||
->where('j.status = :status')
|
||||
->setParameter('status', TagRebuildJob::STATUS_COMPLETED)
|
||||
->orderBy('j.createdAt', 'DESC')
|
||||
->addOrderBy('j.id', 'DESC')
|
||||
->setMaxResults(1)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ob gerade ein Job aktiv ist:
|
||||
* - RUNNING ist immer aktiv
|
||||
* - QUEUED ist nur aktiv, wenn er nicht stale ist
|
||||
*
|
||||
* Zusätzlich: stale QUEUED Jobs werden auf FAILED gesetzt (Recovery).
|
||||
*/
|
||||
public function hasActiveJob(): bool
|
||||
{
|
||||
$this->markStaleQueuedJobsFailed();
|
||||
@@ -106,31 +114,33 @@ final readonly class TagRebuildJobService
|
||||
return (int) $qb->getQuery()->getSingleScalarResult() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Startet den Job async über bin/console.
|
||||
* Wichtige Fixes:
|
||||
* - php explizit verwenden
|
||||
* - --no-interaction
|
||||
* - Logfile statt /dev/null
|
||||
*/
|
||||
private function startAsync(TagRebuildJob $job): void
|
||||
{
|
||||
$projectDir = rtrim($this->projectDir, '/');
|
||||
$console = $projectDir . '/bin/console';
|
||||
$projectDir = rtrim(trim($this->projectDir), '/');
|
||||
$console = $projectDir . '/bin/console';
|
||||
|
||||
if ($projectDir === '' || !is_dir($projectDir)) {
|
||||
throw new \RuntimeException('Project directory is invalid.');
|
||||
}
|
||||
|
||||
if (!is_file($console)) {
|
||||
throw new \RuntimeException('bin/console not found: ' . $console);
|
||||
}
|
||||
|
||||
$phpBinary = $this->resolvePhpBinary();
|
||||
$jobId = (string) $job->getId();
|
||||
|
||||
$logDir = $projectDir . '/var/log/tags';
|
||||
if (!is_dir($logDir)) {
|
||||
@mkdir($logDir, 0777, true);
|
||||
if (!is_dir($logDir) && !@mkdir($logDir, 0775, true) && !is_dir($logDir)) {
|
||||
throw new \RuntimeException('Could not create tag job log directory.');
|
||||
}
|
||||
|
||||
$logFile = $logDir . '/job_' . $jobId . '.log';
|
||||
|
||||
// Robust: cd ins Projekt, dann nohup php bin/console ...
|
||||
$cmd = sprintf(
|
||||
'cd %s && nohup %s %s %s %s --no-interaction >> %s 2>&1 &',
|
||||
'cd %s && nohup %s %s %s %s --no-interaction >> %s 2>&1 & echo $!',
|
||||
escapeshellarg($projectDir),
|
||||
escapeshellcmd('php'),
|
||||
escapeshellarg($phpBinary),
|
||||
escapeshellarg($console),
|
||||
escapeshellarg('mto:agent:tags:job:run'),
|
||||
escapeshellarg($jobId),
|
||||
@@ -141,15 +151,92 @@ final readonly class TagRebuildJobService
|
||||
'job' => $jobId,
|
||||
'cmd' => $cmd,
|
||||
'log' => $logFile,
|
||||
'php_binary' => $phpBinary,
|
||||
]);
|
||||
|
||||
@exec($cmd);
|
||||
$output = [];
|
||||
$exitCode = 0;
|
||||
@exec($cmd, $output, $exitCode);
|
||||
|
||||
$pid = isset($output[0]) ? trim((string) $output[0]) : '';
|
||||
|
||||
if ($exitCode !== 0) {
|
||||
throw new \RuntimeException('Async process bootstrap failed with exit code ' . $exitCode . '.');
|
||||
}
|
||||
|
||||
if ($pid === '' || !ctype_digit($pid)) {
|
||||
throw new \RuntimeException('Async process bootstrap did not return a valid PID.');
|
||||
}
|
||||
|
||||
$this->agentLogger->info('[tags] async job process started', [
|
||||
'job' => $jobId,
|
||||
'pid' => $pid,
|
||||
'log' => $logFile,
|
||||
'php_binary' => $phpBinary,
|
||||
]);
|
||||
|
||||
$this->waitForAsyncJobTransition($job, $logFile);
|
||||
}
|
||||
|
||||
private function resolvePhpBinary(): string
|
||||
{
|
||||
$envCandidates = [
|
||||
trim((string) ($_SERVER['PHP_CLI_BINARY'] ?? '')),
|
||||
trim((string) ($_ENV['PHP_CLI_BINARY'] ?? '')),
|
||||
trim((string) getenv('PHP_CLI_BINARY')),
|
||||
];
|
||||
|
||||
foreach ($envCandidates as $candidate) {
|
||||
if ($this->isValidCliPhpBinary($candidate)) {
|
||||
return $candidate;
|
||||
}
|
||||
}
|
||||
|
||||
$phpBinary = defined('PHP_BINARY') ? trim((string) PHP_BINARY) : '';
|
||||
if ($this->isValidCliPhpBinary($phpBinary)) {
|
||||
return $phpBinary;
|
||||
}
|
||||
|
||||
$fallbackCandidates = [
|
||||
'/usr/bin/php',
|
||||
'/usr/local/bin/php',
|
||||
'/bin/php',
|
||||
'/opt/homebrew/bin/php',
|
||||
];
|
||||
|
||||
foreach ($fallbackCandidates as $candidate) {
|
||||
if ($this->isValidCliPhpBinary($candidate)) {
|
||||
return $candidate;
|
||||
}
|
||||
}
|
||||
|
||||
$whichPhp = trim((string) @shell_exec('command -v php 2>/dev/null'));
|
||||
if ($this->isValidCliPhpBinary($whichPhp)) {
|
||||
return $whichPhp;
|
||||
}
|
||||
|
||||
throw new \RuntimeException(
|
||||
'Could not resolve a CLI PHP binary. Set PHP_CLI_BINARY explicitly, e.g. /usr/bin/php.'
|
||||
);
|
||||
}
|
||||
|
||||
private function isValidCliPhpBinary(string $path): bool
|
||||
{
|
||||
$path = trim($path);
|
||||
|
||||
if ($path === '' || !is_file($path) || !is_executable($path)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$basename = strtolower(basename($path));
|
||||
|
||||
if (str_contains($basename, 'fpm') || str_contains($basename, 'cgi')) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recovery gegen "ewig QUEUED":
|
||||
* Setzt alte QUEUED Jobs auf FAILED, damit enqueueIfIdle() nicht dauerhaft blockiert.
|
||||
*/
|
||||
private function markStaleQueuedJobsFailed(): void
|
||||
{
|
||||
$cutoff = new \DateTimeImmutable('-' . self::STALE_QUEUED_AFTER_SECONDS . ' seconds');
|
||||
@@ -161,12 +248,13 @@ final readonly class TagRebuildJobService
|
||||
->andWhere('j.createdAt < :cutoff')
|
||||
->setParameter('queued', TagRebuildJob::STATUS_QUEUED)
|
||||
->setParameter('cutoff', $cutoff)
|
||||
->orderBy('j.createdAt', 'ASC')
|
||||
->setMaxResults(25);
|
||||
|
||||
/** @var TagRebuildJob[] $stale */
|
||||
/** @var list<TagRebuildJob> $stale */
|
||||
$stale = $qb->getQuery()->getResult();
|
||||
|
||||
if (!$stale) {
|
||||
if ($stale === []) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -183,4 +271,46 @@ final readonly class TagRebuildJobService
|
||||
|
||||
$this->em->flush();
|
||||
}
|
||||
|
||||
private function waitForAsyncJobTransition(TagRebuildJob $job, string $logFile): void
|
||||
{
|
||||
$deadline = microtime(true) + self::ASYNC_START_TIMEOUT_SECONDS;
|
||||
|
||||
while (microtime(true) < $deadline) {
|
||||
usleep(self::ASYNC_START_POLL_INTERVAL_MICROSECONDS);
|
||||
$this->em->refresh($job);
|
||||
|
||||
if (!$job->isQueued()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
$logHint = $this->readLogTail($logFile);
|
||||
|
||||
throw new \RuntimeException(
|
||||
'Async tag rebuild runner did not transition from QUEUED to RUNNING within '
|
||||
. self::ASYNC_START_TIMEOUT_SECONDS
|
||||
. ' seconds.'
|
||||
. ($logHint !== null ? ' Log tail: ' . $logHint : '')
|
||||
);
|
||||
}
|
||||
|
||||
private function readLogTail(string $logFile): ?string
|
||||
{
|
||||
if (!is_file($logFile) || !is_readable($logFile)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$content = @file_get_contents($logFile);
|
||||
|
||||
if (!is_string($content) || trim($content) === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
$content = trim($content);
|
||||
$tail = mb_substr($content, -800);
|
||||
$tail = preg_replace('/\s+/u', ' ', $tail) ?? $tail;
|
||||
|
||||
return trim($tail) !== '' ? trim($tail) : null;
|
||||
}
|
||||
}
|
||||
@@ -11,29 +11,76 @@ final readonly class TagRebuildStatusProvider
|
||||
{
|
||||
public function __construct(
|
||||
private EntityManagerInterface $em
|
||||
) {}
|
||||
) {
|
||||
}
|
||||
|
||||
public function getLatestStatus(): ?array
|
||||
{
|
||||
$this->em->clear();
|
||||
|
||||
$job = $this->em->createQueryBuilder()
|
||||
->select('j')
|
||||
$row = $this->em->createQueryBuilder()
|
||||
->select(
|
||||
'j.status AS status',
|
||||
'j.createdAt AS createdAt',
|
||||
'j.startedAt AS startedAt',
|
||||
'j.finishedAt AS finishedAt',
|
||||
'j.errorMessage AS errorMessage'
|
||||
)
|
||||
->from(TagRebuildJob::class, 'j')
|
||||
->orderBy('j.createdAt', 'DESC')
|
||||
->addOrderBy('j.id', 'DESC')
|
||||
->setMaxResults(1)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
->getOneOrNullResult(\Doctrine\ORM\Query::HYDRATE_ARRAY);
|
||||
|
||||
if (!$job instanceof TagRebuildJob) {
|
||||
if (!is_array($row)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$status = trim((string) ($row['status'] ?? ''));
|
||||
|
||||
if ($status === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
return [
|
||||
'status' => $job->getStatus(),
|
||||
'startedAt' => $job->getStartedAt()?->format(DATE_ATOM),
|
||||
'finishedAt' => $job->getFinishedAt()?->format(DATE_ATOM),
|
||||
'error' => $job->getErrorMessage(),
|
||||
'status' => $status,
|
||||
'createdAt' => $this->formatDateValue($row['createdAt'] ?? null),
|
||||
'startedAt' => $this->formatDateValue($row['startedAt'] ?? null),
|
||||
'finishedAt' => $this->formatDateValue($row['finishedAt'] ?? null),
|
||||
'error' => $this->normalizeNullableString($row['errorMessage'] ?? null),
|
||||
'hasActiveJob' => in_array($status, [
|
||||
TagRebuildJob::STATUS_QUEUED,
|
||||
TagRebuildJob::STATUS_RUNNING,
|
||||
], true),
|
||||
];
|
||||
}
|
||||
|
||||
private function formatDateValue(mixed $value): ?string
|
||||
{
|
||||
if ($value instanceof \DateTimeInterface) {
|
||||
return $value->format(DATE_ATOM);
|
||||
}
|
||||
|
||||
if (is_string($value)) {
|
||||
$value = trim($value);
|
||||
|
||||
if ($value === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return (new \DateTimeImmutable($value))->format(DATE_ATOM);
|
||||
} catch (\Throwable) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private function normalizeNullableString(mixed $value): ?string
|
||||
{
|
||||
$value = trim((string) $value);
|
||||
|
||||
return $value !== '' ? $value : null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user