System Design: Building a Production-Ready AI Chatbot (End-to-End)
The Challenge
"Build a chatbot" is the new "Build a URL shortener." It sounds like a junior interview question, but in 2024, it is a distributed systems minefield.
If you just write a Python script that hits the OpenAI API, you are building a toy. A Production AI Chatbot needs to handle:
- Streaming: No one waits 10 seconds for a spinner.
- Memory: "Wait, who am I?" (State Management).
- Concurrency: 1,000 users chatting simultaneously without blocking threads.
- Reliability: What happens when the LLM hallucinates or times out?
Discussion: How do you design a system that feels instant, remembers context, scales to thousands of concurrent users, and gracefully handles failures?
This post breaks down the end-to-end architecture of a scalable, enterprise-grade AI chatbot.
1. The High-Level Architecture: Why Not Just a Script?
Think of building a chatbot like building a restaurant. You could just set up a food truck (a simple script), but if you want to serve thousands of customers, you need a proper restaurant with different stations: the front door (entry point), the kitchen (processing), the storage (memory), and the delivery system (streaming).
We are not building a monolithic script. We are building an Event-Driven, Asynchronous System — meaning different parts work independently and communicate through events.
graph TD
subgraph ClientSide["Client Side"]
Browser[React / Next.js UI]
end
subgraph EdgeLayer["The Edge Entry Point"]
LB[Load Balancer / Cloudflare]
API[API Gateway]
end
subgraph AppLayer["The Application Layer FastAPI"]
ChatService[Chat Service Async]
end
subgraph AILayer["The Brain AI Layer"]
Orchestrator[Agent Orchestrator LangGraph]
RAG[Vector Search]
end
subgraph PersistenceLayer["The Memory Persistence"]
Postgres[(Postgres Chat History)]
Redis[(Redis Cache Rate Limits)]
VectorDB[(Qdrant/Pinecone)]
end
Browser -->|"1. POST /message"| LB
LB --> API
API --> ChatService
ChatService -->|"2. Retrieve Context"| Postgres
ChatService -->|"3. Check Limits"| Redis
ChatService -->|"4. Invoke Agent"| Orchestrator
Orchestrator -->|"5. Search Knowledge"| RAG
RAG --> VectorDB
Orchestrator -->|"6. Stream Response"| ChatService
ChatService -->|"7. SSE Stream"| Browser
What Each Layer Does (In Simple Terms)
- Client Side (Browser): The chat window you see. It shows messages instantly and handles errors gracefully.
- Edge Layer (Load Balancer): The front door. Routes requests, checks authentication, and protects against attacks.
- Application Layer (Chat Service): The brain that coordinates everything. Manages state and orchestrates the flow.
- AI Layer (Orchestrator + RAG): The actual AI. Decides what to do and searches knowledge when needed.
- Persistence Layer (Databases): The memory. Stores chat history, caches data, and tracks rate limits.
Why This Matters: If one part breaks or gets slow, the others keep working. It's like having separate stations in a restaurant — if the dessert station is slow, the main course can still be served.
2. The Tech Stack: Choosing Your Tools
When building a chatbot, you need to make choices about what technologies to use. Let's understand why we pick each tool, not just what it is.
Frontend: React + Tailwind + Vercel AI SDK
Why React? React lets you build reusable components. Think of it like LEGO blocks — you build a message bubble once, then reuse it everywhere.
Why Tailwind? Tailwind is like having a design system built-in. Instead of writing custom CSS, you use pre-made classes. It's faster and more consistent.
The Secret Weapon: Vercel AI SDK
Here's the thing: streaming text from a server is tricky. You need to:
- Parse incoming chunks
- Handle reconnections if the connection drops
- Manage message state
- Show errors gracefully
The Vercel AI SDK does all of this for you. Don't write your own stream parser — it's like building your own HTTP client when you could just use fetch().
import { useChat } from 'ai/react';
function ChatComponent() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: '/api/chat',
onError: (error) => {
// Handle errors gracefully
console.error('Chat error:', error);
},
});
return (
<div>
{messages.map((message) => (
<div key={message.id}>{message.content}</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
</form>
</div>
);
}
Why not build your own? The SDK handles:
- SSE event parsing
- Automatic reconnection
- Message state management
- Error recovery
- Token accumulation
Communication: Server-Sent Events (SSE) vs WebSockets
Imagine you're at a restaurant. There are two ways the waiter could bring you food:
-
SSE (Server-Sent Events): Like a conveyor belt sushi restaurant. Food comes to you automatically, one piece at a time. You can't send food back, but you get a steady stream.
-
WebSockets: Like a regular restaurant where you can call the waiter back anytime. Two-way communication, but more complex to set up.
For chatbots, SSE is usually better. Here's why:
| Feature | WebSockets | SSE |
|---|---|---|
| Direction | Both ways (you can send messages back) | One way (server → you) |
| Complexity | More complex, harder to debug | Simple, uses regular HTTP |
| Firewall | Sometimes blocked | Works everywhere |
| Reconnection | You handle it manually | Automatic |
| Best For | Gaming, video calls, real-time collaboration | Chat, notifications, streaming text |
Think About It: ChatGPT uses SSE. If it's good enough for them, it's probably good enough for you.
When you'd use WebSockets instead:
- You need the user to send control messages (like "stop generating")
- Multiple people editing the same document
- Voice or video streaming
Backend: Python (FastAPI)
Why Python? The entire AI ecosystem lives in Python. LangChain, LlamaIndex, PyTorch — they're all Python. You could use Go or Node.js, but you'd be fighting against the ecosystem.
Why FastAPI? FastAPI is modern Python. It has:
- Built-in async support (critical for chatbots)
- Automatic API documentation
- Type checking with Pydantic
- Great performance for I/O-bound tasks (like waiting for LLM responses)
The Critical Rule: Always Use async
Here's what happens if you don't:
# ❌ BAD: This blocks everything
@app.post("/chat")
def chat_sync(request: ChatRequest):
response = openai.ChatCompletion.create(...) # Waits 20 seconds
return response
# While waiting, NO other users can chat!
If one user's request takes 20 seconds, every other user has to wait. That's terrible.
# ✅ GOOD: This handles multiple users
@app.post("/chat")
async def chat_async(request: ChatRequest):
async with aiohttp.ClientSession() as session:
async with session.post(...) as resp:
async for chunk in resp.content.iter_chunked(1024):
yield chunk # Stream immediately
# Other users can chat while this one is waiting!
With async, your server can handle hundreds of users chatting simultaneously.
# ❌ BAD: Blocking code
@app.post("/chat")
def chat_sync(request: ChatRequest):
response = openai.ChatCompletion.create(...) # Blocks for 20s
return response
# ✅ GOOD: Async code
@app.post("/chat")
async def chat_async(request: ChatRequest):
async with aiohttp.ClientSession() as session:
async with session.post(...) as resp:
async for chunk in resp.content.iter_chunked(1024):
yield chunk # Stream immediately
Why FastAPI?
- Native async/await support
- Automatic OpenAPI documentation
- Type hints with Pydantic
- High performance (comparable to Node.js/Go for I/O-bound tasks)
Database: Postgres + Redis (Two Tools, Two Jobs)
Think of databases like different types of storage:
- Postgres = Your filing cabinet (permanent storage, organized)
- Redis = Your sticky notes (temporary, super fast)
Postgres: The Filing Cabinet
Postgres stores your chat history permanently. It's like a filing cabinet where you can:
- Find all messages from a specific conversation
- Search through old chats
- Keep everything organized with relationships
Here's a simple schema to get started:
-- Each conversation gets a session
CREATE TABLE chat_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
thread_id VARCHAR(255) NOT NULL, -- Unique ID for this conversation
created_at TIMESTAMP DEFAULT NOW(),
metadata JSONB -- Flexible storage for extra data
);
-- Each message in a conversation
CREATE TABLE chat_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID REFERENCES chat_sessions(id),
role VARCHAR(20) NOT NULL, -- 'user' or 'assistant'
content TEXT NOT NULL,
tokens_used INTEGER, -- Track costs
created_at TIMESTAMP DEFAULT NOW()
);
-- Indexes make queries fast
CREATE INDEX idx_thread_id ON chat_sessions(thread_id);
CREATE INDEX idx_session_messages ON chat_messages(session_id, created_at);
Redis: The Sticky Notes
Redis is in-memory (super fast) but temporary. We use it for:
- Rate limiting: "Has this user sent too many messages?"
- Caching: "We just looked this up, don't look it up again"
- Session data: "What's the current state of this conversation?"
Why Two Databases? Postgres is slow for "how many requests did this user make in the last hour?" Redis is perfect for that. Use the right tool for the job.
import redis
from datetime import timedelta
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def check_rate_limit(user_id: str, limit: int = 50, window: int = 3600) -> bool:
key = f"rate_limit:{user_id}"
current = redis_client.incr(key)
if current == 1:
redis_client.expire(key, window)
return current <= limit
3. Making It Feel Instant: The Streaming Flow
Have you noticed how ChatGPT starts typing immediately, even before it finishes thinking? That's not magic — it's streaming.
The Problem: Waiting Feels Terrible
Imagine you ask a question and wait 10 seconds with a spinner. That feels broken. Users expect to see something happen immediately.
The Solution: Show Progress As It Happens
We use a Dual-Path Strategy:
- Optimistic UI: Show the user's message instantly (don't wait for the server)
- Streaming Response: Show the bot's response word-by-word as it's generated
Think of it like a live sports broadcast vs. a recorded game. Live feels more engaging, even if the quality is the same.
sequenceDiagram
participant User
participant Server
participant LLM
participant DB
User->>Server: POST /chat (msg="Hello")
par Async Save
Server->>DB: INSERT User Message
and Async Generation
Server->>LLM: Stream Completion
end
loop Every Chunk
LLM-->>Server: "H" ... "e" ... "l" ... "l" ... "o"
Server-->>User: SSE Event: data: {"text": "H"}
end
Server->>DB: UPDATE Bot Message (Full Text)
Server-->>User: SSE Event: [DONE]
Key Insight: Notice the Async Save — we don't wait for the database to finish saving before calling the LLM. We do both at the same time. Every millisecond counts when you want to feel instant.
How to Implement Streaming (The Code)
Here's a simple FastAPI endpoint that streams responses:
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
app = FastAPI()
@app.post("/chat")
async def chat_stream(request: ChatRequest):
async def event_generator():
# Save user message in background (don't wait)
asyncio.create_task(save_user_message(request.message, request.thread_id))
# Start streaming immediately
async for chunk in stream_llm_response(request.message, request.thread_id):
yield {
"event": "message",
"data": json.dumps({"text": chunk})
}
# Tell the client we're done
yield {
"event": "done",
"data": json.dumps({"status": "complete"})
}
return EventSourceResponse(event_generator())
Making It Even Faster: Time To First Token
The Goal: User sees the first word in less than 300ms.
How to Achieve It:
- Don't wait for the database — save messages in the background
- Start the LLM call immediately — don't wait for anything
- Send tokens as soon as you get them — don't buffer too much
- Keep connections alive — reuse HTTP connections
Think of it like a race. Every millisecond you save makes the experience feel faster.
async def stream_llm_response(message: str, thread_id: str):
# Retrieve context in parallel with LLM initialization
context_task = asyncio.create_task(get_context(thread_id))
# Start LLM stream immediately
stream = openai.ChatCompletion.acreate(
model="gpt-4",
messages=await context_task, # Wait only when needed
stream=True
)
buffer = []
async for chunk in stream:
token = chunk.choices[0].delta.get("content", "")
if token:
buffer.append(token)
# Flush buffer every 3 tokens or 50ms
if len(buffer) >= 3:
yield "".join(buffer)
buffer = []
# Flush remaining
if buffer:
yield "".join(buffer)
4. Memory: Making the Bot Remember
A chatbot without memory is like talking to someone with amnesia. Every message, they forget everything you said before.
The Problem: Context Windows Are Limited
LLMs have a limit on how much text they can process at once (called a "context window"). GPT-4 can handle about 8,000 tokens (roughly 6,000 words). If your conversation is longer, you need a strategy.
The Solution: Thread IDs + Smart Context Management
Instead of sending the entire conversation history every time, we use a Thread ID — a unique identifier for each conversation.
How It Works:
- Each conversation gets a unique
thread_id(like "conv_abc123") - When the user sends a message, we look up the last N messages for that thread
- We send only those recent messages to the LLM
For Short Conversations: Send everything. Simple.
For Long Conversations: Use a "sliding window" — keep the last 20 messages, and summarize everything older.
Think of it like a conversation summary: "Earlier, the user mentioned they're a Python developer and asked about APIs. Now they're asking about authentication."
Here's how you'd implement this in code:
async def get_conversation_context(thread_id: str, max_tokens: int = 4000):
# Step 1: Get the last 20 messages for this conversation
recent_messages = await db.fetch(
"""
SELECT role, content
FROM chat_messages
WHERE session_id = (
SELECT id FROM chat_sessions WHERE thread_id = $1
)
ORDER BY created_at DESC
LIMIT 20
""",
thread_id
)
# Step 2: Count how many tokens this would use
total_tokens = sum(count_tokens(msg['content']) for msg in recent_messages)
# Step 3: If it fits, use it. If not, summarize.
if total_tokens <= max_tokens:
# Fits! Return all messages
return [{"role": m['role'], "content": m['content']} for m in reversed(recent_messages)]
else:
# Too long! Summarize old messages, keep recent ones
return await get_summarized_context(thread_id, recent_messages)
async def get_summarized_context(thread_id: str, messages: List):
# Get existing summary (if we've summarized before)
summary = await db.fetchval(
"SELECT summary FROM conversation_summaries WHERE thread_id = $1",
thread_id
)
# If no summary exists, create one from old messages
if not summary:
old_messages = messages[:-10] # Everything except last 10
summary = await summarize_messages(old_messages) # Use LLM to summarize
await db.execute(
"INSERT INTO conversation_summaries (thread_id, summary) VALUES ($1, $2)",
thread_id, summary
)
# Combine: summary of old stuff + last 10 actual messages
recent = messages[-10:]
return [
{"role": "system", "content": f"Previous conversation: {summary}"},
*[{"role": m['role'], "content": m['content']} for m in recent]
]
In Plain English: If the conversation is short, send everything. If it's long, summarize the old parts and keep the recent messages.
Sliding Window Strategy
graph LR
A[Full History] -->|"Check Token Count"| B{Under Limit?}
B -->|Yes| C[Use All Messages]
B -->|No| D[Summarize Old Messages]
D --> E[Combine Summary + Recent N Messages]
E --> F[Send to LLM]
C --> F
Implementation Details:
- Window Size: Last 20 messages (configurable)
- Token Budget: 4000 tokens for context (leaves room for response)
- Summarization Trigger: When context exceeds 3500 tokens
- Summary Storage: Cache summaries in Postgres with TTL
5. Giving the Bot "Hands": RAG and Tools
An LLM by itself is like a smart person locked in a room with no access to the outside world. They can talk, but they can't look things up or do things.
The Problem: LLMs Hallucinate
If you ask an LLM "What's the weather?" it might make something up because it doesn't have access to real-time data. We need to give it tools to access real information.
The Solution: Let the LLM Choose When to Use Tools
Instead of always searching your knowledge base (which is slow and expensive), we let the LLM decide: "Do I need to look something up, or can I answer this directly?"
Example Tools:
search_knowledge_base: Search your company docs/PDFsget_user_data: Look up the user's account infocalculator: Do math (LLMs are bad at math)
How It Works:
- User: "Hi" → Bot: "Hello!" (no tools needed)
- User: "Where is my order?" → Bot: Uses
get_user_data→ "Your order #123 is shipping" - User: "How does your API work?" → Bot: Uses
search_knowledge_base→ Finds docs → Explains
The Smart Part: The bot doesn't search your knowledge base for "Thanks" — it just responds directly. This saves time and money.
Tool Selection Flow
flowchart TD
A[User Query] --> B[LLM with Tool Definitions]
B --> C{Tool Needed?}
C -->|No| D[Direct Response]
C -->|Yes| E[Select Tool]
E --> F{Tool Type?}
F -->|RAG| G[Vector Search]
F -->|API| H[External API Call]
F -->|Function| I[Execute Function]
G --> J[Combine Results]
H --> J
I --> J
J --> K[LLM Generates Final Response]
D --> L[Stream to User]
K --> L
How Tools Work (Simple Example)
Here's a simplified version of how tool calling works:
# Step 1: Define what tools are available
tools = [
{
"name": "search_knowledge_base",
"description": "Search company documentation",
"parameters": {"query": "string"}
},
{
"name": "get_user_data",
"description": "Get user account info",
"parameters": {"user_id": "string"}
}
]
# Step 2: Send user message + tool definitions to LLM
response = await llm.chat(
messages=[{"role": "user", "content": "Where is my order?"}],
tools=tools
)
# Step 3: Check if LLM wants to use a tool
if response.tool_calls:
# LLM said: "I need to call get_user_data"
for tool_call in response.tool_calls:
if tool_call["name"] == "get_user_data":
# Actually call the tool
user_data = db_query(tool_call["args"]["user_id"])
# Send tool result back to LLM
final_response = await llm.chat(
messages=[
{"role": "user", "content": "Where is my order?"},
{"role": "assistant", "content": None, "tool_calls": tool_calls},
{"role": "tool", "content": user_data}
]
)
return final_response
else:
# LLM can answer directly, no tools needed
return response
The Flow: User asks → LLM decides if it needs tools → If yes, call tool → Send result back to LLM → LLM generates final answer.
Tool Definition Schema
tools = [
{
"type": "function",
"function": {
"name": "search_knowledge_base",
"description": "Search company documentation and knowledge base",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"top_k": {
"type": "integer",
"description": "Number of results",
"default": 5
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "get_user_data",
"description": "Get user account information",
"parameters": {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User ID"
}
},
"required": ["user_id"]
}
}
}
]
6. Production Reality: What Will Break
When you launch, things will go wrong. Here's what to expect and how to handle it:
1. Rate Limits & Cost: Protecting Your Budget
The Problem: One user could write a script that sends 10,000 messages per minute, draining your API budget in hours.
The Solution: Rate limiting — like a bouncer at a club, you limit how many requests each user can make.
Simple Approach: "Each user gets 50 requests per hour. After that, they wait."
Here's a simple rate limiter:
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def check_rate_limit(user_id: str, limit: int = 50, window: int = 3600):
"""
Check if user has exceeded their rate limit.
limit = max requests (e.g., 50)
window = time window in seconds (e.g., 3600 = 1 hour)
"""
key = f"rate_limit:{user_id}"
# Increment counter (starts at 0, becomes 1, 2, 3...)
current = redis_client.incr(key)
# On first request, set expiration (delete key after 1 hour)
if current == 1:
redis_client.expire(key, window)
# Check if over limit
if current > limit:
return False, 0 # Blocked, 0 remaining
remaining = limit - current
return True, remaining # Allowed, with remaining count
# Usage in your API endpoint
@app.post("/chat")
async def chat(request: ChatRequest):
allowed, remaining = check_rate_limit(request.user_id, limit=50, window=3600)
if not allowed:
raise HTTPException(
status_code=429,
detail="Too many requests. Try again in an hour."
)
# Continue with chat...
How It Works:
- User makes request → increment counter in Redis
- If counter > limit → block the request
- After 1 hour → counter resets (key expires)
Advanced: Track Token Usage Too
You can also limit by tokens (not just requests):
def check_token_budget(user_id: str, tokens_used: int, daily_budget: int = 100000):
"""Limit total tokens per day, not just requests"""
key = f"token_budget:{user_id}:{date.today()}"
current = redis_client.incrby(key, tokens_used)
if current == tokens_used: # First request today
redis_client.expire(key, 86400) # Expire after 24 hours
return current <= daily_budget
This prevents one user from using all your tokens in a single day.
2. When OpenAI Goes Down: Circuit Breakers
The Problem: OpenAI (or any external service) can go down. If you keep trying to call it, you waste time and frustrate users.
The Solution: Circuit Breaker Pattern
Think of it like a fuse in your house. If something keeps failing, you "blow the fuse" and stop trying for a while.
How It Works:
- Normal (CLOSED): Everything works, calls go through
- Failing (OPEN): After 3 failures, stop trying for 60 seconds
- Testing (HALF_OPEN): After 60 seconds, try once to see if it's fixed
Simple Implementation:
class CircuitBreaker:
def __init__(self, failure_threshold=3, timeout=60):
self.failure_count = 0
self.state = "CLOSED" # CLOSED, OPEN, or HALF_OPEN
self.last_failure_time = None
def call(self, func, *args, **kwargs):
# If we're in "OPEN" state, don't even try
if self.state == "OPEN":
if time.time() - self.last_failure_time > timeout:
self.state = "HALF_OPEN" # Try again
else:
raise Exception("Service is down, try later")
try:
result = func(*args, **kwargs)
# Success! Reset
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= failure_threshold:
self.state = "OPEN" # Stop trying
raise e
With Fallback:
async def call_llm_with_fallback(prompt: str):
try:
return await breaker.call(openai_call, prompt)
except Exception:
# Try backup provider
return await anthropic_call(prompt)
# Or show a friendly error: "I'm having trouble. Try again in a minute."
Think About It: What's better — showing an error immediately, or making the user wait 30 seconds for a timeout?
3. Observability: Seeing What's Actually Happening
The Problem: The bot gives a weird answer. Why? Did it retrieve the wrong document? Did the LLM misunderstand? You have no idea.
The Solution: Tracing
Tracing is like having a security camera for your chatbot. You can see:
- What the user asked
- What documents were retrieved
- What the LLM was thinking
- How long each step took
- What went wrong
Why This Matters: When a user complains "the bot gave me wrong information," you can trace back and see: "Oh, it retrieved document #5 instead of document #2. Let me fix the search."
Simple Approach: Log everything with a request ID, then use a tool like LangSmith or Arize Phoenix to visualize it.
Simple Approach: Log Everything
At minimum, log these things for every request:
import time
import uuid
async def handle_chat_request(message: str, thread_id: str):
request_id = str(uuid.uuid4())
start_time = time.time()
# Log the request
logger.info(f"[{request_id}] User message: {message}")
# Step 1: Get context
context_start = time.time()
context = await get_conversation_context(thread_id)
context_time = (time.time() - context_start) * 1000 # milliseconds
# Step 2: Search if needed
rag_start = time.time()
rag_results = await vector_search(message)
rag_time = (time.time() - rag_start) * 1000
# Step 3: Generate response
llm_start = time.time()
response = await llm.generate(messages=context + [{"role": "user", "content": message}])
llm_time = (time.time() - llm_start) * 1000
total_time = (time.time() - start_time) * 1000
# Log everything
logger.info(f"""
[{request_id}] Request complete
- Context retrieval: {context_time}ms
- RAG search: {rag_time}ms
- LLM generation: {llm_time}ms
- Total: {total_time}ms
- Tokens used: {response.usage.total_tokens}
""")
return response
What to Track:
- Request ID: So you can trace a specific conversation
- Timing: How long each step takes (helps find bottlenecks)
- Tokens: Track costs
- Errors: What failed and why
Better Approach: Use a Tool
Tools like LangSmith or Arize Phoenix make this easier — they automatically track everything and give you a nice dashboard. But you can start with simple logging and upgrade later.
Think About It: If a user complains about a bad answer, can you find their request ID and see exactly what happened? If not, you need better observability.
7. Summary: What You Need for Production
Here's the checklist for a production-ready chatbot:
- Async Backend: Use
async/awaitso multiple users can chat simultaneously - Streaming (SSE): Show responses word-by-word, not all at once
- Memory: Store chat history and retrieve it efficiently
- Tools/RAG: Give the bot access to real data when needed
- Rate Limiting: Protect your budget from abuse
- Circuit Breakers: Don't keep trying when services are down
- Observability: Know what's happening so you can fix issues
- Error Handling: Fail gracefully, don't crash
Production Readiness Checklist
- Async/await throughout (no blocking I/O)
- SSE streaming implemented with proper event formatting
- Thread-based conversation context management
- Sliding window or summarization for long conversations
- Tool/function calling for RAG and external APIs
- Rate limiting per user (requests and tokens)
- Circuit breakers for external LLM providers
- Fallback models or graceful error messages
- Distributed tracing (LangSmith/Phoenix)
- Cost tracking per request/user
- Error logging with request IDs
- Health checks and monitoring endpoints
- Load testing completed (1000+ concurrent users)
Challenge for You
Scenario: A user uploads a 50-page PDF and asks questions about it. Constraint: The PDF parsing takes 10 seconds. You cannot block the chat window. Question: How do you modify the architecture to handle "Document Ingestion" asynchronously while keeping the user updated in the chat?
Hint: Think about Task Queues like Celery/BullMQ.
Solution Approach
sequenceDiagram
participant User
participant API
participant Queue
participant Worker
participant VectorDB
participant Chat
User->>API: Upload PDF + Question
API->>Queue: Enqueue Ingestion Task
API->>User: "Processing your document..."
Queue->>Worker: Process PDF Task
Worker->>Worker: Parse & Chunk PDF (10s)
Worker->>VectorDB: Store Embeddings
User->>Chat: "Is my document ready?"
Chat->>Queue: Check Task Status
Queue-->>Chat: "Processing... 60%"
Chat-->>User: "Almost done! 60% processed"
Worker->>Queue: Task Complete
Queue->>Chat: Notify Completion
Chat-->>User: "Document ready! Ask me anything."
Implementation Pattern:
- Task Queue: Use Celery (Python) or BullMQ (Node.js)
- Status Updates: Store task status in Redis with progress percentage
- SSE Notifications: Stream status updates to the user
- Polling Fallback: Client polls for status if SSE disconnects
from celery import Celery
celery_app = Celery('chatbot', broker='redis://localhost:6379')
@celery_app.task(bind=True)
def process_document(self, file_path: str, user_id: str, thread_id: str):
# Update progress
self.update_state(state='PROGRESS', meta={'progress': 0})
# Parse PDF
chunks = parse_pdf(file_path)
self.update_state(state='PROGRESS', meta={'progress': 50})
# Generate embeddings
embeddings = generate_embeddings(chunks)
self.update_state(state='PROGRESS', meta={'progress': 80})
# Store in vector DB
store_in_vector_db(embeddings, thread_id)
self.update_state(state='PROGRESS', meta={'progress': 100})
return {'status': 'complete', 'chunks': len(chunks)}
Discussion Prompts for Engineers
- How would you design rate limiting for a freemium model (different limits for free vs. paid users)?
- What's your strategy for handling context window limits when conversations exceed 100 messages?
- How do you ensure idempotency when a user's request times out but the LLM call succeeds?
- Would you use a single Redis instance or shard rate limiting by user_id? Why?
- How do you balance between streaming speed (smaller chunks) and network efficiency (larger chunks)?
- What's your fallback strategy when both primary and backup LLM providers are down?
- How would you implement A/B testing for different prompt strategies without affecting user experience?
- What metrics would you alert on in production? (e.g., p95 latency > 5s, error rate > 1%)
Takeaway
Building a production AI chatbot isn't about calling an API. It's about designing a system that:
- Feels fast — users see responses immediately through streaming
- Remembers — context management so conversations make sense
- Scales — handles thousands of users without breaking
- Fails gracefully — when things go wrong, users get helpful errors
- Is observable — you can see what's happening and fix issues
The difference between a demo and a product? Architecture. Not the AI model, not the prompts — the system design around it.
Start simple, add complexity as you need it. You don't need all of this on day one, but you should know where you're heading.
For more on building production AI systems, check out our AI Bootcamp for Software Engineers.
Continue Reading
Ready to go deeper?
Go beyond articles. Build production AI systems with hands-on workshops and our intensive AI Bootcamp.