from __future__ import annotations from collections import defaultdict from collections.abc import Callable from datetime import datetime from typing import Any from sqlalchemy import JSON, DateTime, Integer, String, func from sqlalchemy.orm import Mapped, Session, mapped_column from core.db import Base, SessionLocal EventHandler = Callable[[str, dict[str, Any], Session], None] class EventLog(Base): __tablename__ = "events" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) type: Mapped[str] = mapped_column(String(128), index=True) payload: Mapped[dict] = mapped_column(JSON, default=dict) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) class EventBus: def __init__(self) -> None: self._handlers: dict[str, list[EventHandler]] = defaultdict(list) self._wildcards: list[tuple[str, EventHandler]] = [] def subscribe(self, event_type: str, handler: EventHandler) -> None: """Subscribe to an event type. Supports 'namespace.*' wildcards.""" if event_type.endswith(".*"): self._wildcards.append((event_type[:-2], handler)) else: self._handlers[event_type].append(handler) def publish(self, event_type: str, payload: dict[str, Any], db: Session | None = None) -> None: """Publish event. Persists to events table and calls all handlers synchronously.""" own_db = db is None if own_db: db = SessionLocal() try: db.add(EventLog(type=event_type, payload=payload)) db.commit() for h in self._handlers.get(event_type, []): try: h(event_type, payload, db) except Exception as e: # noqa: BLE001 print(f"[event-handler-error] {event_type}: {e}") for prefix, h in self._wildcards: if event_type.startswith(prefix + ".") or event_type == prefix: try: h(event_type, payload, db) except Exception as e: # noqa: BLE001 print(f"[event-handler-error] {event_type}: {e}") finally: if own_db: db.close() event_bus = EventBus()