By KFSys
System Administrator
Ever wanted to build a Slack-like chat system where users can join different rooms and chat in real-time? Let’s build exactly that using FastAPI’s WebSocket support!
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Accepted Answer
pip install fastapi uvicorn python-multipart
First, let’s create our FastAPI app and define our data structures:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, List, Set
import json
from datetime import datetime
import uuid
app = FastAPI(title="WebSocket Chat API")
# In-memory storage (use Redis in production)
class ConnectionManager:
def __init__(self):
# Store active connections by room
self.active_connections: Dict[str, List[WebSocket]] = {}
# Store user info for each connection
self.user_info: Dict[WebSocket, dict] = {}
# Store room members
self.room_members: Dict[str, Set[str]] = {}
async def connect(self, websocket: WebSocket, room_id: str, user_id: str, username: str):
await websocket.accept()
# Initialize room if it doesn't exist
if room_id not in self.active_connections:
self.active_connections[room_id] = []
self.room_members[room_id] = set()
# Add connection to room
self.active_connections[room_id].append(websocket)
self.user_info[websocket] = {
"user_id": user_id,
"username": username,
"room_id": room_id
}
self.room_members[room_id].add(username)
# Notify room about new user
await self.broadcast_to_room(room_id, {
"type": "user_joined",
"username": username,
"message": f"{username} joined the room",
"timestamp": datetime.now().isoformat(),
"room_members": list(self.room_members[room_id])
})
def disconnect(self, websocket: WebSocket):
user_info = self.user_info.get(websocket)
if not user_info:
return
room_id = user_info["room_id"]
username = user_info["username"]
# Remove from connections
if room_id in self.active_connections:
self.active_connections[room_id].remove(websocket)
# Remove from room members
if room_id in self.room_members:
self.room_members[room_id].discard(username)
# Clean up user info
del self.user_info[websocket]
return room_id, username
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast_to_room(self, room_id: str, message: dict):
if room_id not in self.active_connections:
return
message_str = json.dumps(message)
dead_connections = []
for connection in self.active_connections[room_id]:
try:
await connection.send_text(message_str)
except:
dead_connections.append(connection)
# Clean up dead connections
for dead_conn in dead_connections:
self.active_connections[room_id].remove(dead_conn)
# Global connection manager
manager = ConnectionManager()
Now let’s create the main WebSocket endpoint that handles all chat functionality:
@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
# Get user info from query parameters
user_id = websocket.query_params.get("user_id", str(uuid.uuid4()))
username = websocket.query_params.get("username", f"User_{user_id[:8]}")
await manager.connect(websocket, room_id, user_id, username)
try:
while True:
# Receive message from client
data = await websocket.receive_text()
message_data = json.loads(data)
# Handle different message types
if message_data.get("type") == "chat_message":
# Broadcast chat message to room
await manager.broadcast_to_room(room_id, {
"type": "chat_message",
"username": username,
"user_id": user_id,
"message": message_data.get("message", ""),
"timestamp": datetime.now().isoformat(),
"room_id": room_id
})
elif message_data.get("type") == "typing":
# Broadcast typing indicator (exclude sender)
typing_message = {
"type": "typing",
"username": username,
"is_typing": message_data.get("is_typing", False)
}
for connection in manager.active_connections.get(room_id, []):
if connection != websocket: # Don't send to sender
try:
await connection.send_text(json.dumps(typing_message))
except:
pass
except WebSocketDisconnect:
room_id, username = manager.disconnect(websocket)
if room_id and username:
# Notify room about user leaving
await manager.broadcast_to_room(room_id, {
"type": "user_left",
"username": username,
"message": f"{username} left the room",
"timestamp": datetime.now().isoformat(),
"room_members": list(manager.room_members.get(room_id, []))
})
Let’s add some useful REST endpoints for room management:
from fastapi import HTTPException
@app.get("/rooms")
async def get_active_rooms():
"""Get list of active rooms with member counts"""
rooms = []
for room_id, members in manager.room_members.items():
if members: # Only include rooms with active members
rooms.append({
"room_id": room_id,
"member_count": len(members),
"members": list(members)
})
return {"rooms": rooms}
@app.get("/rooms/{room_id}/members")
async def get_room_members(room_id: str):
"""Get members of a specific room"""
if room_id not in manager.room_members:
raise HTTPException(status_code=404, detail="Room not found")
return {
"room_id": room_id,
"members": list(manager.room_members[room_id]),
"member_count": len(manager.room_members[room_id])
}
@app.post("/rooms/{room_id}/broadcast")
async def broadcast_system_message(room_id: str, message: dict):
"""Send a system message to all users in a room"""
if room_id not in manager.active_connections:
raise HTTPException(status_code=404, detail="Room not found")
system_message = {
"type": "system_message",
"message": message.get("text", ""),
"timestamp": datetime.now().isoformat(),
"room_id": room_id
}
await manager.broadcast_to_room(room_id, system_message)
return {"status": "Message broadcasted successfully"}
Here’s a basic HTML client to test our chat:
@app.get("/")
async def get_chat_page():
return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>FastAPI WebSocket Chat</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
#messages { border: 1px solid #ccc; height: 400px; overflow-y: scroll; padding: 10px; margin: 10px 0; }
#messageInput { width: 70%; padding: 10px; }
#sendButton { width: 25%; padding: 10px; }
.message { margin: 5px 0; padding: 5px; }
.system { color: #666; font-style: italic; }
.user-joined { color: green; }
.user-left { color: red; }
#members { background: #f5f5f5; padding: 10px; margin: 10px 0; }
</style>
</head>
<body>
<h1>FastAPI WebSocket Chat</h1>
<div>
<label>Username: <input type="text" id="username" value="User123"></label>
<label>Room ID: <input type="text" id="roomId" value="general"></label>
<button onclick="connect()">Connect</button>
<button onclick="disconnect()">Disconnect</button>
</div>
<div id="members"></div>
<div id="messages"></div>
<div>
<input type="text" id="messageInput" placeholder="Type your message..." onkeypress="handleKeyPress(event)">
<button onclick="sendMessage()" id="sendButton">Send</button>
</div>
<script>
let ws = null;
let username = '';
let roomId = '';
function connect() {
username = document.getElementById('username').value;
roomId = document.getElementById('roomId').value;
if (!username || !roomId) {
alert('Please enter username and room ID');
return;
}
ws = new WebSocket(`ws://localhost:8000/ws/${roomId}?username=${username}`);
ws.onopen = function(event) {
addMessage('Connected to room: ' + roomId, 'system');
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
handleMessage(data);
};
ws.onclose = function(event) {
addMessage('Disconnected from chat', 'system');
};
}
function disconnect() {
if (ws) {
ws.close();
ws = null;
}
}
function handleMessage(data) {
switch(data.type) {
case 'chat_message':
addMessage(`${data.username}: ${data.message}`, 'chat');
break;
case 'user_joined':
addMessage(data.message, 'user-joined');
updateMembers(data.room_members);
break;
case 'user_left':
addMessage(data.message, 'user-left');
updateMembers(data.room_members);
break;
case 'system_message':
addMessage(`System: ${data.message}`, 'system');
break;
case 'typing':
// Handle typing indicators here
break;
}
}
function addMessage(message, className) {
const messages = document.getElementById('messages');
const messageElement = document.createElement('div');
messageElement.className = `message ${className}`;
messageElement.textContent = `[${new Date().toLocaleTimeString()}] ${message}`;
messages.appendChild(messageElement);
messages.scrollTop = messages.scrollHeight;
}
function updateMembers(members) {
const membersDiv = document.getElementById('members');
membersDiv.innerHTML = `<strong>Room Members (${members.length}):</strong> ${members.join(', ')}`;
}
function sendMessage() {
const input = document.getElementById('messageInput');
const message = input.value.trim();
if (message && ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'chat_message',
message: message
}));
input.value = '';
}
}
function handleKeyPress(event) {
if (event.key === 'Enter') {
sendMessage();
}
}
</script>
</body>
</html>
""")
Save everything in a file called chat_app.py
and run:
uvicorn chat_app:app --reload --host 0.0.0.0 --port 8000
Visit http://localhost:8000
to test your chat application!
You can also test the REST endpoints:
# Get active rooms
curl http://localhost:8000/rooms
# Get members of a specific room
curl http://localhost:8000/rooms/general/members
# Send system message to a room
curl -X POST "http://localhost:8000/rooms/general/broadcast" \
-H "Content-Type: application/json" \
-d '{"text": "Server maintenance in 5 minutes!"}'
For production deployment, consider:
✅ Real-time bidirectional communication ✅ Multiple chat rooms support ✅ User join/leave notifications ✅ Room member management ✅ System message broadcasting ✅ Connection cleanup on disconnect ✅ REST API for room management
This gives you a solid foundation for building more complex real-time applications with FastAPI WebSockets!
How can we use FastAPI WebSockets to manage multiple chat rooms with real-time messaging and user notifications?
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.
Full documentation for every DigitalOcean product.
The Wave has everything you need to know about building a business, from raising funding to marketing your product.
Stay up to date by signing up for DigitalOcean’s Infrastructure as a Newsletter.
New accounts only. By submitting your email you agree to our Privacy Policy
Scale up as you grow — whether you're running one virtual machine or ten thousand.
Sign up and get $200 in credit for your first 60 days with DigitalOcean.*
*This promotional offer applies to new accounts only.