from __future__ import annotations import uuid from datetime import datetime, timezone from pathlib import Path import openpyxl from reportlab.lib import colors from reportlab.lib.pagesizes import A4, landscape from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.units import cm from reportlab.platypus import ( PageBreak, Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle, ) _PAGE_W, _ = landscape(A4) _MARGIN = 1.5 * cm _HEADER_BG = colors.HexColor("#1a56db") _ROW_BG = colors.HexColor("#eef2ff") def _normalise(rows: list[dict] | dict) -> list[dict]: if isinstance(rows, dict): return [rows] return rows or [] # --------------------------------------------------------------------------- # XLSX # --------------------------------------------------------------------------- def _save_xlsx(data: dict, path: str, report_id: str, generated_at: str) -> None: wb = openpyxl.Workbook() ws = wb.active ws.title = "Metadata" ws.append(["Field", "Value"]) ws.append(["Generated At (UTC)", generated_at]) ws.append(["Report ID", report_id]) sheets = [ ("AW Sales KPIs", _normalise(data.get("aw_sales_kpis", {}))), ("AW Sales History", _normalise(data.get("aw_sales_history", []))), ("AW Sales Forecast", _normalise(data.get("aw_sales_forecast", []))), ("AW Rep Scores", _normalise(data.get("aw_rep_scores", []))), ("AW Product Demand", _normalise(data.get("aw_product_demand", []))), ("WWI Sales KPIs", _normalise(data.get("wwi_sales_kpis", {}))), ("WWI Stock Recs", _normalise(data.get("wwi_stock_recommendations", []))), ("WWI Supplier Scores", _normalise(data.get("wwi_supplier_scores", []))), ("WWI Business Events", _normalise(data.get("wwi_business_events", []))), ] for sheet_name, rows in sheets: ws = wb.create_sheet(title=sheet_name) if rows: ws.append(list(rows[0].keys())) for row in rows: ws.append([str(v) if v is not None else "" for v in row.values()]) else: ws.append(["No data"]) wb.save(path) # --------------------------------------------------------------------------- # PDF # --------------------------------------------------------------------------- def _pdf_table(rows: list[dict] | dict) -> Table: data = _normalise(rows) if not data: table_data: list[list] = [["No data available"]] n_cols = 1 else: headers = list(data[0].keys()) n_cols = len(headers) table_data = [headers] + [ [str(row.get(h, "")) for h in headers] for row in data ] col_w = (_PAGE_W - 2 * _MARGIN) / n_cols t = Table(table_data, colWidths=[col_w] * n_cols, repeatRows=1) style: list = [ ("BACKGROUND", (0, 0), (-1, 0), _HEADER_BG), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), ("FONTSIZE", (0, 0), (-1, 0), 8), ("FONTNAME", (0, 1), (-1, -1), "Helvetica"), ("FONTSIZE", (0, 1), (-1, -1), 7), ("ALIGN", (0, 0), (-1, -1), "LEFT"), ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#d1d5db")), ("TOPPADDING", (0, 0), (-1, -1), 3), ("BOTTOMPADDING", (0, 0), (-1, -1), 3), ("LEFTPADDING", (0, 0), (-1, -1), 5), ("RIGHTPADDING", (0, 0), (-1, -1), 5), ] for i in range(1, len(table_data)): bg = _ROW_BG if i % 2 == 1 else colors.white style.append(("BACKGROUND", (0, i), (-1, i), bg)) t.setStyle(TableStyle(style)) return t def _section(story: list, title: str, rows: list[dict] | dict, styles) -> None: story.append(Paragraph(title, styles["Heading2"])) story.append(Spacer(1, 0.25 * cm)) story.append(_pdf_table(rows)) story.append(Spacer(1, 0.5 * cm)) def _save_pdf(data: dict, path: str, report_id: str, generated_at: str) -> None: styles = getSampleStyleSheet() story: list = [] story.append(Paragraph("OTel BI Platform — Generated Report", styles["Title"])) story.append(Spacer(1, 0.2 * cm)) story.append(Paragraph( f"Report ID: {report_id}   |   Generated: {generated_at}", styles["Normal"], )) story.append(Spacer(1, 0.6 * cm)) story.append(Paragraph("AdventureWorks DW", styles["Heading1"])) story.append(Spacer(1, 0.3 * cm)) _section(story, "Sales KPIs", data.get("aw_sales_kpis", {}), styles) _section(story, "Sales History", data.get("aw_sales_history", []), styles) story.append(PageBreak()) _section(story, "Sales Forecast", data.get("aw_sales_forecast", []), styles) _section(story, "Rep Scores", data.get("aw_rep_scores", []), styles) _section(story, "Product Demand", data.get("aw_product_demand", []), styles) story.append(PageBreak()) story.append(Paragraph("WideWorldImporters DW", styles["Heading1"])) story.append(Spacer(1, 0.3 * cm)) _section(story, "Sales KPIs", data.get("wwi_sales_kpis", {}), styles) _section(story, "Stock Recommendations", data.get("wwi_stock_recommendations", []), styles) story.append(PageBreak()) _section(story, "Supplier Scores", data.get("wwi_supplier_scores", []), styles) _section(story, "Business Events", data.get("wwi_business_events", []), styles) doc = SimpleDocTemplate( path, pagesize=landscape(A4), leftMargin=_MARGIN, rightMargin=_MARGIN, topMargin=_MARGIN, bottomMargin=_MARGIN, ) doc.build(story) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def save_report(data: dict, output_dir: str) -> dict: """Generate XLSX and PDF reports from aggregated BI data and write both to *output_dir*.""" now = datetime.now(timezone.utc) ts = now.strftime("%Y%m%d_%H%M%S") uid = uuid.uuid4().hex[:6] report_id = f"{ts}_{uid}" generated_at = now.isoformat() out = Path(output_dir) out.mkdir(parents=True, exist_ok=True) base = f"otel_bi_report_{report_id}" xlsx_path = str(out / f"{base}.xlsx") pdf_path = str(out / f"{base}.pdf") _save_xlsx(data, xlsx_path, report_id, generated_at) _save_pdf(data, pdf_path, report_id, generated_at) return { "report_id": report_id, "generated_at": generated_at, "xlsx": {"filename": f"{base}.xlsx", "path": xlsx_path}, "pdf": {"filename": f"{base}.pdf", "path": pdf_path}, }