package persistence import ( "context" "log/slog" "github.com/jackc/pgx/v5/pgxpool" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "otel-bi-analytics/internal/analytics" ) func PersistForecast(ctx context.Context, pool *pgxpool.Pool, data []analytics.ForecastPoint, horizonDays int, source string) { ctx, span := persistTracer.Start(ctx, "persistence.aw.persist_forecast", trace.WithAttributes( attribute.Int("horizon_days", horizonDays), attribute.Int("point_count", len(data)), ), ) defer span.End() traceID, spanID := spanContext(span) _, err := pool.Exec(ctx, `INSERT INTO aw_sales_forecasts (id, created_at, horizon_days, point_count, trigger_source, trace_id, span_id, payload) VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7::jsonb)`, newUUID(), horizonDays, len(data), source, traceID, spanID, mustJSON(data), ) if err != nil { slog.Warn("failed to persist AW forecast", "err", err) span.RecordError(err) return } persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "aw_sales_forecast"))) AppendAudit(ctx, pool, AuditEntry{ Action: "forecast.generated", ActorType: actorType(source), ActorID: source, Domain: "aw", Service: "otel-bi-analytics", EntityType: "sales_forecast", Payload: map[string]any{"horizon_days": horizonDays, "point_count": len(data)}, }) } func PersistRepScores(ctx context.Context, pool *pgxpool.Pool, data []analytics.RepScore, topN int, source string) { ctx, span := persistTracer.Start(ctx, "persistence.aw.persist_rep_scores", trace.WithAttributes(attribute.Int("rep_count", len(data))), ) defer span.End() traceID, spanID := spanContext(span) _, err := pool.Exec(ctx, `INSERT INTO aw_rep_scores (id, computed_at, rep_count, trigger_source, trace_id, span_id, payload) VALUES ($1, NOW(), $2, $3, $4, $5, $6::jsonb)`, newUUID(), len(data), source, traceID, spanID, mustJSON(data), ) if err != nil { slog.Warn("failed to persist AW rep scores", "err", err) span.RecordError(err) return } persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "aw_rep_scores"))) AppendAudit(ctx, pool, AuditEntry{ Action: "scores.generated", ActorType: actorType(source), ActorID: source, Domain: "aw", Service: "otel-bi-analytics", EntityType: "rep_scores", Payload: map[string]any{"rep_count": len(data), "top_n": topN}, }) } func PersistProductDemand(ctx context.Context, pool *pgxpool.Pool, data []analytics.ProductDemand, topN int, source string) { ctx, span := persistTracer.Start(ctx, "persistence.aw.persist_product_demand", trace.WithAttributes(attribute.Int("product_count", len(data))), ) defer span.End() traceID, spanID := spanContext(span) _, err := pool.Exec(ctx, `INSERT INTO aw_product_demand (id, computed_at, product_count, top_n, trigger_source, trace_id, span_id, payload) VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7::jsonb)`, newUUID(), len(data), topN, source, traceID, spanID, mustJSON(data), ) if err != nil { slog.Warn("failed to persist AW product demand", "err", err) span.RecordError(err) return } persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "aw_product_demand"))) AppendAudit(ctx, pool, AuditEntry{ Action: "scores.generated", ActorType: actorType(source), ActorID: source, Domain: "aw", Service: "otel-bi-analytics", EntityType: "product_demand", Payload: map[string]any{"product_count": len(data), "top_n": topN}, }) } func PersistAnomalyRun(ctx context.Context, pool *pgxpool.Pool, data []analytics.AnomalyPoint, source string) { ctx, span := persistTracer.Start(ctx, "persistence.aw.persist_anomaly_run") defer span.End() anomalyCount := 0 for _, p := range data { if p.IsAnomaly { anomalyCount++ } } span.SetAttributes( attribute.Int("series_points", len(data)), attribute.Int("anomaly_count", anomalyCount), ) traceID, spanID := spanContext(span) _, err := pool.Exec(ctx, `INSERT INTO aw_anomaly_runs (id, detected_at, anomaly_count, series_days, window_days, threshold_sigma, trigger_source, trace_id, span_id, payload) VALUES ($1, NOW(), $2, 365, 30, 2.0, $3, $4, $5, $6::jsonb)`, newUUID(), anomalyCount, source, traceID, spanID, mustJSON(data), ) if err != nil { slog.Warn("failed to persist AW anomaly run", "err", err) span.RecordError(err) return } persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "aw_anomaly_run"))) AppendAudit(ctx, pool, AuditEntry{ Action: "anomaly_detection.ran", ActorType: actorType(source), ActorID: source, Domain: "aw", Service: "otel-bi-analytics", EntityType: "anomaly_detection", Payload: map[string]any{"series_days": 365, "window_days": 30, "anomaly_count": anomalyCount}, }) } func actorType(source string) string { if len(source) >= 9 && source[:9] == "scheduler" { return "scheduler" } return "api" }