Coverage for sqlmodel/main.py: 93%

333 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 00:02 +0000

1import ipaddress 1facdeb

2import uuid 1facdeb

3import weakref 1facdeb

4from datetime import date, datetime, time, timedelta 1facdeb

5from decimal import Decimal 1facdeb

6from enum import Enum 1facdeb

7from pathlib import Path 1facdeb

8from typing import ( 1facdeb

9 TYPE_CHECKING, 

10 AbstractSet, 

11 Any, 

12 Callable, 

13 ClassVar, 

14 Dict, 

15 List, 

16 Mapping, 

17 Optional, 

18 Sequence, 

19 Set, 

20 Tuple, 

21 Type, 

22 TypeVar, 

23 Union, 

24 cast, 

25 overload, 

26) 

27 

28from pydantic import BaseModel, EmailStr 1facdeb

29from pydantic.fields import FieldInfo as PydanticFieldInfo 1facdeb

30from sqlalchemy import ( 1facdeb

31 Boolean, 

32 Column, 

33 Date, 

34 DateTime, 

35 Float, 

36 ForeignKey, 

37 Integer, 

38 Interval, 

39 Numeric, 

40 inspect, 

41) 

42from sqlalchemy import Enum as sa_Enum 1facdeb

43from sqlalchemy.orm import ( 1facdeb

44 Mapped, 

45 RelationshipProperty, 

46 declared_attr, 

47 registry, 

48 relationship, 

49) 

50from sqlalchemy.orm.attributes import set_attribute 1facdeb

51from sqlalchemy.orm.decl_api import DeclarativeMeta 1facdeb

52from sqlalchemy.orm.instrumentation import is_instrumented 1facdeb

53from sqlalchemy.sql.schema import MetaData 1facdeb

54from sqlalchemy.sql.sqltypes import LargeBinary, Time 1facdeb

55from typing_extensions import Literal, deprecated, get_origin 1facdeb

56 

57from ._compat import ( # type: ignore[attr-defined] 1facdeb

58 IS_PYDANTIC_V2, 

59 PYDANTIC_VERSION, 

60 BaseConfig, 

61 ModelField, 

62 ModelMetaclass, 

63 Representation, 

64 SQLModelConfig, 

65 Undefined, 

66 UndefinedType, 

67 _calculate_keys, 

68 finish_init, 

69 get_annotations, 

70 get_config_value, 

71 get_field_metadata, 

72 get_model_fields, 

73 get_relationship_to, 

74 get_type_from_field, 

75 init_pydantic_private_attrs, 

76 is_field_noneable, 

77 is_table_model_class, 

78 post_init_field_info, 

79 set_config_value, 

80 sqlmodel_init, 

81 sqlmodel_validate, 

82) 

83from .sql.sqltypes import GUID, AutoString 1facdeb

84 

85if TYPE_CHECKING: 1facdeb

86 from pydantic._internal._model_construction import ModelMetaclass as ModelMetaclass 

87 from pydantic._internal._repr import Representation as Representation 

88 from pydantic_core import PydanticUndefined as Undefined 

89 from pydantic_core import PydanticUndefinedType as UndefinedType 

90 

91_T = TypeVar("_T") 1facdeb

92NoArgAnyCallable = Callable[[], Any] 1facdeb

93IncEx = Union[Set[int], Set[str], Dict[int, Any], Dict[str, Any], None] 1facdeb

94 

95 

96def __dataclass_transform__( 1acdeb

97 *, 

98 eq_default: bool = True, 

99 order_default: bool = False, 

100 kw_only_default: bool = False, 

101 field_descriptors: Tuple[Union[type, Callable[..., Any]], ...] = (()), 

102) -> Callable[[_T], _T]: 

103 return lambda a: a 1facdeb

104 

105 

106class FieldInfo(PydanticFieldInfo): 1facdeb

107 def __init__(self, default: Any = Undefined, **kwargs: Any) -> None: 1facdeb

108 primary_key = kwargs.pop("primary_key", False) 1facdeb

109 nullable = kwargs.pop("nullable", Undefined) 1facdeb

110 foreign_key = kwargs.pop("foreign_key", Undefined) 1facdeb

111 unique = kwargs.pop("unique", False) 1facdeb

112 index = kwargs.pop("index", Undefined) 1facdeb

113 sa_type = kwargs.pop("sa_type", Undefined) 1facdeb

114 sa_column = kwargs.pop("sa_column", Undefined) 1facdeb

115 sa_column_args = kwargs.pop("sa_column_args", Undefined) 1facdeb

116 sa_column_kwargs = kwargs.pop("sa_column_kwargs", Undefined) 1facdeb

117 if sa_column is not Undefined: 1facdeb

118 if sa_column_args is not Undefined: 1facdeb

119 raise RuntimeError( 1facdeb

120 "Passing sa_column_args is not supported when " 

121 "also passing a sa_column" 

122 ) 

123 if sa_column_kwargs is not Undefined: 1facdeb

124 raise RuntimeError( 1facdeb

125 "Passing sa_column_kwargs is not supported when " 

126 "also passing a sa_column" 

127 ) 

128 if primary_key is not Undefined: 1facdeb

