Deploying an LLM API in production requires more than wrapping an API call in a function. This guide covers building a robust, cost-conscious LLM API service.


Architecture Overview

Client → API Gateway → FastAPI → Rate Limiter → LLM Provider (Anthropic/OpenAI)

                         Request Logger → Database

Project Structure

llm-api/
├── app/
│   ├── main.py          # FastAPI app entry point
│   ├── routers/
│   │   └── chat.py      # Chat endpoints
│   ├── middleware/
│   │   ├── auth.py      # API key validation
│   │   └── rate_limit.py # Rate limiting
│   ├── services/
│   │   └── llm.py       # LLM provider abstraction
│   └── models/
│       └── schemas.py   # Pydantic models
├── Dockerfile
├── docker-compose.yml
└── requirements.txt

Core API Implementation

# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum

class Role(str, Enum):
    user = "user"
    assistant = "assistant"
    system = "system"

class Message(BaseModel):
    role: Role
    content: str

class ChatRequest(BaseModel):
    messages: list[Message]
    model: str = "claude-opus-4-7"
    max_tokens: int = Field(default=1024, ge=1, le=4096)
    temperature: float = Field(default=0.7, ge=0, le=1)
    stream: bool = False
    system: Optional[str] = None

class Usage(BaseModel):
    input_tokens: int
    output_tokens: int
    cost_usd: float

class ChatResponse(BaseModel):
    id: str
    content: str
    usage: Usage
    model: str
# app/services/llm.py
import anthropic
from ..models.schemas import ChatRequest, ChatResponse, Usage
import uuid
import time

# Pricing per million tokens (update as needed)
PRICING = {
    "claude-opus-4-7": {"input": 15.0, "output": 75.0},
    "claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
    "claude-haiku-4-5-20251001": {"input": 0.25, "output": 1.25},
}

client = anthropic.Anthropic()

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    if model not in PRICING:
        return 0.0
    price = PRICING[model]
    return (input_tokens * price["input"] + output_tokens * price["output"]) / 1_000_000

async def chat_completion(request: ChatRequest) -> ChatResponse:
    kwargs = {
        "model": request.model,
        "max_tokens": request.max_tokens,
        "messages": [{"role": m.role.value, "content": m.content} for m in request.messages],
    }
    
    if request.system:
        kwargs["system"] = request.system
    
    response = client.messages.create(**kwargs)
    
    usage = Usage(
        input_tokens=response.usage.input_tokens,
        output_tokens=response.usage.output_tokens,
        cost_usd=calculate_cost(
            request.model,
            response.usage.input_tokens,
            response.usage.output_tokens,
        ),
    )
    
    return ChatResponse(
        id=str(uuid.uuid4()),
        content=response.content[0].text,
        usage=usage,
        model=request.model,
    )

FastAPI App with Streaming

# app/routers/chat.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from ..models.schemas import ChatRequest, ChatResponse
from ..services.llm import chat_completion, client
from ..middleware.auth import verify_api_key
import anthropic
import json

router = APIRouter(prefix="/v1", tags=["chat"])

@router.post("/chat/completions", response_model=ChatResponse)
async def create_chat_completion(
    request: ChatRequest,
    api_key: str = Depends(verify_api_key),
):
    if request.stream:
        return StreamingResponse(
            stream_response(request),
            media_type="text/event-stream",
        )
    
    try:
        response = await chat_completion(request)
        return response
    except anthropic.APIError as e:
        raise HTTPException(status_code=e.status_code, detail=str(e))

async def stream_response(request: ChatRequest):
    stream = client.messages.stream(
        model=request.model,
        max_tokens=request.max_tokens,
        messages=[{"role": m.role.value, "content": m.content} for m in request.messages],
        system=request.system,
    )
    
    with stream as s:
        for text in s.text_stream:
            chunk = {"type": "delta", "text": text}
            yield f"data: {json.dumps(chunk)}\n\n"
    
    final = stream.get_final_message()
    done_data = {
        "type": "done",
        "usage": {
            "input_tokens": final.usage.input_tokens,
            "output_tokens": final.usage.output_tokens,
        },
    }
    yield f"data: {json.dumps(done_data)}\n\n"
    yield "data: [DONE]\n\n"

Authentication Middleware

# app/middleware/auth.py
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os
import hashlib

security = HTTPBearer()

# In production, load from database
VALID_API_KEYS = {
    hashlib.sha256(key.encode()).hexdigest(): {"name": "default", "tier": "standard"}
    for key in os.environ.get("API_KEYS", "").split(",")
    if key
}

def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
    key_hash = hashlib.sha256(credentials.credentials.encode()).hexdigest()
    
    if key_hash not in VALID_API_KEYS:
        raise HTTPException(status_code=401, detail="Invalid API key")
    
    return credentials.credentials

Rate Limiting with Redis

# app/middleware/rate_limit.py
import redis
import time
from fastapi import HTTPException

redis_client = redis.from_url("redis://redis:6379")

def rate_limit(api_key: str, limit: int = 60, window: int = 60) -> None:
    key = f"rate_limit:{api_key}"
    pipe = redis_client.pipeline()
    now = time.time()
    
    pipe.zremrangebyscore(key, 0, now - window)
    pipe.zadd(key, {str(now): now})
    pipe.zcard(key)
    pipe.expire(key, window)
    
    results = pipe.execute()
    request_count = results[2]
    
    if request_count > limit:
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={"Retry-After": str(window)},
        )

Main App

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import chat

app = FastAPI(title="LLM API", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["POST", "GET"],
    allow_headers=["*"],
)

app.include_router(chat.router)

@app.get("/health")
def health():
    return {"status": "ok"}

Docker Setup

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY app/ ./app/

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - API_KEYS=${API_KEYS}
    depends_on:
      - redis
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped

volumes:
  redis_data:

Cost Tracking

Track spending per API key:

# In chat endpoint, after response
async def log_usage(api_key: str, request: ChatRequest, response: ChatResponse):
    redis_client.incrby(f"cost:{api_key}:tokens_in", response.usage.input_tokens)
    redis_client.incrby(f"cost:{api_key}:tokens_out", response.usage.output_tokens)
    
    cost_cents = int(response.usage.cost_usd * 100)
    redis_client.incrbyfloat(f"cost:{api_key}:usd", response.usage.cost_usd)
    
    # Daily spend tracking
    today = time.strftime("%Y-%m-%d")
    redis_client.incrbyfloat(f"cost:daily:{today}", response.usage.cost_usd)

Requirements

# requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.30.0
anthropic==0.28.0
redis==5.0.0
pydantic==2.7.0
python-multipart==0.0.9

Production Checklist

  • API keys stored in environment variables, not code
  • Rate limiting configured per tier
  • Request/response logging to persistent storage
  • Cost alerts configured
  • Health check endpoint working
  • Graceful error handling (don’t expose internal errors)
  • HTTPS/TLS termination at load balancer
  • Timeout configured for LLM calls (avoid hanging requests)
  • Retry logic with exponential backoff for transient errors