Design Document¶
AI Workloads is a FastAPI-based platform that meters AI inference emissions and retires verified carbon credits. The system is structured as a polling-based telemetry pipeline with four distinct phases: Ingest, Calculate, Aggregate, Retire. Each phase is decoupled via an async job queue (ARQ/Redis), making the system horizontally scalable and resilient to provider API instability.
The architecture follows a functional core / imperative shell pattern: all emissions calculations are pure functions, all side effects (polling, persistence, signing, S3 uploads) happen at the boundary in service classes and job handlers.
Context¶
- This is a greenfield FastAPI service for Carbon Labs' AI Workloads product. No existing codebase or legacy constraints. The service connects to AI provider accounts, meters inference emissions, and retires verified carbon credits with cryptographically signed receipts.
- The system has two runtime components: an API server handling HTTP requests and an ARQ worker handling background jobs (polling, reconciliation, billing, audit packs). Both share the same codebase and database but run as separate processes.
Key constraints:
- Provider usage APIs are poll-only (no webhooks) with varying schemas
- Provider API keys are high-value credentials requiring encryption at rest
- Receipt signing must be independently verifiable by third parties
- PDF generation (WeasyPrint) is CPU-bound and must not block the API event loop
- Billing close must tolerate late-arriving provider data (T+48h window)
Goals / Non-Goals¶
- End-to-end pipeline: connect → poll → calculate → display → bill → retire → sign → deliver
- Free tier with standalone analytics value (acquisition funnel)
- Paid tier with automated credit retirement and verifiable receipts (revenue path)
- Idempotent, self-healing background jobs that handle provider API failures gracefully
- Structured, auditable logging for compliance and debugging
- Real-time streaming telemetry (hourly polling is sufficient for the use case)
- Custom carbon factor uploads by users (admin-seeded only in v1)
- Multi-region deployment (single region in v1)
- Client-side SDK implementation (Phase 2, scaffold only)
- Dashboard frontend (separate Next.js repository consuming this API)
Technical Context¶
| Property | Value |
|---|---|
| Language | Python 3.12+ |
| Primary Dependencies | FastAPI, SQLAlchemy 2.0 (async), Pydantic v2, httpx (async), ARQ, PyNaCl, WeasyPrint, Jinja2, boto3/aioboto3, stripe, pyjwt, slowapi, structlog, orjson |
| Storage | PostgreSQL 16 via Supabase (asyncpg driver), Redis 7 (ARQ job queue + rate-limit counters) |
| Testing | pytest + pytest-asyncio + httpx (AsyncClient) + factory_boy + testcontainers |
| Target Platform | Linux containers on AWS ECS Fargate |
| Project Type | Web service (REST API + background workers) |
| Performance Goals | API p95 < 200ms, dashboard initial load < 3s, hourly poll cycle completes in < 5 min for 100 connections |
| Constraints | Receipt signing keys loaded from env/Secrets Manager at startup (not per-request); PDF generation offloaded to worker; provider API rate limits vary by tier |
| Initial Scale | 50 organizations, 200 connections, ~10k telemetry events/day |
| Growth Target | 1,000 orgs, 5k connections |
Architecture Diagrams¶
System Context¶
graph TB
subgraph External
OA[OpenAI Usage API]
AN[Anthropic Usage API]
OR[OpenRouter Activity API]
CL[Clerk Auth]
ST[Stripe Billing]
end
subgraph AWS
ALB[ALB / HTTPS]
subgraph ECS Fargate
API[FastAPI API<br/>min 2 tasks]
WRK[ARQ Workers<br/>auto-scale]
end
PG[(Supabase PostgreSQL)]
RD[(ElastiCache Redis)]
SM[Secrets Manager]
S3[S3 Buckets<br/>receipts + audit-packs]
end
subgraph Frontend
DASH[Next.js Dashboard<br/>CloudFront / Vercel]
end
DASH -->|REST API| ALB
ALB --> API
API --> PG
API --> RD
WRK --> PG
WRK --> RD
WRK --> SM
WRK --> S3
WRK -->|poll| OA
WRK -->|poll| AN
WRK -->|poll| OR
API -->|JWT verify| CL
API -->|Stripe API| ST
CL -->|webhook| API
ST -->|webhook| API
Telemetry Pipeline¶
sequenceDiagram
participant Cron as ARQ Cron (Hourly)
participant PollJob as poll_provider Job
participant Provider as Provider API
participant DB as PostgreSQL
participant CalcJob as compute_emissions Job
participant Factors as Carbon Factors
Cron->>PollJob: enqueue per active connection
PollJob->>DB: load connection + sync_cursor
PollJob->>Provider: GET usage (cursor-based)
Provider-->>PollJob: usage buckets
PollJob->>PollJob: map to TelemetryEventCandidate
PollJob->>PollJob: compute idempotency_hash (provider:org_id:model:bucket_start)
PollJob->>DB: UPSERT telemetry_events ON CONFLICT (idempotency_hash)
PollJob->>DB: update sync_cursor, last_polled_at
PollJob->>CalcJob: enqueue per new/updated event_id
CalcJob->>DB: load event
CalcJob->>Factors: load current version
CalcJob->>CalcJob: calculate_emissions() pure function
CalcJob->>DB: UPSERT carbon_calculations
Billing & Receipt Pipeline¶
sequenceDiagram
participant Stripe as Stripe Webhook
participant API as FastAPI
participant BillJob as close_billing_period Job
participant DB as PostgreSQL
participant Signer as ReceiptSigner (Ed25519)
participant PDF as WeasyPrint
participant S3 as S3
Stripe->>API: invoice.payment_succeeded
API->>DB: set billing_period.status = "closing"
API->>BillJob: enqueue (deferred T+48h)
Note over BillJob: Waits for reconciliation window
BillJob->>DB: aggregate CO2 for period
BillJob->>DB: allocate credit serial numbers
BillJob->>Signer: sign canonical payload
Signer-->>BillJob: (payload_hash, signature)
BillJob->>DB: insert CarbonReceipt
BillJob->>PDF: render receipt.html template
PDF-->>BillJob: PDF bytes
BillJob->>S3: upload PDF
BillJob->>DB: set billing_period.status = "closed"
Entity Relationship¶
erDiagram
Organization ||--o{ Project : "has many"
Organization ||--o{ ProviderConnection : "has many"
Organization ||--o{ BillingPeriod : "has many"
Organization ||--o{ AuditPack : "has many"
Project ||--o{ Workload : "has many"
ProviderConnection ||--o{ Workload : "has many"
Workload ||--o{ TelemetryEvent : "has many"
TelemetryEvent ||--|| CarbonCalculation : "has one"
BillingPeriod ||--o{ CarbonReceipt : "has many"
BillingPeriod ||--o{ PriorPeriodAdjustment : "has many"
CarbonFactors }o--o{ CarbonCalculation : "referenced by"
See Data Model for full entity definitions with columns, indexes, and constraints.
Component Design¶
1. Provider Connectors - app/connectors/¶
Each connector implements a Protocol defining two operations: validate and poll.
Design decisions:
idempotency_hashis a method on the connector (not static) because hash strategy varies by provider. OpenAI and OpenRouter useprovider:model:bucket_start_time(coarse, handles retroactive bucket revisions). Anthropic uses the same plus cache token counts since it provides stable breakdowns.PollResult.has_moredrives re-enqueue logic. Connectors paginate internally.- Each connector creates a fresh
httpx.AsyncClientper poll call. No persistent connections - workers are short-lived.
2. Telemetry Service - app/services/telemetry.py¶
Handles ingestion, dedup, and query aggregation.
The upsert strategy uses PostgreSQL ON CONFLICT (idempotency_hash) DO UPDATE SET for token count fields and sync_timestamp. This handles the case where OpenAI revises a usage bucket retroactively - we take the latest values.
Modified immutability
The immutability trigger only blocks UPDATEs to the idempotency_hash, event_timestamp, and model fields. Token counts and raw_payload can be updated during reconciliation.
3. Emissions Engine - app/services/emissions.py¶
Pure function. No database access, no side effects.
Model tier resolution uses fnmatch with first-match-wins ordering. The Carbon Factors JSON stores tiers in priority order - reasoning models first (since o1* would otherwise match nothing specific), then large, medium, small. Default is "medium."
4. Receipt & Signing - app/services/signing.py, app/services/receipts.py¶
Two-layer design:
- ReceiptSigner: Pure cryptographic operations. Takes private key bytes, produces signatures.
- ReceiptService: Orchestrates receipt creation (DB, signing, PDF, S3).
Signing flow:
- Build canonical payload dict (sorted keys, no whitespace)
- SHA-256 hash the canonical JSON
- Ed25519 sign the hash bytes
- Store
sha256:{hash}and base64-encoded signature on the receipt record
Key rotation: signing keys are versioned. Each receipt stores a key_version field. The verification endpoint fetches the correct public key for the receipt's key_version.
5. Billing Service - app/services/billing.py¶
Handles Stripe webhook events and billing period lifecycle.
The closing → closed transition is deferred: when we receive invoice.payment_succeeded, we enqueue close_billing_period with a 48-hour delay (_defer_by in ARQ). This ensures the reconciliation job has run at least once before we finalize.
Prior period adjustments: if telemetry arrives after a period is closed, the ingest_batch method checks the event's billing period status. If closed, it creates a PriorPeriodAdjustment record linked to the next open period. The next billing cycle includes these adjustments.
6. Workload Assignment - app/services/connections.py¶
When a connection is created:
- If
project_idis provided, create a Workload under that project linked to this connection. - If
project_idis null, find-or-create a "Default" project for the org, then create a Workload under it.
When events are polled, they're assigned to the connection's active workload. A connection has exactly one active workload at a time. Users can re-map a connection to a different project, which creates a new workload under the new project, sets the old workload to inactive, and routes future events to the new workload while historical events stay where they are.
7. Auth Dependency - app/core/deps.py¶
Every authenticated endpoint resolves the current organization via a FastAPI dependency. All downstream queries use org.id - there is no path for cross-org data access.
8. Background Jobs - app/workers.py, app/jobs/¶
ARQ worker with cron + event-driven jobs. Error classification distinguishes transient failures (429, 503, timeout) from permanent failures (401, 403, 404) for retry logic.
9. Health & Observability - app/api/v1/health.py, app/core/logging.py¶
/health endpoint (no auth) checks database, Redis, and last poll recency.
Structured logging via structlog: correlation ID generated per request (middleware) and propagated to background jobs. All log entries include timestamp, level, correlation_id, org_id (if authenticated), and event name. Sensitive fields (api_key, credentials) are redacted via a processor.
Architecture Decisions¶
See Research for the full decision log with rationale and alternatives (R1-R9).
Implementation Decisions¶
| Decision | Choice | Rationale |
|---|---|---|
| Layered architecture | API → Service → Model with DI | Enables unit testing services without HTTP; fat controllers rejected (untestable), DDD rejected (over-engineered for 11 tables) |
| Relaxed immutability trigger | Protect hash/timestamp/model only | Token counts must be updatable during reconciliation upserts; full immutability would block recon |
| UUID as_uuid=True | Native PostgreSQL UUID type | 16 bytes vs 36-byte string; proper indexing; aligns with SQLAlchemy best practices |
| Default project auto-creation | Find-or-create "Default" | Eliminates the "polled data with no home" problem; users can re-organize later |
| Prior period adjustments | Separate table, rolled into next period | Cleaner than amending closed receipts; auditors can see the adjustment trail |
| Signing key versioning | key_version on each receipt | Enables annual rotation without invalidating old receipts |
| structlog for logging | Structured JSON + correlation IDs | Essential for debugging async job failures across API and worker processes |
| Single Redis instance | Separate databases (db 0 ARQ, db 1 rate limits) | Simpler ops; if Redis down, ARQ queues up, rate limits fail open, health endpoint detects |
| Denormalized org_id | Set at ingest time on TelemetryEvent | Avoids 3-table join (telemetry → workload → connection → org) on every dashboard query |
| WeasyPrint system deps | Pinned in Dockerfile (pango, cairo, gdk-pixbuf) | Pure Python PDF renderer; no headless browser; sufficient for receipt layouts |
Error Mapping¶
| Domain Exception | HTTP Status | Error Code |
|---|---|---|
ConnectionValidationError |
400 | connection_validation_failed |
ConnectionNotFoundError |
404 | connection_not_found |
RateLimitExceededError |
429 | rate_limit_exceeded |
BillingPeriodNotOpenError |
409 | billing_period_not_open |
ReceiptNotFoundError |
404 | receipt_not_found |
InvalidSignatureError |
422 | receipt_signature_invalid |
Constitution Check¶
All gates pass
| Gate | Status | Notes |
|---|---|---|
| No secrets in code or config files | PASS | API keys → Secrets Manager ARN only; signing keys → env vars |
| Structured logging everywhere | PASS | structlog with correlation IDs per requirement 7.9 |
| All external calls async | PASS | httpx.AsyncClient for providers, asyncpg for DB, aioboto3 for AWS |
| Idempotent background jobs | PASS | Upsert on idempotency hash; billing close uses status machine |
| Error classification | PASS | Transient (retry w/ backoff) vs permanent (stop + alert) per requirement 7.5 |
Project Structure¶
app/
├── __init__.py
├── main.py # FastAPI app, lifespan, middleware
├── core/
│ ├── config.py # pydantic-settings (env vars)
│ ├── database.py # async engine, session factory
│ ├── logging.py # structlog setup, correlation IDs
│ ├── auth.py # Clerk JWT verification (JWKS)
│ ├── security.py # Secrets Manager client, key encryption
│ └── errors.py # Error classification, exception handlers
├── models/
│ ├── base.py # DeclarativeBase, TimestampMixin
│ ├── enums.py # Provider, ConnectionStatus, PlanTier, etc.
│ └── db.py # All ORM models (11 tables)
├── schemas/
│ ├── connections.py # Pydantic request/response models
│ ├── telemetry.py
│ ├── receipts.py
│ ├── billing.py
│ ├── projects.py
│ └── common.py # Pagination, error responses
├── services/
│ ├── connection_service.py # CRUD + key validation + Secrets Manager
│ ├── telemetry_service.py # Ingest, deduplicate, query
│ ├── emissions_engine.py # Token → energy → CO2 calculation
│ ├── receipt_service.py # Ed25519 signing, PDF generation, verification
│ ├── billing_service.py # Period lifecycle, Stripe sync, credit retirement
│ ├── project_service.py # Project/workload CRUD, re-mapping
│ ├── audit_service.py # Audit pack generation
│ └── export_service.py # CSV/JSON export
├── connectors/
│ ├── base.py # Connector Protocol definition
│ ├── openai.py # OpenAI usage API connector
│ ├── anthropic.py # Anthropic Messages API connector
│ └── openrouter.py # OpenRouter usage connector
├── jobs/
│ ├── worker.py # ARQ WorkerSettings
│ ├── poll_usage.py # Hourly polling job
│ ├── reconciliation.py # T+24h trailing reconciliation
│ ├── billing_close.py # Period close + receipt generation
│ └── audit_pack.py # Monthly audit pack generation
├── api/
│ ├── deps.py # Dependency injection (session, auth, org)
│ └── v1/
│ ├── __init__.py # v1 router aggregation
│ ├── connections.py # /v1/connections endpoints
│ ├── telemetry.py # /v1/telemetry endpoints
│ ├── receipts.py # /v1/receipts + /public/receipts/verify
│ ├── billing.py # /v1/billing endpoints
│ ├── projects.py # /v1/projects endpoints
│ ├── export.py # /v1/export endpoints
│ └── health.py # /health endpoint
└── templates/
└── receipt.html # Jinja2 template for PDF receipts
alembic/
├── alembic.ini
├── env.py
└── versions/
tests/
├── conftest.py # Fixtures: async session, test client, factories
├── unit/
│ ├── test_emissions_engine.py
│ ├── test_receipt_signing.py
│ └── test_connectors.py
├── integration/
│ ├── test_connection_flow.py
│ ├── test_telemetry_pipeline.py
│ ├── test_billing_lifecycle.py
│ └── test_receipt_verification.py
└── contract/
└── test_api_contracts.py
docker-compose.yml
Dockerfile
pyproject.toml
env.example
Note
Single-project web service layout. The app/ package follows a layered architecture: api/ (HTTP layer) → services/ (business logic) → models/ (ORM/DB) with connectors/ as external adapters and jobs/ as background workers. The Next.js dashboard is a separate repository consuming this API.
Risks / Trade-offs¶
| Risk | Mitigation |
|---|---|
| Provider API changes | Each connector is isolated. API changes affect only one file. Monitor provider changelogs. |
| WeasyPrint system deps in Docker | Pin versions in Dockerfile. System deps (pango, cairo, gdk-pixbuf) are well-maintained. Test PDF output in CI. |
| Secrets Manager latency on every poll | Cache decrypted keys briefly (5 min TTL) in the worker process. Keys are fetched once per poll cycle, not per request. |
| Single Redis for both ARQ and rate limiting | Use separate Redis databases (db 0 for ARQ, db 1 for rate limits). If Redis goes down, ARQ jobs queue up and rate limits fail open. The health endpoint detects this. |
| Carbon factor accuracy | Published uncertainty bounds (30% default). Factors are versioned and immutable. Updated versions released as research improves. |
| Stripe webhook reliability | Stripe retries for up to 3 days. Handler is idempotent (checks period status before transitioning). Manual billing close trigger available for operations. |
Open Questions¶
| Question | Current Assumption |
|---|---|
| Credit inventory management - how are carbon credits sourced and loaded into inventory? | Manual database seeding by operations in v1, API in v2 |
| Email notifications - should the system send emails on connection errors, billing failures, and receipt availability? | Deferred to v2; logs and dashboard indicators in v1 |
| Rate limit configuration - should tier-based rate limits differ? | Uniform limits in v1, tier-based in v2 |
Complexity Tracking¶
| Violation | Why Needed | Simpler Alternative Rejected Because |
|---|---|---|
| Connector Protocol abstraction | Three providers with different APIs and token schemas need unified interface | Direct inline HTTP calls would duplicate error handling and token mapping logic across 3 implementations |
| ARQ worker separate from API | PDF generation (WeasyPrint) is CPU-bound and blocks event loop | In-process background tasks would degrade API response times under load |
| Secrets Manager for API keys | Provider API keys are high-value credentials that must not appear in DB dumps | Environment variables don't scale past a few keys and can't be rotated per-org |