Coverage for pydantic/datetime_parse.py: 100.00%

129 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-15 13:26 +0000

1""" 

2Functions to parse datetime objects. 

3 

4We're using regular expressions rather than time.strptime because: 

5- They provide both validation and parsing. 

6- They're more flexible for datetimes. 

7- The date/datetime/time constructors produce friendlier error messages. 

8 

9Stolen from https://raw.githubusercontent.com/django/django/main/django/utils/dateparse.py at 

109718fa2e8abe430c3526a9278dd976443d4ae3c6 

11 

12Changed to: 

13* use standard python datetime types not django.utils.timezone 

14* raise ValueError when regex doesn't match rather than returning None 

15* support parsing unix timestamps for dates and datetimes 

16""" 

17import re 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

18from datetime import date, datetime, time, timedelta, timezone 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

19from typing import Dict, Optional, Type, Union 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

20 

21from pydantic import errors 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

22 

23date_expr = r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})' 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

24time_expr = ( 1abcdefghijklmnopqrstPQRSTUuvwxyzABCD

25 r'(?P<hour>\d{1,2}):(?P<minute>\d{1,2})' 

26 r'(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?' 

27 r'(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$' 

28) 

29 

30date_re = re.compile(f'{date_expr}$') 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

31time_re = re.compile(time_expr) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

32datetime_re = re.compile(f'{date_expr}[T ]{time_expr}') 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

33 

34standard_duration_re = re.compile( 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

35 r'^' 

36 r'(?:(?P<days>-?\d+) (days?, )?)?' 

37 r'((?:(?P<hours>-?\d+):)(?=\d+:\d+))?' 

38 r'(?:(?P<minutes>-?\d+):)?' 

39 r'(?P<seconds>-?\d+)' 

40 r'(?:\.(?P<microseconds>\d{1,6})\d{0,6})?' 

41 r'$' 

42) 

43 

44# Support the sections of ISO 8601 date representation that are accepted by timedelta 

45iso8601_duration_re = re.compile( 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

46 r'^(?P<sign>[-+]?)' 

47 r'P' 

48 r'(?:(?P<days>\d+(.\d+)?)D)?' 

49 r'(?:T' 

50 r'(?:(?P<hours>\d+(.\d+)?)H)?' 

51 r'(?:(?P<minutes>\d+(.\d+)?)M)?' 

52 r'(?:(?P<seconds>\d+(.\d+)?)S)?' 

53 r')?' 

54 r'$' 

55) 

56 

57EPOCH = datetime(1970, 1, 1) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

58# if greater than this, the number is in ms, if less than or equal it's in seconds 

59# (in seconds this is 11th October 2603, in ms it's 20th August 1970) 

60MS_WATERSHED = int(2e10) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

61# slightly more than datetime.max in ns - (datetime.max - EPOCH).total_seconds() * 1e9 

62MAX_NUMBER = int(3e20) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

63StrBytesIntFloat = Union[str, bytes, int, float] 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

64 

65 

66def get_numeric(value: StrBytesIntFloat, native_expected_type: str) -> Union[None, int, float]: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

67 if isinstance(value, (int, float)): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

68 return value 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

69 try: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

70 return float(value) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

71 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

72 return None 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

73 except TypeError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

74 raise TypeError(f'invalid type; expected {native_expected_type}, string, bytes, int or float') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

75 

76 

77def from_unix_seconds(seconds: Union[int, float]) -> datetime: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

78 if seconds > MAX_NUMBER: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

79 return datetime.max 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

80 elif seconds < -MAX_NUMBER: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

81 return datetime.min 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

82 

83 while abs(seconds) > MS_WATERSHED: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

84 seconds /= 1000 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

85 dt = EPOCH + timedelta(seconds=seconds) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

86 return dt.replace(tzinfo=timezone.utc) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

87 

88 

89def _parse_timezone(value: Optional[str], error: Type[Exception]) -> Union[None, int, timezone]: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

90 if value == 'Z': 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

91 return timezone.utc 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

92 elif value is not None: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

93 offset_mins = int(value[-2:]) if len(value) > 3 else 0 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

94 offset = 60 * int(value[1:3]) + offset_mins 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

95 if value[0] == '-': 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

96 offset = -offset 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

97 try: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

98 return timezone(timedelta(minutes=offset)) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

99 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

100 raise error() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

101 else: 

102 return None 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

103 

104 

105def parse_date(value: Union[date, StrBytesIntFloat]) -> date: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

106 """ 

107 Parse a date/int/float/string and return a datetime.date. 

108 

109 Raise ValueError if the input is well formatted but not a valid date. 

110 Raise ValueError if the input isn't well formatted. 

111 """ 

112 if isinstance(value, date): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

113 if isinstance(value, datetime): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

114 return value.date() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

115 else: 

116 return value 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

117 

118 number = get_numeric(value, 'date') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