129 raise RuntimeError( 1facdeb

130 "Passing primary_key is not supported when " 

131 "also passing a sa_column" 

132 ) 

133 if nullable is not Undefined: 1facdeb

134 raise RuntimeError( 1facdeb

135 "Passing nullable is not supported when " "also passing a sa_column" 

136 ) 

137 if foreign_key is not Undefined: 1facdeb

138 raise RuntimeError( 1facdeb

139 "Passing foreign_key is not supported when " 

140 "also passing a sa_column" 

141 ) 

142 if unique is not Undefined: 1facdeb

143 raise RuntimeError( 1facdeb

144 "Passing unique is not supported when also passing a sa_column" 

145 ) 

146 if index is not Undefined: 1facdeb

147 raise RuntimeError( 1facdeb

148 "Passing index is not supported when also passing a sa_column" 

149 ) 

150 if sa_type is not Undefined: 1facdeb

151 raise RuntimeError( 1facdeb

152 "Passing sa_type is not supported when also passing a sa_column" 

153 ) 

154 super().__init__(default=default, **kwargs) 1facdeb

155 self.primary_key = primary_key 1facdeb

156 self.nullable = nullable 1facdeb

157 self.foreign_key = foreign_key 1facdeb

158 self.unique = unique 1facdeb

159 self.index = index 1facdeb

160 self.sa_type = sa_type 1facdeb

161 self.sa_column = sa_column 1facdeb

162 self.sa_column_args = sa_column_args 1facdeb

163 self.sa_column_kwargs = sa_column_kwargs 1facdeb

164 

165 

166class RelationshipInfo(Representation): 1facdeb

167 def __init__( 1acdeb

168 self, 

169 *, 

170 back_populates: Optional[str] = None, 

171 link_model: Optional[Any] = None, 

172 sa_relationship: Optional[RelationshipProperty] = None, # type: ignore 

173 sa_relationship_args: Optional[Sequence[Any]] = None, 

174 sa_relationship_kwargs: Optional[Mapping[str, Any]] = None, 

175 ) -> None: 

176 if sa_relationship is not None: 1facdeb

177 if sa_relationship_args is not None: 1facdeb

178 raise RuntimeError( 1facdeb

179 "Passing sa_relationship_args is not supported when " 

180 "also passing a sa_relationship" 

181 ) 

182 if sa_relationship_kwargs is not None: 1facdeb

183 raise RuntimeError( 1facdeb

184 "Passing sa_relationship_kwargs is not supported when " 

185 "also passing a sa_relationship" 

186 ) 

187 self.back_populates = back_populates 1facdeb

188 self.link_model = link_model 1facdeb

189 self.sa_relationship = sa_relationship 1facdeb

190 self.sa_relationship_args = sa_relationship_args 1facdeb

191 self.sa_relationship_kwargs = sa_relationship_kwargs 1facdeb

192 

193 

194@overload 1facdeb

195def Field( 1acdeb

196 default: Any = Undefined, 1facdeb

197 *, 

198 default_factory: Optional[NoArgAnyCallable] = None, 1facdeb

199 alias: Optional[str] = None, 1facdeb

200 title: Optional[str] = None, 1facdeb

201 description: Optional[str] = None, 1facdeb

202 exclude: Union[ 1acdeb

203 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1acdeb

204 ] = None, 1facdeb

205 include: Union[ 1acdeb

206 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1acdeb

207 ] = None, 1facdeb

208 const: Optional[bool] = None, 1facdeb

209 gt: Optional[float] = None, 1facdeb

210 ge: Optional[float] = None, 1facdeb

211 lt: Optional[float] = None, 1facdeb

212 le: Optional[float] = None, 1facdeb

213 multiple_of: Optional[float] = None, 1facdeb

214 max_digits: Optional[int] = None, 1facdeb

215 decimal_places: Optional[int] = None, 1facdeb

216 min_items: Optional[int] = None, 1facdeb

217 max_items: Optional[int] = None, 1facdeb

218 unique_items: Optional[bool] = None, 1facdeb

219 min_length: Optional[int] = None, 1facdeb

220 max_length: Optional[int] = None, 1facdeb

221 allow_mutation: bool = True, 1facdeb

222 regex: Optional[str] = None, 1facdeb

223 discriminator: Optional[str] = None, 1facdeb

224 repr: bool = True, 1facdeb

225 primary_key: Union[bool, UndefinedType] = Undefined, 1facdeb

226 foreign_key: Any = Undefined, 1facdeb

227 unique: Union[bool, UndefinedType] = Undefined, 1facdeb

228 nullable: Union[bool, UndefinedType] = Undefined, 1facdeb

229 index: Union[bool, UndefinedType] = Undefined, 1facdeb

230 sa_type: Union[Type[Any], UndefinedType] = Undefined, 1facdeb

231 sa_column_args: Union[Sequence[Any], UndefinedType] = Undefined, 1facdeb

232 sa_column_kwargs: Union[Mapping[str, Any], UndefinedType] = Undefined, 1facdeb

233 schema_extra: Optional[Dict[str, Any]] = None, 1facdeb

234) -> Any: ... 1facdeb

235 

236 

237@overload 1facdeb

