Coverage for pydantic/datetime_parse.py: 100.00%
129 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-15 13:26 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-15 13:26 +0000
1"""
2Functions to parse datetime objects.
4We're using regular expressions rather than time.strptime because:
5- They provide both validation and parsing.
6- They're more flexible for datetimes.
7- The date/datetime/time constructors produce friendlier error messages.
9Stolen from https://raw.githubusercontent.com/django/django/main/django/utils/dateparse.py at
109718fa2e8abe430c3526a9278dd976443d4ae3c6
12Changed to:
13* use standard python datetime types not django.utils.timezone
14* raise ValueError when regex doesn't match rather than returning None
15* support parsing unix timestamps for dates and datetimes
16"""
17import re 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
18from datetime import date, datetime, time, timedelta, timezone 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
19from typing import Dict, Optional, Type, Union 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
21from pydantic import errors 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
23date_expr = r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})' 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
24time_expr = ( 1abcdefghijklmnopqrstPQRSTUuvwxyzABCD
25 r'(?P<hour>\d{1,2}):(?P<minute>\d{1,2})'
26 r'(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?'
27 r'(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$'
28)
30date_re = re.compile(f'{date_expr}$') 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
31time_re = re.compile(time_expr) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
32datetime_re = re.compile(f'{date_expr}[T ]{time_expr}') 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
34standard_duration_re = re.compile( 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
35 r'^'
36 r'(?:(?P<days>-?\d+) (days?, )?)?'
37 r'((?:(?P<hours>-?\d+):)(?=\d+:\d+))?'
38 r'(?:(?P<minutes>-?\d+):)?'
39 r'(?P<seconds>-?\d+)'
40 r'(?:\.(?P<microseconds>\d{1,6})\d{0,6})?'
41 r'$'
42)
44# Support the sections of ISO 8601 date representation that are accepted by timedelta
45iso8601_duration_re = re.compile( 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
46 r'^(?P<sign>[-+]?)'
47 r'P'
48 r'(?:(?P<days>\d+(.\d+)?)D)?'
49 r'(?:T'
50 r'(?:(?P<hours>\d+(.\d+)?)H)?'
51 r'(?:(?P<minutes>\d+(.\d+)?)M)?'
52 r'(?:(?P<seconds>\d+(.\d+)?)S)?'
53 r')?'
54 r'$'
55)
57EPOCH = datetime(1970, 1, 1) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
58# if greater than this, the number is in ms, if less than or equal it's in seconds
59# (in seconds this is 11th October 2603, in ms it's 20th August 1970)
60MS_WATERSHED = int(2e10) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
61# slightly more than datetime.max in ns - (datetime.max - EPOCH).total_seconds() * 1e9
62MAX_NUMBER = int(3e20) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
63StrBytesIntFloat = Union[str, bytes, int, float] 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
66def get_numeric(value: StrBytesIntFloat, native_expected_type: str) -> Union[None, int, float]: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
67 if isinstance(value, (int, float)): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
68 return value 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
69 try: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
70 return float(value) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
71 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
72 return None 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
73 except TypeError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
74 raise TypeError(f'invalid type; expected {native_expected_type}, string, bytes, int or float') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
77def from_unix_seconds(seconds: Union[int, float]) -> datetime: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
78 if seconds > MAX_NUMBER: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
79 return datetime.max 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
80 elif seconds < -MAX_NUMBER: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
81 return datetime.min 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
83 while abs(seconds) > MS_WATERSHED: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
84 seconds /= 1000 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
85 dt = EPOCH + timedelta(seconds=seconds) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
86 return dt.replace(tzinfo=timezone.utc) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
89def _parse_timezone(value: Optional[str], error: Type[Exception]) -> Union[None, int, timezone]: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
90 if value == 'Z': 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
91 return timezone.utc 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
92 elif value is not None: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
93 offset_mins = int(value[-2:]) if len(value) > 3 else 0 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
94 offset = 60 * int(value[1:3]) + offset_mins 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
95 if value[0] == '-': 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
96 offset = -offset 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
97 try: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
98 return timezone(timedelta(minutes=offset)) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
99 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
100 raise error() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
101 else:
102 return None 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
105def parse_date(value: Union[date, StrBytesIntFloat]) -> date: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
106 """
107 Parse a date/int/float/string and return a datetime.date.
109 Raise ValueError if the input is well formatted but not a valid date.
110 Raise ValueError if the input isn't well formatted.
111 """
112 if isinstance(value, date): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
113 if isinstance(value, datetime): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
114 return value.date() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
115 else:
116 return value 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
118 number = get_numeric(value, 'date') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
119 if number is not None: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
120 return from_unix_seconds(number).date() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
122 if isinstance(value, bytes): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
123 value = value.decode() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
125 match = date_re.match(value) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
126 if match is None: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
127 raise errors.DateError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
129 kw = {k: int(v) for k, v in match.groupdict().items()} 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
131 try: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
132 return date(**kw) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
133 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
134 raise errors.DateError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
137def parse_time(value: Union[time, StrBytesIntFloat]) -> time: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
138 """
139 Parse a time/string and return a datetime.time.
141 Raise ValueError if the input is well formatted but not a valid time.
142 Raise ValueError if the input isn't well formatted, in particular if it contains an offset.
143 """
144 if isinstance(value, time): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
145 return value 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
147 number = get_numeric(value, 'time') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
148 if number is not None: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
149 if number >= 86400: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
150 # doesn't make sense since the time time loop back around to 0
151 raise errors.TimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
152 return (datetime.min + timedelta(seconds=number)).time() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
154 if isinstance(value, bytes): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
155 value = value.decode() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
157 match = time_re.match(value) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
158 if match is None: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
159 raise errors.TimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
161 kw = match.groupdict() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
162 if kw['microsecond']: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
163 kw['microsecond'] = kw['microsecond'].ljust(6, '0') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
165 tzinfo = _parse_timezone(kw.pop('tzinfo'), errors.TimeError) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
166 kw_: Dict[str, Union[None, int, timezone]] = {k: int(v) for k, v in kw.items() if v is not None} 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
167 kw_['tzinfo'] = tzinfo 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
169 try: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
170 return time(**kw_) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
171 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
172 raise errors.TimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
175def parse_datetime(value: Union[datetime, StrBytesIntFloat]) -> datetime: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
176 """
177 Parse a datetime/int/float/string and return a datetime.datetime.
179 This function supports time zone offsets. When the input contains one,
180 the output uses a timezone with a fixed offset from UTC.
182 Raise ValueError if the input is well formatted but not a valid datetime.
183 Raise ValueError if the input isn't well formatted.
184 """
185 if isinstance(value, datetime): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
186 return value 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
188 number = get_numeric(value, 'datetime') 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
189 if number is not None: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
190 return from_unix_seconds(number) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
192 if isinstance(value, bytes): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
193 value = value.decode() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
195 match = datetime_re.match(value) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
196 if match is None: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
197 raise errors.DateTimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
199 kw = match.groupdict() 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
200 if kw['microsecond']: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
201 kw['microsecond'] = kw['microsecond'].ljust(6, '0') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
203 tzinfo = _parse_timezone(kw.pop('tzinfo'), errors.DateTimeError) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
204 kw_: Dict[str, Union[None, int, timezone]] = {k: int(v) for k, v in kw.items() if v is not None} 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
205 kw_['tzinfo'] = tzinfo 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
207 try: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
208 return datetime(**kw_) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
209 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
210 raise errors.DateTimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
213def parse_duration(value: StrBytesIntFloat) -> timedelta: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD
214 """
215 Parse a duration int/float/string and return a datetime.timedelta.
217 The preferred format for durations in Django is '%d %H:%M:%S.%f'.
219 Also supports ISO 8601 representation.
220 """
221 if isinstance(value, timedelta): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
222 return value 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
224 if isinstance(value, (int, float)): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
225 # below code requires a string
226 value = f'{value:f}' 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
227 elif isinstance(value, bytes): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
228 value = value.decode() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
230 try: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
231 match = standard_duration_re.match(value) or iso8601_duration_re.match(value) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
232 except TypeError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
233 raise TypeError('invalid type; expected timedelta, string, bytes, int or float') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
235 if not match: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
236 raise errors.DurationError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
238 kw = match.groupdict() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
239 sign = -1 if kw.pop('sign', '+') == '-' else 1 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
240 if kw.get('microseconds'): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
241 kw['microseconds'] = kw['microseconds'].ljust(6, '0') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
243 if kw.get('seconds') and kw.get('microseconds') and kw['seconds'].startswith('-'): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
244 kw['microseconds'] = '-' + kw['microseconds'] 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
246 kw_ = {k: float(v) for k, v in kw.items() if v is not None} 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD
248 return sign * timedelta(**kw_) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD