from __future__ import annotations import asyncio import logging from fastapi import APIRouter, Depends, Query, Request, Response from opentelemetry import propagate, trace from app.core.audit import AuditLog, ExportRecord, append_audit from app.core.config import settings from app.core.executor import get_executor from app.core.reports import save_report from app.core.security import FrontendPrincipal, require_frontend_principal from app.domain.wwi import analytics as wwi_analytics LOGGER = logging.getLogger(__name__) router = APIRouter(tags=["platform"]) def _trace_headers() -> dict[str, str]: ctx = trace.get_current_span().get_span_context() if not ctx.is_valid: return {} return {"x-trace-id": f"{ctx.trace_id:032x}", "x-span-id": f"{ctx.span_id:016x}"} # --------------------------------------------------------------------------- # System # --------------------------------------------------------------------------- @router.get("/api/config") def frontend_config() -> dict: return { "oidc_enabled": settings.require_frontend_auth, "oidc_authority": settings.frontend_jwt_issuer_url, "oidc_client_id": settings.frontend_oidc_client_id, "oidc_scope": settings.frontend_oidc_scope, } @router.get("/api/health") def health(response: Response) -> dict: response.headers.update(_trace_headers()) return {"status": "ok", "service": "otel-bi-backend"} @router.get("/api/telemetry/status") def telemetry_status( response: Response, principal: FrontendPrincipal = Depends(require_frontend_principal), ) -> dict: response.headers.update(_trace_headers()) return { "status": "instrumented", "service": "otel-bi-backend", "collector_endpoint": settings.otel_collector_endpoint, "subject": principal.subject, **_trace_headers(), } # --------------------------------------------------------------------------- # Cross-domain report generation # --------------------------------------------------------------------------- def _propagation_headers() -> dict[str, str]: headers: dict[str, str] = {} propagate.inject(headers) return headers @router.post("/api/reports/generate") async def generate_report( request: Request, response: Response, principal: FrontendPrincipal = Depends(require_frontend_principal), ) -> dict: response.headers.update(_trace_headers()) client = request.app.state.analytics_client pg_factory = request.app.state.pg_factory actor_id = principal.subject loop = asyncio.get_running_loop() executor = get_executor() import httpx as _httpx async def _fetch(path: str, params: dict | None = None): try: r = await client.get(path, params=params, headers=_propagation_headers()) r.raise_for_status() return r.json() except (_httpx.HTTPStatusError, _httpx.RequestError): return {} ( aw_kpis, aw_history, aw_forecast, aw_reps, aw_products, wwi_kpis, wwi_stock, wwi_suppliers, ) = await asyncio.gather( _fetch("/aw/sales/kpis"), _fetch("/aw/sales/history", {"days_back": settings.default_history_days}), _fetch("/aw/sales/forecast", {"horizon_days": settings.forecast_horizon_days}), _fetch("/aw/reps/scores", {"top_n": settings.ranking_default_top_n}), _fetch("/aw/products/demand", {"top_n": settings.ranking_default_top_n}), _fetch("/wwi/sales/kpis"), _fetch("/wwi/stock/recommendations"), _fetch("/wwi/suppliers/scores", {"top_n": settings.ranking_default_top_n}), ) wwi_events = await loop.run_in_executor( executor, lambda: wwi_analytics.get_business_events(pg_factory, 200) ) data = { "aw_sales_kpis": aw_kpis, "aw_sales_history": aw_history, "aw_sales_forecast": aw_forecast, "aw_rep_scores": aw_reps, "aw_product_demand": aw_products, "wwi_sales_kpis": wwi_kpis, "wwi_stock_recommendations": wwi_stock, "wwi_supplier_scores": wwi_suppliers, "wwi_business_events": wwi_events, } report = await loop.run_in_executor( executor, lambda: save_report(data, settings.report_output_dir) ) append_audit( pg_factory, action="report.generated", actor_type="user", actor_id=actor_id, domain="platform", service="otel-bi-backend", entity_type="full_report", payload={ "report_id": report["report_id"], "xlsx": report["xlsx"]["filename"], "pdf": report["pdf"]["filename"], }, ) return {**report, "output_dir": settings.report_output_dir, **_trace_headers()} # --------------------------------------------------------------------------- # Audit log # --------------------------------------------------------------------------- @router.get("/api/audit") async def audit_log( response: Response, request: Request, limit: int = Query(default=100, ge=1, le=500), domain: str | None = Query(default=None), principal: FrontendPrincipal = Depends(require_frontend_principal), ) -> list[dict]: response.headers.update(_trace_headers()) pg_factory = request.app.state.pg_factory def _query(): with pg_factory() as session: q = session.query(AuditLog).order_by(AuditLog.occurred_at.desc()) if domain: q = q.filter_by(domain=domain) rows = q.limit(limit).all() return [ { "id": r.id, "occurred_at": r.occurred_at.isoformat(), "action": r.action, "status": r.status, "actor_type": r.actor_type, "actor_id": r.actor_id, "domain": r.domain, "service": r.service, "entity_type": r.entity_type, "trace_id": r.trace_id, "payload": r.payload, } for r in rows ] return await asyncio.get_running_loop().run_in_executor(get_executor(), _query) # --------------------------------------------------------------------------- # Export history # --------------------------------------------------------------------------- @router.get("/api/exports") async def export_history( response: Response, request: Request, limit: int = Query(default=100, ge=1, le=500), domain: str | None = Query(default=None), principal: FrontendPrincipal = Depends(require_frontend_principal), ) -> list[dict]: response.headers.update(_trace_headers()) pg_factory = request.app.state.pg_factory def _query(): with pg_factory() as session: q = session.query(ExportRecord).order_by(ExportRecord.created_at.desc()) if domain: q = q.filter_by(domain=domain) rows = q.limit(limit).all() return [ { "id": r.id, "exported_at": r.created_at.isoformat(), "domain": r.domain, "service": r.service, "source_view": r.source_view, "format": r.format, "filters_applied": r.filters_applied, "row_count": r.row_count, "file_size_bytes": r.file_size_bytes, "actor_id": r.actor_id, "trace_id": r.trace_id, } for r in rows ] return await asyncio.get_running_loop().run_in_executor(get_executor(), _query) # --------------------------------------------------------------------------- # Job history (platform-level — both domains in one response) # --------------------------------------------------------------------------- @router.get("/api/jobs/aw") async def jobs_aw( response: Response, request: Request, limit: int = Query(default=50, ge=1, le=200), principal: FrontendPrincipal = Depends(require_frontend_principal), ) -> list[dict]: response.headers.update(_trace_headers()) from app.routers.aw import _list_jobs pg_factory = request.app.state.pg_factory return await asyncio.get_running_loop().run_in_executor( get_executor(), lambda: _list_jobs(pg_factory, "aw", limit) ) @router.get("/api/jobs/wwi") async def jobs_wwi( response: Response, request: Request, limit: int = Query(default=50, ge=1, le=200), principal: FrontendPrincipal = Depends(require_frontend_principal), ) -> list[dict]: response.headers.update(_trace_headers()) from app.routers.wwi import _list_jobs pg_factory = request.app.state.pg_factory return await asyncio.get_running_loop().run_in_executor( get_executor(), lambda: _list_jobs(pg_factory, "wwi", limit) )