Coverage for pydantic/experimental/pipeline.py: 92.34%

355 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 10:05 +0000

1"""Experimental pipeline API functionality. Be careful with this API, it's subject to change.""" 

2 

3from __future__ import annotations 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

4 

5import datetime 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

6import operator 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

7import re 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

8import sys 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

9from collections import deque 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

10from collections.abc import Container 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

11from dataclasses import dataclass 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

12from decimal import Decimal 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

13from functools import cached_property, partial 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

14from re import Pattern 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

15from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, Protocol, TypeVar, Union, overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

16 

17import annotated_types 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

18 

19if TYPE_CHECKING: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

20 from pydantic_core import core_schema as cs 

21 

22 from pydantic import GetCoreSchemaHandler 

23 

24from pydantic._internal._internal_dataclass import slots_true as _slots_true 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

25 

26if sys.version_info < (3, 10): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

27 EllipsisType = type(Ellipsis) 1rxeyzsA

28else: 

29 from types import EllipsisType 1tBuCagbhifDEFGjklmnvHwIcodpq

30 

31__all__ = ['validate_as', 'validate_as_deferred', 'transform'] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

32 

33_slots_frozen = {**_slots_true, 'frozen': True} 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

34 

35 

36@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

37class _ValidateAs: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

38 tp: type[Any] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

39 strict: bool = False 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

40 

41 

42@dataclass 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

43class _ValidateAsDefer: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

44 func: Callable[[], type[Any]] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

45 

46 @cached_property 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

47 def tp(self) -> type[Any]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

48 return self.func() 

49 

50 

51@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

52class _Transform: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

53 func: Callable[[Any], Any] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

54 

55 

56@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

57class _PipelineOr: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

58 left: _Pipeline[Any, Any] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

59 right: _Pipeline[Any, Any] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

60 

61 

62@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

63class _PipelineAnd: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

64 left: _Pipeline[Any, Any] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

65 right: _Pipeline[Any, Any] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

66 

67 

68@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

69class _Eq: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

70 value: Any 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

71 

72 

73@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

74class _NotEq: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

75 value: Any 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

76 

77 

78@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

79class _In: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

80 values: Container[Any] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

81 

82 

83@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

84class _NotIn: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

85 values: Container[Any] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

86 

87 

88_ConstraintAnnotation = Union[ 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

89 annotated_types.Le, 

90 annotated_types.Ge, 

91 annotated_types.Lt, 

92 annotated_types.Gt, 

93 annotated_types.Len, 

94 annotated_types.MultipleOf, 

95 annotated_types.Timezone, 

96 annotated_types.Interval, 

97 annotated_types.Predicate, 

98 # common predicates not included in annotated_types 

99 _Eq, 

100 _NotEq, 

101 _In, 

102 _NotIn, 

103 # regular expressions 

104 Pattern[str], 

105] 

106 

107 

