Coverage for docs/docs_src/user_guide/adapters/fastapi/security/simple_client.py: 16%
27 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import asyncio 1abcd
2import json 1abcd
3from typing import Any 1abcd
5import requests 1abcd
6import websockets 1abcd
7from asyncer import asyncify 1abcd
9from fastagency.messages import AskingMessage, IOMessage, WorkflowCompleted 1abcd
10from fastagency.ui.console import ConsoleUI 1abcd
12# API base URL
13FASTAGENCY_URL = "http://localhost:8008" 1abcd
15# User credentials
16CREDENTIALS = { 1abcd
17 "username": "johndoe",
18 "password": "secret", # pragma: allowlist secret
19}
22# Function to authenticate and get the OAuth token
23def get_oauth_token() -> str: 1abcd
24 """Authenticate the user and return the access token."""
25 response = requests.post(f"{FASTAGENCY_URL}/token", data=CREDENTIALS)
26 response.raise_for_status() # Ensure we handle errors
27 return response.json().get("access_token") # type: ignore
30# Function to initiate the workflow
31def initiate_workflow(token: str) -> dict[str, Any]: 1abcd
32 """Initiate the workflow and return the initial payload."""
33 payload = {
34 "workflow_name": "simple_learning",
35 "workflow_uuid": "1234", # You can generate this dynamically
36 "user_id": None,
37 "params": {"message": "Hello"},
38 }
40 headers = {"Authorization": f"Bearer {token}"}
41 response = requests.post(
42 f"{FASTAGENCY_URL}/fastagency/initiate_workflow", json=payload, headers=headers
43 )
44 response.raise_for_status() # Ensure we handle errors
45 return response.json() # type: ignore
48# Function to handle WebSocket communication
49async def websocket_workflow(token: str, initial_payload: dict[str, Any]) -> None: 1abcd
50 """Establish a WebSocket connection and handle the workflow interaction."""
51 websocket_url = f"ws{FASTAGENCY_URL[4:]}/fastagency/ws"
52 ui = ConsoleUI() # Initialize the UI for handling user interaction
54 async with websockets.connect(
55 websocket_url, extra_headers={"Authorization": f"Bearer {token}"}
56 ) as websocket:
57 # Send the initial payload to start the workflow
58 await websocket.send(json.dumps(initial_payload))
60 while True:
61 # Receive messages from the WebSocket server
62 response = await websocket.recv()
63 message = IOMessage.create(**json.loads(response))
65 # Process the received message and interact with the UI
66 result = await asyncify(ui.process_message)(message)
68 # Respond if the message requires further input
69 if isinstance(message, AskingMessage) and result is not None:
70 await websocket.send(result)
71 elif isinstance(message, WorkflowCompleted):
72 # Exit the loop when the workflow is completed
73 break
76# Main function to run the workflow
77async def main() -> None: 1abcd
78 """Main function to orchestrate the workflow."""
79 # Step 1: Authenticate to get the OAuth2 token
80 token = get_oauth_token()
82 # Step 2: Initiate the workflow and get the initial payload
83 initial_payload = initiate_workflow(token)
85 # Step 3: Handle WebSocket interaction
86 await websocket_workflow(token, initial_payload)
89if __name__ == "__main__": 1abcd
90 # Run the async main function
91 asyncio.run(main())