Coverage for pydantic/networks.py: 98.91%

371 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-15 13:26 +0000

1import re 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

2from ipaddress import ( 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

3 IPv4Address, 

4 IPv4Interface, 

5 IPv4Network, 

6 IPv6Address, 

7 IPv6Interface, 

8 IPv6Network, 

9 _BaseAddress, 

10 _BaseNetwork, 

11) 

12from typing import ( 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

13 TYPE_CHECKING, 

14 Any, 

15 Collection, 

16 Dict, 

17 Generator, 

18 List, 

19 Match, 

20 Optional, 

21 Pattern, 

22 Set, 

23 Tuple, 

24 Type, 

25 Union, 

26 cast, 

27 no_type_check, 

28) 

29 

30from pydantic import errors 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

31from pydantic.utils import Representation, update_not_none 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

32from pydantic.validators import constr_length_validator, str_validator 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

33 

34if TYPE_CHECKING: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

35 import email_validator 

36 from typing_extensions import TypedDict 

37 

38 from pydantic.config import BaseConfig 

39 from pydantic.fields import ModelField 

40 from pydantic.typing import AnyCallable 

41 

42 CallableGenerator = Generator[AnyCallable, None, None] 

43 

44 class Parts(TypedDict, total=False): 

45 scheme: str 

46 user: Optional[str] 

47 password: Optional[str] 

48 ipv4: Optional[str] 

49 ipv6: Optional[str] 

50 domain: Optional[str] 

51 port: Optional[str] 

52 path: Optional[str] 

53 query: Optional[str] 

54 fragment: Optional[str] 

55 

56 class HostParts(TypedDict, total=False): 

57 host: str 

58 tld: Optional[str] 

59 host_type: Optional[str] 

60 port: Optional[str] 

61 rebuild: bool 

62 

63else: 

64 email_validator = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

65 

66 class Parts(dict): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

67 pass 1uMaxbyczdAeBvNfCgDhEiFjGPQRSTUwOkHlImJnKoL

68 

69 

70NetworkType = Union[str, bytes, int, Tuple[Union[str, bytes, int], Union[str, int]]] 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

71 

72__all__ = [ 1axbyczdAeBfCgDhEiFjGpqrsPQRSTUtkHlImJnKoL

73 'AnyUrl', 

74 'AnyHttpUrl', 

75 'FileUrl', 

76 'HttpUrl', 

77 'stricturl', 

78 'EmailStr', 

79 'NameEmail', 

80 'IPvAnyAddress', 

81 'IPvAnyInterface', 

82 'IPvAnyNetwork', 

83 'PostgresDsn', 

84 'CockroachDsn', 

85 'AmqpDsn', 

86 'RedisDsn', 

87 'MongoDsn', 

88 'KafkaDsn', 

89 'validate_email', 

90] 

91 

92_url_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

93_multi_host_url_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

94_ascii_domain_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

95_int_domain_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

96_host_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

97 

98_host_regex = ( 1axbyczdAeBfCgDhEiFjGPQRSTUkHlImJnKoL

99 r'(?:' 

100 r'(?P<ipv4>(?:\d{1,3}\.){3}\d{1,3})(?=$|[/:#?])|' # ipv4 

101 r'(?P<ipv6>\[[A-F0-9]*:[A-F0-9:]+\])(?=$|[/:#?])|' # ipv6 

102 r'(?P<domain>[^\s/:?#]+)' # domain, validation occurs later 

103 r')?' 

104 r'(?::(?P<port>\d+))?' # port 

105) 

106_scheme_regex = r'(?:(?P<scheme>[a-z][a-z0-9+\-.]+)://)?' # scheme https://tools.ietf.org/html/rfc3986#appendix-A 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

107_user_info_regex = r'(?:(?P<user>[^\s:/]*)(?::(?P<password>[^\s/]*))?@)?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

108_path_regex = r'(?P<path>/[^\s?#]*)?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

109_query_regex = r'(?:\?(?P<query>[^\s#]*))?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

110_fragment_regex = r'(?:#(?P<fragment>[^\s#]*))?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

111 

112 

113def url_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

114 global _url_regex_cache 

115 if _url_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

116 _url_regex_cache = re.compile( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

117 rf'{_scheme_regex}{_user_info_regex}{_host_regex}{_path_regex}{_query_regex}{_fragment_regex}', 

118 re.IGNORECASE, 

119 ) 

