optimize py control service

This commit is contained in:
team2
2026-02-22 09:00:06 +01:00
parent 4dfed5a797
commit 06376e0fb4
3 changed files with 569 additions and 251 deletions

384
README.md
View File

@@ -1,8 +1,8 @@
# mitho AI Agent Developer README # mitho AI Agent Developer README
Enterprise Hybrid RAG System (Symfony + NDJSON + FAISS) Enterprise Hybrid RAG System (Symfony + NDJSON + FAISS + Persistent Vector Service)
Stand: Februar 2026 Stand: Februar 2026
Status: Produktiv stabil Job-basierte Ingest-Architektur vollständig integriert Status: Produktiv stabil Job-basierte Ingest-Architektur + Persistenter Vector-Service integriert
--- ---
@@ -14,6 +14,7 @@ Hybrid-RAG-Architektur mit:
- Symfony (PHP Backend) - Symfony (PHP Backend)
- NDJSON als Single Source of Truth - NDJSON als Single Source of Truth
- FAISS als Vektorindex (immer Full Rebuild) - FAISS als Vektorindex (immer Full Rebuild)
- Persistenter Python Vector-Service (FastAPI + Uvicorn)
- Hybrid Retrieval (Keyword + Vektor) - Hybrid Retrieval (Keyword + Vektor)
- Versioniertes Dokumentmodell - Versioniertes Dokumentmodell
- Job-basierte Ingest-Pipeline - Job-basierte Ingest-Pipeline
@@ -21,28 +22,34 @@ Hybrid-RAG-Architektur mit:
- SSE-Streaming im Frontend - SSE-Streaming im Frontend
Grundprinzip: Grundprinzip:
Keine inkrementellen Vektor-Updates.
FAISS wird immer vollständig aus index.ndjson neu gebaut. - Keine inkrementellen Vektor-Updates
- FAISS wird immer vollständig aus `index.ndjson` neu gebaut
- Retrieval läuft über einen persistenten Service (kein Python-Spawn pro Anfrage)
--- ---
# 2. Architekturprinzipien # 2. Architekturprinzipien
Determinismus: ## 2.1 Determinismus
- Gleiche Dokumente + gleiche Konfiguration → identisches index.ndjson
- Gleiches index.ndjson → identisches FAISS - Gleiche Dokumente + gleiche Konfiguration → identisches `index.ndjson`
- Gleiches `index.ndjson` → identisches FAISS
- Gleiche Query → identisches Retrieval-Ergebnis - Gleiche Query → identisches Retrieval-Ergebnis
Governance: ## 2.2 Governance
- Eine aktive Version pro Dokument - Eine aktive Version pro Dokument
- Keine impliziten Index-Änderungen - Keine impliziten Index-Änderungen
- Strukturänderungen erzwingen Global Reindex - Strukturänderungen erzwingen Global Reindex
- Keine Selbstmodifikation durch KI - Keine Selbstmodifikation durch KI
Skalierbarkeit: ## 2.3 Skalierbarkeit
- NDJSON (streamingfähig) - NDJSON (streamingfähig)
- Keine RAM-basierte JSON-Arrays - Kein RAM-basiertes JSON-Array
- Zielgröße > 200k Chunks - Zielgröße > 200k Chunks
- FAISS Full Rebuild ist deterministisch
--- ---
@@ -54,10 +61,11 @@ Single Source of Truth.
- 1 JSON-Objekt pro Zeile - 1 JSON-Objekt pro Zeile
- Streaming-Append - Streaming-Append
- Deterministische Compaction by document_id - Deterministische Compaction by `document_id`
Beispielstruktur: Beispielstruktur:
```json
{ {
"chunk_id": "uuid", "chunk_id": "uuid",
"document_id": "uuid", "document_id": "uuid",
@@ -65,28 +73,32 @@ Beispielstruktur:
"text": "...", "text": "...",
"meta": { ... } "meta": { ... }
} }
```
Keine JSON-Array-Datei. Regeln:
Keine Mutation einzelner Chunks.
Nur Append + deterministische Entfernung per document_id. - Keine JSON-Array-Datei
- Keine Mutation einzelner Chunks
- Nur Append + deterministische Entfernung per `document_id`
--- ---
## 3.2 index_meta.json ## 3.2 index_meta.json
Enthält Strukturparameter: Strukturparameter:
- index_version - `index_version`
- embedding_model - `embedding_model`
- embedding_dimension - `embedding_dimension`
- chunk_size - `chunk_size`
- chunk_overlap - `chunk_overlap`
- scoring_version - `scoring_version`
- index_format - `index_format`
- vector_backend - `vector_backend`
Wenn einer dieser Werte sich ändert: Wenn einer dieser Werte sich ändert:
→ Global Reindex zwingend erforderlich.
**Global Reindex zwingend erforderlich**
--- ---
@@ -94,31 +106,117 @@ Wenn einer dieser Werte sich ändert:
Dateien: Dateien:
- vector.index - `vector.index`
- vector_meta.json (Chunk-ID Mapping) - `vector.index.meta.json`
- `vector_tags.index`
- `vector_tags.index.meta.json`
FAISS wird IMMER vollständig aus `index.ndjson` gebaut.
FAISS wird IMMER vollständig aus index.ndjson gebaut.
Keine Partial Updates. Keine Partial Updates.
Kein inkrementelles Vector-Append.
--- ---
# 4. Dokument- & Versionsmodell # 4. Persistenter Vector-Service
Retrieval läuft nicht mehr über:
- Symfony Process
- `exec()`
- `python vector_search.py` pro Anfrage
Sondern über:
**FastAPI + Uvicorn (persistent im RAM)**
## 4.1 Eigenschaften
Beim Start lädt der Service:
- Embedding-Modell
- Chunk-Index
- Tag-Index
- ID-Mappings
Diese bleiben dauerhaft im RAM.
Kein Modell-Reload pro Anfrage.
Kein Disk-Reload pro Anfrage.
Kein Python-Spawn pro Anfrage.
## 4.2 Endpoints
- `GET /health`
- `POST /search-chunks`
- `POST /search-tags`
- `POST /reload`
## 4.3 Reload-Mechanismus
Nach Global Reindex:
```bash
curl -X POST http://127.0.0.1:8090/reload
```
Lädt:
- Chunk-Index neu
- Tag-Index neu
- Modell nur wenn `embedding_model` geändert wurde
Kein Neustart nötig.
Keine Downtime.
---
# 5. Score-Gates (Routing-Sicherheit)
## 5.1 Tag-Gate
Tags steuern Routing.
Empfohlener Mindestscore:
`MIN_SCORE ≈ 0.70`
Schützt vor:
- zufälligen semantischen Treffern
- falschem Dokumentrouting
## 5.2 Chunk-Gate
Chunks sind Kontext.
Weicher Gate:
`MIN_SCORE ≈ 0.50`
Optional: relativer Score zum besten Treffer.
---
# 6. Dokument- & Versionsmodell
Document Document
→ enthält mehrere DocumentVersion → enthält mehrere `DocumentVersion`
→ genau eine Version ist aktiv → genau eine Version ist aktiv
Regel: Regel:
Es darf immer nur eine aktive Version pro Dokument existieren. Es darf immer nur eine aktive Version pro Dokument existieren.
Beim Aktivieren einer Version: Beim Aktivieren einer Version:
- Alle anderen Versionen werden inaktiv - Alle anderen Versionen werden inaktiv
- IngestStatus → PENDING - `IngestStatus → PENDING`
- Re-Ingest via Job - Re-Ingest via Job
--- ---
# 5. Ingest-Architektur (vollständig Job-basiert) # 7. Ingest-Architektur (vollständig Job-basiert)
Ingest läuft NIEMALS synchron im HTTP-Request. Ingest läuft NIEMALS synchron im HTTP-Request.
@@ -126,126 +224,44 @@ Jede Mutation am Index läuft über:
IngestJob → CLI Runner → IngestOrchestrator → IngestFlow IngestJob → CLI Runner → IngestOrchestrator → IngestFlow
--- ## 7.1 Job-Typen
## 5.1 Job-Typen - `DOCUMENT_VERSION_ACTIVATE`
- `DOCUMENT`
- `GLOBAL_REINDEX`
DOCUMENT_VERSION_ACTIVATE ## 7.2 Job-Status
- Wird genutzt für:
- Version aktivieren
- Neue Datei hochladen (Auto-Ingest)
DOCUMENT - `QUEUED`
- Manuelles Ingest einer Version - `RUNNING`
- `COMPLETED`
- `FAILED`
- `ABORTED`
GLOBAL_REINDEX CLI-Ausführung:
- Strukturänderungen
---
## 5.2 Job-Status
- QUEUED
- RUNNING
- COMPLETED
- FAILED
- ABORTED
Jobs werden über CLI ausgeführt:
```bash
php bin/console mto:agent:ingest:run <jobId> php bin/console mto:agent:ingest:run <jobId>
```
Start erfolgt asynchron per exec() aus dem Controller.
---
# 6. Admin-Flows (aktueller Stand)
## 6.1 Neue Datei hochladen (NEU: Auto-Ingest)
Beim Upload:
1. Datei speichern
2. Document + Version 1 erzeugen
3. Version 1 aktiv setzen
4. IngestJob vom Typ DOCUMENT_VERSION_ACTIVATE anlegen
5. Job asynchron starten
6. Redirect auf Job-Detailseite
Ergebnis:
Neue Dokumente werden automatisch indexiert.
---
## 6.2 Version aktivieren
1. DB-Status anpassen
2. IngestStatus → PENDING
3. DOCUMENT_VERSION_ACTIVATE Job erzeugen
4. Async Runner starten
5. Redirect zur Job-Seite
---
## 6.3 Manuelles Ingest
1. DOCUMENT Job erzeugen
2. Async Runner starten
3. Redirect zur Job-Seite
---
## 6.4 Reset
Reset löscht:
- index.ndjson
- vector.index
- vector_meta.json
- Upload-Verzeichnis
- Tabellen:
- document
- document_version
- ingest_job
Nur möglich, wenn exec() aktiv ist.
---
# 7. Ingest-Flow Details
Local Ingest (ein Dokument):
1. Extract
2. Normalize
3. Chunk deterministisch
4. Entferne alte Chunks per document_id
5. Append neue Chunks
6. Full FAISS Rebuild
Global Reindex:
1. Alle aktiven Versionen neu verarbeiten
2. Komplettes index.ndjson neu schreiben
3. FAISS neu bauen
4. index_version++
--- ---
# 8. Hybrid Retrieval # 8. Hybrid Retrieval
Ablauf: Flow:
User Query User Query
→ Keyword Retrieval → Keyword Retrieval
FAISS Vector Retrieval Tag Vector Search
→ Dokumentfilter
→ Chunk Vector Retrieval
→ Score Fusion → Score Fusion
→ NDJSON Chunk Lookup → NDJSON Lookup
→ Context Builder → Context Builder
→ LLM → LLM
→ SSE Streaming → SSE Streaming
Keyword ist Primärsignal. Keyword bleibt Primärsignal.
Vector ergänzt Semantik. Vector ergänzt Semantik.
--- ---
@@ -262,38 +278,101 @@ Keine gleichzeitigen Mutationen erlaubt.
--- ---
# 10. CLI Commands # 10. Vector Control (Production Safe)
mto:agent:ingest:run <jobId> Ein zentrales Kommando steuert:
mto:agent:vector:ingest
mto:agent:vector:search - Dependency-Check
- Auto-Install (opt-in)
- Service Start
- Service Stop
- Reload
- Status
- Health-Check
- PID-Management
## Command
`mto:agent:vector:control`
## Beispiele
Status:
```bash
bin/console mto:agent:vector:control
```
Install + Start:
```bash
bin/console mto:agent:vector:control --install --start
```
Stop:
```bash
bin/console mto:agent:vector:control --stop
```
Reload:
```bash
bin/console mto:agent:vector:control --reload
```
## Production-Safety
- PID-File unter `var/run/vector_service.pid`
- SIGTERM Stop mit Timeout
- Optional SIGKILL (`--force`)
- Health-Check mit Retry-Mechanismus
- Kein automatisches Install ohne Flag
---
# 11. CLI Commands
- `mto:agent:ingest:run <jobId>`
- `mto:agent:vector:control`
- `mto:agent:test`
- `mto:agent:chat`
Alle Commands unter: Alle Commands unter:
mto:agent:*
`mto:agent:*`
--- ---
# 11. Failure Modes # 12. Failure Modes
- Vector index fehlt → vector ingest ausführen Vector Service nicht erreichbar
- index_meta mismatch → Global Reindex `vector:control --start`
- exec deaktiviert → Async-Start schlägt fehl
- Lock aktiv → Parallel-Ingest blockiert Reload Endpoint fehlt
→ falsche Service-Version
index_meta mismatch
→ Global Reindex
Lock aktiv
→ Parallel-Ingest blockiert
--- ---
# 12. Non-Goals # 13. Non-Goals
- Kein Online-Learning - Kein Online-Learning
- Keine inkrementellen FAISS Updates - Keine inkrementellen FAISS Updates
- Keine selbstverändernden Prompts - Keine selbstverändernden Prompts
- Kein Auto-Merging von Chunks - Kein Auto-Merging von Chunks
- Kein Vector-Append im Runtime
Strukturänderungen → explizit + reindex. Strukturänderungen → explizit + Reindex.
--- ---
# 13. Zusammenfassung # 14. Zusammenfassung
Dieses System ist: Dieses System ist:
@@ -304,11 +383,14 @@ Dieses System ist:
- enterprise-ready - enterprise-ready
- job-basiert - job-basiert
- versionssicher - versionssicher
- persistent im Retrieval
- ohne Spawn-Overhead
- reload-fähig ohne Downtime
Wichtige Neuerung: Wichtige Neuerungen:
Neue Dokumente lösen jetzt automatisch einen IngestJob aus
(exakt derselbe Mechanismus wie bei Version-Aktivierung).
Kein HTTP-Ingest mehr. - Persistenter Vector-Service ersetzt CLI-Spawn
Keine Inline-Rebuilds. - Score-Gates verhindern falsches Routing
Alles läuft über das Job-System. - Reload-Endpoint vermeidet Neustarts
- Production-Safe Control Command integriert
- Vollständige Trennung von Ingest und Runtime