238def Field( 1acdeb

239 default: Any = Undefined, 1facdeb

240 *, 

241 default_factory: Optional[NoArgAnyCallable] = None, 1facdeb

242 alias: Optional[str] = None, 1facdeb

243 title: Optional[str] = None, 1facdeb

244 description: Optional[str] = None, 1facdeb

245 exclude: Union[ 1acdeb

246 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1acdeb

247 ] = None, 1facdeb

248 include: Union[ 1acdeb

249 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1acdeb

250 ] = None, 1facdeb

251 const: Optional[bool] = None, 1facdeb

252 gt: Optional[float] = None, 1facdeb

253 ge: Optional[float] = None, 1facdeb

254 lt: Optional[float] = None, 1facdeb

255 le: Optional[float] = None, 1facdeb

256 multiple_of: Optional[float] = None, 1facdeb

257 max_digits: Optional[int] = None, 1facdeb

258 decimal_places: Optional[int] = None, 1facdeb

259 min_items: Optional[int] = None, 1facdeb

260 max_items: Optional[int] = None, 1facdeb

261 unique_items: Optional[bool] = None, 1facdeb

262 min_length: Optional[int] = None, 1facdeb

263 max_length: Optional[int] = None, 1facdeb

264 allow_mutation: bool = True, 1facdeb

265 regex: Optional[str] = None, 1facdeb

266 discriminator: Optional[str] = None, 1facdeb

267 repr: bool = True, 1facdeb

268 sa_column: Union[Column, UndefinedType] = Undefined, # type: ignore 1facdeb

269 schema_extra: Optional[Dict[str, Any]] = None, 1facdeb

270) -> Any: ... 1facdeb

271 

272 

273def Field( 1acdeb

274 default: Any = Undefined, 

275 *, 

276 default_factory: Optional[NoArgAnyCallable] = None, 

277 alias: Optional[str] = None, 

278 title: Optional[str] = None, 

279 description: Optional[str] = None, 

280 exclude: Union[ 

281 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 

282 ] = None, 

283 include: Union[ 

284 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 

285 ] = None, 

286 const: Optional[bool] = None, 

287 gt: Optional[float] = None, 

288 ge: Optional[float] = None, 

289 lt: Optional[float] = None, 

290 le: Optional[float] = None, 

291 multiple_of: Optional[float] = None, 

292 max_digits: Optional[int] = None, 

293 decimal_places: Optional[int] = None, 

294 min_items: Optional[int] = None, 

295 max_items: Optional[int] = None, 

296 unique_items: Optional[bool] = None, 

297 min_length: Optional[int] = None, 

298 max_length: Optional[int] = None, 

299 allow_mutation: bool = True, 

300 regex: Optional[str] = None, 

301 discriminator: Optional[str] = None, 

302 repr: bool = True, 

303 primary_key: Union[bool, UndefinedType] = Undefined, 

304 foreign_key: Any = Undefined, 

305 unique: Union[bool, UndefinedType] = Undefined, 

306 nullable: Union[bool, UndefinedType] = Undefined, 

307 index: Union[bool, UndefinedType] = Undefined, 

308 sa_type: Union[Type[Any], UndefinedType] = Undefined, 

309 sa_column: Union[Column, UndefinedType] = Undefined, # type: ignore 

310 sa_column_args: Union[Sequence[Any], UndefinedType] = Undefined, 

311 sa_column_kwargs: Union[Mapping[str, Any], UndefinedType] = Undefined, 

312 schema_extra: Optional[Dict[str, Any]] = None, 

313) -> Any: 

314 current_schema_extra = schema_extra or {} 1facdeb

315 field_info = FieldInfo( 1facdeb

316 default, 

317 default_factory=default_factory, 

318 alias=alias, 

319 title=title, 

320 description=description, 

321 exclude=exclude, 

322 include=include, 

323 const=const, 

324 gt=gt, 

325 ge=ge, 

326 lt=lt, 

327 le=le, 

328 multiple_of=multiple_of, 

329 max_digits=max_digits, 

330 decimal_places=decimal_places, 

331 min_items=min_items, 

332 max_items=max_items, 

333 unique_items=unique_items, 

334 min_length=min_length, 

335 max_length=max_length, 

336 allow_mutation=allow_mutation, 

337 regex=regex, 

338 discriminator=discriminator, 

339 repr=repr, 

340 primary_key=primary_key, 

341 foreign_key=foreign_key, 

342 unique=unique, 

343 nullable=nullable, 

344 index=index, 

345 sa_type=sa_type, 

346 sa_column=sa_column, 

347 sa_column_args=sa_column_args, 

348 sa_column_kwargs=sa_column_kwargs, 

349 **current_schema_extra, 

350 ) 

351 post_init_field_info(field_info) 1facdeb

352 return field_info 1facdeb

353 

354 

355@overload 1facdeb

356def Relationship( 1acdeb

357 *, 

358 back_populates: Optional[str] = None, 1facdeb

359 link_model: Optional[Any] = None, 1facdeb

360 sa_relationship_args: Optional[Sequence[Any]] = None, 1facdeb

361 sa_relationship_kwargs: Optional[Mapping[str, Any]] = None, 1facdeb

362) -> Any: ... 1facdeb

