Skip to content

Design Document

AI Workloads is a FastAPI-based platform that meters AI inference emissions and retires verified carbon credits. The system is structured as a polling-based telemetry pipeline with four distinct phases: Ingest, Calculate, Aggregate, Retire. Each phase is decoupled via an async job queue (ARQ/Redis), making the system horizontally scalable and resilient to provider API instability.

The architecture follows a functional core / imperative shell pattern: all emissions calculations are pure functions, all side effects (polling, persistence, signing, S3 uploads) happen at the boundary in service classes and job handlers.

Context

  • This is a greenfield FastAPI service for Carbon Labs' AI Workloads product. No existing codebase or legacy constraints. The service connects to AI provider accounts, meters inference emissions, and retires verified carbon credits with cryptographically signed receipts.
  • The system has two runtime components: an API server handling HTTP requests and an ARQ worker handling background jobs (polling, reconciliation, billing, audit packs). Both share the same codebase and database but run as separate processes.

Key constraints:

  • Provider usage APIs are poll-only (no webhooks) with varying schemas
  • Provider API keys are high-value credentials requiring encryption at rest
  • Receipt signing must be independently verifiable by third parties
  • PDF generation (WeasyPrint) is CPU-bound and must not block the API event loop
  • Billing close must tolerate late-arriving provider data (T+48h window)

Goals / Non-Goals

  • End-to-end pipeline: connect → poll → calculate → display → bill → retire → sign → deliver
  • Free tier with standalone analytics value (acquisition funnel)
  • Paid tier with automated credit retirement and verifiable receipts (revenue path)
  • Idempotent, self-healing background jobs that handle provider API failures gracefully
  • Structured, auditable logging for compliance and debugging
  • Real-time streaming telemetry (hourly polling is sufficient for the use case)
  • Custom carbon factor uploads by users (admin-seeded only in v1)
  • Multi-region deployment (single region in v1)
  • Client-side SDK implementation (Phase 2, scaffold only)
  • Dashboard frontend (separate Next.js repository consuming this API)

Technical Context

Property Value
Language Python 3.12+
Primary Dependencies FastAPI, SQLAlchemy 2.0 (async), Pydantic v2, httpx (async), ARQ, PyNaCl, WeasyPrint, Jinja2, boto3/aioboto3, stripe, pyjwt, slowapi, structlog, orjson
Storage PostgreSQL 16 via Supabase (asyncpg driver), Redis 7 (ARQ job queue + rate-limit counters)
Testing pytest + pytest-asyncio + httpx (AsyncClient) + factory_boy + testcontainers
Target Platform Linux containers on AWS ECS Fargate
Project Type Web service (REST API + background workers)
Performance Goals API p95 < 200ms, dashboard initial load < 3s, hourly poll cycle completes in < 5 min for 100 connections
Constraints Receipt signing keys loaded from env/Secrets Manager at startup (not per-request); PDF generation offloaded to worker; provider API rate limits vary by tier
Initial Scale 50 organizations, 200 connections, ~10k telemetry events/day
Growth Target 1,000 orgs, 5k connections

Architecture Diagrams

System Context

graph TB
    subgraph External
        OA[OpenAI Usage API]
        AN[Anthropic Usage API]
        OR[OpenRouter Activity API]
        CL[Clerk Auth]
        ST[Stripe Billing]
    end

    subgraph AWS
        ALB[ALB / HTTPS]
        subgraph ECS Fargate
            API[FastAPI API<br/>min 2 tasks]
            WRK[ARQ Workers<br/>auto-scale]
        end
        PG[(Supabase PostgreSQL)]
        RD[(ElastiCache Redis)]
        SM[Secrets Manager]
        S3[S3 Buckets<br/>receipts + audit-packs]
    end

    subgraph Frontend
        DASH[Next.js Dashboard<br/>CloudFront / Vercel]
    end

    DASH -->|REST API| ALB
    ALB --> API
    API --> PG
    API --> RD
    WRK --> PG
    WRK --> RD
    WRK --> SM
    WRK --> S3
    WRK -->|poll| OA
    WRK -->|poll| AN
    WRK -->|poll| OR
    API -->|JWT verify| CL
    API -->|Stripe API| ST
    CL -->|webhook| API
    ST -->|webhook| API

Telemetry Pipeline