View File

@@ -9,19 +9,26 @@ use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Process;
#[AsCommand( #[AsCommand(
name: 'mto:agent:vector:control', name: 'mto:agent:vector:control',
description: 'Vector environment control' description: 'Production-safe vector service control (deps/install/start/stop/reload/status)'
)] )]
final class VectorControlCommand extends Command final class VectorControlCommand extends Command
{ {
protected function configure(): void protected function configure(): void
{ {
$this $this
->addOption('install', null, InputOption::VALUE_NONE) ->addOption('install', null, InputOption::VALUE_NONE, 'Install missing python deps into .venv')
->addOption('start', null, InputOption::VALUE_NONE) ->addOption('start', null, InputOption::VALUE_NONE, 'Start service if not running')
->addOption('reload', null, InputOption::VALUE_NONE); ->addOption('stop', null, InputOption::VALUE_NONE, 'Stop service using PID file')
->addOption('force', null, InputOption::VALUE_NONE, 'Force stop (SIGKILL) if needed')
->addOption('reload', null, InputOption::VALUE_NONE, 'Trigger /reload')
->addOption('status', null, InputOption::VALUE_NONE, 'Print status')
->addOption('foreground', null, InputOption::VALUE_NONE, 'Start in foreground (rare)')
->addOption('port', null, InputOption::VALUE_OPTIONAL, 'Port (default 8090)', '8090')
->addOption('host', null, InputOption::VALUE_OPTIONAL, 'Host (default 0.0.0.0)', '0.0.0.0');
} }
protected function execute(InputInterface $input, OutputInterface $output): int protected function execute(InputInterface $input, OutputInterface $output): int
@@ -31,21 +38,37 @@ final class VectorControlCommand extends Command
if ($input->getOption('install')) { if ($input->getOption('install')) {
$cmd[] = '--install'; $cmd[] = '--install';
} }
if ($input->getOption('start')) { if ($input->getOption('start')) {
$cmd[] = '--start'; $cmd[] = '--start';
} }
if ($input->getOption('stop')) {
$cmd[] = '--stop';
}
if ($input->getOption('force')) {
$cmd[] = '--force';
}
if ($input->getOption('reload')) { if ($input->getOption('reload')) {
$cmd[] = '--reload'; $cmd[] = '--reload';
} }
if ($input->getOption('status')) {
$cmd[] = '--status';
}
if ($input->getOption('foreground')) {
$cmd[] = '--foreground';
}
$process = new \Symfony\Component\Process\Process($cmd); $cmd[] = '--port';
$cmd[] = (string)$input->getOption('port');
$cmd[] = '--host';
$cmd[] = (string)$input->getOption('host');
$process = new Process($cmd);
$process->setTimeout(300); $process->setTimeout(300);
$process->run(); $process->run();
$output->writeln($process->getOutput()); $output->writeln($process->getOutput());
return Command::SUCCESS; return $process->isSuccessful() ? Command::SUCCESS : Command::FAILURE;
} }
} }

