Files
2026-05-11 10:58:46 +02:00

141 lines
4.8 KiB
Go

package persistence
import (
"context"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"otel-bi-analytics/internal/analytics"
)
func PersistForecast(ctx context.Context, pool *pgxpool.Pool, data []analytics.ForecastPoint, horizonDays int, source string) {
ctx, span := persistTracer.Start(ctx, "persistence.aw.persist_forecast",
trace.WithAttributes(
attribute.Int("horizon_days", horizonDays),
attribute.Int("point_count", len(data)),
),
)
defer span.End()
traceID, spanID := spanContext(span)
_, err := pool.Exec(ctx,
`INSERT INTO aw_sales_forecasts
(id, created_at, horizon_days, point_count, trigger_source, trace_id, span_id, payload)
VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7::jsonb)`,
newUUID(), horizonDays, len(data), source, traceID, spanID, mustJSON(data),
)
if err != nil {
slog.Warn("failed to persist AW forecast", "err", err)
span.RecordError(err)
return
}
persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "aw_sales_forecast")))
AppendAudit(ctx, pool, AuditEntry{
Action: "forecast.generated", ActorType: actorType(source), ActorID: source,
Domain: "aw", Service: "otel-bi-analytics", EntityType: "sales_forecast",
Payload: map[string]any{"horizon_days": horizonDays, "point_count": len(data)},
})
}
func PersistRepScores(ctx context.Context, pool *pgxpool.Pool, data []analytics.RepScore, topN int, source string) {
ctx, span := persistTracer.Start(ctx, "persistence.aw.persist_rep_scores",
trace.WithAttributes(attribute.Int("rep_count", len(data))),
)
defer span.End()
traceID, spanID := spanContext(span)
_, err := pool.Exec(ctx,
`INSERT INTO aw_rep_scores
(id, computed_at, rep_count, trigger_source, trace_id, span_id, payload)
VALUES ($1, NOW(), $2, $3, $4, $5, $6::jsonb)`,
newUUID(), len(data), source, traceID, spanID, mustJSON(data),
)
if err != nil {
slog.Warn("failed to persist AW rep scores", "err", err)
span.RecordError(err)
return
}
persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "aw_rep_scores")))
AppendAudit(ctx, pool, AuditEntry{
Action: "scores.generated", ActorType: actorType(source), ActorID: source,
Domain: "aw", Service: "otel-bi-analytics", EntityType: "rep_scores",
Payload: map[string]any{"rep_count": len(data), "top_n": topN},
})
}
func PersistProductDemand(ctx context.Context, pool *pgxpool.Pool, data []analytics.ProductDemand, topN int, source string) {
ctx, span := persistTracer.Start(ctx, "persistence.aw.persist_product_demand",
trace.WithAttributes(attribute.Int("product_count", len(data))),
)
defer span.End()
traceID, spanID := spanContext(span)
_, err := pool.Exec(ctx,
`INSERT INTO aw_product_demand
(id, computed_at, product_count, top_n, trigger_source, trace_id, span_id, payload)
VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7::jsonb)`,
newUUID(), len(data), topN, source, traceID, spanID, mustJSON(data),
)
if err != nil {
slog.Warn("failed to persist AW product demand", "err", err)
span.RecordError(err)
return
}
persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "aw_product_demand")))
AppendAudit(ctx, pool, AuditEntry{
Action: "scores.generated", ActorType: actorType(source), ActorID: source,
Domain: "aw", Service: "otel-bi-analytics", EntityType: "product_demand",
Payload: map[string]any{"product_count": len(data), "top_n": topN},
})
}
func PersistAnomalyRun(ctx context.Context, pool *pgxpool.Pool, data []analytics.AnomalyPoint, source string) {
ctx, span := persistTracer.Start(ctx, "persistence.aw.persist_anomaly_run")
defer span.End()
anomalyCount := 0
for _, p := range data {
if p.IsAnomaly {
anomalyCount++
}
}
span.SetAttributes(
attribute.Int("series_points", len(data)),
attribute.Int("anomaly_count", anomalyCount),
)
traceID, spanID := spanContext(span)
_, err := pool.Exec(ctx,
`INSERT INTO aw_anomaly_runs
(id, detected_at, anomaly_count, series_days, window_days, threshold_sigma, trigger_source, trace_id, span_id, payload)
VALUES ($1, NOW(), $2, 365, 30, 2.0, $3, $4, $5, $6::jsonb)`,
newUUID(), anomalyCount, source, traceID, spanID, mustJSON(data),
)
if err != nil {
slog.Warn("failed to persist AW anomaly run", "err", err)
span.RecordError(err)
return
}
persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "aw_anomaly_run")))
AppendAudit(ctx, pool, AuditEntry{
Action: "anomaly_detection.ran", ActorType: actorType(source), ActorID: source,
Domain: "aw", Service: "otel-bi-analytics", EntityType: "anomaly_detection",
Payload: map[string]any{"series_days": 365, "window_days": 30, "anomaly_count": anomalyCount},
})
}
func actorType(source string) string {
if len(source) >= 9 && source[:9] == "scheduler" {
return "scheduler"
}
return "api"
}