Report this

What is the reason for this report?

Building Real-Time Chat with Room Management in FastAPI

Posted on September 18, 2025
KFSys

By KFSys

System Administrator

Ever wanted to build a Slack-like chat system where users can join different rooms and chat in real-time? Let’s build exactly that using FastAPI’s WebSocket support!

What We’re Building

  • Real-time chat with WebSocket connections
  • Multiple chat rooms support
  • User join/leave notifications
  • Message broadcasting to room members
  • Connection management and cleanup


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

These answers are provided by our Community. If you find them useful, show some love by clicking the heart. If you run into issues leave a comment, or add your own answer to help others.
0

Prerequisites

pip install fastapi uvicorn python-multipart

Step 1: Basic Setup and Models

First, let’s create our FastAPI app and define our data structures:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, List, Set
import json
from datetime import datetime
import uuid

app = FastAPI(title="WebSocket Chat API")

# In-memory storage (use Redis in production)
class ConnectionManager:
    def __init__(self):
        # Store active connections by room
        self.active_connections: Dict[str, List[WebSocket]] = {}
        # Store user info for each connection
        self.user_info: Dict[WebSocket, dict] = {}
        # Store room members
        self.room_members: Dict[str, Set[str]] = {}

    async def connect(self, websocket: WebSocket, room_id: str, user_id: str, username: str):
        await websocket.accept()

        # Initialize room if it doesn't exist
        if room_id not in self.active_connections:
            self.active_connections[room_id] = []
            self.room_members[room_id] = set()

        # Add connection to room
        self.active_connections[room_id].append(websocket)
        self.user_info[websocket] = {
            "user_id": user_id,
            "username": username,
            "room_id": room_id
        }
        self.room_members[room_id].add(username)

        # Notify room about new user
        await self.broadcast_to_room(room_id, {
            "type": "user_joined",
            "username": username,
            "message": f"{username} joined the room",
            "timestamp": datetime.now().isoformat(),
            "room_members": list(self.room_members[room_id])
        })

    def disconnect(self, websocket: WebSocket):
        user_info = self.user_info.get(websocket)
        if not user_info:
            return

        room_id = user_info["room_id"]
        username = user_info["username"]

        # Remove from connections
        if room_id in self.active_connections:
            self.active_connections[room_id].remove(websocket)

        # Remove from room members
        if room_id in self.room_members:
            self.room_members[room_id].discard(username)

        # Clean up user info
        del self.user_info[websocket]

        return room_id, username

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast_to_room(self, room_id: str, message: dict):
        if room_id not in self.active_connections:
            return

        message_str = json.dumps(message)
        dead_connections = []

        for connection in self.active_connections[room_id]:
            try:
                await connection.send_text(message_str)
            except:
                dead_connections.append(connection)

        # Clean up dead connections
        for dead_conn in dead_connections:
            self.active_connections[room_id].remove(dead_conn)

# Global connection manager
manager = ConnectionManager()

Step 2: WebSocket Endpoint

Now let’s create the main WebSocket endpoint that handles all chat functionality:

@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
    # Get user info from query parameters
    user_id = websocket.query_params.get("user_id", str(uuid.uuid4()))
    username = websocket.query_params.get("username", f"User_{user_id[:8]}")

    await manager.connect(websocket, room_id, user_id, username)

    try:
        while True:
            # Receive message from client
            data = await websocket.receive_text()
            message_data = json.loads(data)

            # Handle different message types
            if message_data.get("type") == "chat_message":
                # Broadcast chat message to room
                await manager.broadcast_to_room(room_id, {
                    "type": "chat_message",
                    "username": username,
                    "user_id": user_id,
                    "message": message_data.get("message", ""),
                    "timestamp": datetime.now().isoformat(),
                    "room_id": room_id
                })

            elif message_data.get("type") == "typing":
                # Broadcast typing indicator (exclude sender)
                typing_message = {
                    "type": "typing",
                    "username": username,
                    "is_typing": message_data.get("is_typing", False)
                }

                for connection in manager.active_connections.get(room_id, []):
                    if connection != websocket:  # Don't send to sender
                        try:
                            await connection.send_text(json.dumps(typing_message))
                        except:
                            pass

    except WebSocketDisconnect:
        room_id, username = manager.disconnect(websocket)
        if room_id and username:
            # Notify room about user leaving
            await manager.broadcast_to_room(room_id, {
                "type": "user_left",
                "username": username,
                "message": f"{username} left the room",
                "timestamp": datetime.now().isoformat(),
                "room_members": list(manager.room_members.get(room_id, []))
            })

Step 3: REST API Endpoints

Let’s add some useful REST endpoints for room management:

from fastapi import HTTPException

@app.get("/rooms")
async def get_active_rooms():
    """Get list of active rooms with member counts"""
    rooms = []
    for room_id, members in manager.room_members.items():
        if members:  # Only include rooms with active members
            rooms.append({
                "room_id": room_id,
                "member_count": len(members),
                "members": list(members)
            })
    return {"rooms": rooms}

@app.get("/rooms/{room_id}/members")
async def get_room_members(room_id: str):
    """Get members of a specific room"""
    if room_id not in manager.room_members:
        raise HTTPException(status_code=404, detail="Room not found")

    return {
        "room_id": room_id,
        "members": list(manager.room_members[room_id]),
        "member_count": len(manager.room_members[room_id])
    }