View File

@@ -4,13 +4,27 @@ import argparse
import importlib import importlib
import json import json
import os import os
import signal
import socket
import subprocess import subprocess
import sys import sys
import time import time
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Tuple
BASE_PATH = Path(__file__).resolve().parents[2] BASE_PATH = Path(__file__).resolve().parents[2]
KNOWLEDGE_DIR = BASE_PATH / "var" / "knowledge" VENV_DIR = BASE_PATH / ".venv"
VENV_PY = VENV_DIR / "bin" / "python"
VENV_PIP = VENV_DIR / "bin" / "pip"
UVICORN_BIN = VENV_DIR / "bin" / "uvicorn"
PID_DIR = BASE_PATH / "var" / "run"
PID_FILE = PID_DIR / "vector_service.pid"
DEFAULT_HOST = "0.0.0.0"
DEFAULT_PORT = 8090
DEFAULT_HEALTH_URL = "http://127.0.0.1:{port}/health"
DEFAULT_RELOAD_URL = "http://127.0.0.1:{port}/reload"
REQUIRED_MODULES = [ REQUIRED_MODULES = [
"fastapi", "fastapi",
@@ -20,11 +34,78 @@ REQUIRED_MODULES = [
"numpy", "numpy",
] ]
VENV_PIP = BASE_PATH / ".venv" / "bin" / "pip" # If you want pinning later, do it here. For now keep simple.
UVICORN_BIN = BASE_PATH / ".venv" / "bin" / "uvicorn" INSTALL_PACKAGES = [
"fastapi",
"uvicorn",
"numpy",
"sentence-transformers",
# faiss: depending on your env, it might be "faiss-cpu" (pip) or system package.
# Don't force install unless missing import "faiss" and you opt-in via --install.
"faiss-cpu",
]
def check_modules(): def _now_ms() -> int:
return int(time.time() * 1000)
def _read_pid() -> Optional[int]:
try:
if PID_FILE.exists():
content = PID_FILE.read_text(encoding="utf-8").strip()
if content.isdigit():
return int(content)
except Exception:
return None
return None
def _write_pid(pid: int) -> None:
PID_DIR.mkdir(parents=True, exist_ok=True)
PID_FILE.write_text(str(pid), encoding="utf-8")
def _remove_pid() -> None:
try:
if PID_FILE.exists():
PID_FILE.unlink()
except Exception:
pass
def _pid_is_running(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except Exception:
return False
def _is_port_open(host: str, port: int, timeout: float = 0.3) -> bool:
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except Exception:
return False
def _curl(url: str, timeout_seconds: int = 2) -> Tuple[int, str]:
# We use curl because it's usually available in your container;
# if you prefer, we can switch to urllib.
cmd = ["curl", "-s", "-m", str(timeout_seconds), "-w", "\n%{http_code}", url]
p = subprocess.run(cmd, capture_output=True, text=True)
out = (p.stdout or "").rstrip("\n")
if "\n" in out:
body, code = out.rsplit("\n", 1)
try:
return int(code), body
except Exception:
return 0, body
return 0, out
def check_modules() -> List[str]:
missing = [] missing = []
for module in REQUIRED_MODULES: for module in REQUIRED_MODULES:
try: try:
@@ -34,96 +115,228 @@ def check_modules():
return missing return missing
def install_modules(modules): def install_missing_modules(missing: List[str]) -> Dict[str, str]:
if not modules: # map missing module names to pip packages (faiss -> faiss-cpu, sentence_transformers -> sentence-transformers)
return mod_to_pkg = {
subprocess.call([str(VENV_PIP), "install", *modules]) "fastapi": "fastapi",
"uvicorn": "uvicorn",
"numpy": "numpy",
"sentence_transformers": "sentence-transformers",
"faiss": "faiss-cpu",
}
pkgs = []
for m in missing:
pkgs.append(mod_to_pkg.get(m, m))
if not VENV_PIP.exists():
return {"status": "error", "detail": "pip not found in .venv"}
cmd = [str(VENV_PIP), "install", *pkgs]
p = subprocess.run(cmd, capture_output=True, text=True)
if p.returncode != 0:
return {"status": "error", "detail": (p.stderr or p.stdout or "pip install failed").strip()}
return {"status": "ok", "detail": "installed: " + " ".join(pkgs)}
def service_running(): def service_status(port: int) -> Dict:
result = subprocess.run( pid = _read_pid()
["ps", "aux"], pid_running = bool(pid and _pid_is_running(pid))
capture_output=True, # if pid file is stale, remove it
text=True if pid and not pid_running:
) _remove_pid()
return "uvicorn src.Vector.vector_service:app" in result.stdout pid = None
health_code, health_body = _curl(DEFAULT_HEALTH_URL.format(port=port), timeout_seconds=2)
health_ok = health_code == 200 and health_body.strip() != ""
def start_service(): return {
subprocess.Popen([ "pid_file": str(PID_FILE),
str(UVICORN_BIN), "pid": pid,
"src.Vector.vector_service:app", "pid_running": pid_running,
"--host", "0.0.0.0", "health_code": health_code,
"--port", "8090" "health_body": health_body if len(health_body) <= 600 else health_body[:600] + "...",
]) "healthy": health_ok,
time.sleep(2) "port": port,
def reload_service():
subprocess.call([
"curl",
"-s",
"-X",
"POST",
"http://127.0.0.1:8090/reload"
])
def health_check():
try:
result = subprocess.run(
["curl", "-s", "http://127.0.0.1:8090/health"],
capture_output=True,
text=True
)
return result.stdout.strip()
except Exception:
return None
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--install", action="store_true")
parser.add_argument("--start", action="store_true")
parser.add_argument("--reload", action="store_true")
args = parser.parse_args()
result = {
"modules_missing": [],
"service_running": False,
"health": None,
"actions": []
} }
# 1⃣ Check modules
def start_service(host: str, port: int, background: bool, health_retries: int, health_wait_ms: int) -> Dict:
# already running?
st = service_status(port)
if st["pid_running"] and st["healthy"]:
return {"status": "ok", "detail": "already running", "status_info": st}
if not UVICORN_BIN.exists():
return {"status": "error", "detail": "uvicorn not found in .venv/bin/uvicorn"}
# If port already open but pidfile missing, we still consider it running; user can fix by stop with --force later
if _is_port_open("127.0.0.1", port):
# Try health anyway
st2 = service_status(port)
if st2["healthy"]:
return {"status": "ok", "detail": "port already in use but service healthy", "status_info": st2}
return {"status": "error", "detail": f"port {port} already in use, and /health not healthy"}
cmd = [
str(UVICORN_BIN),
"src.Vector.vector_service:app",
"--host", host,
"--port", str(port),
]
# production: no --reload
# run in background by default
if background:
p = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=str(BASE_PATH),
start_new_session=True, # detach from terminal
)
_write_pid(p.pid)
else:
# foreground start (rare in production)
p = subprocess.Popen(cmd, cwd=str(BASE_PATH))
_write_pid(p.pid)
# wait for health
last = None
for _ in range(max(1, health_retries)):
time.sleep(health_wait_ms / 1000.0)
stx = service_status(port)
last = stx
if stx["healthy"]:
return {"status": "ok", "detail": "started", "status_info": stx}
return {"status": "error", "detail": "started but health not OK", "status_info": last}
def stop_service(port: int, force: bool = False, wait_seconds: int = 5) -> Dict:
pid = _read_pid()
if not pid:
# nothing to stop via pid; still check health
st = service_status(port)
if st["healthy"]:
return {"status": "error", "detail": "service healthy but no PID file (cannot stop safely)", "status_info": st}
return {"status": "ok", "detail": "not running"}
if not _pid_is_running(pid):
_remove_pid()
return {"status": "ok", "detail": "not running (stale pid removed)"}
# SIGTERM
try:
os.kill(pid, signal.SIGTERM)
except Exception as e:
if force:
try:
os.kill(pid, signal.SIGKILL)
_remove_pid()
return {"status": "ok", "detail": "killed (SIGKILL)"}
except Exception as e2:
return {"status": "error", "detail": f"failed to kill: {e2}"}
return {"status": "error", "detail": f"failed to stop: {e}"}
# wait for exit
end = time.time() + max(1, wait_seconds)
while time.time() < end:
if not _pid_is_running(pid):
_remove_pid()
return {"status": "ok", "detail": "stopped"}
time.sleep(0.2)
if force:
try:
os.kill(pid, signal.SIGKILL)
_remove_pid()
return {"status": "ok", "detail": "forced stop (SIGKILL)"}
except Exception as e:
return {"status": "error", "detail": f"failed to SIGKILL: {e}"}
return {"status": "error", "detail": "timeout stopping (use --force)"}
def reload_service(port: int) -> Dict:
code, body = _curl(DEFAULT_RELOAD_URL.format(port=port), timeout_seconds=5)
if code == 200 and "reloaded" in body:
return {"status": "ok", "detail": body}
if code == 404:
return {"status": "error", "detail": "reload endpoint not found (wrong service version?)"}
return {"status": "error", "detail": f"reload failed (http {code}): {body}"}
def main() -> int:
parser = argparse.ArgumentParser(description="Production-safe vector service control")
parser.add_argument("--install", action="store_true", help="Install missing python deps into .venv")
parser.add_argument("--start", action="store_true", help="Start service if not running")
parser.add_argument("--stop", action="store_true", help="Stop service using PID file")
parser.add_argument("--force", action="store_true", help="Force stop (SIGKILL) if needed")
parser.add_argument("--reload", action="store_true", help="Trigger /reload")
parser.add_argument("--status", action="store_true", help="Print status (default if no action)")
parser.add_argument("--port", type=int, default=DEFAULT_PORT)
parser.add_argument("--host", type=str, default=DEFAULT_HOST)
parser.add_argument("--foreground", action="store_true", help="Start in foreground (default background)")
parser.add_argument("--health-retries", type=int, default=20)
parser.add_argument("--health-wait-ms", type=int, default=250)
args = parser.parse_args()
started_ms = _now_ms()
out: Dict = {
"ts_ms": started_ms,
"base_path": str(BASE_PATH),
"venv_python": str(VENV_PY),
"pid_file": str(PID_FILE),
"actions": [],
"results": {},
}
# sanity: venv exists
if not VENV_PY.exists():
out["results"]["venv"] = {"status": "error", "detail": ".venv/bin/python not found"}
print(json.dumps(out, indent=2))
return 2
# 1) deps check
missing = check_modules() missing = check_modules()
result["modules_missing"] = missing out["results"]["modules_missing"] = missing
if missing and args.install: if missing and args.install:
install_modules(missing) out["actions"].append("install")
result["actions"].append("modules_installed") out["results"]["install"] = install_missing_modules(missing)
# re-check after install
missing2 = check_modules()
out["results"]["modules_missing_after"] = missing2
# 2️⃣ Service check # 2) service actions
running = service_running() if args.stop:
result["service_running"] = running out["actions"].append("stop")
out["results"]["stop"] = stop_service(args.port, force=args.force)
if not running and args.start: if args.start:
start_service() out["actions"].append("start")
result["actions"].append("service_started") out["results"]["start"] = start_service(
running = True host=args.host,
port=args.port,
background=not args.foreground,
health_retries=args.health_retries,
health_wait_ms=args.health_wait_ms,
)
# 3⃣ Reload
if args.reload: if args.reload:
reload_service() out["actions"].append("reload")
result["actions"].append("service_reloaded") out["results"]["reload"] = reload_service(args.port)
# 4⃣ Health # default: status (or if requested)
if running: if args.status or (not args.install and not args.start and not args.stop and not args.reload):
result["health"] = health_check() out["actions"].append("status")
out["results"]["status"] = service_status(args.port)
print(json.dumps(result, indent=2)) out["duration_ms"] = _now_ms() - started_ms
print(json.dumps(out, indent=2))
return 0
if __name__ == "__main__": if __name__ == "__main__":
main() raise SystemExit(main())