Coverage for docs/docs_src/user_guide/adapters/fastapi/security/simple_client.py: 16%

27 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1import asyncio 1abcd

2import json 1abcd

3from typing import Any 1abcd

4 

5import requests 1abcd

6import websockets 1abcd

7from asyncer import asyncify 1abcd

8 

9from fastagency.messages import AskingMessage, IOMessage, WorkflowCompleted 1abcd

10from fastagency.ui.console import ConsoleUI 1abcd

11 

12# API base URL 

13FASTAGENCY_URL = "http://localhost:8008" 1abcd

14 

15# User credentials 

16CREDENTIALS = { 1abcd

17 "username": "johndoe", 

18 "password": "secret", # pragma: allowlist secret 

19} 

20 

21 

22# Function to authenticate and get the OAuth token 

23def get_oauth_token() -> str: 1abcd

24 """Authenticate the user and return the access token.""" 

25 response = requests.post(f"{FASTAGENCY_URL}/token", data=CREDENTIALS) 

26 response.raise_for_status() # Ensure we handle errors 

27 return response.json().get("access_token") # type: ignore 

28 

29 

30# Function to initiate the workflow 

31def initiate_workflow(token: str) -> dict[str, Any]: 1abcd

32 """Initiate the workflow and return the initial payload.""" 

33 payload = { 

34 "workflow_name": "simple_learning", 

35 "workflow_uuid": "1234", # You can generate this dynamically 

36 "user_id": None, 

37 "params": {"message": "Hello"}, 

38 } 

39 

40 headers = {"Authorization": f"Bearer {token}"} 

41 response = requests.post( 

42 f"{FASTAGENCY_URL}/fastagency/initiate_workflow", json=payload, headers=headers 

43 ) 

44 response.raise_for_status() # Ensure we handle errors 

45 return response.json() # type: ignore 

46 

47 

48# Function to handle WebSocket communication 

49async def websocket_workflow(token: str, initial_payload: dict[str, Any]) -> None: 1abcd

50 """Establish a WebSocket connection and handle the workflow interaction.""" 

51 websocket_url = f"ws{FASTAGENCY_URL[4:]}/fastagency/ws" 

52 ui = ConsoleUI() # Initialize the UI for handling user interaction 

53 

54 async with websockets.connect( 

55 websocket_url, extra_headers={"Authorization": f"Bearer {token}"} 

56 ) as websocket: 

57 # Send the initial payload to start the workflow 

58 await websocket.send(json.dumps(initial_payload)) 

59 

60 while True: 

61 # Receive messages from the WebSocket server 

62 response = await websocket.recv() 

63 message = IOMessage.create(**json.loads(response)) 

64 

65 # Process the received message and interact with the UI 

66 result = await asyncify(ui.process_message)(message) 

67 

68 # Respond if the message requires further input 

69 if isinstance(message, AskingMessage) and result is not None: 

70 await websocket.send(result) 

71 elif isinstance(message, WorkflowCompleted): 

72 # Exit the loop when the workflow is completed 

73 break 

74 

75 

76# Main function to run the workflow 

77async def main() -> None: 1abcd

78 """Main function to orchestrate the workflow.""" 

79 # Step 1: Authenticate to get the OAuth2 token 

80 token = get_oauth_token() 

81 

82 # Step 2: Initiate the workflow and get the initial payload 

83 initial_payload = initiate_workflow(token) 

84 

85 # Step 3: Handle WebSocket interaction 

86 await websocket_workflow(token, initial_payload) 

87 

88 

89if __name__ == "__main__": 1abcd

90 # Run the async main function 

91 asyncio.run(main())