package scheduler import ( "context" "database/sql" "fmt" "log/slog" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/robfig/cron/v3" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "otel-bi-analytics/internal/analytics" "otel-bi-analytics/internal/persistence" ) var ( schedTracer = otel.Tracer("otel-bi.scheduler") schedMeter = otel.Meter("otel-bi.scheduler") jobDurationSeconds, _ = schedMeter.Float64Histogram( "scheduler.job.duration_seconds", metric.WithDescription("Scheduler job execution duration"), metric.WithUnit("s"), ) jobSuccessTotal, _ = schedMeter.Int64Counter( "scheduler.job.success_total", metric.WithDescription("Scheduler jobs completed successfully"), ) jobFailureTotal, _ = schedMeter.Int64Counter( "scheduler.job.failure_total", metric.WithDescription("Scheduler jobs that failed"), ) jobRecordsProcessed, _ = schedMeter.Int64Counter( "scheduler.job.records_processed_total", metric.WithDescription("Records processed by scheduler jobs"), ) ) // Scheduler wraps robfig/cron and owns all job implementations. type Scheduler struct { awDB *sql.DB wwiDB *sql.DB pgPool *pgxpool.Pool topN int cron *cron.Cron } func New(awDB, wwiDB *sql.DB, pgPool *pgxpool.Pool, defaultTopN int) *Scheduler { return &Scheduler{ awDB: awDB, wwiDB: wwiDB, pgPool: pgPool, topN: defaultTopN, cron: cron.New(cron.WithLocation(time.UTC), cron.WithSeconds()), } } // Start registers all jobs and starts the cron runner. func (s *Scheduler) Start() { s.cron.AddFunc("0 0 2 * * *", s.jobAWForecast) s.cron.AddFunc("0 30 2 * * *", s.jobAWScores) s.cron.AddFunc("0 0 3 * * *", s.jobAWDataQuality) s.cron.AddFunc("0 30 3 * * *", s.jobAWAnomalyDetection) s.cron.AddFunc("0 0 * * * *", s.jobWWIReorder) s.cron.AddFunc("0 30 3 * * *", s.jobWWISupplierScores) s.cron.AddFunc("0 30 * * * *", s.jobWWIEvents) s.cron.AddFunc("0 0 4 * * *", s.jobWWIDataQuality) s.cron.Start() slog.Info("scheduler started", "jobs", len(s.cron.Entries())) } // Stop gracefully stops the cron runner. func (s *Scheduler) Stop() { ctx := s.cron.Stop() <-ctx.Done() } // TriggerAWJob runs an AW job immediately in a goroutine. func (s *Scheduler) TriggerAWJob(jobName string) error { fns := map[string]func(){ "forecast": s.jobAWForecast, "scores": s.jobAWScores, "data_quality": s.jobAWDataQuality, "anomaly_detection": s.jobAWAnomalyDetection, } fn, ok := fns[jobName] if !ok { return fmt.Errorf("unknown AW job: %s", jobName) } go fn() return nil } // TriggerWWIJob runs a WWI job immediately in a goroutine. func (s *Scheduler) TriggerWWIJob(jobName string) error { fns := map[string]func(){ "reorder": s.jobWWIReorder, "supplier_scores": s.jobWWISupplierScores, "events": s.jobWWIEvents, "data_quality": s.jobWWIDataQuality, } fn, ok := fns[jobName] if !ok { return fmt.Errorf("unknown WWI job: %s", jobName) } go fn() return nil } // --------------------------------------------------------------------------- // runJob wraps a job function with OTel tracing, metrics, and audit logging. // --------------------------------------------------------------------------- func (s *Scheduler) runJob(jobName, domain string, fn func(ctx context.Context) (int, error)) { ctx := context.Background() ctx, span := schedTracer.Start(ctx, "scheduler."+jobName, trace.WithNewRoot(), trace.WithSpanKind(trace.SpanKindInternal), trace.WithAttributes( attribute.String("job.name", jobName), attribute.String("job.domain", domain), ), ) defer span.End() traceID, spanID := spanCtx(span) jobID := persistence.RecordJobStart(ctx, s.pgPool, jobName, domain, traceID, spanID) startedAt := time.Now() slog.Info("job started", "job", jobName) records, err := fn(ctx) duration := time.Since(startedAt).Seconds() attrs := metric.WithAttributes( attribute.String("job.name", jobName), attribute.String("job.domain", domain), ) jobDurationSeconds.Record(ctx, duration, attrs) if err != nil { slog.Error("job failed", "job", jobName, "err", err, "duration_s", duration) span.RecordError(err) span.SetAttributes(attribute.String("job.status", "failure")) persistence.RecordJobFailure(ctx, s.pgPool, jobID, startedAt, err.Error()) persistence.AppendAudit(ctx, s.pgPool, persistence.AuditEntry{ Action: "job.failed", ActorType: "scheduler", ActorID: jobName, Domain: domain, Service: "otel-bi-analytics", Status: "failure", Payload: map[string]any{"job_name": jobName, "error": err.Error()}, }) jobFailureTotal.Add(ctx, 1, attrs) return } slog.Info("job completed", "job", jobName, "records", records, "duration_s", duration) span.SetAttributes( attribute.String("job.status", "success"), attribute.Int("job.records_processed", records), ) persistence.RecordJobComplete(ctx, s.pgPool, jobID, startedAt, records) persistence.AppendAudit(ctx, s.pgPool, persistence.AuditEntry{ Action: "job.completed", ActorType: "scheduler", ActorID: jobName, Domain: domain, Service: "otel-bi-analytics", Payload: map[string]any{"job_name": jobName, "records_processed": records}, }) jobSuccessTotal.Add(ctx, 1, attrs) jobRecordsProcessed.Add(ctx, int64(records), attrs) } func spanCtx(span trace.Span) (traceID, spanID *string) { sctx := span.SpanContext() if !sctx.IsValid() { return nil, nil } tid := sctx.TraceID().String() sid := sctx.SpanID().String() return &tid, &sid } // --------------------------------------------------------------------------- // AW jobs // --------------------------------------------------------------------------- func (s *Scheduler) jobAWForecast() { s.runJob("aw.daily.forecast", "aw", func(ctx context.Context) (int, error) { data, err := analytics.AWGetSalesForecast(ctx, s.awDB, 30) if err != nil { return 0, err } persistence.PersistForecast(ctx, s.pgPool, data, 30, "scheduler.aw.daily.forecast") return len(data), nil }) } func (s *Scheduler) jobAWScores() { s.runJob("aw.daily.scores", "aw", func(ctx context.Context) (int, error) { reps, err := analytics.AWGetRepScores(ctx, s.awDB, s.topN) if err != nil { return 0, err } products, err := analytics.AWGetProductDemand(ctx, s.awDB, s.topN) if err != nil { return 0, err } persistence.PersistRepScores(ctx, s.pgPool, reps, s.topN, "scheduler.aw.daily.scores") persistence.PersistProductDemand(ctx, s.pgPool, products, s.topN, "scheduler.aw.daily.scores") return len(reps) + len(products), nil }) } func (s *Scheduler) jobAWDataQuality() { s.runJob("aw.daily.data_quality", "aw", func(ctx context.Context) (int, error) { report, err := analytics.AWRunDataQualityCheck(ctx, s.awDB) if err != nil { return 0, err } persistence.AppendAudit(ctx, s.pgPool, persistence.AuditEntry{ Action: "job.completed", ActorType: "scheduler", ActorID: "aw.daily.data_quality", Domain: "aw", Service: "otel-bi-analytics", EntityType: "data_quality", Status: report.Status, Payload: map[string]any{"status": report.Status, "failed_checks": report.FailedChecks}, }) return len(report.Checks), nil }) } func (s *Scheduler) jobAWAnomalyDetection() { s.runJob("aw.daily.anomaly_detection", "aw", func(ctx context.Context) (int, error) { data, err := analytics.AWRunAnomalyDetection(ctx, s.awDB) if err != nil { return 0, err } persistence.PersistAnomalyRun(ctx, s.pgPool, data, "scheduler.aw.daily.anomaly_detection") return len(data), nil }) } // --------------------------------------------------------------------------- // WWI jobs // --------------------------------------------------------------------------- func (s *Scheduler) jobWWIReorder() { s.runJob("wwi.hourly.reorder", "wwi", func(ctx context.Context) (int, error) { data, err := analytics.WWIGetReorderRecommendations(ctx, s.wwiDB) if err != nil { return 0, err } persistence.PersistReorderRecommendations(ctx, s.pgPool, data, "scheduler.wwi.hourly.reorder") if err := persistence.GenerateStockEvents(ctx, s.pgPool, data); err != nil { slog.Warn("generate_stock_events failed", "err", err) } return len(data), nil }) } func (s *Scheduler) jobWWISupplierScores() { s.runJob("wwi.daily.supplier_scores", "wwi", func(ctx context.Context) (int, error) { data, err := analytics.WWIGetSupplierScores(ctx, s.wwiDB, s.topN) if err != nil { return 0, err } persistence.PersistSupplierScores(ctx, s.pgPool, data, s.topN, "scheduler.wwi.daily.supplier_scores") return len(data), nil }) } func (s *Scheduler) jobWWIEvents() { s.runJob("wwi.hourly.events", "wwi", func(ctx context.Context) (int, error) { data, err := analytics.WWIGetReorderRecommendations(ctx, s.wwiDB) if err != nil { return 0, err } var highUrgency []analytics.ReorderRecommendation for _, item := range data { if item.Urgency == "HIGH" { highUrgency = append(highUrgency, item) } } if err := persistence.GenerateStockEvents(ctx, s.pgPool, highUrgency); err != nil { slog.Warn("generate_stock_events (events job) failed", "err", err) } return len(highUrgency), nil }) } func (s *Scheduler) jobWWIDataQuality() { s.runJob("wwi.daily.data_quality", "wwi", func(ctx context.Context) (int, error) { report, err := analytics.WWIRunDataQualityCheck(ctx, s.wwiDB) if err != nil { return 0, err } persistence.AppendAudit(ctx, s.pgPool, persistence.AuditEntry{ Action: "job.completed", ActorType: "scheduler", ActorID: "wwi.daily.data_quality", Domain: "wwi", Service: "otel-bi-analytics", EntityType: "data_quality", Status: report.Status, Payload: map[string]any{"status": report.Status, "failed_checks": report.FailedChecks}, }) return len(report.Checks), nil }) }