Streaming AI responses dramatically improves user experience — users see text appearing in real time instead of waiting 10-30 seconds for a complete response. This guide covers implementing streaming from API to frontend.
Why Streaming Matters
Without streaming: User submits message → waits 10-20 seconds → sees complete response
With streaming: User submits message → sees first token in 1-2 seconds → reads as text generates
Streaming reduces perceived latency by 10x even when total generation time is the same.
Streaming with Claude API
Python
import anthropic
client = anthropic.Anthropic()
# Method 1: Using the stream context manager
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
messages=[{"role": "user", "content": "Explain quantum computing in detail."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Get the final message after streaming
final_message = stream.get_final_message()
print(f"\n\nInput tokens: {final_message.usage.input_tokens}")
# Method 2: Raw event stream
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_stop":
print("\n[Stream complete]")
Python with FastAPI (Server-Side)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.Anthropic()
async def generate_stream(message: str):
"""Generator that yields SSE-formatted chunks."""
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
messages=[{"role": "user", "content": message}]
) as stream:
for text in stream.text_stream:
# Format as Server-Sent Events
yield f"data: {json.dumps({'text': text})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
generate_stream(request.message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # Required for nginx
}
)
Streaming with OpenAI API
from openai import OpenAI
client = OpenAI()
# Streaming
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
OpenAI with Node.js
import OpenAI from 'openai';
const client = new OpenAI();
const stream = await client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: 'Hello' }],
stream: true,
});
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content || '';
process.stdout.write(text);
}
Server-Sent Events (SSE) in Next.js
Next.js App Router API route with streaming:
// app/api/chat/route.ts
import { anthropic } from '@ai-sdk/anthropic';
import { streamText } from 'ai';
export const maxDuration = 30;
export async function POST(req: Request) {
const { messages } = await req.json();
const result = streamText({
model: anthropic('claude-3-5-sonnet-20241022'),
messages,
});
// Returns SSE-compatible response
return result.toDataStreamResponse();
}
Frontend: React Streaming Hook
Using the Vercel AI SDK (simplest approach):
// components/Chat.tsx
'use client';
import { useChat } from 'ai/react';
export function Chat() {
const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
api: '/api/chat',
// Streaming is handled automatically
});
return (
<div>
{messages.map(message => (
<div key={message.id}>
<strong>{message.role}:</strong>
{/* Text renders incrementally as it streams */}
{message.content}
</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
<button type="submit" disabled={isLoading}>Send</button>
</form>
</div>
);
}
Manual SSE Implementation (No Vercel AI SDK)
For more control:
// Frontend: consuming SSE
async function* streamChat(message: string) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
if (!response.body) throw new Error('No response body');
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
yield parsed.text;
} catch {
// Skip malformed chunks
}
}
}
}
}
// React component using the generator
function StreamingChat() {
const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
async function sendMessage(message: string) {
setContent('');
setIsStreaming(true);
for await (const text of streamChat(message)) {
setContent(prev => prev + text);
}
setIsStreaming(false);
}
return (
<div>
<p>{content}{isStreaming && <span className="cursor">|</span>}</p>
<button onClick={() => sendMessage('Hello')}>
Send
</button>
</div>
);
}
WebSocket Alternative
For real-time bidirectional communication:
# Backend: FastAPI WebSocket
from fastapi import WebSocket
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
while True:
message = await websocket.receive_text()
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
messages=[{"role": "user", "content": message}]
) as stream:
for text in stream.text_stream:
await websocket.send_text(json.dumps({"type": "chunk", "text": text}))
await websocket.send_text(json.dumps({"type": "done"}))
Error Handling for Streaming
import anthropic
async def safe_stream(message: str):
try:
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
messages=[{"role": "user", "content": message}]
) as stream:
for text in stream.text_stream:
yield {"type": "chunk", "text": text}
yield {"type": "done"}
except anthropic.RateLimitError:
yield {"type": "error", "message": "Rate limited. Please wait a moment."}
except anthropic.APIError as e:
yield {"type": "error", "message": f"API error: {str(e)}"}
Performance Tips
Buffer small tokens: Rendering individual tokens triggers many re-renders. Buffer 10-20 tokens before updating React state:
let buffer = '';
for await (const text of streamChat(message)) {
buffer += text;
if (buffer.length > 20) { // Flush every 20 chars
setContent(prev => prev + buffer);
buffer = '';
}
}
setContent(prev => prev + buffer); // Final flush
Abort controller: Let users cancel in-flight requests:
const controller = new AbortController();
const response = await fetch('/api/chat', {
method: 'POST',
signal: controller.signal,
// ...
});
// Cancel from UI
cancelButton.onclick = () => controller.abort();