from __future__ import annotations import logging from opentelemetry import metrics, trace from sqlalchemy.orm import sessionmaker, Session from app.core.audit import append_audit from app.domain.aw.models import AWSalesForecast, AWRepScore, AWProductDemand, AWAnomalyRun LOGGER = logging.getLogger(__name__) tracer = trace.get_tracer("otel-bi.domain.aw") meter = metrics.get_meter("otel-bi.domain.aw") _persist_counter = meter.create_counter( "aw_persist_writes_total", description="Number of AW PostgreSQL write operations", ) def _current_span_context() -> tuple[str | None, str | None]: ctx = trace.get_current_span().get_span_context() if not ctx.is_valid: return None, None return f"{ctx.trace_id:032x}", f"{ctx.span_id:016x}" def _actor_type(trigger_source: str) -> str: return "scheduler" if trigger_source.startswith("scheduler") else "api" # --------------------------------------------------------------------------- # Persist functions — called after Go service returns data # --------------------------------------------------------------------------- def persist_forecast( factory: sessionmaker[Session], data: list[dict], horizon_days: int, trigger_source: str, ) -> None: trace_id, span_id = _current_span_context() try: with factory() as session: session.add(AWSalesForecast( horizon_days=horizon_days, point_count=len(data), trigger_source=trigger_source, trace_id=trace_id, span_id=span_id, payload=data, )) session.commit() _persist_counter.add(1, {"entity": "sales_forecast"}) except Exception as exc: # noqa: BLE001 LOGGER.warning("Failed to persist AW forecast: %s", exc) append_audit( factory, action="forecast.generated", actor_type=_actor_type(trigger_source), actor_id=trigger_source, domain="aw", service="otel-bi-backend", entity_type="sales_forecast", payload={"horizon_days": horizon_days, "point_count": len(data)}, ) def persist_rep_scores( factory: sessionmaker[Session], data: list[dict], top_n: int, trigger_source: str, ) -> None: trace_id, span_id = _current_span_context() try: with factory() as session: session.add(AWRepScore( rep_count=len(data), trigger_source=trigger_source, trace_id=trace_id, span_id=span_id, payload=data, )) session.commit() _persist_counter.add(1, {"entity": "rep_scores"}) except Exception as exc: # noqa: BLE001 LOGGER.warning("Failed to persist AW rep scores: %s", exc) append_audit( factory, action="scores.generated", actor_type=_actor_type(trigger_source), actor_id=trigger_source, domain="aw", service="otel-bi-backend", entity_type="rep_scores", payload={"rep_count": len(data), "top_n": top_n}, ) def persist_product_demand( factory: sessionmaker[Session], data: list[dict], top_n: int, trigger_source: str, ) -> None: trace_id, span_id = _current_span_context() try: with factory() as session: session.add(AWProductDemand( product_count=len(data), top_n=top_n, trigger_source=trigger_source, trace_id=trace_id, span_id=span_id, payload=data, )) session.commit() _persist_counter.add(1, {"entity": "product_demand"}) except Exception as exc: # noqa: BLE001 LOGGER.warning("Failed to persist AW product demand: %s", exc) append_audit( factory, action="scores.generated", actor_type=_actor_type(trigger_source), actor_id=trigger_source, domain="aw", service="otel-bi-backend", entity_type="product_demand", payload={"product_count": len(data), "top_n": top_n}, ) def persist_anomaly_run( factory: sessionmaker[Session], data: list[dict], trigger_source: str, ) -> None: anomaly_count = sum(1 for p in data if p.get("is_anomaly")) trace_id, span_id = _current_span_context() try: with factory() as session: session.add(AWAnomalyRun( anomaly_count=anomaly_count, series_days=365, window_days=30, threshold_sigma=2.0, trigger_source=trigger_source, trace_id=trace_id, span_id=span_id, payload=data, )) session.commit() _persist_counter.add(1, {"entity": "anomaly_run"}) except Exception as exc: # noqa: BLE001 LOGGER.warning("Failed to persist AW anomaly run: %s", exc) append_audit( factory, action="anomaly_detection.ran", actor_type=_actor_type(trigger_source), actor_id=trigger_source, domain="aw", service="otel-bi-backend", entity_type="anomaly_detection", payload={"series_days": 365, "window_days": 30, "anomaly_count": anomaly_count}, ) # --------------------------------------------------------------------------- # Read functions — query PostgreSQL for stored results # --------------------------------------------------------------------------- def list_forecasts(factory: sessionmaker[Session], limit: int = 50) -> list[dict]: with factory() as session: rows = ( session.query(AWSalesForecast) .order_by(AWSalesForecast.created_at.desc()) .limit(limit) .all() ) return [ { "id": r.id, "created_at": r.created_at.isoformat(), "horizon_days": r.horizon_days, "point_count": r.point_count, "trigger_source": r.trigger_source, "trace_id": r.trace_id, } for r in rows ] def list_rep_scores(factory: sessionmaker[Session], limit: int = 50) -> list[dict]: with factory() as session: rows = ( session.query(AWRepScore) .order_by(AWRepScore.computed_at.desc()) .limit(limit) .all() ) return [ { "id": r.id, "computed_at": r.computed_at.isoformat(), "rep_count": r.rep_count, "trigger_source": r.trigger_source, "trace_id": r.trace_id, "payload": r.payload, } for r in rows ] def list_product_demand(factory: sessionmaker[Session], limit: int = 50) -> list[dict]: with factory() as session: rows = ( session.query(AWProductDemand) .order_by(AWProductDemand.computed_at.desc()) .limit(limit) .all() ) return [ { "id": r.id, "computed_at": r.computed_at.isoformat(), "product_count": r.product_count, "top_n": r.top_n, "trigger_source": r.trigger_source, "trace_id": r.trace_id, "payload": r.payload, } for r in rows ] def list_anomaly_runs(factory: sessionmaker[Session], limit: int = 20) -> list[dict]: with factory() as session: rows = ( session.query(AWAnomalyRun) .order_by(AWAnomalyRun.detected_at.desc()) .limit(limit) .all() ) return [ { "id": r.id, "detected_at": r.detected_at.isoformat(), "anomaly_count": r.anomaly_count, "series_days": r.series_days, "window_days": r.window_days, "threshold_sigma": r.threshold_sigma, "trigger_source": r.trigger_source, "trace_id": r.trace_id, } for r in rows ]