sequenceDiagram
    participant Cron as ARQ Cron (Hourly)
    participant PollJob as poll_provider Job
    participant Provider as Provider API
    participant DB as PostgreSQL
    participant CalcJob as compute_emissions Job
    participant Factors as Carbon Factors

    Cron->>PollJob: enqueue per active connection
    PollJob->>DB: load connection + sync_cursor
    PollJob->>Provider: GET usage (cursor-based)
    Provider-->>PollJob: usage buckets
    PollJob->>PollJob: map to TelemetryEventCandidate
    PollJob->>PollJob: compute idempotency_hash (provider:org_id:model:bucket_start)
    PollJob->>DB: UPSERT telemetry_events ON CONFLICT (idempotency_hash)
    PollJob->>DB: update sync_cursor, last_polled_at
    PollJob->>CalcJob: enqueue per new/updated event_id
    CalcJob->>DB: load event
    CalcJob->>Factors: load current version
    CalcJob->>CalcJob: calculate_emissions() pure function
    CalcJob->>DB: UPSERT carbon_calculations

Billing & Receipt Pipeline

sequenceDiagram
    participant Stripe as Stripe Webhook
    participant API as FastAPI
    participant BillJob as close_billing_period Job
    participant DB as PostgreSQL
    participant Signer as ReceiptSigner (Ed25519)
    participant PDF as WeasyPrint
    participant S3 as S3

    Stripe->>API: invoice.payment_succeeded
    API->>DB: set billing_period.status = "closing"
    API->>BillJob: enqueue (deferred T+48h)
    Note over BillJob: Waits for reconciliation window
    BillJob->>DB: aggregate CO2 for period
    BillJob->>DB: allocate credit serial numbers
    BillJob->>Signer: sign canonical payload
    Signer-->>BillJob: (payload_hash, signature)
    BillJob->>DB: insert CarbonReceipt
    BillJob->>PDF: render receipt.html template
    PDF-->>BillJob: PDF bytes
    BillJob->>S3: upload PDF
    BillJob->>DB: set billing_period.status = "closed"

Entity Relationship

