fix stream error handling

This commit is contained in:
team 1
2026-04-25 12:19:20 +02:00
parent 2f28ad0416
commit fa65417efe
9 changed files with 435 additions and 62 deletions

View File

@@ -60,6 +60,7 @@ final readonly class AgentRunner
$attemptedShopRepair = false;
$usedShopRepair = false;
$shopRepairQueries = [];
$primaryShopSearchHadSystemFailure = false;
$this->agentLogger->info('Agent run started', [
'userId' => $userId,
@@ -113,7 +114,7 @@ final readonly class AgentRunner
$this->addSource($sources, $this->agentRunnerConfig->getConversationHistorySourceLabel());
}
$optimizedShopQuery = $this->buildOptimizedShopQuery(
$optimizedShopQuery = yield from $this->buildOptimizedShopQuery(
$prompt,
$userId,
$commerceHistoryContext
@@ -142,16 +143,35 @@ final readonly class AgentRunner
$userId,
$commerceHistoryContext
);
$primaryShopSearchHadSystemFailure = $this->shopSearchService->hadLastSearchSystemFailure();
$repairPayload = $this->repairShopResults(
prompt: $prompt,
userId: $userId,
commerceIntent: $commerceIntent,
commerceHistoryContext: $commerceHistoryContext,
primaryQuery: $shopSearchQuery,
primaryShopResults: $primaryShopResults,
knowledgeChunks: $knowledgeChunks
);
if ($primaryShopSearchHadSystemFailure) {
$this->agentLogger->warning('Shop repair skipped after Store API system failure', [
'userId' => $userId,
'commerceIntent' => $commerceIntent,
'shopSearchQuery' => $shopSearchQuery,
'failureReason' => $this->shopSearchService->getLastSearchFailureReason(),
]);
$repairPayload = [
'results' => $primaryShopResults,
'attemptedRepair' => false,
'usedRepair' => false,
'repairQueries' => [],
];
} else {
yield $this->systemMsg('Erweiterte Shopsuche wird geprüft…', 'think');
$repairPayload = $this->repairShopResults(
prompt: $prompt,
userId: $userId,
commerceIntent: $commerceIntent,
commerceHistoryContext: $commerceHistoryContext,
primaryQuery: $shopSearchQuery,
primaryShopResults: $primaryShopResults,
knowledgeChunks: $knowledgeChunks
);
}
$shopResults = $repairPayload['results'];
$attemptedShopRepair = $repairPayload['attemptedRepair'];
@@ -247,6 +267,7 @@ final readonly class AgentRunner
'attemptedShopRepair' => $attemptedShopRepair,
'usedShopRepair' => $usedShopRepair,
'shopRepairQueries' => $shopRepairQueries,
'primaryShopSearchHadSystemFailure' => $primaryShopSearchHadSystemFailure,
'knowledgeChunkCount' => count($knowledgeChunks),
'knowledgeRetrievalPrompt' => $knowledgeRetrievalPrompt,
'usedFollowUpRetrievalContext' => $usedFollowUpRetrievalContext,
@@ -534,11 +555,14 @@ final readonly class AgentRunner
return trim($value);
}
/**
* @return Generator<int, string, mixed, string>
*/
private function buildOptimizedShopQuery(
string $prompt,
string $userId,
string $commerceHistoryContext = ''
): string {
): Generator {
$shopPrompt = trim($this->agentRunnerConfig->getShopPrompt(
$prompt,
$commerceHistoryContext
@@ -549,6 +573,7 @@ final readonly class AgentRunner
}
$optimizedQuery = '';
$lastHeartbeatAt = time();
$this->thinkSuppressor->reset();
try {
@@ -557,6 +582,11 @@ final readonly class AgentRunner
continue;
}
if (time() - $lastHeartbeatAt >= 2) {
yield $this->systemMsg('Shop-Suchanfrage wird optimiert…', 'think');
$lastHeartbeatAt = time();
}
$cleanToken = $this->thinkSuppressor->filter($token);
if ($cleanToken === '') {

View File

@@ -84,6 +84,17 @@ final readonly class SearchRepairService
foreach ($repairQueries as $repairQuery) {
$results = $this->shopSearchService->search($repairQuery, $commerceIntent, '');
if ($this->shopSearchService->hadLastSearchSystemFailure()) {
$this->logger->warning('Shop repair stopped after Store API system failure', [
'commerceIntent' => $commerceIntent,
'primaryQuery' => $primaryQuery,
'failedRepairQuery' => $repairQuery,
'failureReason' => $this->shopSearchService->getLastSearchFailureReason(),
]);
break;
}
if ($results === []) {
continue;
}

View File

@@ -10,30 +10,44 @@ use App\Commerce\Dto\ShopProductResult;
use App\Config\ShopServiceConfig;
use App\Shopware\ShopwareCriteriaBuilder;
use App\Shopware\StoreApiClient;
use App\Shopware\StoreApiException;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
final readonly class ShopSearchService
final class ShopSearchService
{
private const FOCUS_NEUTRAL = 'neutral';
private const FOCUS_DEVICE = 'device';
private const FOCUS_ACCESSORY = 'accessory';
private bool $lastSearchHadSystemFailure = false;
private ?string $lastSearchFailureReason = null;
public function __construct(
private CommerceQueryParser $queryParser,
private ShopwareCriteriaBuilder $criteriaBuilder,
private StoreApiClient $storeApiClient,
private ShopServiceConfig $shopConfig,
private LoggerInterface $logger,
private bool $enabled = true,
private int $maxResults = 25,
private string $baseUrl = ''
private readonly CommerceQueryParser $queryParser,
private readonly ShopwareCriteriaBuilder $criteriaBuilder,
private readonly StoreApiClient $storeApiClient,
private readonly ShopServiceConfig $shopConfig,
private readonly LoggerInterface $logger,
private readonly bool $enabled = true,
private readonly int $maxResults = 25,
private readonly string $baseUrl = ''
) {
}
public function hadLastSearchSystemFailure(): bool
{
return $this->lastSearchHadSystemFailure;
}
public function getLastSearchFailureReason(): ?string
{
return $this->lastSearchFailureReason;
}
/**
* @return ShopProductResult[]
*/
@@ -43,6 +57,8 @@ final readonly class ShopSearchService
string $commerceHistoryContext = '',
?CommerceReferenceContext $referenceContext = null
): array {
$this->resetLastSearchFailure();
if (!$this->enabled) {
$this->logger->info('Shop search skipped because commerce search is disabled', [
'commerceIntent' => $commerceIntent,
@@ -335,30 +351,114 @@ final readonly class ShopSearchService
try {
$response = $this->storeApiClient->searchProducts($criteria);
return $this->mapAndLogSearchResponse(
response: $response,
query: $query,
commerceIntent: $commerceIntent,
originalPrompt: $originalPrompt,
usesHistoryContext: $usesHistoryContext,
usedSafeCriteria: false
);
} catch (StoreApiException $e) {
if ($e->isSafeCriteriaRetryRecommended()) {
$safeResults = $this->retryWithSafeCriteria(
query: $query,
commerceIntent: $commerceIntent,
originalPrompt: $originalPrompt,
usesHistoryContext: $usesHistoryContext,
previousException: $e
);
if ($safeResults !== null) {
return $safeResults;
}
}
$this->recordFailedSearch($e);
$this->logShopSearchFailure($query, $commerceIntent, $originalPrompt, $usesHistoryContext, $e);
return [];
} catch (
ClientExceptionInterface |
RedirectionExceptionInterface |
ServerExceptionInterface |
TransportExceptionInterface |
\RuntimeException $e
ClientExceptionInterface |
RedirectionExceptionInterface |
ServerExceptionInterface |
TransportExceptionInterface |
\RuntimeException $e
) {
$this->logger->warning('Shop search request failed', [
$this->recordFailedSearch($e);
$this->logShopSearchFailure($query, $commerceIntent, $originalPrompt, $usesHistoryContext, $e);
return [];
}
}
/**
* @return ShopProductResult[]|null
*/
private function retryWithSafeCriteria(
CommerceSearchQuery $query,
string $commerceIntent,
string $originalPrompt,
bool $usesHistoryContext,
StoreApiException $previousException
): ?array {
$this->logger->warning('Shop search retrying with safe criteria', [
'commerceIntent' => $commerceIntent,
'originalPrompt' => $originalPrompt,
'normalizedPrompt' => $query->normalizedPrompt,
'searchText' => $query->searchText,
'usesHistoryContext' => $usesHistoryContext,
'previousStatusCode' => $previousException->getStatusCode(),
'previousUtf8Failure' => $previousException->isUtf8Failure(),
'previousExceptionMessage' => $previousException->getMessage(),
]);
try {
$safeCriteria = $this->criteriaBuilder->buildSafe($query, $this->maxResults);
$response = $this->storeApiClient->searchProducts($safeCriteria);
return $this->mapAndLogSearchResponse(
response: $response,
query: $query,
commerceIntent: $commerceIntent,
originalPrompt: $originalPrompt,
usesHistoryContext: $usesHistoryContext,
usedSafeCriteria: true
);
} catch (
ClientExceptionInterface |
RedirectionExceptionInterface |
ServerExceptionInterface |
TransportExceptionInterface |
\RuntimeException $safeException
) {
$this->recordFailedSearch($safeException);
$this->logger->warning('Shop search safe criteria retry failed', [
'commerceIntent' => $commerceIntent,
'originalPrompt' => $originalPrompt,
'normalizedPrompt' => $query->normalizedPrompt,
'searchText' => $query->searchText,
'brand' => $query->brand,
'sizes' => $query->sizes,
'priceMin' => $query->priceMin,
'priceMax' => $query->priceMax,
'usesHistoryContext' => $usesHistoryContext,
'exceptionClass' => $e::class,
'exceptionMessage' => $e->getMessage(),
'exceptionClass' => $safeException::class,
'exceptionMessage' => $safeException->getMessage(),
]);
return [];
return null;
}
}
/**
* @param array<mixed> $response
* @return ShopProductResult[]
*/
private function mapAndLogSearchResponse(
array $response,
CommerceSearchQuery $query,
string $commerceIntent,
string $originalPrompt,
bool $usesHistoryContext,
bool $usedSafeCriteria
): array {
$mappedProducts = $this->mapProducts($response);
$rankedProducts = $this->rerankProducts($mappedProducts, $query);
@@ -372,6 +472,7 @@ final readonly class ShopSearchService
'priceMin' => $query->priceMin,
'priceMax' => $query->priceMax,
'usesHistoryContext' => $usesHistoryContext,
'usedSafeCriteria' => $usedSafeCriteria,
'rawElementsCount' => is_array($response['elements'] ?? null) ? count($response['elements']) : 0,
'mappedProductsCount' => count($mappedProducts),
'rankedProductsCount' => count($rankedProducts),
@@ -380,6 +481,50 @@ final readonly class ShopSearchService
return $rankedProducts;
}
private function logShopSearchFailure(
CommerceSearchQuery $query,
string $commerceIntent,
string $originalPrompt,
bool $usesHistoryContext,
\Throwable $e
): void {
$this->logger->warning('Shop search request failed', [
'commerceIntent' => $commerceIntent,
'originalPrompt' => $originalPrompt,
'normalizedPrompt' => $query->normalizedPrompt,
'searchText' => $query->searchText,
'brand' => $query->brand,
'sizes' => $query->sizes,
'priceMin' => $query->priceMin,
'priceMax' => $query->priceMax,
'usesHistoryContext' => $usesHistoryContext,
'systemFailure' => $this->lastSearchHadSystemFailure,
'failureReason' => $this->lastSearchFailureReason,
'exceptionClass' => $e::class,
'exceptionMessage' => $e->getMessage(),
]);
}
private function resetLastSearchFailure(): void
{
$this->lastSearchHadSystemFailure = false;
$this->lastSearchFailureReason = null;
}
private function recordFailedSearch(\Throwable $e): void
{
$isSystemFailure = $e instanceof StoreApiException
? $e->isSystemFailure()
: $e instanceof ServerExceptionInterface || $e instanceof TransportExceptionInterface;
if (!$isSystemFailure) {
return;
}
$this->lastSearchHadSystemFailure = true;
$this->lastSearchFailureReason = $e->getMessage();
}
/**
* @param ShopProductResult[] $referenceProbeResults
* @param ShopProductResult[] $rankedProducts

View File

@@ -40,6 +40,10 @@ final readonly class AskSseController
return new StreamedResponse(
function () use ($prompt, $clientId, $cookieResponse, $includeFullContext): void {
@set_time_limit(0);
@ini_set('output_buffering', 'off');
@ini_set('zlib.output_compression', '0');
while (ob_get_level() > 0) {
ob_end_flush();
}
@@ -49,7 +53,7 @@ final readonly class AskSseController
}
echo "retry: 3000\n\n";
flush();
$this->sendComment('stream-open');
if ($prompt === '') {
$this->sendEvent('error', 'Empty prompt');
@@ -59,6 +63,10 @@ final readonly class AskSseController
try {
foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) {
if (connection_aborted() === 1) {
return;
}
$chunk = str_replace(["\r\n", "\r"], "\n", $chunk);
$this->sendData($chunk);
}
@@ -77,12 +85,18 @@ final readonly class AskSseController
'Cache-Control' => 'no-cache, no-store, must-revalidate',
'Connection' => 'keep-alive',
'X-Accel-Buffering' => 'no',
'X-Content-Type-Options' => 'nosniff',
]
);
}
private function sendData(string $data): void
{
if ($data === '') {
$this->sendComment('keepalive');
return;
}
$lines = explode("\n", $data);
foreach ($lines as $line) {
@@ -90,7 +104,7 @@ final readonly class AskSseController
}
echo "\n\n";
flush();
$this->flushOutput();
}
private function sendEvent(string $event, string $data): void
@@ -99,6 +113,23 @@ final readonly class AskSseController
echo "event: {$event}\n";
echo "data: {$safe}\n\n";
flush();
$this->flushOutput();
}
private function sendComment(string $comment): void
{
$safe = str_replace(["\r", "\n"], ' ', $comment);
echo ': ' . $safe . "\n\n";
$this->flushOutput();
}
private function flushOutput(): void
{
if (function_exists('ob_flush')) {
@ob_flush();
}
@flush();
}
}

View File

@@ -14,24 +14,61 @@ final class ShopwareCriteriaBuilder
?bool $grouping = true
): array
{
return $this->buildCriteria(
query: $query,
limit: $limit,
grouping: $grouping,
includeRichTextFields: true
);
}
/**
* Builds a smaller Store API criteria payload for retrying Shopware responses
* that fail while JSON-encoding product descriptions or custom fields.
*/
public function buildSafe(
CommerceSearchQuery $query,
?int $limit = 25,
?bool $grouping = true
): array
{
return $this->buildCriteria(
query: $query,
limit: $limit,
grouping: $grouping,
includeRichTextFields: false
);
}
private function buildCriteria(
CommerceSearchQuery $query,
?int $limit,
?bool $grouping,
bool $includeRichTextFields
): array {
$productIncludes = [
'id',
'name',
'productNumber',
'available',
'calculatedPrice',
'seoUrls',
'manufacturer',
'translated.name',
'cover',
];
if ($includeRichTextFields) {
$productIncludes[] = 'description';
$productIncludes[] = 'customFields';
}
$criteria = [
'page' => 1,
'limit' => max(1, $limit),
'total-count-mode' => 0,
'includes' => [
'product' => [
'id',
'name',
'description',
'productNumber',
'available',
'calculatedPrice',
'seoUrls',
'manufacturer',
'translated.name',
'cover',
'customFields'
],
'product' => $productIncludes,
'product_manufacturer' => [
'name',
],
@@ -64,7 +101,7 @@ final class ShopwareCriteriaBuilder
'associations' => [
'media' => [
'associations' => [
"thumbnails" => new \stdClass()
'thumbnails' => new \stdClass()
]
]
]
@@ -73,7 +110,7 @@ final class ShopwareCriteriaBuilder
];
if ($grouping) {
$criteria["grouping"] = ["parentId"];
$criteria['grouping'] = ['parentId'];
}
if ($query->searchText !== '') {
@@ -105,4 +142,4 @@ final class ShopwareCriteriaBuilder
return $criteria;
}
}
}

View File

@@ -26,6 +26,7 @@ final readonly class StoreApiClient
* @throws ServerExceptionInterface
* @throws RedirectionExceptionInterface
* @throws ClientExceptionInterface
* @throws StoreApiException
*/
public function searchProducts(array $criteria): array
{
@@ -43,7 +44,7 @@ final readonly class StoreApiClient
$response = $this->httpClient->request('POST', $url, [
'headers' => [
'Content-Type' => 'application/json',
'Content-Type' => 'application/json; charset=utf-8',
'Accept' => 'application/json',
'sw-access-key' => $this->salesChannelAccessKey,
],
@@ -56,22 +57,54 @@ final readonly class StoreApiClient
$content = $this->sanitizeString($content);
if ($statusCode < 200 || $statusCode >= 300) {
throw new RuntimeException(sprintf(
'Shopware Store API request failed with status %d. Response: %s',
$statusCode,
mb_substr(trim($content), 0, 1000)
));
throw $this->buildHttpFailure($statusCode, $content);
}
$data = json_decode($content, true);
if (!is_array($data)) {
throw new RuntimeException('Shopware Store API returned invalid JSON.');
throw new StoreApiException(
'Shopware Store API returned invalid JSON.',
$statusCode,
true,
$this->containsUtf8FailureSignal($content),
true
);
}
return $data;
}
private function buildHttpFailure(int $statusCode, string $content): StoreApiException
{
$preview = mb_substr(trim($content), 0, 1000);
$utf8Failure = $this->containsUtf8FailureSignal($preview);
$serverFailure = $statusCode >= 500;
return new StoreApiException(
sprintf(
'Shopware Store API request failed with status %d. Response: %s',
$statusCode,
$preview
),
$statusCode,
$serverFailure,
$utf8Failure,
$serverFailure || $utf8Failure
);
}
private function containsUtf8FailureSignal(string $content): bool
{
$normalized = mb_strtolower($content, 'UTF-8');
return str_contains($normalized, 'malformed utf-8')
|| str_contains($normalized, 'malformed utf8')
|| str_contains($normalized, 'invalid utf-8')
|| str_contains($normalized, 'invalid utf8')
|| str_contains($normalized, 'possibly incorrectly encoded');
}
private function sanitizeValue(mixed $value): mixed
{
if (is_array($value)) {
@@ -115,4 +148,4 @@ final readonly class StoreApiClient
return '';
}
}
}

View File

@@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
namespace App\Shopware;
use RuntimeException;
use Throwable;
final class StoreApiException extends RuntimeException
{
public function __construct(
string $message,
private readonly ?int $statusCode = null,
private readonly bool $serverFailure = false,
private readonly bool $utf8Failure = false,
private readonly bool $safeCriteriaRetryRecommended = false,
?Throwable $previous = null
) {
parent::__construct($message, 0, $previous);
}
public function getStatusCode(): ?int
{
return $this->statusCode;
}
public function isServerFailure(): bool
{
return $this->serverFailure;
}
public function isUtf8Failure(): bool
{
return $this->utf8Failure;
}
public function isSafeCriteriaRetryRecommended(): bool
{
return $this->safeCriteriaRetryRecommended;
}
public function isSystemFailure(): bool
{
return $this->serverFailure || $this->utf8Failure;
}
}