120 return _url_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

121 

122 

123def multi_host_url_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

124 """ 

125 Compiled multi host url regex. 

126 

127 Additionally to `url_regex` it allows to match multiple hosts. 

128 E.g. host1.db.net,host2.db.net 

129 """ 

130 global _multi_host_url_regex_cache 

131 if _multi_host_url_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

132 _multi_host_url_regex_cache = re.compile( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

133 rf'{_scheme_regex}{_user_info_regex}' 

134 r'(?P<hosts>([^/]*))' # validation occurs later 

135 rf'{_path_regex}{_query_regex}{_fragment_regex}', 

136 re.IGNORECASE, 

137 ) 

138 return _multi_host_url_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

139 

140 

141def ascii_domain_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

142 global _ascii_domain_regex_cache 

143 if _ascii_domain_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

144 ascii_chunk = r'[_0-9a-z](?:[-_0-9a-z]{0,61}[_0-9a-z])?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

145 ascii_domain_ending = r'(?P<tld>\.[a-z]{2,63})?\.?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

146 _ascii_domain_regex_cache = re.compile( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

147 fr'(?:{ascii_chunk}\.)*?{ascii_chunk}{ascii_domain_ending}', re.IGNORECASE 

148 ) 

149 return _ascii_domain_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

150 

151 

152def int_domain_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

153 global _int_domain_regex_cache 

154 if _int_domain_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

155 int_chunk = r'[_0-9a-\U00040000](?:[-_0-9a-\U00040000]{0,61}[_0-9a-\U00040000])?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

156 int_domain_ending = r'(?P<tld>(\.[^\W\d_]{2,63})|(\.(?:xn--)[_0-9a-z-]{2,63}))?\.?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

157 _int_domain_regex_cache = re.compile(fr'(?:{int_chunk}\.)*?{int_chunk}{int_domain_ending}', re.IGNORECASE) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

158 return _int_domain_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

159 

160 

161def host_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

162 global _host_regex_cache 

163 if _host_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

164 _host_regex_cache = re.compile( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

165 _host_regex, 

166 re.IGNORECASE, 

167 ) 

168 return _host_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

169 

170 

171class AnyUrl(str): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

172 strip_whitespace = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

173 min_length = 1 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

174 max_length = 2**16 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

175 allowed_schemes: Optional[Collection[str]] = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

176 tld_required: bool = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

177 user_required: bool = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

178 host_required: bool = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

179 hidden_parts: Set[str] = set() 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

180 

181 __slots__ = ('scheme', 'user', 'password', 'host', 'tld', 'host_type', 'port', 'path', 'query', 'fragment') 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

182 

183 @no_type_check 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

184 def __new__(cls, url: Optional[str], **kwargs) -> object: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

