152 lines
5.0 KiB
Go
152 lines
5.0 KiB
Go
package persistence
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
"otel-bi-analytics/internal/analytics"
|
|
)
|
|
|
|
var (
|
|
businessEventsTotal, _ = persistMeter.Int64Counter(
|
|
"wwi.business_events_generated_total",
|
|
metric.WithDescription("Business events generated from reorder data"),
|
|
)
|
|
)
|
|
|
|
func PersistReorderRecommendations(ctx context.Context, pool *pgxpool.Pool, data []analytics.ReorderRecommendation, source string) {
|
|
ctx, span := persistTracer.Start(ctx, "persistence.wwi.persist_reorder_recommendations",
|
|
trace.WithAttributes(attribute.Int("item_count", len(data))),
|
|
)
|
|
defer span.End()
|
|
|
|
traceID, spanID := spanContext(span)
|
|
_, err := pool.Exec(ctx,
|
|
`INSERT INTO wwi_reorder_recommendations
|
|
(id, created_at, item_count, trigger_source, trace_id, span_id, payload)
|
|
VALUES ($1, NOW(), $2, $3, $4, $5, $6::jsonb)`,
|
|
newUUID(), len(data), source, traceID, spanID, mustJSON(data),
|
|
)
|
|
if err != nil {
|
|
slog.Warn("failed to persist WWI reorder recommendations", "err", err)
|
|
span.RecordError(err)
|
|
return
|
|
}
|
|
persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "wwi_reorder_recommendations")))
|
|
|
|
AppendAudit(ctx, pool, AuditEntry{
|
|
Action: "recommendations.generated", ActorType: actorType(source), ActorID: source,
|
|
Domain: "wwi", Service: "otel-bi-analytics", EntityType: "reorder_recommendations",
|
|
Payload: map[string]any{"item_count": len(data)},
|
|
})
|
|
}
|
|
|
|
func PersistSupplierScores(ctx context.Context, pool *pgxpool.Pool, data []analytics.SupplierScore, topN int, source string) {
|
|
ctx, span := persistTracer.Start(ctx, "persistence.wwi.persist_supplier_scores",
|
|
trace.WithAttributes(attribute.Int("supplier_count", len(data))),
|
|
)
|
|
defer span.End()
|
|
|
|
traceID, spanID := spanContext(span)
|
|
_, err := pool.Exec(ctx,
|
|
`INSERT INTO wwi_supplier_scores
|
|
(id, computed_at, supplier_count, top_n, trigger_source, trace_id, span_id, payload)
|
|
VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7::jsonb)`,
|
|
newUUID(), len(data), topN, source, traceID, spanID, mustJSON(data),
|
|
)
|
|
if err != nil {
|
|
slog.Warn("failed to persist WWI supplier scores", "err", err)
|
|
span.RecordError(err)
|
|
return
|
|
}
|
|
persistWritesTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("entity", "wwi_supplier_scores")))
|
|
|
|
AppendAudit(ctx, pool, AuditEntry{
|
|
Action: "scores.generated", ActorType: actorType(source), ActorID: source,
|
|
Domain: "wwi", Service: "otel-bi-analytics", EntityType: "supplier_scores",
|
|
Payload: map[string]any{"supplier_count": len(data), "top_n": topN},
|
|
})
|
|
}
|
|
|
|
// GenerateStockEvents writes LOW_STOCK business events for HIGH-urgency items,
|
|
// deduplicating within a 24-hour window.
|
|
func GenerateStockEvents(ctx context.Context, pool *pgxpool.Pool, items []analytics.ReorderRecommendation) error {
|
|
ctx, span := persistTracer.Start(ctx, "persistence.wwi.generate_stock_events")
|
|
defer span.End()
|
|
|
|
cutoff := time.Now().UTC().Add(-24 * time.Hour)
|
|
|
|
tx, err := pool.Begin(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback(ctx) //nolint:errcheck
|
|
|
|
inserted := 0
|
|
for _, item := range items {
|
|
if item.Urgency != "HIGH" {
|
|
continue
|
|
}
|
|
entityKey := fmt.Sprintf("%d", item.StockItemKey)
|
|
|
|
var existingID string
|
|
err := tx.QueryRow(ctx,
|
|
`SELECT id FROM wwi_business_events
|
|
WHERE event_type = 'LOW_STOCK' AND entity_key = $1 AND occurred_at >= $2
|
|
LIMIT 1`,
|
|
entityKey, cutoff,
|
|
).Scan(&existingID)
|
|
if err == nil {
|
|
continue // already exists within 24h
|
|
}
|
|
if !errors.Is(err, pgx.ErrNoRows) {
|
|
slog.Warn("error checking existing business event", "err", err)
|
|
continue
|
|
}
|
|
|
|
daysStr := "immediately"
|
|
if item.DaysUntilStockout != nil {
|
|
daysStr = fmt.Sprintf("%.1f days", *item.DaysUntilStockout)
|
|
}
|
|
message := fmt.Sprintf(
|
|
"Stock for '%s' will be exhausted in %s. Current stock: %.0f units, daily demand: %.1f units.",
|
|
item.StockItemName, daysStr, item.CurrentStock, item.AvgDailyDemand,
|
|
)
|
|
|
|
traceID, spanID := spanContext(span)
|
|
details := mustJSON(map[string]any{
|
|
"current_stock": item.CurrentStock,
|
|
"avg_daily_demand": item.AvgDailyDemand,
|
|
"recommended_reorder_qty": item.RecommendedReorderQty,
|
|
})
|
|
|
|
_, err = tx.Exec(ctx,
|
|
`INSERT INTO wwi_business_events
|
|
(id, occurred_at, event_type, severity, entity_key, entity_name, message, trace_id, span_id, details)
|
|
VALUES ($1, NOW(), 'LOW_STOCK', 'HIGH', $2, $3, $4, $5, $6, $7::jsonb)`,
|
|
newUUID(), entityKey, item.StockItemName, message, traceID, spanID, details,
|
|
)
|
|
if err != nil {
|
|
slog.Warn("failed to insert business event", "item", item.StockItemKey, "err", err)
|
|
continue
|
|
}
|
|
inserted++
|
|
businessEventsTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("event_type", "LOW_STOCK")))
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return fmt.Errorf("commit stock events: %w", err)
|
|
}
|
|
span.SetAttributes(attribute.Int("events_inserted", inserted))
|
|
return nil
|
|
}
|