Coverage for pydantic/networks.py: 98.91%
371 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-15 13:26 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-15 13:26 +0000
1import re 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
2from ipaddress import ( 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
3 IPv4Address,
4 IPv4Interface,
5 IPv4Network,
6 IPv6Address,
7 IPv6Interface,
8 IPv6Network,
9 _BaseAddress,
10 _BaseNetwork,
11)
12from typing import ( 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
13 TYPE_CHECKING,
14 Any,
15 Collection,
16 Dict,
17 Generator,
18 List,
19 Match,
20 Optional,
21 Pattern,
22 Set,
23 Tuple,
24 Type,
25 Union,
26 cast,
27 no_type_check,
28)
30from pydantic import errors 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
31from pydantic.utils import Representation, update_not_none 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
32from pydantic.validators import constr_length_validator, str_validator 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
34if TYPE_CHECKING: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
35 import email_validator
36 from typing_extensions import TypedDict
38 from pydantic.config import BaseConfig
39 from pydantic.fields import ModelField
40 from pydantic.typing import AnyCallable
42 CallableGenerator = Generator[AnyCallable, None, None]
44 class Parts(TypedDict, total=False):
45 scheme: str
46 user: Optional[str]
47 password: Optional[str]
48 ipv4: Optional[str]
49 ipv6: Optional[str]
50 domain: Optional[str]
51 port: Optional[str]
52 path: Optional[str]
53 query: Optional[str]
54 fragment: Optional[str]
56 class HostParts(TypedDict, total=False):
57 host: str
58 tld: Optional[str]
59 host_type: Optional[str]
60 port: Optional[str]
61 rebuild: bool
63else:
64 email_validator = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
66 class Parts(dict): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
67 pass 1uMaxbyczdAeBvNfCgDhEiFjGPQRSTUwOkHlImJnKoL
70NetworkType = Union[str, bytes, int, Tuple[Union[str, bytes, int], Union[str, int]]] 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
72__all__ = [ 1axbyczdAeBfCgDhEiFjGpqrsPQRSTUtkHlImJnKoL
73 'AnyUrl',
74 'AnyHttpUrl',
75 'FileUrl',
76 'HttpUrl',
77 'stricturl',
78 'EmailStr',
79 'NameEmail',
80 'IPvAnyAddress',
81 'IPvAnyInterface',
82 'IPvAnyNetwork',
83 'PostgresDsn',
84 'CockroachDsn',
85 'AmqpDsn',
86 'RedisDsn',
87 'MongoDsn',
88 'KafkaDsn',
89 'validate_email',
90]
92_url_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
93_multi_host_url_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
94_ascii_domain_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
95_int_domain_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
96_host_regex_cache = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
98_host_regex = ( 1axbyczdAeBfCgDhEiFjGPQRSTUkHlImJnKoL
99 r'(?:'
100 r'(?P<ipv4>(?:\d{1,3}\.){3}\d{1,3})(?=$|[/:#?])|' # ipv4
101 r'(?P<ipv6>\[[A-F0-9]*:[A-F0-9:]+\])(?=$|[/:#?])|' # ipv6
102 r'(?P<domain>[^\s/:?#]+)' # domain, validation occurs later
103 r')?'
104 r'(?::(?P<port>\d+))?' # port
105)
106_scheme_regex = r'(?:(?P<scheme>[a-z][a-z0-9+\-.]+)://)?' # scheme https://tools.ietf.org/html/rfc3986#appendix-A 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
107_user_info_regex = r'(?:(?P<user>[^\s:/]*)(?::(?P<password>[^\s/]*))?@)?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
108_path_regex = r'(?P<path>/[^\s?#]*)?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
109_query_regex = r'(?:\?(?P<query>[^\s#]*))?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
110_fragment_regex = r'(?:#(?P<fragment>[^\s#]*))?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
113def url_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
114 global _url_regex_cache
115 if _url_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
116 _url_regex_cache = re.compile( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
117 rf'{_scheme_regex}{_user_info_regex}{_host_regex}{_path_regex}{_query_regex}{_fragment_regex}',
118 re.IGNORECASE,
119 )
120 return _url_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
123def multi_host_url_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
124 """
125 Compiled multi host url regex.
127 Additionally to `url_regex` it allows to match multiple hosts.
128 E.g. host1.db.net,host2.db.net
129 """
130 global _multi_host_url_regex_cache
131 if _multi_host_url_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
132 _multi_host_url_regex_cache = re.compile( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
133 rf'{_scheme_regex}{_user_info_regex}'
134 r'(?P<hosts>([^/]*))' # validation occurs later
135 rf'{_path_regex}{_query_regex}{_fragment_regex}',
136 re.IGNORECASE,
137 )
138 return _multi_host_url_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
141def ascii_domain_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
142 global _ascii_domain_regex_cache
143 if _ascii_domain_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
144 ascii_chunk = r'[_0-9a-z](?:[-_0-9a-z]{0,61}[_0-9a-z])?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
145 ascii_domain_ending = r'(?P<tld>\.[a-z]{2,63})?\.?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
146 _ascii_domain_regex_cache = re.compile( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
147 fr'(?:{ascii_chunk}\.)*?{ascii_chunk}{ascii_domain_ending}', re.IGNORECASE
148 )
149 return _ascii_domain_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
152def int_domain_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
153 global _int_domain_regex_cache
154 if _int_domain_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
155 int_chunk = r'[_0-9a-\U00040000](?:[-_0-9a-\U00040000]{0,61}[_0-9a-\U00040000])?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
156 int_domain_ending = r'(?P<tld>(\.[^\W\d_]{2,63})|(\.(?:xn--)[_0-9a-z-]{2,63}))?\.?' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
157 _int_domain_regex_cache = re.compile(fr'(?:{int_chunk}\.)*?{int_chunk}{int_domain_ending}', re.IGNORECASE) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
158 return _int_domain_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
161def host_regex() -> Pattern[str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
162 global _host_regex_cache
163 if _host_regex_cache is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
164 _host_regex_cache = re.compile( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
165 _host_regex,
166 re.IGNORECASE,
167 )
168 return _host_regex_cache 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
171class AnyUrl(str): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
172 strip_whitespace = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
173 min_length = 1 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
174 max_length = 2**16 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
175 allowed_schemes: Optional[Collection[str]] = None 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
176 tld_required: bool = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
177 user_required: bool = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
178 host_required: bool = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
179 hidden_parts: Set[str] = set() 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
181 __slots__ = ('scheme', 'user', 'password', 'host', 'tld', 'host_type', 'port', 'path', 'query', 'fragment') 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
183 @no_type_check 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
184 def __new__(cls, url: Optional[str], **kwargs) -> object: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
185 return str.__new__(cls, cls.build(**kwargs) if url is None else url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
187 def __init__( 1axbyczdAeBfCgDhEiFjGpqrsPQRSTUtkHlImJnKoL
188 self,
189 url: str,
190 *,
191 scheme: str,
192 user: Optional[str] = None,
193 password: Optional[str] = None,
194 host: Optional[str] = None,
195 tld: Optional[str] = None,
196 host_type: str = 'domain',
197 port: Optional[str] = None,
198 path: Optional[str] = None,
199 query: Optional[str] = None,
200 fragment: Optional[str] = None,
201 ) -> None:
202 str.__init__(url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
203 self.scheme = scheme 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
204 self.user = user 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
205 self.password = password 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
206 self.host = host 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
207 self.tld = tld 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
208 self.host_type = host_type 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
209 self.port = port 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
210 self.path = path 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
211 self.query = query 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
212 self.fragment = fragment 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
214 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
215 def build( 1axbyczdAeBfCgDhEiFjGpqrsPQRSTUtkHlImJnKoL
216 cls,
217 *,
218 scheme: str,
219 user: Optional[str] = None,
220 password: Optional[str] = None,
221 host: str,
222 port: Optional[str] = None,
223 path: Optional[str] = None,
224 query: Optional[str] = None,
225 fragment: Optional[str] = None,
226 **_kwargs: str,
227 ) -> str:
228 parts = Parts( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
229 scheme=scheme,
230 user=user,
231 password=password,
232 host=host,
233 port=port,
234 path=path,
235 query=query,
236 fragment=fragment,
237 **_kwargs, # type: ignore[misc]
238 )
240 url = scheme + '://' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
241 if user: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
242 url += user 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
243 if password: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
244 url += ':' + password 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
245 if user or password: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
246 url += '@' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
247 url += host 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
248 if port and ('port' not in cls.hidden_parts or cls.get_default_parts(parts).get('port') != port): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
249 url += ':' + port 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
250 if path: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
251 url += path 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
252 if query: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
253 url += '?' + query 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
254 if fragment: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
255 url += '#' + fragment 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
256 return url 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
258 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
259 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
260 update_not_none(field_schema, minLength=cls.min_length, maxLength=cls.max_length, format='uri') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
262 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
263 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
264 yield cls.validate 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
266 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
267 def validate(cls, value: Any, field: 'ModelField', config: 'BaseConfig') -> 'AnyUrl': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
268 if value.__class__ == cls: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
269 return value 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
270 value = str_validator(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
271 if cls.strip_whitespace: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
272 value = value.strip() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
273 url: str = cast(str, constr_length_validator(value, field, config)) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
275 m = cls._match_url(url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
276 # the regex should always match, if it doesn't please report with details of the URL tried
277 assert m, 'URL regex failed unexpectedly' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
279 original_parts = cast('Parts', m.groupdict()) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
280 parts = cls.apply_default_parts(original_parts) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
281 parts = cls.validate_parts(parts) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
283 if m.end() != len(url): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
284 raise errors.UrlExtraError(extra=url[m.end() :]) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
286 return cls._build_url(m, url, parts) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
288 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
289 def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'AnyUrl': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
290 """
291 Validate hosts and build the AnyUrl object. Split from `validate` so this method
292 can be altered in `MultiHostDsn`.
293 """
294 host, tld, host_type, rebuild = cls.validate_host(parts) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
296 return cls( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
297 None if rebuild else url,
298 scheme=parts['scheme'],
299 user=parts['user'],
300 password=parts['password'],
301 host=host,
302 tld=tld,
303 host_type=host_type,
304 port=parts['port'],
305 path=parts['path'],
306 query=parts['query'],
307 fragment=parts['fragment'],
308 )
310 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
311 def _match_url(url: str) -> Optional[Match[str]]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
312 return url_regex().match(url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
314 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
315 def _validate_port(port: Optional[str]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
316 if port is not None and int(port) > 65_535: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
317 raise errors.UrlPortError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
319 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
320 def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
321 """
322 A method used to validate parts of a URL.
323 Could be overridden to set default values for parts if missing
324 """
325 scheme = parts['scheme'] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
326 if scheme is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
327 raise errors.UrlSchemeError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
329 if cls.allowed_schemes and scheme.lower() not in cls.allowed_schemes: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
330 raise errors.UrlSchemePermittedError(set(cls.allowed_schemes)) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
332 if validate_port: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
333 cls._validate_port(parts['port']) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
335 user = parts['user'] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
336 if cls.user_required and user is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
337 raise errors.UrlUserInfoError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
339 return parts 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
341 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
342 def validate_host(cls, parts: 'Parts') -> Tuple[str, Optional[str], str, bool]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
343 tld, host_type, rebuild = None, None, False 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
344 for f in ('domain', 'ipv4', 'ipv6'): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
345 host = parts[f] # type: ignore[literal-required] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
346 if host: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
347 host_type = f 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
348 break 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
350 if host is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
351 if cls.host_required: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
352 raise errors.UrlHostError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
353 elif host_type == 'domain': 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
354 is_international = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
355 d = ascii_domain_regex().fullmatch(host) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
356 if d is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
357 d = int_domain_regex().fullmatch(host) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
358 if d is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
359 raise errors.UrlHostError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
360 is_international = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
362 tld = d.group('tld') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
363 if tld is None and not is_international: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
364 d = int_domain_regex().fullmatch(host) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
365 assert d is not None 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
366 tld = d.group('tld') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
367 is_international = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
369 if tld is not None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
370 tld = tld[1:] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
371 elif cls.tld_required: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
372 raise errors.UrlHostTldError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
374 if is_international: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
375 host_type = 'int_domain' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
376 rebuild = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
377 host = host.encode('idna').decode('ascii') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
378 if tld is not None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
379 tld = tld.encode('idna').decode('ascii') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
381 return host, tld, host_type, rebuild # type: ignore 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
383 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
384 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
385 return {} 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
387 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
388 def apply_default_parts(cls, parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
389 for key, value in cls.get_default_parts(parts).items(): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
390 if not parts[key]: # type: ignore[literal-required] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
391 parts[key] = value # type: ignore[literal-required] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
392 return parts 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
394 def __repr__(self) -> str: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
395 extra = ', '.join(f'{n}={getattr(self, n)!r}' for n in self.__slots__ if getattr(self, n) is not None) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
396 return f'{self.__class__.__name__}({super().__repr__()}, {extra})' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
399class AnyHttpUrl(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
400 allowed_schemes = {'http', 'https'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
402 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
405class HttpUrl(AnyHttpUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
406 tld_required = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
407 # https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers
408 max_length = 2083 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
409 hidden_parts = {'port'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
411 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
412 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
413 return {'port': '80' if parts['scheme'] == 'http' else '443'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
416class FileUrl(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
417 allowed_schemes = {'file'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
418 host_required = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
420 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
423class MultiHostDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
424 __slots__ = AnyUrl.__slots__ + ('hosts',) 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
426 def __init__(self, *args: Any, hosts: Optional[List['HostParts']] = None, **kwargs: Any): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
427 super().__init__(*args, **kwargs) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
428 self.hosts = hosts 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
430 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
431 def _match_url(url: str) -> Optional[Match[str]]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
432 return multi_host_url_regex().match(url) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
434 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
435 def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
436 return super().validate_parts(parts, validate_port=False) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
438 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
439 def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'MultiHostDsn': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
440 hosts_parts: List['HostParts'] = [] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
441 host_re = host_regex() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
442 for host in m.groupdict()['hosts'].split(','): 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
443 d: Parts = host_re.match(host).groupdict() # type: ignore 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
444 host, tld, host_type, rebuild = cls.validate_host(d) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
445 port = d.get('port') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
446 cls._validate_port(port) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
447 hosts_parts.append( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
448 {
449 'host': host,
450 'host_type': host_type,
451 'tld': tld,
452 'rebuild': rebuild,
453 'port': port,
454 }
455 )
457 if len(hosts_parts) > 1: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
458 return cls( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
459 None if any([hp['rebuild'] for hp in hosts_parts]) else url,
460 scheme=parts['scheme'],
461 user=parts['user'],
462 password=parts['password'],
463 path=parts['path'],
464 query=parts['query'],
465 fragment=parts['fragment'],
466 host_type=None,
467 hosts=hosts_parts,
468 )
469 else:
470 # backwards compatibility with single host
471 host_part = hosts_parts[0] 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
472 return cls( 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
473 None if host_part['rebuild'] else url,
474 scheme=parts['scheme'],
475 user=parts['user'],
476 password=parts['password'],
477 host=host_part['host'],
478 tld=host_part['tld'],
479 host_type=host_part['host_type'],
480 port=host_part.get('port'),
481 path=parts['path'],
482 query=parts['query'],
483 fragment=parts['fragment'],
484 )
487class PostgresDsn(MultiHostDsn): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
488 allowed_schemes = { 1axbyczdAeBfCgDhEiFjGPQRSTUkHlImJnKoL
489 'postgres',
490 'postgresql',
491 'postgresql+asyncpg',
492 'postgresql+pg8000',
493 'postgresql+psycopg',
494 'postgresql+psycopg2',
495 'postgresql+psycopg2cffi',
496 'postgresql+py-postgresql',
497 'postgresql+pygresql',
498 }
499 user_required = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
501 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
504class CockroachDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
505 allowed_schemes = { 1axbyczdAeBfCgDhEiFjGPQRSTUkHlImJnKoL
506 'cockroachdb',
507 'cockroachdb+psycopg2',
508 'cockroachdb+asyncpg',
509 }
510 user_required = True 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
513class AmqpDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
514 allowed_schemes = {'amqp', 'amqps'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
515 host_required = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
518class RedisDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
519 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
520 allowed_schemes = {'redis', 'rediss'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
521 host_required = False 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
523 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
524 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
525 return { 1axbyczdAeBfCgDhEiFjGpqrstkHlImJnKoL
526 'domain': 'localhost' if not (parts['ipv4'] or parts['ipv6']) else '',
527 'port': '6379',
528 'path': '/0',
529 }
532class MongoDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
533 allowed_schemes = {'mongodb'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
535 # TODO: Needed to generic "Parts" for "Replica Set", "Sharded Cluster", and other mongodb deployment modes
536 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
537 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
538 return { 1axbyczdAeBfCgDhEiFjGpqrstkHlImJnKoL
539 'port': '27017',
540 }
543class KafkaDsn(AnyUrl): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
544 allowed_schemes = {'kafka'} 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
546 @staticmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
547 def get_default_parts(parts: 'Parts') -> 'Parts': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
548 return { 1axbyczdAeBfCgDhEiFjGpqrstkHlImJnKoL
549 'domain': 'localhost',
550 'port': '9092',
551 }
554def stricturl( 1axbyczdAeBfCgDhEiFjGpqrsPQRSTUtkHlImJnKoL
555 *,
556 strip_whitespace: bool = True,
557 min_length: int = 1,
558 max_length: int = 2**16,
559 tld_required: bool = True,
560 host_required: bool = True,
561 allowed_schemes: Optional[Collection[str]] = None,
562) -> Type[AnyUrl]:
563 # use kwargs then define conf in a dict to aid with IDE type hinting
564 namespace = dict( 1uMaxbyczdAeBvNfCgDhEiFjGPQRSTUwOkHlImJnKoL
565 strip_whitespace=strip_whitespace,
566 min_length=min_length,
567 max_length=max_length,
568 tld_required=tld_required,
569 host_required=host_required,
570 allowed_schemes=allowed_schemes,
571 )
572 return type('UrlValue', (AnyUrl,), namespace) 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
575def import_email_validator() -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
576 global email_validator
577 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
578 import email_validator 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
579 except ImportError as e: 1MxyzABNCDEFGOHIJKL
580 raise ImportError('email-validator is not installed, run `pip install pydantic[email]`') from e 1MxyzABNCDEFGOHIJKL
583class EmailStr(str): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
584 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
585 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
586 field_schema.update(type='string', format='email') 1uabcdevfghijpqrstwklmno
588 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
589 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
590 # included here and below so the error happens straight away
591 import_email_validator() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
593 yield str_validator 1uabcdevfghijpqrstwklmno
594 yield cls.validate 1uabcdevfghijpqrstwklmno
596 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
597 def validate(cls, value: Union[str]) -> str: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
598 return validate_email(value)[1] 1uabcdevfghijpqrstwklmno
601class NameEmail(Representation): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
602 __slots__ = 'name', 'email' 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
604 def __init__(self, name: str, email: str): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
605 self.name = name 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
606 self.email = email 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
608 def __eq__(self, other: Any) -> bool: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
609 return isinstance(other, NameEmail) and (self.name, self.email) == (other.name, other.email) 1uabcdevfghijpqrstwklmno
611 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
612 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
613 field_schema.update(type='string', format='name-email') 1uabcdevfghijpqrstwklmno
615 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
616 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
617 import_email_validator() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
619 yield cls.validate 1uabcdevfghijpqrstwklmno
621 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
622 def validate(cls, value: Any) -> 'NameEmail': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
623 if value.__class__ == cls: 1uabcdevfghijpqrstwklmno
624 return value 1uabcdevfghijpqrstwklmno
625 value = str_validator(value) 1uabcdevfghijpqrstwklmno
626 return cls(*validate_email(value)) 1uabcdevfghijpqrstwklmno
628 def __str__(self) -> str: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
629 return f'{self.name} <{self.email}>' 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
632class IPvAnyAddress(_BaseAddress): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
633 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
635 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
636 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
637 field_schema.update(type='string', format='ipvanyaddress') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
639 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
640 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
641 yield cls.validate 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
643 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
644 def validate(cls, value: Union[str, bytes, int]) -> Union[IPv4Address, IPv6Address]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
645 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
646 return IPv4Address(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
647 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
648 pass 1uMaxbyczdAeBvNfCgDhEiFjGwOkHlImJnKoL
650 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
651 return IPv6Address(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
652 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
653 raise errors.IPvAnyAddressError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
656class IPvAnyInterface(_BaseAddress): 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
657 __slots__ = () 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
659 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
660 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
661 field_schema.update(type='string', format='ipvanyinterface') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
663 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
664 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
665 yield cls.validate 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
667 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
668 def validate(cls, value: NetworkType) -> Union[IPv4Interface, IPv6Interface]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
669 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
670 return IPv4Interface(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
671 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
672 pass 1uMaxbyczdAeBvNfCgDhEiFjGwOkHlImJnKoL
674 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
675 return IPv6Interface(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
676 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
677 raise errors.IPvAnyInterfaceError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
680class IPvAnyNetwork(_BaseNetwork): # type: ignore 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
681 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
682 def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
683 field_schema.update(type='string', format='ipvanynetwork') 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
685 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
686 def __get_validators__(cls) -> 'CallableGenerator': 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
687 yield cls.validate 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
689 @classmethod 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
690 def validate(cls, value: NetworkType) -> Union[IPv4Network, IPv6Network]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
691 # Assume IP Network is defined with a default value for ``strict`` argument.
692 # Define your own class if you want to specify network address check strictness.
693 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
694 return IPv4Network(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
695 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
696 pass 1uMaxbyczdAeBvNfCgDhEiFjGwOkHlImJnKoL
698 try: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
699 return IPv6Network(value) 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
700 except ValueError: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
701 raise errors.IPvAnyNetworkError() 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
704pretty_email_regex = re.compile(r'([\w ]*?) *<(.*)> *') 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
705MAX_EMAIL_LENGTH = 2048 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
706"""Maximum length for an email.
707A somewhat arbitrary but very generous number compared to what is allowed by most implementations.
708"""
711def validate_email(value: Union[str]) -> Tuple[str, str]: 1uMaxbyczdAeBvNfCgDhEiFjGpqrsPQRSTUtwOkHlImJnKoL
712 """
713 Email address validation using https://pypi.org/project/email-validator/
714 Notes:
715 * raw ip address (literal) domain parts are not allowed.
716 * "John Doe <local_part@domain.com>" style "pretty" email addresses are processed
717 * spaces are striped from the beginning and end of addresses but no error is raised
718 """
719 if email_validator is None: 1uMaxbyczdAeBvNfCgDhEiFjGpqrstwOkHlImJnKoL
720 import_email_validator() 1MxyzABNCDEFGOHIJKL
722 if len(value) > MAX_EMAIL_LENGTH: 1uabcdevfghijpqrstwklmno
723 raise errors.EmailError() 1uabcdevfghijpqrstwklmno
725 m = pretty_email_regex.fullmatch(value) 1uabcdevfghijpqrstwklmno
726 name: Union[str, None] = None 1uabcdevfghijpqrstwklmno
727 if m: 1uabcdevfghijpqrstwklmno
728 name, value = m.groups() 1uabcdevfghijpqrstwklmno
729 email = value.strip() 1uabcdevfghijpqrstwklmno
730 try: 1uabcdevfghijpqrstwklmno
731 parts = email_validator.validate_email(email, check_deliverability=False) 1uabcdevfghijpqrstwklmno
732 except email_validator.EmailNotValidError as e: 1uabcdevfghijpqrstwklmno
733 raise errors.EmailError from e 1uabcdevfghijpqrstwklmno
735 if hasattr(parts, 'normalized'): 735 ↛ 743line 735 didn't jump to line 743 because the condition on line 735 was always true1uabcdevfghijpqrstwklmno
736 # email-validator >= 2
737 email = parts.normalized 1uabcdevfghijpqrstwklmno
738 assert email is not None 1uabcdevfghijpqrstwklmno
739 name = name or parts.local_part 1uabcdevfghijpqrstwklmno
740 return name, email 1uabcdevfghijpqrstwklmno
741 else:
742 # email-validator >1, <2
743 at_index = email.index('@')
744 local_part = email[:at_index] # RFC 5321, local part must be case-sensitive.
745 global_part = email[at_index:].lower()
747 return name or local_part, local_part + global_part