363 

364 

365@overload 1facdeb

366def Relationship( 1acdeb

367 *, 

368 back_populates: Optional[str] = None, 1facdeb

369 link_model: Optional[Any] = None, 1facdeb

370 sa_relationship: Optional[RelationshipProperty[Any]] = None, 1facdeb

371) -> Any: ... 1facdeb

372 

373 

374def Relationship( 1acdeb

375 *, 

376 back_populates: Optional[str] = None, 

377 link_model: Optional[Any] = None, 

378 sa_relationship: Optional[RelationshipProperty[Any]] = None, 

379 sa_relationship_args: Optional[Sequence[Any]] = None, 

380 sa_relationship_kwargs: Optional[Mapping[str, Any]] = None, 

381) -> Any: 

382 relationship_info = RelationshipInfo( 1facdeb

383 back_populates=back_populates, 

384 link_model=link_model, 

385 sa_relationship=sa_relationship, 

386 sa_relationship_args=sa_relationship_args, 

387 sa_relationship_kwargs=sa_relationship_kwargs, 

388 ) 

389 return relationship_info 1facdeb

390 

391 

392@__dataclass_transform__(kw_only_default=True, field_descriptors=(Field, FieldInfo)) 1facdeb

393class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta): 1facdeb

394 __sqlmodel_relationships__: Dict[str, RelationshipInfo] 1facdeb

395 model_config: SQLModelConfig 1facdeb

396 model_fields: Dict[str, FieldInfo] 1facdeb

397 __config__: Type[SQLModelConfig] 1facdeb

398 __fields__: Dict[str, ModelField] # type: ignore[assignment] 1facdeb

399 

400 # Replicate SQLAlchemy 

401 def __setattr__(cls, name: str, value: Any) -> None: 1facdeb

402 if is_table_model_class(cls): 1facdeb

403 DeclarativeMeta.__setattr__(cls, name, value) 1facdeb

404 else: 

405 super().__setattr__(name, value) 1facdeb

406 

407 def __delattr__(cls, name: str) -> None: 1facdeb

408 if is_table_model_class(cls): 1facdeb

409 DeclarativeMeta.__delattr__(cls, name) 1facdeb

410 else: 

411 super().__delattr__(name) 1fcde

412 

413 # From Pydantic 

414 def __new__( 1acdeb

415 cls, 

416 name: str, 

417 bases: Tuple[Type[Any], ...], 

418 class_dict: Dict[str, Any], 

419 **kwargs: Any, 

420 ) -> Any: 

421 relationships: Dict[str, RelationshipInfo] = {} 1facdeb

422 dict_for_pydantic = {} 1facdeb

423 original_annotations = get_annotations(class_dict) 1facdeb

424 pydantic_annotations = {} 1facdeb

425 relationship_annotations = {} 1facdeb

426 for k, v in class_dict.items(): 1facdeb

427 if isinstance(v, RelationshipInfo): 1facdeb

428 relationships[k] = v 1facdeb

429 else: 

430 dict_for_pydantic[k] = v 1facdeb

431 for k, v in original_annotations.items(): 1facdeb

432 if k in relationships: 1facdeb

433 relationship_annotations[k] = v 1facdeb

434 else: 

435 pydantic_annotations[k] = v 1facdeb

436 dict_used = { 1acdeb

437 **dict_for_pydantic, 

438 "__weakref__": None, 

439 "__sqlmodel_relationships__": relationships, 

440 "__annotations__": pydantic_annotations, 

441 } 

442 # Duplicate logic from Pydantic to filter config kwargs because if they are 

443 # passed directly including the registry Pydantic will pass them over to the 

444 # superclass causing an error 

445 allowed_config_kwargs: Set[str] = { 1facdeb

446 key 

447 for key in dir(BaseConfig) 

448 if not ( 

449 key.startswith("__") and key.endswith("__") 

450 ) # skip dunder methods and attributes 

451 } 

452 config_kwargs = { 1facdeb

453 key: kwargs[key] for key in kwargs.keys() & allowed_config_kwargs 

454 } 

455 new_cls = super().__new__(cls, name, bases, dict_used, **config_kwargs) 1facdeb

456 new_cls.__annotations__ = { 1acdeb

457 **relationship_annotations, 

458 **pydantic_annotations, 

459 **new_cls.__annotations__, 

460 } 

461 

462 def get_config(name: str) -> Any: 1facdeb

463 config_class_value = get_config_value( 1facdeb

464 model=new_cls, parameter=name, default=Undefined 

465 ) 

466 if config_class_value is not Undefined: 1facdeb

467 return config_class_value 1ab

468 kwarg_value = kwargs.get(name, Undefined) 1facdeb

469 if kwarg_value is not Undefined: 1facdeb

470 return kwarg_value 1facdeb

471 return Undefined 1facdeb

472 

473 config_table = get_config("table") 1facdeb

474 if config_table is True: 1facdeb

475 # If it was passed by kwargs, ensure it's also set in config 

476 set_config_value(model=new_cls, parameter="table", value=config_table) 1facdeb

477 for k, v in get_model_fields(new_cls).items(): 1facdeb