erDiagram
    Organization ||--o{ Project : "has many"
    Organization ||--o{ ProviderConnection : "has many"
    Organization ||--o{ BillingPeriod : "has many"
    Organization ||--o{ AuditPack : "has many"
    Project ||--o{ Workload : "has many"
    ProviderConnection ||--o{ Workload : "has many"
    Workload ||--o{ TelemetryEvent : "has many"
    TelemetryEvent ||--|| CarbonCalculation : "has one"
    BillingPeriod ||--o{ CarbonReceipt : "has many"
    BillingPeriod ||--o{ PriorPeriodAdjustment : "has many"
    CarbonFactors }o--o{ CarbonCalculation : "referenced by"

See Data Model for full entity definitions with columns, indexes, and constraints.

Component Design

1. Provider Connectors - app/connectors/

Each connector implements a Protocol defining two operations: validate and poll.

Design decisions:

  • idempotency_hash is a method on the connector (not static) because hash strategy varies by provider. OpenAI and OpenRouter use provider:model:bucket_start_time (coarse, handles retroactive bucket revisions). Anthropic uses the same plus cache token counts since it provides stable breakdowns.
  • PollResult.has_more drives re-enqueue logic. Connectors paginate internally.
  • Each connector creates a fresh httpx.AsyncClient per poll call. No persistent connections - workers are short-lived.

2. Telemetry Service - app/services/telemetry.py

Handles ingestion, dedup, and query aggregation.

The upsert strategy uses PostgreSQL ON CONFLICT (idempotency_hash) DO UPDATE SET for token count fields and sync_timestamp. This handles the case where OpenAI revises a usage bucket retroactively - we take the latest values.

Modified immutability

The immutability trigger only blocks UPDATEs to the idempotency_hash, event_timestamp, and model fields. Token counts and raw_payload can be updated during reconciliation.

3. Emissions Engine - app/services/emissions.py

Pure function. No database access, no side effects.

Model tier resolution uses fnmatch with first-match-wins ordering. The Carbon Factors JSON stores tiers in priority order - reasoning models first (since o1* would otherwise match nothing specific), then large, medium, small. Default is "medium."

4. Receipt & Signing - app/services/signing.py, app/services/receipts.py

Two-layer design:

  • ReceiptSigner: Pure cryptographic operations. Takes private key bytes, produces signatures.
  • ReceiptService: Orchestrates receipt creation (DB, signing, PDF, S3).

Signing flow:

  1. Build canonical payload dict (sorted keys, no whitespace)
  2. SHA-256 hash the canonical JSON
  3. Ed25519 sign the hash bytes
  4. Store sha256:{hash} and base64-encoded signature on the receipt record

Key rotation: signing keys are versioned. Each receipt stores a key_version field. The verification endpoint fetches the correct public key for the receipt's key_version.

5. Billing Service - app/services/billing.py

Handles Stripe webhook events and billing period lifecycle.

The closing → closed transition is deferred: when we receive invoice.payment_succeeded, we enqueue close_billing_period with a 48-hour delay (_defer_by in ARQ). This ensures the reconciliation job has run at least once before we finalize.

Prior period adjustments: if telemetry arrives after a period is closed, the ingest_batch method checks the event's billing period status. If closed, it creates a PriorPeriodAdjustment record linked to the next open period. The next billing cycle includes these adjustments.

6. Workload Assignment - app/services/connections.py

When a connection is created:

  1. If project_id is provided, create a Workload under that project linked to this connection.
  2. If project_id is null, find-or-create a "Default" project for the org, then create a Workload under it.

When events are polled, they're assigned to the connection's active workload. A connection has exactly one active workload at a time. Users can re-map a connection to a different project, which creates a new workload under the new project, sets the old workload to inactive, and routes future events to the new workload while historical events stay where they are.

7. Auth Dependency - app/core/deps.py

Every authenticated endpoint resolves the current organization via a FastAPI dependency. All downstream queries use org.id - there is no path for cross-org data access.

8. Background Jobs - app/workers.py, app/jobs/

ARQ worker with cron + event-driven jobs. Error classification distinguishes transient failures (429, 503, timeout) from permanent failures (401, 403, 404) for retry logic.

9. Health & Observability - app/api/v1/health.py, app/core/logging.py

/health endpoint (no auth) checks database, Redis, and last poll recency.

Structured logging via structlog: correlation ID generated per request (middleware) and propagated to background jobs. All log entries include timestamp, level, correlation_id, org_id (if authenticated), and event name. Sensitive fields (api_key, credentials) are redacted via a processor.

Architecture Decisions

See Research for the full decision log with rationale and alternatives (R1-R9).

Implementation Decisions

Decision Choice Rationale
Layered architecture API → Service → Model with DI Enables unit testing services without HTTP; fat controllers rejected (untestable), DDD rejected (over-engineered for 11 tables)
Relaxed immutability trigger Protect hash/timestamp/model only Token counts must be updatable during reconciliation upserts; full immutability would block recon
UUID as_uuid=True Native PostgreSQL UUID type 16 bytes vs 36-byte string; proper indexing; aligns with SQLAlchemy best practices
Default project auto-creation Find-or-create "Default" Eliminates the "polled data with no home" problem; users can re-organize later
Prior period adjustments Separate table, rolled into next period Cleaner than amending closed receipts; auditors can see the adjustment trail
Signing key versioning key_version on each receipt Enables annual rotation without invalidating old receipts
structlog for logging Structured JSON + correlation IDs Essential for debugging async job failures across API and worker processes
Single Redis instance Separate databases (db 0 ARQ, db 1 rate limits) Simpler ops; if Redis down, ARQ queues up, rate limits fail open, health endpoint detects
Denormalized org_id Set at ingest time on TelemetryEvent Avoids 3-table join (telemetry → workload → connection → org) on every dashboard query
WeasyPrint system deps Pinned in Dockerfile (pango, cairo, gdk-pixbuf) Pure Python PDF renderer; no headless browser; sufficient for receipt layouts

Error Mapping

Domain Exception HTTP Status Error Code
ConnectionValidationError 400 connection_validation_failed
ConnectionNotFoundError 404 connection_not_found
RateLimitExceededError 429 rate_limit_exceeded
BillingPeriodNotOpenError 409 billing_period_not_open
ReceiptNotFoundError 404 receipt_not_found
InvalidSignatureError 422 receipt_signature_invalid

Constitution Check

All gates pass

Gate Status Notes
No secrets in code or config files PASS API keys → Secrets Manager ARN only; signing keys → env vars
Structured logging everywhere PASS structlog with correlation IDs per requirement 7.9
All external calls async PASS httpx.AsyncClient for providers, asyncpg for DB, aioboto3 for AWS
Idempotent background jobs PASS Upsert on idempotency hash; billing close uses status machine
Error classification PASS Transient (retry w/ backoff) vs permanent (stop + alert) per requirement 7.5

Project Structure

app/
├── __init__.py
├── main.py                    # FastAPI app, lifespan, middleware
├── core/
│   ├── config.py              # pydantic-settings (env vars)
│   ├── database.py            # async engine, session factory
│   ├── logging.py             # structlog setup, correlation IDs
│   ├── auth.py                # Clerk JWT verification (JWKS)
│   ├── security.py            # Secrets Manager client, key encryption
│   └── errors.py              # Error classification, exception handlers
├── models/
│   ├── base.py                # DeclarativeBase, TimestampMixin
│   ├── enums.py               # Provider, ConnectionStatus, PlanTier, etc.
│   └── db.py                  # All ORM models (11 tables)
├── schemas/
│   ├── connections.py         # Pydantic request/response models
│   ├── telemetry.py
│   ├── receipts.py
│   ├── billing.py
│   ├── projects.py
│   └── common.py              # Pagination, error responses
├── services/
│   ├── connection_service.py  # CRUD + key validation + Secrets Manager
│   ├── telemetry_service.py   # Ingest, deduplicate, query
│   ├── emissions_engine.py    # Token → energy → CO2 calculation
│   ├── receipt_service.py     # Ed25519 signing, PDF generation, verification
│   ├── billing_service.py     # Period lifecycle, Stripe sync, credit retirement
│   ├── project_service.py     # Project/workload CRUD, re-mapping
│   ├── audit_service.py       # Audit pack generation
│   └── export_service.py      # CSV/JSON export
├── connectors/
│   ├── base.py                # Connector Protocol definition
│   ├── openai.py              # OpenAI usage API connector
│   ├── anthropic.py           # Anthropic Messages API connector
│   └── openrouter.py          # OpenRouter usage connector
├── jobs/
│   ├── worker.py              # ARQ WorkerSettings
│   ├── poll_usage.py          # Hourly polling job
│   ├── reconciliation.py      # T+24h trailing reconciliation
│   ├── billing_close.py       # Period close + receipt generation
│   └── audit_pack.py          # Monthly audit pack generation
├── api/
│   ├── deps.py                # Dependency injection (session, auth, org)
│   └── v1/
│       ├── __init__.py        # v1 router aggregation
│       ├── connections.py     # /v1/connections endpoints
│       ├── telemetry.py       # /v1/telemetry endpoints
│       ├── receipts.py        # /v1/receipts + /public/receipts/verify
│       ├── billing.py         # /v1/billing endpoints
│       ├── projects.py        # /v1/projects endpoints
│       ├── export.py          # /v1/export endpoints
│       └── health.py          # /health endpoint
└── templates/
    └── receipt.html           # Jinja2 template for PDF receipts

alembic/
├── alembic.ini
├── env.py
└── versions/

tests/
├── conftest.py                # Fixtures: async session, test client, factories
├── unit/
│   ├── test_emissions_engine.py
│   ├── test_receipt_signing.py
│   └── test_connectors.py
├── integration/
│   ├── test_connection_flow.py
│   ├── test_telemetry_pipeline.py
│   ├── test_billing_lifecycle.py
│   └── test_receipt_verification.py
└── contract/
    └── test_api_contracts.py

docker-compose.yml
Dockerfile
pyproject.toml
env.example

Note

Single-project web service layout. The app/ package follows a layered architecture: api/ (HTTP layer) → services/ (business logic) → models/ (ORM/DB) with connectors/ as external adapters and jobs/ as background workers. The Next.js dashboard is a separate repository consuming this API.

Risks / Trade-offs

Risk Mitigation
Provider API changes Each connector is isolated. API changes affect only one file. Monitor provider changelogs.
WeasyPrint system deps in Docker Pin versions in Dockerfile. System deps (pango, cairo, gdk-pixbuf) are well-maintained. Test PDF output in CI.
Secrets Manager latency on every poll Cache decrypted keys briefly (5 min TTL) in the worker process. Keys are fetched once per poll cycle, not per request.
Single Redis for both ARQ and rate limiting Use separate Redis databases (db 0 for ARQ, db 1 for rate limits). If Redis goes down, ARQ jobs queue up and rate limits fail open. The health endpoint detects this.
Carbon factor accuracy Published uncertainty bounds (30% default). Factors are versioned and immutable. Updated versions released as research improves.
Stripe webhook reliability Stripe retries for up to 3 days. Handler is idempotent (checks period status before transitioning). Manual billing close trigger available for operations.

Open Questions

Question Current Assumption
Credit inventory management - how are carbon credits sourced and loaded into inventory? Manual database seeding by operations in v1, API in v2
Email notifications - should the system send emails on connection errors, billing failures, and receipt availability? Deferred to v2; logs and dashboard indicators in v1
Rate limit configuration - should tier-based rate limits differ? Uniform limits in v1, tier-based in v2

Complexity Tracking

Violation Why Needed Simpler Alternative Rejected Because
Connector Protocol abstraction Three providers with different APIs and token schemas need unified interface Direct inline HTTP calls would duplicate error handling and token mapping logic across 3 implementations
ARQ worker separate from API PDF generation (WeasyPrint) is CPU-bound and blocks event loop In-process background tasks would degrade API response times under load
Secrets Manager for API keys Provider API keys are high-value credentials that must not appear in DB dumps Environment variables don't scale past a few keys and can't be rotated per-org