307 lines
9.6 KiB
Go
307 lines
9.6 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/robfig/cron/v3"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
"otel-bi-analytics/internal/analytics"
|
|
"otel-bi-analytics/internal/persistence"
|
|
)
|
|
|
|
var (
|
|
schedTracer = otel.Tracer("otel-bi.scheduler")
|
|
schedMeter = otel.Meter("otel-bi.scheduler")
|
|
|
|
jobDurationSeconds, _ = schedMeter.Float64Histogram(
|
|
"scheduler.job.duration_seconds",
|
|
metric.WithDescription("Scheduler job execution duration"),
|
|
metric.WithUnit("s"),
|
|
)
|
|
jobSuccessTotal, _ = schedMeter.Int64Counter(
|
|
"scheduler.job.success_total",
|
|
metric.WithDescription("Scheduler jobs completed successfully"),
|
|
)
|
|
jobFailureTotal, _ = schedMeter.Int64Counter(
|
|
"scheduler.job.failure_total",
|
|
metric.WithDescription("Scheduler jobs that failed"),
|
|
)
|
|
jobRecordsProcessed, _ = schedMeter.Int64Counter(
|
|
"scheduler.job.records_processed_total",
|
|
metric.WithDescription("Records processed by scheduler jobs"),
|
|
)
|
|
)
|
|
|
|
// Scheduler wraps robfig/cron and owns all job implementations.
|
|
type Scheduler struct {
|
|
awDB *sql.DB
|
|
wwiDB *sql.DB
|
|
pgPool *pgxpool.Pool
|
|
topN int
|
|
cron *cron.Cron
|
|
}
|
|
|
|
func New(awDB, wwiDB *sql.DB, pgPool *pgxpool.Pool, defaultTopN int) *Scheduler {
|
|
return &Scheduler{
|
|
awDB: awDB,
|
|
wwiDB: wwiDB,
|
|
pgPool: pgPool,
|
|
topN: defaultTopN,
|
|
cron: cron.New(cron.WithLocation(time.UTC), cron.WithSeconds()),
|
|
}
|
|
}
|
|
|
|
// Start registers all jobs and starts the cron runner.
|
|
func (s *Scheduler) Start() {
|
|
s.cron.AddFunc("0 0 2 * * *", s.jobAWForecast)
|
|
s.cron.AddFunc("0 30 2 * * *", s.jobAWScores)
|
|
s.cron.AddFunc("0 0 3 * * *", s.jobAWDataQuality)
|
|
s.cron.AddFunc("0 30 3 * * *", s.jobAWAnomalyDetection)
|
|
s.cron.AddFunc("0 0 * * * *", s.jobWWIReorder)
|
|
s.cron.AddFunc("0 30 3 * * *", s.jobWWISupplierScores)
|
|
s.cron.AddFunc("0 30 * * * *", s.jobWWIEvents)
|
|
s.cron.AddFunc("0 0 4 * * *", s.jobWWIDataQuality)
|
|
s.cron.Start()
|
|
slog.Info("scheduler started", "jobs", len(s.cron.Entries()))
|
|
}
|
|
|
|
// Stop gracefully stops the cron runner.
|
|
func (s *Scheduler) Stop() {
|
|
ctx := s.cron.Stop()
|
|
<-ctx.Done()
|
|
}
|
|
|
|
// TriggerAWJob runs an AW job immediately in a goroutine.
|
|
func (s *Scheduler) TriggerAWJob(jobName string) error {
|
|
fns := map[string]func(){
|
|
"forecast": s.jobAWForecast,
|
|
"scores": s.jobAWScores,
|
|
"data_quality": s.jobAWDataQuality,
|
|
"anomaly_detection": s.jobAWAnomalyDetection,
|
|
}
|
|
fn, ok := fns[jobName]
|
|
if !ok {
|
|
return fmt.Errorf("unknown AW job: %s", jobName)
|
|
}
|
|
go fn()
|
|
return nil
|
|
}
|
|
|
|
// TriggerWWIJob runs a WWI job immediately in a goroutine.
|
|
func (s *Scheduler) TriggerWWIJob(jobName string) error {
|
|
fns := map[string]func(){
|
|
"reorder": s.jobWWIReorder,
|
|
"supplier_scores": s.jobWWISupplierScores,
|
|
"events": s.jobWWIEvents,
|
|
"data_quality": s.jobWWIDataQuality,
|
|
}
|
|
fn, ok := fns[jobName]
|
|
if !ok {
|
|
return fmt.Errorf("unknown WWI job: %s", jobName)
|
|
}
|
|
go fn()
|
|
return nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// runJob wraps a job function with OTel tracing, metrics, and audit logging.
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func (s *Scheduler) runJob(jobName, domain string, fn func(ctx context.Context) (int, error)) {
|
|
ctx := context.Background()
|
|
ctx, span := schedTracer.Start(ctx,
|
|
"scheduler."+jobName,
|
|
trace.WithNewRoot(),
|
|
trace.WithSpanKind(trace.SpanKindInternal),
|
|
trace.WithAttributes(
|
|
attribute.String("job.name", jobName),
|
|
attribute.String("job.domain", domain),
|
|
),
|
|
)
|
|
defer span.End()
|
|
|
|
traceID, spanID := spanCtx(span)
|
|
jobID := persistence.RecordJobStart(ctx, s.pgPool, jobName, domain, traceID, spanID)
|
|
startedAt := time.Now()
|
|
|
|
slog.Info("job started", "job", jobName)
|
|
|
|
records, err := fn(ctx)
|
|
|
|
duration := time.Since(startedAt).Seconds()
|
|
attrs := metric.WithAttributes(
|
|
attribute.String("job.name", jobName),
|
|
attribute.String("job.domain", domain),
|
|
)
|
|
jobDurationSeconds.Record(ctx, duration, attrs)
|
|
|
|
if err != nil {
|
|
slog.Error("job failed", "job", jobName, "err", err, "duration_s", duration)
|
|
span.RecordError(err)
|
|
span.SetAttributes(attribute.String("job.status", "failure"))
|
|
persistence.RecordJobFailure(ctx, s.pgPool, jobID, startedAt, err.Error())
|
|
persistence.AppendAudit(ctx, s.pgPool, persistence.AuditEntry{
|
|
Action: "job.failed", ActorType: "scheduler", ActorID: jobName,
|
|
Domain: domain, Service: "otel-bi-analytics", Status: "failure",
|
|
Payload: map[string]any{"job_name": jobName, "error": err.Error()},
|
|
})
|
|
jobFailureTotal.Add(ctx, 1, attrs)
|
|
return
|
|
}
|
|
|
|
slog.Info("job completed", "job", jobName, "records", records, "duration_s", duration)
|
|
span.SetAttributes(
|
|
attribute.String("job.status", "success"),
|
|
attribute.Int("job.records_processed", records),
|
|
)
|
|
persistence.RecordJobComplete(ctx, s.pgPool, jobID, startedAt, records)
|
|
persistence.AppendAudit(ctx, s.pgPool, persistence.AuditEntry{
|
|
Action: "job.completed", ActorType: "scheduler", ActorID: jobName,
|
|
Domain: domain, Service: "otel-bi-analytics",
|
|
Payload: map[string]any{"job_name": jobName, "records_processed": records},
|
|
})
|
|
jobSuccessTotal.Add(ctx, 1, attrs)
|
|
jobRecordsProcessed.Add(ctx, int64(records), attrs)
|
|
}
|
|
|
|
func spanCtx(span trace.Span) (traceID, spanID *string) {
|
|
sctx := span.SpanContext()
|
|
if !sctx.IsValid() {
|
|
return nil, nil
|
|
}
|
|
tid := sctx.TraceID().String()
|
|
sid := sctx.SpanID().String()
|
|
return &tid, &sid
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// AW jobs
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func (s *Scheduler) jobAWForecast() {
|
|
s.runJob("aw.daily.forecast", "aw", func(ctx context.Context) (int, error) {
|
|
data, err := analytics.AWGetSalesForecast(ctx, s.awDB, 30)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
persistence.PersistForecast(ctx, s.pgPool, data, 30, "scheduler.aw.daily.forecast")
|
|
return len(data), nil
|
|
})
|
|
}
|
|
|
|
func (s *Scheduler) jobAWScores() {
|
|
s.runJob("aw.daily.scores", "aw", func(ctx context.Context) (int, error) {
|
|
reps, err := analytics.AWGetRepScores(ctx, s.awDB, s.topN)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
products, err := analytics.AWGetProductDemand(ctx, s.awDB, s.topN)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
persistence.PersistRepScores(ctx, s.pgPool, reps, s.topN, "scheduler.aw.daily.scores")
|
|
persistence.PersistProductDemand(ctx, s.pgPool, products, s.topN, "scheduler.aw.daily.scores")
|
|
return len(reps) + len(products), nil
|
|
})
|
|
}
|
|
|
|
func (s *Scheduler) jobAWDataQuality() {
|
|
s.runJob("aw.daily.data_quality", "aw", func(ctx context.Context) (int, error) {
|
|
report, err := analytics.AWRunDataQualityCheck(ctx, s.awDB)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
persistence.AppendAudit(ctx, s.pgPool, persistence.AuditEntry{
|
|
Action: "job.completed", ActorType: "scheduler", ActorID: "aw.daily.data_quality",
|
|
Domain: "aw", Service: "otel-bi-analytics", EntityType: "data_quality",
|
|
Status: report.Status,
|
|
Payload: map[string]any{"status": report.Status, "failed_checks": report.FailedChecks},
|
|
})
|
|
return len(report.Checks), nil
|
|
})
|
|
}
|
|
|
|
func (s *Scheduler) jobAWAnomalyDetection() {
|
|
s.runJob("aw.daily.anomaly_detection", "aw", func(ctx context.Context) (int, error) {
|
|
data, err := analytics.AWRunAnomalyDetection(ctx, s.awDB)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
persistence.PersistAnomalyRun(ctx, s.pgPool, data, "scheduler.aw.daily.anomaly_detection")
|
|
return len(data), nil
|
|
})
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// WWI jobs
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func (s *Scheduler) jobWWIReorder() {
|
|
s.runJob("wwi.hourly.reorder", "wwi", func(ctx context.Context) (int, error) {
|
|
data, err := analytics.WWIGetReorderRecommendations(ctx, s.wwiDB)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
persistence.PersistReorderRecommendations(ctx, s.pgPool, data, "scheduler.wwi.hourly.reorder")
|
|
if err := persistence.GenerateStockEvents(ctx, s.pgPool, data); err != nil {
|
|
slog.Warn("generate_stock_events failed", "err", err)
|
|
}
|
|
return len(data), nil
|
|
})
|
|
}
|
|
|
|
func (s *Scheduler) jobWWISupplierScores() {
|
|
s.runJob("wwi.daily.supplier_scores", "wwi", func(ctx context.Context) (int, error) {
|
|
data, err := analytics.WWIGetSupplierScores(ctx, s.wwiDB, s.topN)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
persistence.PersistSupplierScores(ctx, s.pgPool, data, s.topN, "scheduler.wwi.daily.supplier_scores")
|
|
return len(data), nil
|
|
})
|
|
}
|
|
|
|
func (s *Scheduler) jobWWIEvents() {
|
|
s.runJob("wwi.hourly.events", "wwi", func(ctx context.Context) (int, error) {
|
|
data, err := analytics.WWIGetReorderRecommendations(ctx, s.wwiDB)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
var highUrgency []analytics.ReorderRecommendation
|
|
for _, item := range data {
|
|
if item.Urgency == "HIGH" {
|
|
highUrgency = append(highUrgency, item)
|
|
}
|
|
}
|
|
if err := persistence.GenerateStockEvents(ctx, s.pgPool, highUrgency); err != nil {
|
|
slog.Warn("generate_stock_events (events job) failed", "err", err)
|
|
}
|
|
return len(highUrgency), nil
|
|
})
|
|
}
|
|
|
|
func (s *Scheduler) jobWWIDataQuality() {
|
|
s.runJob("wwi.daily.data_quality", "wwi", func(ctx context.Context) (int, error) {
|
|
report, err := analytics.WWIRunDataQualityCheck(ctx, s.wwiDB)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
persistence.AppendAudit(ctx, s.pgPool, persistence.AuditEntry{
|
|
Action: "job.completed", ActorType: "scheduler", ActorID: "wwi.daily.data_quality",
|
|
Domain: "wwi", Service: "otel-bi-analytics", EntityType: "data_quality",
|
|
Status: report.Status,
|
|
Payload: map[string]any{"status": report.Status, "failed_checks": report.FailedChecks},
|
|
})
|
|
return len(report.Checks), nil
|
|
})
|
|
}
|