478 col = get_column_from_field(v) 1facdeb

479 setattr(new_cls, k, col) 1facdeb

480 # Set a config flag to tell FastAPI that this should be read with a field 

481 # in orm_mode instead of preemptively converting it to a dict. 

482 # This could be done by reading new_cls.model_config['table'] in FastAPI, but 

483 # that's very specific about SQLModel, so let's have another config that 

484 # other future tools based on Pydantic can use. 

485 set_config_value( 1facdeb

486 model=new_cls, parameter="read_from_attributes", value=True 

487 ) 

488 # For compatibility with older versions 

489 # TODO: remove this in the future 

490 set_config_value(model=new_cls, parameter="read_with_orm_mode", value=True) 1facdeb

491 

492 config_registry = get_config("registry") 1facdeb

493 if config_registry is not Undefined: 1facdeb

494 config_registry = cast(registry, config_registry) 1facdeb

495 # If it was passed by kwargs, ensure it's also set in config 

496 set_config_value(model=new_cls, parameter="registry", value=config_table) 1facdeb

497 setattr(new_cls, "_sa_registry", config_registry) # noqa: B010 1facdeb

498 setattr(new_cls, "metadata", config_registry.metadata) # noqa: B010 1facdeb

499 setattr(new_cls, "__abstract__", True) # noqa: B010 1facdeb

500 return new_cls 1facdeb

501 

502 # Override SQLAlchemy, allow both SQLAlchemy and plain Pydantic models 

503 def __init__( 1acdeb

504 cls, classname: str, bases: Tuple[type, ...], dict_: Dict[str, Any], **kw: Any 

505 ) -> None: 

506 # Only one of the base classes (or the current one) should be a table model 

507 # this allows FastAPI cloning a SQLModel for the response_model without 

508 # trying to create a new SQLAlchemy, for a new table, with the same name, that 

509 # triggers an error 

510 base_is_table = any(is_table_model_class(base) for base in bases) 1facdeb

511 if is_table_model_class(cls) and not base_is_table: 1facdeb

512 for rel_name, rel_info in cls.__sqlmodel_relationships__.items(): 1facdeb

513 if rel_info.sa_relationship: 1facdeb

514 # There's a SQLAlchemy relationship declared, that takes precedence 

515 # over anything else, use that and continue with the next attribute 

516 setattr(cls, rel_name, rel_info.sa_relationship) # Fix #315 1facdeb

517 continue 1facdeb

518 raw_ann = cls.__annotations__[rel_name] 1facdeb

519 origin = get_origin(raw_ann) 1facdeb

520 if origin is Mapped: 1facdeb

521 ann = raw_ann.__args__[0] 

522 else: 

523 ann = raw_ann 1facdeb

524 # Plain forward references, for models not yet defined, are not 

525 # handled well by SQLAlchemy without Mapped, so, wrap the 

526 # annotations in Mapped here 

527 cls.__annotations__[rel_name] = Mapped[ann] # type: ignore[valid-type] 1facdeb

528 relationship_to = get_relationship_to( 1facdeb

529 name=rel_name, rel_info=rel_info, annotation=ann 

530 ) 

531 rel_kwargs: Dict[str, Any] = {} 1facdeb

532 if rel_info.back_populates: 1facdeb

533 rel_kwargs["back_populates"] = rel_info.back_populates 1facdeb

534 if rel_info.link_model: 1facdeb

535 ins = inspect(rel_info.link_model) 1facdeb

536 local_table = getattr(ins, "local_table") # noqa: B009 1facdeb

537 if local_table is None: 1facdeb

538 raise RuntimeError( 

539 "Couldn't find the secondary table for " 

540 f"model {rel_info.link_model}" 

541 ) 

542 rel_kwargs["secondary"] = local_table 1facdeb

543 rel_args: List[Any] = [] 1facdeb

544 if rel_info.sa_relationship_args: 1facdeb

545 rel_args.extend(rel_info.sa_relationship_args) 

546 if rel_info.sa_relationship_kwargs: 1facdeb

547 rel_kwargs.update(rel_info.sa_relationship_kwargs) 

548 rel_value = relationship(relationship_to, *rel_args, **rel_kwargs) 1facdeb

549 setattr(cls, rel_name, rel_value) # Fix #315 1facdeb

550 # SQLAlchemy no longer uses dict_ 

551 # Ref: https://github.com/sqlalchemy/sqlalchemy/commit/428ea01f00a9cc7f85e435018565eb6da7af1b77 

552 # Tag: 1.4.36 

553 DeclarativeMeta.__init__(cls, classname, bases, dict_, **kw) 1facdeb

554 else: 

555 ModelMetaclass.__init__(cls, classname, bases, dict_, **kw) 1facdeb

556 

557 

558def get_sqlalchemy_type(field: Any) -> Any: 1facdeb

559 if IS_PYDANTIC_V2: 1facdeb

560 field_info = field 1fcde

561 else: 

562 field_info = field.field_info 1ab

563 sa_type = getattr(field_info, "sa_type", Undefined) # noqa: B009 1facdeb

564 if sa_type is not Undefined: 1facdeb

565 return sa_type 

