Coverage for fastagency/ui/mesop/auth/firebase/firebase_auth.py: 52%

45 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1import os 1abc

2import typing 1abc

3from typing import Any, Callable, Literal, Union 1abc

4 

5import firebase_admin 1abc

6import mesop as me 1abc

7import mesop.labs as mel 1abc

8from firebase_admin import auth 1abc

9 

10from ...data_model import State 1abc

11from ...styles import MesopHomePageStyles 1abc

12from .firebase_auth_component import FirebaseConfig, firebase_auth_component 1abc

13 

14__all__ = ["FirebaseAuth"] 1abc

15 

16if typing.TYPE_CHECKING: 1abc

17 from ..auth import AuthProtocol 

18 

19 

20# Avoid re-initializing firebase app (useful for avoiding warning message because of hot reloads). 

21if firebase_admin._DEFAULT_APP_NAME not in firebase_admin._apps: 21 ↛ 25line 21 didn't jump to line 25 because the condition on line 21 was always true1abc

22 default_app = firebase_admin.initialize_app() 1abc

23 

24 

25class FirebaseAuth: # implements AuthProtocol 1abc

26 SIGN_IN_MESSAGE = "Sign in to your account" 1abc

27 UN_AUTHORIZED_ERROR_MESSAGE = """You are not authorized to access this application. Please contact the application administrators for access.""" 1abc

28 

29 def __init__( 1abc

30 self, 

31 sign_in_methods: list[Literal["google"]], 1abc

32 config: FirebaseConfig, 1abc

33 allowed_users: Union[ 1abc

34 list[str], Callable[[dict[str, Any]], bool], Literal["all"] 1abc

35 ], # for callable -> pass the whole decoded token (dict) 

36 ) -> None: 1abc

37 """Initialize the Firebase Auth provider. 

38 

39 Args: 

40 sign_in_methods: List of authentication methods to enable. 

41 Currently only supports ["google"]. 

42 config: Firebase configuration containing project settings. 

43 allowed_users: Specifies user access control: 

44 - List[str]: List of allowed email addresses 

45 - Callable: Function taking decoded token and returning boolean 

46 - "all": Allows all authenticated users (default) 

47 

48 Raises: 

49 TypeError: If sign_in_methods is not a list 

50 ValueError: If no sign-in methods specified, unsupported methods provided, 

51 or GOOGLE_APPLICATION_CREDENTIALS environment variable is missing 

52 """ 

53 # mypy check if self is AuthProtocol 

54 _self: AuthProtocol = self 1abc

55 

56 self.config = config 1abc

57 self.allowed_users = allowed_users 1abc

58 

59 # Validate sign_in_methods type 

60 if not isinstance(sign_in_methods, list): 1abc

61 raise TypeError( 

62 "sign_in_methods must be a list. Example: sign_in_methods=['google']" 

63 ) 

64 

65 # 2. Remove duplicates 

66 self.sign_in_methods = list(set(sign_in_methods)) 1abc

67 

68 # 3. Validate sign-in methods 

69 if not self.sign_in_methods: 1abc

70 raise ValueError("At least one sign-in method must be specified") 

71 

72 unsupported_methods = [ 1abc

73 method for method in self.sign_in_methods if method != "google" 1abc

74 ] 

75 if unsupported_methods: 1abc

76 raise ValueError( 

77 f"Unsupported sign-in method(s): {unsupported_methods}. Currently, only 'google' sign-in is supported." 

78 ) 

79 

80 if not os.getenv("GOOGLE_APPLICATION_CREDENTIALS"): 1abc

81 raise ValueError( 1abc

82 "Error: A service account key is required. Please create one and set the JSON key file path in the `GOOGLE_APPLICATION_CREDENTIALS` environment variable. For more information: https://firebase.google.com/docs/admin/setup#initialize_the_sdk_in_non-google_environments" 1abc

83 ) 

84 

85 def create_security_policy(self, policy: me.SecurityPolicy) -> me.SecurityPolicy: 1abc

86 return me.SecurityPolicy( 1abc

87 dangerously_disable_trusted_types=True, 

88 allowed_connect_srcs=list( 

89 set(policy.allowed_connect_srcs or []) | {"*.googleapis.com"} 

90 ), 

91 allowed_script_srcs=list( 

92 set(policy.allowed_script_srcs or []) 

93 | { 

94 "*.google.com", 

95 "https://www.gstatic.com", 

96 "https://cdn.jsdelivr.net", 

97 } 

98 ), 

99 ) 

100 

101 def is_authorized(self, token: dict[str, Any]) -> bool: 1abc

102 """Check if the user is authorized based on the token and allowed_users configuration. 

103 

104 Args: 

105 token: The decoded Firebase JWT token containing user information. 

106 Must include an 'email' field for validation. 

107 

108 Returns: 

109 bool: True if the user is authorized, False otherwise. 

110 

111 Raises: 

112 TypeError: If allowed_users is not of type str, list, or Callable. 

113 ValueError: If email field is missing in the Firebase token. 

114 """ 

115 # Check if the email is present in token 

116 email = token.get("email") 1abc

117 if not email: 1abc

118 raise ValueError( 1abc

119 "Invalid response from Firebase: `email` field is missing in the token" 

120 ) 

121 

122 # Handle string-based configuration ("all" or single email) 

123 if isinstance(self.allowed_users, str): 1abc

124 if self.allowed_users == "all": 1abc

125 return True 1abc

126 return email == self.allowed_users 1abc

127 

128 # Handle list of allowed email addresses 

129 if isinstance(self.allowed_users, list): 1abc

130 return email in { 1abc

131 addr.strip() if isinstance(addr, str) else addr 

132 for addr in self.allowed_users 

133 } 

134 

135 # Handle custom validation function 

136 if callable(self.allowed_users): 1abc

137 return self.allowed_users(token) 1abc

138 

139 raise TypeError( 1abc

140 "allowed_users must be one of: " 

141 "str ('all' or email), " 

142 "list of emails, " 

143 "or callable taking token dict" 

144 ) 

145 

146 def on_auth_changed(self, e: mel.WebEvent) -> None: 1abc

147 state = me.state(State) 

148 firebase_auth_token = e.value 

149 

150 if not firebase_auth_token: 

151 state.authenticated_user = "" 

152 state.auth_error = None 

153 return 

154 

155 decoded_token = auth.verify_id_token(firebase_auth_token) 

156 if self.is_authorized(decoded_token): 

157 state.authenticated_user = decoded_token["email"] 

158 state.auth_error = None 

159 else: 

160 state.authenticated_user = "" 

161 state.auth_error = FirebaseAuth.UN_AUTHORIZED_ERROR_MESSAGE 

162 

163 # maybe me.Component is wrong 

164 def auth_component(self) -> me.component: 1abc

165 styles = MesopHomePageStyles() 

166 state = me.state(State) 

167 if state.authenticated_user: 

168 with me.box(style=styles.logout_btn_container): 

169 firebase_auth_component( 

170 on_auth_changed=self.on_auth_changed, config=self.config 

171 ) 

172 else: 

173 with me.box(style=styles.login_box): # noqa: SIM117 

174 with me.box(style=styles.login_btn_container): 

175 message = state.auth_error or FirebaseAuth.SIGN_IN_MESSAGE 

176 me.text(message, style=styles.header_text) 

177 firebase_auth_component( 

178 on_auth_changed=self.on_auth_changed, config=self.config 

179 )