119 if number is not None: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

120 return from_unix_seconds(number).date() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

121 

122 if isinstance(value, bytes): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

123 value = value.decode() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

124 

125 match = date_re.match(value) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

126 if match is None: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

127 raise errors.DateError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

128 

129 kw = {k: int(v) for k, v in match.groupdict().items()} 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

130 

131 try: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

132 return date(**kw) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

133 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

134 raise errors.DateError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

135 

136 

137def parse_time(value: Union[time, StrBytesIntFloat]) -> time: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

138 """ 

139 Parse a time/string and return a datetime.time. 

140 

141 Raise ValueError if the input is well formatted but not a valid time. 

142 Raise ValueError if the input isn't well formatted, in particular if it contains an offset. 

143 """ 

144 if isinstance(value, time): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

145 return value 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

146 

147 number = get_numeric(value, 'time') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

148 if number is not None: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

149 if number >= 86400: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

150 # doesn't make sense since the time time loop back around to 0 

151 raise errors.TimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

152 return (datetime.min + timedelta(seconds=number)).time() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

153 

154 if isinstance(value, bytes): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

155 value = value.decode() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

156 

157 match = time_re.match(value) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

158 if match is None: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

159 raise errors.TimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

160 

161 kw = match.groupdict() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

162 if kw['microsecond']: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

163 kw['microsecond'] = kw['microsecond'].ljust(6, '0') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

164 

165 tzinfo = _parse_timezone(kw.pop('tzinfo'), errors.TimeError) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

166 kw_: Dict[str, Union[None, int, timezone]] = {k: int(v) for k, v in kw.items() if v is not None} 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

167 kw_['tzinfo'] = tzinfo 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

168 

169 try: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

170 return time(**kw_) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

171 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

172 raise errors.TimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

173 

174 

175def parse_datetime(value: Union[datetime, StrBytesIntFloat]) -> datetime: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

176 """ 

177 Parse a datetime/int/float/string and return a datetime.datetime. 

178 

179 This function supports time zone offsets. When the input contains one, 

180 the output uses a timezone with a fixed offset from UTC. 

181 

182 Raise ValueError if the input is well formatted but not a valid datetime. 

183 Raise ValueError if the input isn't well formatted. 

184 """ 

185 if isinstance(value, datetime): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

186 return value 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

187 

188 number = get_numeric(value, 'datetime') 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

189 if number is not None: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

190 return from_unix_seconds(number) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

191 

192 if isinstance(value, bytes): 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

193 value = value.decode() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

194 

195 match = datetime_re.match(value) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

196 if match is None: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

197 raise errors.DateTimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

198 

199 kw = match.groupdict() 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

200 if kw['microsecond']: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

201 kw['microsecond'] = kw['microsecond'].ljust(6, '0') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

202 

203 tzinfo = _parse_timezone(kw.pop('tzinfo'), errors.DateTimeError) 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

204 kw_: Dict[str, Union[None, int, timezone]] = {k: int(v) for k, v in kw.items() if v is not None} 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

205 kw_['tzinfo'] = tzinfo 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

206 

207 try: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

208 return datetime(**kw_) # type: ignore 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

209 except ValueError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

210 raise errors.DateTimeError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

211 

212 

213def parse_duration(value: StrBytesIntFloat) -> timedelta: 1EFabcdefghijGHklmnopqrstIJKLPQRSTUMNOuvwxyzABCD

214 """ 

215 Parse a duration int/float/string and return a datetime.timedelta. 

216 

217 The preferred format for durations in Django is '%d %H:%M:%S.%f'. 

218 

219 Also supports ISO 8601 representation. 

220 """ 

221 if isinstance(value, timedelta): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

222 return value 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

223 

224 if isinstance(value, (int, float)): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

225 # below code requires a string 

226 value = f'{value:f}' 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

227 elif isinstance(value, bytes): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

228 value = value.decode() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

229 

230 try: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

231 match = standard_duration_re.match(value) or iso8601_duration_re.match(value) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

232 except TypeError: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

233 raise TypeError('invalid type; expected timedelta, string, bytes, int or float') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

234 

235 if not match: 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

236 raise errors.DurationError() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

237 

238 kw = match.groupdict() 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

239 sign = -1 if kw.pop('sign', '+') == '-' else 1 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

240 if kw.get('microseconds'): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

241 kw['microseconds'] = kw['microseconds'].ljust(6, '0') 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

242 

243 if kw.get('seconds') and kw.get('microseconds') and kw['seconds'].startswith('-'): 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

244 kw['microseconds'] = '-' + kw['microseconds'] 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

245 

246 kw_ = {k: float(v) for k, v in kw.items() if v is not None} 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD

247 

248 return sign * timedelta(**kw_) 1EFabcdefghijGHklmnopqrstIJKLMNOuvwxyzABCD