566 

567 type_ = get_type_from_field(field) 1facdeb

568 metadata = get_field_metadata(field) 1facdeb

569 

570 # Check enums first as an enum can also be a str, needed by Pydantic/FastAPI 

571 if issubclass(type_, Enum): 1facdeb

572 return sa_Enum(type_) 1facdeb

573 if issubclass( 1facdeb

574 type_, 

575 ( 

576 str, 

577 ipaddress.IPv4Address, 

578 ipaddress.IPv4Network, 

579 ipaddress.IPv6Address, 

580 ipaddress.IPv6Network, 

581 Path, 

582 EmailStr, 

583 ), 

584 ): 

585 max_length = getattr(metadata, "max_length", None) 1facdeb

586 if max_length: 1facdeb

587 return AutoString(length=max_length) 

588 return AutoString 1facdeb

589 if issubclass(type_, float): 1facdeb

590 return Float 

591 if issubclass(type_, bool): 1facdeb

592 return Boolean 1facdeb

593 if issubclass(type_, int): 1facdeb

594 return Integer 1facdeb

595 if issubclass(type_, datetime): 1facdeb

596 return DateTime 

597 if issubclass(type_, date): 1facdeb

598 return Date 

599 if issubclass(type_, timedelta): 1facdeb

600 return Interval 

601 if issubclass(type_, time): 1facdeb

602 return Time 

603 if issubclass(type_, bytes): 1facdeb

604 return LargeBinary 

605 if issubclass(type_, Decimal): 1facdeb

606 return Numeric( 1facdeb

607 precision=getattr(metadata, "max_digits", None), 

608 scale=getattr(metadata, "decimal_places", None), 

609 ) 

610 if issubclass(type_, uuid.UUID): 1facdeb

611 return GUID 1facdeb

612 raise ValueError(f"{type_} has no matching SQLAlchemy type") 1facdeb

613 

614 

615def get_column_from_field(field: Any) -> Column: # type: ignore 1facdeb

616 if IS_PYDANTIC_V2: 1facdeb

617 field_info = field 1fcde

618 else: 

619 field_info = field.field_info 1ab

620 sa_column = getattr(field_info, "sa_column", Undefined) 1facdeb

621 if isinstance(sa_column, Column): 1facdeb

622 return sa_column 1facdeb

623 sa_type = get_sqlalchemy_type(field) 1facdeb

624 primary_key = getattr(field_info, "primary_key", Undefined) 1facdeb

625 if primary_key is Undefined: 1facdeb

626 primary_key = False 1facdeb

627 index = getattr(field_info, "index", Undefined) 1facdeb

628 if index is Undefined: 1facdeb

629 index = False 1facdeb

630 nullable = not primary_key and is_field_noneable(field) 1facdeb

631 # Override derived nullability if the nullable property is set explicitly 

632 # on the field 

633 field_nullable = getattr(field_info, "nullable", Undefined) # noqa: B009 1facdeb

634 if field_nullable is not Undefined: 1facdeb

635 assert not isinstance(field_nullable, UndefinedType) 1facdeb

636 nullable = field_nullable 1facdeb

637 args = [] 1facdeb

638 foreign_key = getattr(field_info, "foreign_key", Undefined) 1facdeb

639 if foreign_key is Undefined: 1facdeb

640 foreign_key = None 1facdeb

641 unique = getattr(field_info, "unique", Undefined) 1facdeb

642 if unique is Undefined: 1facdeb

643 unique = False 1facdeb

644 if foreign_key: 1facdeb

645 assert isinstance(foreign_key, str) 1facdeb

646 args.append(ForeignKey(foreign_key)) 1facdeb

647 kwargs = { 1acdeb

648 "primary_key": primary_key, 

649 "nullable": nullable, 

650 "index": index, 

651 "unique": unique, 

652 } 

653 sa_default = Undefined 1facdeb

654 if field_info.default_factory: 1facdeb

655 sa_default = field_info.default_factory 

656 elif field_info.default is not Undefined: 1facdeb

657 sa_default = field_info.default 1facdeb

658 if sa_default is not Undefined: 1facdeb

659 kwargs["default"] = sa_default 1facdeb

660 sa_column_args = getattr(field_info, "sa_column_args", Undefined) 1facdeb

661 if sa_column_args is not Undefined: 1facdeb

662 args.extend(list(cast(Sequence[Any], sa_column_args))) 1facdeb

663 sa_column_kwargs = getattr(field_info, "sa_column_kwargs", Undefined) 1facdeb

664 if sa_column_kwargs is not Undefined: 1facdeb

665 kwargs.update(cast(Dict[Any, Any], sa_column_kwargs)) 1facdeb

666 return Column(sa_type, *args, **kwargs) # type: ignore 1facdeb

667 

668 

669class_registry = weakref.WeakValueDictionary() # type: ignore 1facdeb

670 

671default_registry = registry() 1facdeb

672 

673_TSQLModel = TypeVar("_TSQLModel", bound="SQLModel") 1facdeb

674 

675 

676class SQLModel(BaseModel, metaclass=SQLModelMetaclass, registry=default_registry): 1facdeb

677 # SQLAlchemy needs to set weakref(s), Pydantic will set the other slots values 

