Coverage for docs_src / websockets / tutorial003_py310.py: 100%
32 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
1from fastapi import FastAPI, WebSocket, WebSocketDisconnect 1abc
2from fastapi.responses import HTMLResponse 1abc
4app = FastAPI() 1abc
6html = """ 1abc
7<!DOCTYPE html>
8<html>
9 <head>
10 <title>Chat</title>
11 </head>
12 <body>
13 <h1>WebSocket Chat</h1>
14 <h2>Your ID: <span id="ws-id"></span></h2>
15 <form action="" onsubmit="sendMessage(event)">
16 <input type="text" id="messageText" autocomplete="off"/>
17 <button>Send</button>
18 </form>
19 <ul id='messages'>
20 </ul>
21 <script>
22 var client_id = Date.now()
23 document.querySelector("#ws-id").textContent = client_id;
24 var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
25 ws.onmessage = function(event) {
26 var messages = document.getElementById('messages')
27 var message = document.createElement('li')
28 var content = document.createTextNode(event.data)
29 message.appendChild(content)
30 messages.appendChild(message)
31 };
32 function sendMessage(event) {
33 var input = document.getElementById("messageText")
34 ws.send(input.value)
35 input.value = ''
36 event.preventDefault()
37 }
38 </script>
39 </body>
40</html>
41"""
44class ConnectionManager: 1abc
45 def __init__(self): 1abc
46 self.active_connections: list[WebSocket] = [] 1abc
48 async def connect(self, websocket: WebSocket): 1abc
49 await websocket.accept() 1def
50 self.active_connections.append(websocket) 1def
52 def disconnect(self, websocket: WebSocket): 1abc
53 self.active_connections.remove(websocket) 1def
55 async def send_personal_message(self, message: str, websocket: WebSocket): 1abc
56 await websocket.send_text(message) 1def
58 async def broadcast(self, message: str): 1abc
59 for connection in self.active_connections: 1def
60 await connection.send_text(message) 1def
63manager = ConnectionManager() 1abc
66@app.get("/") 1abc
67async def get(): 1abc
68 return HTMLResponse(html) 1ghi
71@app.websocket("/ws/{client_id}") 1abc
72async def websocket_endpoint(websocket: WebSocket, client_id: int): 1abc
73 await manager.connect(websocket) 1def
74 try: 1def
75 while True: 1def
76 data = await websocket.receive_text() 1def
77 await manager.send_personal_message(f"You wrote: {data}", websocket) 1def
78 await manager.broadcast(f"Client #{client_id} says: {data}") 1def
79 except WebSocketDisconnect: 1def
80 manager.disconnect(websocket) 1def
81 await manager.broadcast(f"Client #{client_id} left the chat") 1def