@app.post("/rooms/{room_id}/broadcast")
async def broadcast_system_message(room_id: str, message: dict):
    """Send a system message to all users in a room"""
    if room_id not in manager.active_connections:
        raise HTTPException(status_code=404, detail="Room not found")

    system_message = {
        "type": "system_message",
        "message": message.get("text", ""),
        "timestamp": datetime.now().isoformat(),
        "room_id": room_id
    }

    await manager.broadcast_to_room(room_id, system_message)
    return {"status": "Message broadcasted successfully"}

Step 4: Simple Frontend for Testing

Here’s a basic HTML client to test our chat:

@app.get("/")
async def get_chat_page():
    return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
    <title>FastAPI WebSocket Chat</title>
    <style>
        body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
        #messages { border: 1px solid #ccc; height: 400px; overflow-y: scroll; padding: 10px; margin: 10px 0; }
        #messageInput { width: 70%; padding: 10px; }
        #sendButton { width: 25%; padding: 10px; }
        .message { margin: 5px 0; padding: 5px; }
        .system { color: #666; font-style: italic; }
        .user-joined { color: green; }
        .user-left { color: red; }
        #members { background: #f5f5f5; padding: 10px; margin: 10px 0; }
    </style>
</head>
<body>
    <h1>FastAPI WebSocket Chat</h1>

    <div>
        <label>Username: <input type="text" id="username" value="User123"></label>
        <label>Room ID: <input type="text" id="roomId" value="general"></label>
        <button onclick="connect()">Connect</button>
        <button onclick="disconnect()">Disconnect</button>
    </div>

    <div id="members"></div>
    <div id="messages"></div>

    <div>
        <input type="text" id="messageInput" placeholder="Type your message..." onkeypress="handleKeyPress(event)">
        <button onclick="sendMessage()" id="sendButton">Send</button>
    </div>

    <script>
        let ws = null;
        let username = '';
        let roomId = '';

        function connect() {
            username = document.getElementById('username').value;
            roomId = document.getElementById('roomId').value;

            if (!username || !roomId) {
                alert('Please enter username and room ID');
                return;
            }

            ws = new WebSocket(`ws://localhost:8000/ws/${roomId}?username=${username}`);

            ws.onopen = function(event) {
                addMessage('Connected to room: ' + roomId, 'system');
            };

            ws.onmessage = function(event) {
                const data = JSON.parse(event.data);
                handleMessage(data);
            };

            ws.onclose = function(event) {
                addMessage('Disconnected from chat', 'system');
            };
        }

        function disconnect() {
            if (ws) {
                ws.close();
                ws = null;
            }
        }

        function handleMessage(data) {
            switch(data.type) {
                case 'chat_message':
                    addMessage(`${data.username}: ${data.message}`, 'chat');
                    break;
                case 'user_joined':
                    addMessage(data.message, 'user-joined');
                    updateMembers(data.room_members);
                    break;
                case 'user_left':
                    addMessage(data.message, 'user-left');
                    updateMembers(data.room_members);
                    break;
                case 'system_message':
                    addMessage(`System: ${data.message}`, 'system');
                    break;
                case 'typing':
                    // Handle typing indicators here
                    break;
            }
        }

        function addMessage(message, className) {
            const messages = document.getElementById('messages');
            const messageElement = document.createElement('div');
            messageElement.className = `message ${className}`;
            messageElement.textContent = `[${new Date().toLocaleTimeString()}] ${message}`;
            messages.appendChild(messageElement);
            messages.scrollTop = messages.scrollHeight;
        }

        function updateMembers(members) {
            const membersDiv = document.getElementById('members');
            membersDiv.innerHTML = `<strong>Room Members (${members.length}):</strong> ${members.join(', ')}`;
        }

        function sendMessage() {
            const input = document.getElementById('messageInput');
            const message = input.value.trim();

            if (message && ws && ws.readyState === WebSocket.OPEN) {
                ws.send(JSON.stringify({
                    type: 'chat_message',
                    message: message
                }));
                input.value = '';
            }
        }

        function handleKeyPress(event) {
            if (event.key === 'Enter') {
                sendMessage();
            }
        }
    </script>
</body>
</html>
    """)

Step 5: Running the Application

Save everything in a file called chat_app.py and run:

uvicorn chat_app:app --reload --host 0.0.0.0 --port 8000

Visit http://localhost:8000 to test your chat application!

Testing the API

You can also test the REST endpoints:

# Get active rooms
curl http://localhost:8000/rooms

# Get members of a specific room
curl http://localhost:8000/rooms/general/members

# Send system message to a room
curl -X POST "http://localhost:8000/rooms/general/broadcast" \
  -H "Content-Type: application/json" \
  -d '{"text": "Server maintenance in 5 minutes!"}'

Production Considerations

For production deployment, consider:

  1. Use Redis instead of in-memory storage for horizontal scaling
  2. Add authentication and authorization
  3. Implement rate limiting to prevent spam
  4. Add message persistence with a database
  5. Use proper logging and monitoring
  6. Add input validation and sanitization
  7. Implement reconnection logic on the client side

Key Features Implemented

✅ Real-time bidirectional communication ✅ Multiple chat rooms support ✅ User join/leave notifications ✅ Room member management ✅ System message broadcasting ✅ Connection cleanup on disconnect ✅ REST API for room management

This gives you a solid foundation for building more complex real-time applications with FastAPI WebSockets!

How can we use FastAPI WebSockets to manage multiple chat rooms with real-time messaging and user notifications?

The developer cloud

Scale up as you grow — whether you're running one virtual machine or ten thousand.

Get started for free

Sign up and get $200 in credit for your first 60 days with DigitalOcean.*

*This promotional offer applies to new accounts only.