678 __slots__ = ("__weakref__",) 1facdeb

679 __tablename__: ClassVar[Union[str, Callable[..., str]]] 1facdeb

680 __sqlmodel_relationships__: ClassVar[Dict[str, RelationshipProperty[Any]]] 1facdeb

681 __name__: ClassVar[str] 1facdeb

682 metadata: ClassVar[MetaData] 1facdeb

683 __allow_unmapped__ = True # https://docs.sqlalchemy.org/en/20/changelog/migration_20.html#migration-20-step-six 1facdeb

684 

685 if IS_PYDANTIC_V2: 1facdeb

686 model_config = SQLModelConfig(from_attributes=True) 1fcde

687 else: 

688 

689 class Config: 1ab

690 orm_mode = True 1ab

691 

692 def __new__(cls, *args: Any, **kwargs: Any) -> Any: 1facdeb

693 new_object = super().__new__(cls) 1facdeb

694 # SQLAlchemy doesn't call __init__ on the base class when querying from DB 

695 # Ref: https://docs.sqlalchemy.org/en/14/orm/constructors.html 

696 # Set __fields_set__ here, that would have been set when calling __init__ 

697 # in the Pydantic model so that when SQLAlchemy sets attributes that are 

698 # added (e.g. when querying from DB) to the __fields_set__, this already exists 

699 init_pydantic_private_attrs(new_object) 1facdeb

700 return new_object 1facdeb

701 

702 def __init__(__pydantic_self__, **data: Any) -> None: 1facdeb

703 # Uses something other than `self` the first arg to allow "self" as a 

704 # settable attribute 

705 

706 # SQLAlchemy does very dark black magic and modifies the __init__ method in 

707 # sqlalchemy.orm.instrumentation._generate_init() 

708 # so, to make SQLAlchemy work, it's needed to explicitly call __init__ to 

709 # trigger all the SQLAlchemy logic, it doesn't work using cls.__new__, setting 

710 # attributes obj.__dict__, etc. The __init__ method has to be called. But 

711 # there are cases where calling all the default logic is not ideal, e.g. 

712 # when calling Model.model_validate(), as the validation is done outside 

713 # of instance creation. 

714 # At the same time, __init__ is what users would normally call, by creating 

715 # a new instance, which should have validation and all the default logic. 

716 # So, to be able to set up the internal SQLAlchemy logic alone without 

717 # executing the rest, and support things like Model.model_validate(), we 

718 # use a contextvar to know if we should execute everything. 

719 if finish_init.get(): 1facdeb

720 sqlmodel_init(self=__pydantic_self__, data=data) 1facdeb

721 

722 def __setattr__(self, name: str, value: Any) -> None: 1facdeb

723 if name in {"_sa_instance_state"}: 1facdeb

724 self.__dict__[name] = value 1facdeb

725 return 1facdeb

726 else: 

727 # Set in SQLAlchemy, before Pydantic to trigger events and updates 

728 if is_table_model_class(self.__class__) and is_instrumented(self, name): # type: ignore[no-untyped-call] 1facdeb

729 set_attribute(self, name, value) 1facdeb

730 # Set in Pydantic model to trigger possible validation changes, only for 

731 # non relationship values 

732 if name not in self.__sqlmodel_relationships__: 1facdeb

733 super().__setattr__(name, value) 1facdeb

734 

735 def __repr_args__(self) -> Sequence[Tuple[Optional[str], Any]]: 1facdeb

736 # Don't show SQLAlchemy private attributes 

737 return [ 1acdeb

738 (k, v) 

739 for k, v in super().__repr_args__() 

740 if not (isinstance(k, str) and k.startswith("_sa_")) 

741 ] 

742 

743 @declared_attr # type: ignore 1facdeb

744 def __tablename__(cls) -> str: 1facdeb

745 return cls.__name__.lower() 1facdeb

746 

747 @classmethod 1facdeb

748 def model_validate( 1acdeb

749 cls: Type[_TSQLModel], 

750 obj: Any, 

751 *, 

752 strict: Union[bool, None] = None, 

753 from_attributes: Union[bool, None] = None, 

754 context: Union[Dict[str, Any], None] = None, 

755 update: Union[Dict[str, Any], None] = None, 

756 ) -> _TSQLModel: 

757 return sqlmodel_validate( 1facdeb

758 cls=cls, 

759 obj=obj, 

760 strict=strict, 

761 from_attributes=from_attributes, 

762 context=context, 

763 update=update, 

764 ) 

765 

766 def model_dump( 1acdeb

767 self, 

768 *, 

769 mode: Union[Literal["json", "python"], str] = "python", 

770 include: IncEx = None, 

771 exclude: IncEx = None, 

772 context: Union[Dict[str, Any], None] = None, 

773 by_alias: bool = False, 

774 exclude_unset: bool = False, 

775 exclude_defaults: bool = False, 

776 exclude_none: bool = False, 

777 round_trip: bool = False, 

778 warnings: Union[bool, Literal["none", "warn", "error"]] = True, 

779 serialize_as_any: bool = False, 

780 ) -> Dict[str, Any]: 