108@dataclass(**_slots_frozen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

109class _Constraint: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

110 constraint: _ConstraintAnnotation 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

111 

112 

113_Step = Union[_ValidateAs, _ValidateAsDefer, _Transform, _PipelineOr, _PipelineAnd, _Constraint] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

114 

115_InT = TypeVar('_InT') 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

116_OutT = TypeVar('_OutT') 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

117_NewOutT = TypeVar('_NewOutT') 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

118 

119 

120class _FieldTypeMarker: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

121 pass 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

122 

123 

124# TODO: ultimately, make this public, see https://github.com/pydantic/pydantic/pull/9459#discussion_r1628197626 

125# Also, make this frozen eventually, but that doesn't work right now because of the generic base 

126# Which attempts to modify __orig_base__ and such. 

127# We could go with a manual freeze, but that seems overkill for now. 

128@dataclass(**_slots_true) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

129class _Pipeline(Generic[_InT, _OutT]): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

130 """Abstract representation of a chain of validation, transformation, and parsing steps.""" 

131 

132 _steps: tuple[_Step, ...] 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

133 

134 def transform( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

135 self, 

136 func: Callable[[_OutT], _NewOutT], 

137 ) -> _Pipeline[_InT, _NewOutT]: 

138 """Transform the output of the previous step. 

139 

140 If used as the first step in a pipeline, the type of the field is used. 

141 That is, the transformation is applied to after the value is parsed to the field's type. 

142 """ 

143 return _Pipeline[_InT, _NewOutT](self._steps + (_Transform(func),)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

144 

145 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

146 def validate_as(self, tp: type[_NewOutT], *, strict: bool = ...) -> _Pipeline[_InT, _NewOutT]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

147 

148 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

149 def validate_as(self, tp: EllipsisType, *, strict: bool = ...) -> _Pipeline[_InT, Any]: # type: ignore 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

150 ... 

151 

152 def validate_as(self, tp: type[_NewOutT] | EllipsisType, *, strict: bool = False) -> _Pipeline[_InT, Any]: # type: ignore 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

153 """Validate / parse the input into a new type. 

154 

155 If no type is provided, the type of the field is used. 

156 

157 Types are parsed in Pydantic's `lax` mode by default, 

158 but you can enable `strict` mode by passing `strict=True`. 

159 """ 

160 if isinstance(tp, EllipsisType): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

161 return _Pipeline[_InT, Any](self._steps + (_ValidateAs(_FieldTypeMarker, strict=strict),)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

162 return _Pipeline[_InT, _NewOutT](self._steps + (_ValidateAs(tp, strict=strict),)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

163 

164 def validate_as_deferred(self, func: Callable[[], type[_NewOutT]]) -> _Pipeline[_InT, _NewOutT]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

165 """Parse the input into a new type, deferring resolution of the type until the current class 

166 is fully defined. 

167 

168 This is useful when you need to reference the class in it's own type annotations. 

169 """ 

170 return _Pipeline[_InT, _NewOutT](self._steps + (_ValidateAsDefer(func),)) 

171 

172 # constraints 

173 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

174 def constrain(self: _Pipeline[_InT, _NewOutGe], constraint: annotated_types.Ge) -> _Pipeline[_InT, _NewOutGe]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

175 

176 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

177 def constrain(self: _Pipeline[_InT, _NewOutGt], constraint: annotated_types.Gt) -> _Pipeline[_InT, _NewOutGt]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

178 

179 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

180 def constrain(self: _Pipeline[_InT, _NewOutLe], constraint: annotated_types.Le) -> _Pipeline[_InT, _NewOutLe]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

181 

182 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

183 def constrain(self: _Pipeline[_InT, _NewOutLt], constraint: annotated_types.Lt) -> _Pipeline[_InT, _NewOutLt]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

184 

185 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

186 def constrain( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

187 self: _Pipeline[_InT, _NewOutLen], constraint: annotated_types.Len 1agbhiefjklmncodpq

188 ) -> _Pipeline[_InT, _NewOutLen]: ... 1agbhiefjklmncodpq

189 

190 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

191 def constrain( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

192 self: _Pipeline[_InT, _NewOutT], constraint: annotated_types.MultipleOf 1agbhiefjklmncodpq

193 ) -> _Pipeline[_InT, _NewOutT]: ... 1agbhiefjklmncodpq

194 

195 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

196 def constrain( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

197 self: _Pipeline[_InT, _NewOutDatetime], constraint: annotated_types.Timezone 1agbhiefjklmncodpq

198 ) -> _Pipeline[_InT, _NewOutDatetime]: ... 1agbhiefjklmncodpq

199 

200 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

201 def constrain(self: _Pipeline[_InT, _OutT], constraint: annotated_types.Predicate) -> _Pipeline[_InT, _OutT]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

202 

203 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

204 def constrain( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

205 self: _Pipeline[_InT, _NewOutInterval], constraint: annotated_types.Interval 1agbhiefjklmncodpq

206 ) -> _Pipeline[_InT, _NewOutInterval]: ... 1agbhiefjklmncodpq

207 

208 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

209 def constrain(self: _Pipeline[_InT, _OutT], constraint: _Eq) -> _Pipeline[_InT, _OutT]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

210 

211 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

212 def constrain(self: _Pipeline[_InT, _OutT], constraint: _NotEq) -> _Pipeline[_InT, _OutT]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

213 

214 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

215 def constrain(self: _Pipeline[_InT, _OutT], constraint: _In) -> _Pipeline[_InT, _OutT]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

216 

217 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

218 def constrain(self: _Pipeline[_InT, _OutT], constraint: _NotIn) -> _Pipeline[_InT, _OutT]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

219 

220 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

221 def constrain(self: _Pipeline[_InT, _NewOutT], constraint: Pattern[str]) -> _Pipeline[_InT, _NewOutT]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

222 

223 def constrain(self, constraint: _ConstraintAnnotation) -> Any: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

224 """Constrain a value to meet a certain condition. 

225 

226 We support most conditions from `annotated_types`, as well as regular expressions. 

227 

228 Most of the time you'll be calling a shortcut method like `gt`, `lt`, `len`, etc 

229 so you don't need to call this directly. 

230 """ 

231 return _Pipeline[_InT, _OutT](self._steps + (_Constraint(constraint),)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

232 

233 def predicate(self: _Pipeline[_InT, _NewOutT], func: Callable[[_NewOutT], bool]) -> _Pipeline[_InT, _NewOutT]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

234 """Constrain a value to meet a certain predicate.""" 

235 return self.constrain(annotated_types.Predicate(func)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

236 

237 def gt(self: _Pipeline[_InT, _NewOutGt], gt: _NewOutGt) -> _Pipeline[_InT, _NewOutGt]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

238 """Constrain a value to be greater than a certain value.""" 

239 return self.constrain(annotated_types.Gt(gt)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

240 

241 def lt(self: _Pipeline[_InT, _NewOutLt], lt: _NewOutLt) -> _Pipeline[_InT, _NewOutLt]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

242 """Constrain a value to be less than a certain value.""" 

243 return self.constrain(annotated_types.Lt(lt)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

244 

245 def ge(self: _Pipeline[_InT, _NewOutGe], ge: _NewOutGe) -> _Pipeline[_InT, _NewOutGe]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

246 """Constrain a value to be greater than or equal to a certain value.""" 

247 return self.constrain(annotated_types.Ge(ge)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

248 

249 def le(self: _Pipeline[_InT, _NewOutLe], le: _NewOutLe) -> _Pipeline[_InT, _NewOutLe]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

250 """Constrain a value to be less than or equal to a certain value.""" 

251 return self.constrain(annotated_types.Le(le)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

252 

253 def len(self: _Pipeline[_InT, _NewOutLen], min_len: int, max_len: int | None = None) -> _Pipeline[_InT, _NewOutLen]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

254 """Constrain a value to have a certain length.""" 

255 return self.constrain(annotated_types.Len(min_len, max_len)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

256 

257 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

258 def multiple_of(self: _Pipeline[_InT, _NewOutDiv], multiple_of: _NewOutDiv) -> _Pipeline[_InT, _NewOutDiv]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

259 

260 @overload 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

261 def multiple_of(self: _Pipeline[_InT, _NewOutMod], multiple_of: _NewOutMod) -> _Pipeline[_InT, _NewOutMod]: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

262 

263 def multiple_of(self: _Pipeline[_InT, Any], multiple_of: Any) -> _Pipeline[_InT, Any]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

264 """Constrain a value to be a multiple of a certain number.""" 

265 return self.constrain(annotated_types.MultipleOf(multiple_of)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

266 

267 def eq(self: _Pipeline[_InT, _OutT], value: _OutT) -> _Pipeline[_InT, _OutT]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

268 """Constrain a value to be equal to a certain value.""" 

269 return self.constrain(_Eq(value)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

270 

271 def not_eq(self: _Pipeline[_InT, _OutT], value: _OutT) -> _Pipeline[_InT, _OutT]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

272 """Constrain a value to not be equal to a certain value.""" 

273 return self.constrain(_NotEq(value)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

274 

275 def in_(self: _Pipeline[_InT, _OutT], values: Container[_OutT]) -> _Pipeline[_InT, _OutT]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

276 """Constrain a value to be in a certain set.""" 

277 return self.constrain(_In(values)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

278 

279 def not_in(self: _Pipeline[_InT, _OutT], values: Container[_OutT]) -> _Pipeline[_InT, _OutT]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

280 """Constrain a value to not be in a certain set.""" 

281 return self.constrain(_NotIn(values)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

282 

283 # timezone methods 

284 def datetime_tz_naive(self: _Pipeline[_InT, datetime.datetime]) -> _Pipeline[_InT, datetime.datetime]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

285 return self.constrain(annotated_types.Timezone(None)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

286 

287 def datetime_tz_aware(self: _Pipeline[_InT, datetime.datetime]) -> _Pipeline[_InT, datetime.datetime]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

288 return self.constrain(annotated_types.Timezone(...)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

289 

290 def datetime_tz( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

291 self: _Pipeline[_InT, datetime.datetime], tz: datetime.tzinfo 

292 ) -> _Pipeline[_InT, datetime.datetime]: 

293 return self.constrain(annotated_types.Timezone(tz)) # type: ignore 

294 

295 def datetime_with_tz( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

296 self: _Pipeline[_InT, datetime.datetime], tz: datetime.tzinfo | None 

297 ) -> _Pipeline[_InT, datetime.datetime]: 

298 return self.transform(partial(datetime.datetime.replace, tzinfo=tz)) 

299 

300 # string methods 

301 def str_lower(self: _Pipeline[_InT, str]) -> _Pipeline[_InT, str]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

302 return self.transform(str.lower) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

303 

304 def str_upper(self: _Pipeline[_InT, str]) -> _Pipeline[_InT, str]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

305 return self.transform(str.upper) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

306 

307 def str_title(self: _Pipeline[_InT, str]) -> _Pipeline[_InT, str]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

308 return self.transform(str.title) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

309 

310 def str_strip(self: _Pipeline[_InT, str]) -> _Pipeline[_InT, str]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

311 return self.transform(str.strip) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

312 

313 def str_pattern(self: _Pipeline[_InT, str], pattern: str) -> _Pipeline[_InT, str]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

314 return self.constrain(re.compile(pattern)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

315 

316 def str_contains(self: _Pipeline[_InT, str], substring: str) -> _Pipeline[_InT, str]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

317 return self.predicate(lambda v: substring in v) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

318 

319 def str_starts_with(self: _Pipeline[_InT, str], prefix: str) -> _Pipeline[_InT, str]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

320 return self.predicate(lambda v: v.startswith(prefix)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

321 

322 def str_ends_with(self: _Pipeline[_InT, str], suffix: str) -> _Pipeline[_InT, str]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

323 return self.predicate(lambda v: v.endswith(suffix)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

324 

325 # operators 

326 def otherwise(self, other: _Pipeline[_OtherIn, _OtherOut]) -> _Pipeline[_InT | _OtherIn, _OutT | _OtherOut]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

327 """Combine two validation chains, returning the result of the first chain if it succeeds, and the second chain if it fails.""" 

328 return _Pipeline((_PipelineOr(self, other),)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

329 

330 __or__ = otherwise 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

331 

332 def then(self, other: _Pipeline[_OutT, _OtherOut]) -> _Pipeline[_InT, _OtherOut]: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

333 """Pipe the result of one validation chain into another.""" 

334 return _Pipeline((_PipelineAnd(self, other),)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

335 

336 __and__ = then 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

337 

338 def __get_pydantic_core_schema__(self, source_type: Any, handler: GetCoreSchemaHandler) -> cs.CoreSchema: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

339 from pydantic_core import core_schema as cs 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

340 

341 queue = deque(self._steps) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

342 

343 s = None 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

344 

345 while queue: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

346 step = queue.popleft() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

347 s = _apply_step(step, s, handler, source_type) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

348 

349 s = s or cs.any_schema() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

350 return s 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

351 

352 def __supports_type__(self, _: _OutT) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

353 raise NotImplementedError 

354 

355 

356validate_as = _Pipeline[Any, Any](()).validate_as 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

357validate_as_deferred = _Pipeline[Any, Any](()).validate_as_deferred 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

358transform = _Pipeline[Any, Any]((_ValidateAs(_FieldTypeMarker),)).transform 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

359 

360 

361def _check_func( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

362 func: Callable[[Any], bool], predicate_err: str | Callable[[], str], s: cs.CoreSchema | None 

363) -> cs.CoreSchema: 

364 from pydantic_core import core_schema as cs 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

365 

366 def handler(v: Any) -> Any: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

367 if func(v): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

368 return v 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

369 raise ValueError(f'Expected {predicate_err if isinstance(predicate_err, str) else predicate_err()}') 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

370 

371 if s is None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

372 return cs.no_info_plain_validator_function(handler) 

373 else: 

374 return cs.no_info_after_validator_function(handler, s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

375 

376 

377def _apply_step(step: _Step, s: cs.CoreSchema | None, handler: GetCoreSchemaHandler, source_type: Any) -> cs.CoreSchema: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

378 from pydantic_core import core_schema as cs 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

379 

380 if isinstance(step, _ValidateAs): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

381 s = _apply_parse(s, step.tp, step.strict, handler, source_type) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

382 elif isinstance(step, _ValidateAsDefer): 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

383 s = _apply_parse(s, step.tp, False, handler, source_type) 

384 elif isinstance(step, _Transform): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

385 s = _apply_transform(s, step.func, handler) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

386 elif isinstance(step, _Constraint): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

387 s = _apply_constraint(s, step.constraint) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

388 elif isinstance(step, _PipelineOr): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

389 s = cs.union_schema([handler(step.left), handler(step.right)]) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

390 else: 

391 assert isinstance(step, _PipelineAnd) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

392 s = cs.chain_schema([handler(step.left), handler(step.right)]) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

393 return s 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

394 

395 

396def _apply_parse( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

397 s: cs.CoreSchema | None, 

398 tp: type[Any], 

399 strict: bool, 

400 handler: GetCoreSchemaHandler, 

401 source_type: Any, 

402) -> cs.CoreSchema: 

403 from pydantic_core import core_schema as cs 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

404 

405 from pydantic import Strict 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

406 

407 if tp is _FieldTypeMarker: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

408 return cs.chain_schema([s, handler(source_type)]) if s else handler(source_type) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

409 

410 if strict: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

411 tp = Annotated[tp, Strict()] # type: ignore 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

412 

413 if s and s['type'] == 'any': 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

414 return handler(tp) 

415 else: 

416 return cs.chain_schema([s, handler(tp)]) if s else handler(tp) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

417 

418 

419def _apply_transform( 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

420 s: cs.CoreSchema | None, func: Callable[[Any], Any], handler: GetCoreSchemaHandler 

421) -> cs.CoreSchema: 

422 from pydantic_core import core_schema as cs 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

423 

424 if s is None: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

425 return cs.no_info_plain_validator_function(func) 

426 

427 if s['type'] == 'str': 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

428 if func is str.strip: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

429 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

430 s['strip_whitespace'] = True 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

431 return s 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

432 elif func is str.lower: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

433 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

434 s['to_lower'] = True 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

435 return s 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

436 elif func is str.upper: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

437 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

438 s['to_upper'] = True 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

439 return s 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

440 

441 return cs.no_info_after_validator_function(func, s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

442 

443 

444def _apply_constraint( # noqa: C901 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

445 s: cs.CoreSchema | None, constraint: _ConstraintAnnotation 

446) -> cs.CoreSchema: 

447 """Apply a single constraint to a schema.""" 

448 if isinstance(constraint, annotated_types.Gt): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

449 gt = constraint.gt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

450 if s and s['type'] in {'int', 'float', 'decimal'}: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

451 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

452 if s['type'] == 'int' and isinstance(gt, int): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

453 s['gt'] = gt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

454 elif s['type'] == 'float' and isinstance(gt, float): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

455 s['gt'] = gt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

456 elif s['type'] == 'decimal' and isinstance(gt, Decimal): 456 ↛ 646line 456 didn't jump to line 646 because the condition on line 456 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

457 s['gt'] = gt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

458 else: 

459 

460 def check_gt(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

461 return v > gt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

462 

463 s = _check_func(check_gt, f'> {gt}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

464 elif isinstance(constraint, annotated_types.Ge): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

465 ge = constraint.ge 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

466 if s and s['type'] in {'int', 'float', 'decimal'}: 466 ↛ 475line 466 didn't jump to line 475 because the condition on line 466 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

467 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

468 if s['type'] == 'int' and isinstance(ge, int): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

469 s['ge'] = ge 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

470 elif s['type'] == 'float' and isinstance(ge, float): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

471 s['ge'] = ge 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

472 elif s['type'] == 'decimal' and isinstance(ge, Decimal): 472 ↛ 475line 472 didn't jump to line 475 because the condition on line 472 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

473 s['ge'] = ge 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

474 

475 def check_ge(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

476 return v >= ge 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

477 

478 s = _check_func(check_ge, f'>= {ge}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

479 elif isinstance(constraint, annotated_types.Lt): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

480 lt = constraint.lt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

481 if s and s['type'] in {'int', 'float', 'decimal'}: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

482 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

483 if s['type'] == 'int' and isinstance(lt, int): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

484 s['lt'] = lt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

485 elif s['type'] == 'float' and isinstance(lt, float): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

486 s['lt'] = lt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

487 elif s['type'] == 'decimal' and isinstance(lt, Decimal): 487 ↛ 490line 487 didn't jump to line 490 because the condition on line 487 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

488 s['lt'] = lt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

489 

490 def check_lt(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

491 return v < lt 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

492 

493 s = _check_func(check_lt, f'< {lt}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

494 elif isinstance(constraint, annotated_types.Le): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

495 le = constraint.le 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

496 if s and s['type'] in {'int', 'float', 'decimal'}: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

497 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

498 if s['type'] == 'int' and isinstance(le, int): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

499 s['le'] = le 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

500 elif s['type'] == 'float' and isinstance(le, float): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

501 s['le'] = le 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

502 elif s['type'] == 'decimal' and isinstance(le, Decimal): 502 ↛ 505line 502 didn't jump to line 505 because the condition on line 502 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

503 s['le'] = le 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

504 

505 def check_le(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

506 return v <= le 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

507 

508 s = _check_func(check_le, f'<= {le}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

509 elif isinstance(constraint, annotated_types.Len): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

510 min_len = constraint.min_length 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

511 max_len = constraint.max_length 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

512 

513 if s and s['type'] in {'str', 'list', 'tuple', 'set', 'frozenset', 'dict'}: 513 ↛ 528line 513 didn't jump to line 528 because the condition on line 513 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

514 assert ( 1rxefyzsA

515 s['type'] == 'str' 

516 or s['type'] == 'list' 

517 or s['type'] == 'tuple' 

518 or s['type'] == 'set' 

519 or s['type'] == 'dict' 

520 or s['type'] == 'frozenset' 

521 ) 

522 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

523 if min_len != 0: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

524 s['min_length'] = min_len 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

525 if max_len is not None: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

526 s['max_length'] = max_len 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

527 

528 def check_len(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

529 if max_len is not None: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

530 return (min_len <= len(v)) and (len(v) <= max_len) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

531 return min_len <= len(v) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

532 

533 s = _check_func(check_len, f'length >= {min_len} and length <= {max_len}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

534 elif isinstance(constraint, annotated_types.MultipleOf): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

535 multiple_of = constraint.multiple_of 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

536 if s and s['type'] in {'int', 'float', 'decimal'}: 536 ↛ 545line 536 didn't jump to line 545 because the condition on line 536 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

537 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

538 if s['type'] == 'int' and isinstance(multiple_of, int): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

539 s['multiple_of'] = multiple_of 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

540 elif s['type'] == 'float' and isinstance(multiple_of, float): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

541 s['multiple_of'] = multiple_of 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

542 elif s['type'] == 'decimal' and isinstance(multiple_of, Decimal): 542 ↛ 545line 542 didn't jump to line 545 because the condition on line 542 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

543 s['multiple_of'] = multiple_of 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

544 

545 def check_multiple_of(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

546 return v % multiple_of == 0 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

547 

548 s = _check_func(check_multiple_of, f'% {multiple_of} == 0', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

549 elif isinstance(constraint, annotated_types.Timezone): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

550 tz = constraint.tz 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

551 

552 if tz is ...: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

553 if s and s['type'] == 'datetime': 553 ↛ 558line 553 didn't jump to line 558 because the condition on line 553 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

554 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

555 s['tz_constraint'] = 'aware' 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

556 else: 

557 

558 def check_tz_aware(v: object) -> bool: 

559 assert isinstance(v, datetime.datetime) 

560 return v.tzinfo is not None 

561 

562 s = _check_func(check_tz_aware, 'timezone aware', s) 

563 elif tz is None: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

564 if s and s['type'] == 'datetime': 564 ↛ 569line 564 didn't jump to line 569 because the condition on line 564 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

565 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

566 s['tz_constraint'] = 'naive' 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

567 else: 

568 

569 def check_tz_naive(v: object) -> bool: 

570 assert isinstance(v, datetime.datetime) 

571 return v.tzinfo is None 

572 

573 s = _check_func(check_tz_naive, 'timezone naive', s) 

574 else: 

575 raise NotImplementedError('Constraining to a specific timezone is not yet supported') 

576 elif isinstance(constraint, annotated_types.Interval): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

577 if constraint.ge: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

578 s = _apply_constraint(s, annotated_types.Ge(constraint.ge)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

579 if constraint.gt: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

580 s = _apply_constraint(s, annotated_types.Gt(constraint.gt)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

581 if constraint.le: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

582 s = _apply_constraint(s, annotated_types.Le(constraint.le)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

583 if constraint.lt: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

584 s = _apply_constraint(s, annotated_types.Lt(constraint.lt)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

585 assert s is not None 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

586 elif isinstance(constraint, annotated_types.Predicate): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

587 func = constraint.func 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

588 

589 if func.__name__ == '<lambda>': 589 ↛ 605line 589 didn't jump to line 605 because the condition on line 589 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

590 # attempt to extract the source code for a lambda function 

591 # to use as the function name in error messages 

592 # TODO: is there a better way? should we just not do this? 

593 import inspect 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

594 

595 try: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

596 source = inspect.getsource(func).strip() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

597 source = source.removesuffix(')') 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

598 lambda_source_code = '`' + ''.join(''.join(source.split('lambda ')[1:]).split(':')[1:]).strip() + '`' 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

599 except OSError: 1rtuabsvwcd

600 # stringified annotations 

601 lambda_source_code = 'lambda' 1rtuabsvwcd

602 

603 s = _check_func(func, lambda_source_code, s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

604 else: 

605 s = _check_func(func, func.__name__, s) 

606 elif isinstance(constraint, _NotEq): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

607 value = constraint.value 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

608 

609 def check_not_eq(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

610 return operator.__ne__(v, value) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

611 

612 s = _check_func(check_not_eq, f'!= {value}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

613 elif isinstance(constraint, _Eq): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

614 value = constraint.value 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

615 

616 def check_eq(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

617 return operator.__eq__(v, value) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

618 

619 s = _check_func(check_eq, f'== {value}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

620 elif isinstance(constraint, _In): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

621 values = constraint.values 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

622 

623 def check_in(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

624 return operator.__contains__(values, v) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

625 

626 s = _check_func(check_in, f'in {values}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

627 elif isinstance(constraint, _NotIn): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

628 values = constraint.values 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

629 

630 def check_not_in(v: Any) -> bool: 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

631 return operator.__not__(operator.__contains__(values, v)) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

632 

633 s = _check_func(check_not_in, f'not in {values}', s) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

634 else: 

635 assert isinstance(constraint, Pattern) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

636 if s and s['type'] == 'str': 636 ↛ 641line 636 didn't jump to line 641 because the condition on line 636 was always true1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

637 s = s.copy() 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

638 s['pattern'] = constraint.pattern 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

639 else: 

640 

641 def check_pattern(v: object) -> bool: 

642 assert isinstance(v, str) 

643 return constraint.match(v) is not None 

644 

645 s = _check_func(check_pattern, f'~ {constraint.pattern}', s) 

646 return s 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

647 

648 

649class _SupportsRange(annotated_types.SupportsLe, annotated_types.SupportsGe, Protocol): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

650 pass 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

651 

652 

653class _SupportsLen(Protocol): 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

654 def __len__(self) -> int: ... 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

655 

656 

657_NewOutGt = TypeVar('_NewOutGt', bound=annotated_types.SupportsGt) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

658_NewOutGe = TypeVar('_NewOutGe', bound=annotated_types.SupportsGe) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

659_NewOutLt = TypeVar('_NewOutLt', bound=annotated_types.SupportsLt) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

660_NewOutLe = TypeVar('_NewOutLe', bound=annotated_types.SupportsLe) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

661_NewOutLen = TypeVar('_NewOutLen', bound=_SupportsLen) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

662_NewOutDiv = TypeVar('_NewOutDiv', bound=annotated_types.SupportsDiv) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

663_NewOutMod = TypeVar('_NewOutMod', bound=annotated_types.SupportsMod) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

664_NewOutDatetime = TypeVar('_NewOutDatetime', bound=datetime.datetime) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

665_NewOutInterval = TypeVar('_NewOutInterval', bound=_SupportsRange) 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

666_OtherIn = TypeVar('_OtherIn') 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq

667_OtherOut = TypeVar('_OtherOut') 1rxtBuCagbhiefyzDEFGjklmnsAvHwIcodpq