185 return str.__new__(cls, cls.build(**kwargs) if url is None else url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

186 

187 def __init__( 1axbyczdAeBfCgDhEiFjGpqrsPQRSTUtkHlImJnKoL

188 self, 

189 url: str, 

190 *, 

191 scheme: str, 

192 user: Optional[str] = None, 

193 password: Optional[str] = None, 

194 host: Optional[str] = None, 

195 tld: Optional[str] = None, 

196 host_type: str = 'domain', 

197 port: Optional[str] = None, 

198 path: Optional[str] = None, 

199 query: Optional[str] = None, 

200 fragment: Optional[str] = None, 

201 ) -> None: 

202 str.__init__(url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

203 self.scheme = scheme 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

204 self.user = user 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

205 self.password = password 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

206 self.host = host 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

207 self.tld = tld 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

208 self.host_type = host_type 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

209 self.port = port 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

210 self.path = path 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

211 self.query = query 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

212 self.fragment = fragment 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

213 

214 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

215 def build( 1axbyczdAeBfCgDhEiFjGpqrsPQRSTUtkHlImJnKoL

216 cls, 

217 *, 

218 scheme: str, 

219 user: Optional[str] = None, 

220 password: Optional[str] = None, 

221 host: str, 

222 port: Optional[str] = None, 

223 path: Optional[str] = None, 

224 query: Optional[str] = None, 

225 fragment: Optional[str] = None, 

226 **_kwargs: str, 

227 ) -> str: 

228 parts = Parts( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

229 scheme=scheme, 

230 user=user, 

231 password=password, 

232 host=host, 

233 port=port, 

234 path=path, 

235 query=query, 

236 fragment=fragment, 

237 **_kwargs, # type: ignore[misc] 

238 ) 

239 

240 url = scheme + '://' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

241 if user: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

242 url += user 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

243 if password: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

244 url += ':' + password 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

245 if user or password: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

246 url += '@' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

247 url += host 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

248 if port and ('port' not in cls.hidden_parts or cls.get_default_parts(parts).get('port') != port): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

249 url += ':' + port 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

250 if path: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

251 url += path 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

252 if query: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

253 url += '?' + query 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

254 if fragment: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

255 url += '#' + fragment 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

256 return url 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

257 

258 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

259 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

260 update_not_none(field_schema, minLength=cls.min_length, maxLength=cls.max_length, format='uri') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

261 

262 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

263 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

264 yield cls.validate 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

265 

266 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

267 def validate(cls, value: Any, field: 'ModelField', config: 'BaseConfig') -> 'AnyUrl': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

268 if value.__class__ == cls: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

269 return value 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

270 value = str_validator(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

271 if cls.strip_whitespace: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

272 value = value.strip() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

273 url: str = cast(str, constr_length_validator(value, field, config)) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

274 

275 m = cls._match_url(url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

276 # the regex should always match, if it doesn't please report with details of the URL tried 

277 assert m, 'URL regex failed unexpectedly' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

278 

279 original_parts = cast('Parts', m.groupdict()) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

280 parts = cls.apply_default_parts(original_parts) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

281 parts = cls.validate_parts(parts) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

282 

283 if m.end() != len(url): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

284 raise errors.UrlExtraError(extra=url[m.end() :]) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

285 

286 return cls._build_url(m, url, parts) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

287 

288 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

289 def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'AnyUrl': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

290 """ 

291 Validate hosts and build the AnyUrl object. Split from `validate` so this method 

292 can be altered in `MultiHostDsn`. 

293 """ 

294 host, tld, host_type, rebuild = cls.validate_host(parts) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

295 

296 return cls( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

297 None if rebuild else url, 

298 scheme=parts['scheme'], 

299 user=parts['user'], 

300 password=parts['password'], 

301 host=host, 

302 tld=tld, 

303 host_type=host_type, 

304 port=parts['port'], 

305 path=parts['path'], 

306 query=parts['query'], 

307 fragment=parts['fragment'], 

308 ) 

309 

310 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

311 def _match_url(url: str) -> Optional[Match[str]]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

312 return url_regex().match(url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

313 

314 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

315 def _validate_port(port: Optional[str]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

316 if port is not None and int(port) > 65_535: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

317 raise errors.UrlPortError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

318 

319 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

320 def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

321 """ 

322 A method used to validate parts of a URL. 

323 Could be overridden to set default values for parts if missing 

324 """ 

325 scheme = parts['scheme'] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

326 if scheme is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

327 raise errors.UrlSchemeError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

328 

329 if cls.allowed_schemes and scheme.lower() not in cls.allowed_schemes: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

330 raise errors.UrlSchemePermittedError(set(cls.allowed_schemes)) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

331 

332 if validate_port: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

333 cls._validate_port(parts['port']) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

334 

335 user = parts['user'] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

336 if cls.user_required and user is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

337 raise errors.UrlUserInfoError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

338 

339 return parts 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

340 

341 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

342 def validate_host(cls, parts: 'Parts') -> Tuple[str, Optional[str], str, bool]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

343 tld, host_type, rebuild = None, None, False 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

344 for f in ('domain', 'ipv4', 'ipv6'): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

345 host = parts[f] # type: ignore[literal-required] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

346 if host: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

347 host_type = f 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

348 break 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

349 

350 if host is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

351 if cls.host_required: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

352 raise errors.UrlHostError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

353 elif host_type == 'domain': 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

354 is_international = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

355 d = ascii_domain_regex().fullmatch(host) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

356 if d is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

357 d = int_domain_regex().fullmatch(host) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

358 if d is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

359 raise errors.UrlHostError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

360 is_international = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

361 

362 tld = d.group('tld') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

363 if tld is None and not is_international: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

364 d = int_domain_regex().fullmatch(host) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

365 assert d is not None 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

366 tld = d.group('tld') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

367 is_international = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

368 

369 if tld is not None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

370 tld = tld[1:] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

371 elif cls.tld_required: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

372 raise errors.UrlHostTldError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

373 

374 if is_international: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

375 host_type = 'int_domain' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

376 rebuild = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

377 host = host.encode('idna').decode('ascii') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

378 if tld is not None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

379 tld = tld.encode('idna').decode('ascii') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

380 

381 return host, tld, host_type, rebuild # type: ignore 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

382 

383 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

384 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

385 return {} 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

386 

387 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

388 def apply_default_parts(cls, parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

389 for key, value in cls.get_default_parts(parts).items(): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

390 if not parts[key]: # type: ignore[literal-required] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

391 parts[key] = value # type: ignore[literal-required] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

392 return parts 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

393 

394 def __repr__(self) -> str: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

395 extra = ', '.join(f'{n}={getattr(self, n)!r}' for n in self.__slots__ if getattr(self, n) is not None) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

396 return f'{self.__class__.__name__}({super().__repr__()}, {extra})' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

397 

398 

399class AnyHttpUrl(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

400 allowed_schemes = {'http', 'https'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

401 

402 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

403 

404 

405class HttpUrl(AnyHttpUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

406 tld_required = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

407 # https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers 

408 max_length = 2083 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

409 hidden_parts = {'port'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

410 

411 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

412 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

413 return {'port': '80' if parts['scheme'] == 'http' else '443'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

414 

415 

416class FileUrl(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

417 allowed_schemes = {'file'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

418 host_required = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

419 

420 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

421 

422 

423class MultiHostDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

424 __slots__ = AnyUrl.__slots__ + ('hosts',) 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

425 

426 def __init__(self, *args: Any, hosts: Optional[List['HostParts']] = None, **kwargs: Any): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

427 super().__init__(*args, **kwargs) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

428 self.hosts = hosts 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

429 

430 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

431 def _match_url(url: str) -> Optional[Match[str]]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

432 return multi_host_url_regex().match(url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

433 

434 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

435 def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

436 return super().validate_parts(parts, validate_port=False) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

437 

438 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

439 def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'MultiHostDsn': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

440 hosts_parts: List['HostParts'] = [] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

441 host_re = host_regex() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

442 for host in m.groupdict()['hosts'].split(','): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

443 d: Parts = host_re.match(host).groupdict() # type: ignore 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

444 host, tld, host_type, rebuild = cls.validate_host(d) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

445 port = d.get('port') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

446 cls._validate_port(port) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

447 hosts_parts.append( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

448 { 

449 'host': host, 

450 'host_type': host_type, 

451 'tld': tld, 

452 'rebuild': rebuild, 

453 'port': port, 

454 } 

455 ) 

456 

457 if len(hosts_parts) > 1: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

458 return cls( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

459 None if any([hp['rebuild'] for hp in hosts_parts]) else url, 

460 scheme=parts['scheme'], 

461 user=parts['user'], 

462 password=parts['password'], 

463 path=parts['path'], 

464 query=parts['query'], 

465 fragment=parts['fragment'], 

466 host_type=None, 

467 hosts=hosts_parts, 

468 ) 

469 else: 

470 # backwards compatibility with single host 

471 host_part = hosts_parts[0] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

472 return cls( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

473 None if host_part['rebuild'] else url, 

474 scheme=parts['scheme'], 

475 user=parts['user'], 

476 password=parts['password'], 

477 host=host_part['host'], 

478 tld=host_part['tld'], 

479 host_type=host_part['host_type'], 

480 port=host_part.get('port'), 

481 path=parts['path'], 

482 query=parts['query'], 

483 fragment=parts['fragment'], 

484 ) 

485 

486 

487class PostgresDsn(MultiHostDsn): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

488 allowed_schemes = { 1axbyczdAeBfCgDhEiFjGPQRSTUkHlImJnKoL

489 'postgres', 

490 'postgresql', 

491 'postgresql+asyncpg', 

492 'postgresql+pg8000', 

493 'postgresql+psycopg', 

494 'postgresql+psycopg2', 

495 'postgresql+psycopg2cffi', 

496 'postgresql+py-postgresql', 

497 'postgresql+pygresql', 

498 } 

499 user_required = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

500 

501 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

502 

503 

504class CockroachDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

505 allowed_schemes = { 1axbyczdAeBfCgDhEiFjGPQRSTUkHlImJnKoL

506 'cockroachdb', 

507 'cockroachdb+psycopg2', 

508 'cockroachdb+asyncpg', 

509 } 

510 user_required = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

511 

512 

513class AmqpDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

514 allowed_schemes = {'amqp', 'amqps'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

515 host_required = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

516 

517 

518class RedisDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

519 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

520 allowed_schemes = {'redis', 'rediss'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

521 host_required = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

522 

523 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

524 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

525 return { 1axbyczdAeBfCgDhEiFjGpqrstkHlImJnKoL

526 'domain': 'localhost' if not (parts['ipv4'] or parts['ipv6']) else '', 

527 'port': '6379', 

528 'path': '/0', 

529 } 

530 

531 

532class MongoDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

533 allowed_schemes = {'mongodb'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

534 

535 # TODO: Needed to generic "Parts" for "Replica Set", "Sharded Cluster", and other mongodb deployment modes 

536 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

537 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

538 return { 1axbyczdAeBfCgDhEiFjGpqrstkHlImJnKoL

539 'port': '27017', 

540 } 

541 

542 

543class KafkaDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

544 allowed_schemes = {'kafka'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

545 

546 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

547 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

548 return { 1axbyczdAeBfCgDhEiFjGpqrstkHlImJnKoL

549 'domain': 'localhost', 

550 'port': '9092', 

551 } 

552 

553 

554def stricturl( 1axbyczdAeBfCgDhEiFjGpqrsPQRSTUtkHlImJnKoL

555 *, 

556 strip_whitespace: bool = True, 

557 min_length: int = 1, 

558 max_length: int = 2**16, 

559 tld_required: bool = True, 

560 host_required: bool = True, 

561 allowed_schemes: Optional[Collection[str]] = None, 

562) -> Type[AnyUrl]: 

563 # use kwargs then define conf in a dict to aid with IDE type hinting 

564 namespace = dict( 1uMaxbyczdAeBvNfCgDhEiFjGPQRSTUwOkHlImJnKoL

565 strip_whitespace=strip_whitespace, 

566 min_length=min_length, 

567 max_length=max_length, 

568 tld_required=tld_required, 

569 host_required=host_required, 

570 allowed_schemes=allowed_schemes, 

571 ) 

572 return type('UrlValue', (AnyUrl,), namespace) 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

573 

574 

575def import_email_validator() -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

576 global email_validator 

577 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

578 import email_validator 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

579 except ImportError as e: 1MxyzABNCDEFGOHIJKL

580 raise ImportError('email-validator is not installed, run `pip install pydantic[email]`') from e 1MxyzABNCDEFGOHIJKL

581 

582 

583class EmailStr(str): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

584 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

585 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

586 field_schema.update(type='string', format='email') 1uabcdevfghijpqrstwklmno

587 

588 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

589 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

590 # included here and below so the error happens straight away 

591 import_email_validator() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

592 

593 yield str_validator 1uabcdevfghijpqrstwklmno

594 yield cls.validate 1uabcdevfghijpqrstwklmno

595 

596 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

597 def validate(cls, value: Union[str]) -> str: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

598 return validate_email(value)[1] 1uabcdevfghijpqrstwklmno

599 

600 

601class NameEmail(Representation): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

602 __slots__ = 'name', 'email' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

603 

604 def __init__(self, name: str, email: str): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

605 self.name = name 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

606 self.email = email 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

607 

608 def __eq__(self, other: Any) -> bool: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

609 return isinstance(other, NameEmail) and (self.name, self.email) == (other.name, other.email) 1uabcdevfghijpqrstwklmno

610 

611 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

612 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

613 field_schema.update(type='string', format='name-email') 1uabcdevfghijpqrstwklmno

614 

615 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

616 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

617 import_email_validator() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

618 

619 yield cls.validate 1uabcdevfghijpqrstwklmno

620 

621 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

622 def validate(cls, value: Any) -> 'NameEmail': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

623 if value.__class__ == cls: 1uabcdevfghijpqrstwklmno

624 return value 1uabcdevfghijpqrstwklmno

625 value = str_validator(value) 1uabcdevfghijpqrstwklmno

626 return cls(*validate_email(value)) 1uabcdevfghijpqrstwklmno

627 

628 def __str__(self) -> str: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

629 return f'{self.name} <{self.email}>' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

630 

631 

632class IPvAnyAddress(_BaseAddress): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

633 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

634 

635 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

636 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

637 field_schema.update(type='string', format='ipvanyaddress') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

638 

639 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

640 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

641 yield cls.validate 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

642 

643 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

644 def validate(cls, value: Union[str, bytes, int]) -> Union[IPv4Address, IPv6Address]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

645 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

646 return IPv4Address(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

647 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

648 pass 1uMaxbyczdAeBvNfCgDhEiFjGwOkHlImJnKoL

649 

650 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

651 return IPv6Address(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

652 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

653 raise errors.IPvAnyAddressError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

654 

655 

656class IPvAnyInterface(_BaseAddress): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

657 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

658 

659 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

660 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

661 field_schema.update(type='string', format='ipvanyinterface') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

662 

663 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

664 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

665 yield cls.validate 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

666 

667 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

668 def validate(cls, value: NetworkType) -> Union[IPv4Interface, IPv6Interface]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

669 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

670 return IPv4Interface(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

671 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

672 pass 1uMaxbyczdAeBvNfCgDhEiFjGwOkHlImJnKoL

673 

674 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

675 return IPv6Interface(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

676 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

677 raise errors.IPvAnyInterfaceError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

678 

679 

680class IPvAnyNetwork(_BaseNetwork): # type: ignore 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

681 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

682 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

683 field_schema.update(type='string', format='ipvanynetwork') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

684 

685 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

686 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

687 yield cls.validate 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

688 

689 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

690 def validate(cls, value: NetworkType) -> Union[IPv4Network, IPv6Network]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

691 # Assume IP Network is defined with a default value for ``strict`` argument. 

692 # Define your own class if you want to specify network address check strictness. 

693 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

694 return IPv4Network(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

695 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

696 pass 1uMaxbyczdAeBvNfCgDhEiFjGwOkHlImJnKoL

697 

698 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

699 return IPv6Network(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

700 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

701 raise errors.IPvAnyNetworkError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

702 

703 

704pretty_email_regex = re.compile(r'([\w ]*?) *<(.*)> *') 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

705MAX_EMAIL_LENGTH = 2048 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

706"""Maximum length for an email. 

707A somewhat arbitrary but very generous number compared to what is allowed by most implementations. 

708""" 

709 

710 

711def validate_email(value: Union[str]) -> Tuple[str, str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL

712 """ 

713 Email address validation using https://pypi.org/project/email-validator/ 

714 Notes: 

715 * raw ip address (literal) domain parts are not allowed. 

716 * "John Doe <local_part@domain.com>" style "pretty" email addresses are processed 

717 * spaces are striped from the beginning and end of addresses but no error is raised 

718 """ 

719 if email_validator is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL

720 import_email_validator() 1MxyzABNCDEFGOHIJKL

721 

722 if len(value) > MAX_EMAIL_LENGTH: 1uabcdevfghijpqrstwklmno

723 raise errors.EmailError() 1uabcdevfghijpqrstwklmno

724 

725 m = pretty_email_regex.fullmatch(value) 1uabcdevfghijpqrstwklmno

726 name: Union[str, None] = None 1uabcdevfghijpqrstwklmno

727 if m: 1uabcdevfghijpqrstwklmno

728 name, value = m.groups() 1uabcdevfghijpqrstwklmno

729 email = value.strip() 1uabcdevfghijpqrstwklmno

730 try: 1uabcdevfghijpqrstwklmno

731 parts = email_validator.validate_email(email, check_deliverability=False) 1uabcdevfghijpqrstwklmno

732 except email_validator.EmailNotValidError as e: 1uabcdevfghijpqrstwklmno

733 raise errors.EmailError from e 1uabcdevfghijpqrstwklmno

734 

735 if hasattr(parts, 'normalized'): 735 ↛ 743line 735 didn't jump to line 743 because the condition on line 735 was always true1uabcdevfghijpqrstwklmno

736 # email-validator >= 2 

737 email = parts.normalized 1uabcdevfghijpqrstwklmno

738 assert email is not None 1uabcdevfghijpqrstwklmno

739 name = name or parts.local_part 1uabcdevfghijpqrstwklmno

740 return name, email 1uabcdevfghijpqrstwklmno

741 else: 

742 # email-validator >1, <2 

743 at_index = email.index('@') 

744 local_part = email[:at_index] # RFC 5321, local part must be case-sensitive. 

745 global_part = email[at_index:].lower() 

746 

747 return name or local_part, local_part + global_part