781 if PYDANTIC_VERSION >= "2.7.0": 1facdeb

782 extra_kwargs: Dict[str, Any] = { 1cde

783 "context": context, 

784 "serialize_as_any": serialize_as_any, 

785 } 

786 else: 

787 extra_kwargs = {} 1fab

788 if IS_PYDANTIC_V2: 1facdeb

789 return super().model_dump( 1fcde

790 mode=mode, 

791 include=include, 

792 exclude=exclude, 

793 by_alias=by_alias, 

794 exclude_unset=exclude_unset, 

795 exclude_defaults=exclude_defaults, 

796 exclude_none=exclude_none, 

797 round_trip=round_trip, 

798 warnings=warnings, 

799 **extra_kwargs, 

800 ) 

801 else: 

802 return super().dict( 1ab

803 include=include, 

804 exclude=exclude, 

805 by_alias=by_alias, 

806 exclude_unset=exclude_unset, 

807 exclude_defaults=exclude_defaults, 

808 exclude_none=exclude_none, 

809 ) 

810 

811 @deprecated( 1facdeb

812 """ 

813 🚨 `obj.dict()` was deprecated in SQLModel 0.0.14, you should 

814 instead use `obj.model_dump()`. 

815 """ 

816 ) 

817 def dict( 1acdeb

818 self, 

819 *, 

820 include: IncEx = None, 

821 exclude: IncEx = None, 

822 by_alias: bool = False, 

823 exclude_unset: bool = False, 

824 exclude_defaults: bool = False, 

825 exclude_none: bool = False, 

826 ) -> Dict[str, Any]: 

827 return self.model_dump( 1facdeb

828 include=include, 

829 exclude=exclude, 

830 by_alias=by_alias, 

831 exclude_unset=exclude_unset, 

832 exclude_defaults=exclude_defaults, 

833 exclude_none=exclude_none, 

834 ) 

835 

836 @classmethod 1facdeb

837 @deprecated( 1facdeb

838 """ 

839 🚨 `obj.from_orm(data)` was deprecated in SQLModel 0.0.14, you should 

840 instead use `obj.model_validate(data)`. 

841 """ 

842 ) 

843 def from_orm( 1acdeb

844 cls: Type[_TSQLModel], obj: Any, update: Optional[Dict[str, Any]] = None 

845 ) -> _TSQLModel: 

846 return cls.model_validate(obj, update=update) 1facdeb

847 

848 @classmethod 1facdeb

849 @deprecated( 1facdeb

850 """ 

851 🚨 `obj.parse_obj(data)` was deprecated in SQLModel 0.0.14, you should 

852 instead use `obj.model_validate(data)`. 

853 """ 

854 ) 

855 def parse_obj( 1acdeb

856 cls: Type[_TSQLModel], obj: Any, update: Optional[Dict[str, Any]] = None 

857 ) -> _TSQLModel: 

858 if not IS_PYDANTIC_V2: 1facdeb

859 obj = cls._enforce_dict_if_root(obj) # type: ignore[attr-defined] # noqa 1ab

860 return cls.model_validate(obj, update=update) 1facdeb

861 

862 # From Pydantic, override to only show keys from fields, omit SQLAlchemy attributes 

863 @deprecated( 1facdeb

864 """ 

865 🚨 You should not access `obj._calculate_keys()` directly. 

866 

867 It is only useful for Pydantic v1.X, you should probably upgrade to 

868 Pydantic v2.X. 

869 """, 

870 category=None, 

871 ) 

872 def _calculate_keys( 1acdeb

873 self, 

874 include: Optional[Mapping[Union[int, str], Any]], 

875 exclude: Optional[Mapping[Union[int, str], Any]], 

876 exclude_unset: bool, 

877 update: Optional[Dict[str, Any]] = None, 

878 ) -> Optional[AbstractSet[str]]: 

879 return _calculate_keys( 1ab

880 self, 

881 include=include, 

882 exclude=exclude, 

883 exclude_unset=exclude_unset, 

884 update=update, 

885 ) 

886 

887 def sqlmodel_update( 1acdeb

888 self: _TSQLModel, 

889 obj: Union[Dict[str, Any], BaseModel], 

890 *, 

891 update: Union[Dict[str, Any], None] = None, 

892 ) -> _TSQLModel: 

893 use_update = (update or {}).copy() 1facdeb

894 if isinstance(obj, dict): 1facdeb

895 for key, value in {**obj, **use_update}.items(): 1facdeb

896 if key in get_model_fields(self): 1facdeb

897 setattr(self, key, value) 1facdeb

898 elif isinstance(obj, BaseModel): 

899 for key in get_model_fields(obj): 

900 if key in use_update: 

901 value = use_update.pop(key) 

902 else: 

903 value = getattr(obj, key) 

904 setattr(self, key, value) 

905 for remaining_key in use_update: 

906 if remaining_key in get_model_fields(self): 

907 value = use_update.pop(remaining_key) 

908 setattr(self, remaining_key, value) 

909 else: 

910 raise ValueError( 

911 "Can't use sqlmodel_update() with something that " 

912 f"is not a dict or SQLModel or Pydantic model: {obj}" 

913 ) 

914 return self 1facdeb