Coverage for sqlmodel/main.py: 93%
353 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-12 19:13 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-12 19:13 +0000
1import ipaddress 1feabcd
2import uuid 1feabcd
3import weakref 1feabcd
4from datetime import date, datetime, time, timedelta 1feabcd
5from decimal import Decimal 1feabcd
6from enum import Enum 1feabcd
7from pathlib import Path 1feabcd
8from typing import ( 1feabcd
9 TYPE_CHECKING,
10 AbstractSet,
11 Any,
12 Callable,
13 ClassVar,
14 Dict,
15 List,
16 Mapping,
17 Optional,
18 Sequence,
19 Set,
20 Tuple,
21 Type,
22 TypeVar,
23 Union,
24 cast,
25 overload,
26)
28from pydantic import BaseModel, EmailStr 1feabcd
29from pydantic.fields import FieldInfo as PydanticFieldInfo 1feabcd
30from sqlalchemy import ( 1feabcd
31 Boolean,
32 Column,
33 Date,
34 DateTime,
35 Float,
36 ForeignKey,
37 Integer,
38 Interval,
39 Numeric,
40 inspect,
41)
42from sqlalchemy import Enum as sa_Enum 1feabcd
43from sqlalchemy.orm import ( 1feabcd
44 Mapped,
45 RelationshipProperty,
46 declared_attr,
47 registry,
48 relationship,
49)
50from sqlalchemy.orm.attributes import set_attribute 1feabcd
51from sqlalchemy.orm.decl_api import DeclarativeMeta 1feabcd
52from sqlalchemy.orm.instrumentation import is_instrumented 1feabcd
53from sqlalchemy.sql.schema import MetaData 1feabcd
54from sqlalchemy.sql.sqltypes import LargeBinary, Time, Uuid 1feabcd
55from typing_extensions import Literal, deprecated, get_origin 1feabcd
57from ._compat import ( # type: ignore[attr-defined] 1feabcd
58 IS_PYDANTIC_V2,
59 PYDANTIC_VERSION,
60 BaseConfig,
61 ModelField,
62 ModelMetaclass,
63 Representation,
64 SQLModelConfig,
65 Undefined,
66 UndefinedType,
67 _calculate_keys,
68 finish_init,
69 get_annotations,
70 get_config_value,
71 get_field_metadata,
72 get_model_fields,
73 get_relationship_to,
74 get_type_from_field,
75 init_pydantic_private_attrs,
76 is_field_noneable,
77 is_table_model_class,
78 post_init_field_info,
79 set_config_value,
80 sqlmodel_init,
81 sqlmodel_validate,
82)
83from .sql.sqltypes import AutoString 1feabcd
85if TYPE_CHECKING: 1feabcd
86 from pydantic._internal._model_construction import ModelMetaclass as ModelMetaclass
87 from pydantic._internal._repr import Representation as Representation
88 from pydantic_core import PydanticUndefined as Undefined
89 from pydantic_core import PydanticUndefinedType as UndefinedType
91_T = TypeVar("_T") 1feabcd
92NoArgAnyCallable = Callable[[], Any] 1feabcd
93IncEx = Union[Set[int], Set[str], Dict[int, Any], Dict[str, Any], None] 1feabcd
94OnDeleteType = Literal["CASCADE", "SET NULL", "RESTRICT"] 1feabcd
97def __dataclass_transform__( 1eabcd
98 *,
99 eq_default: bool = True,
100 order_default: bool = False,
101 kw_only_default: bool = False,
102 field_descriptors: Tuple[Union[type, Callable[..., Any]], ...] = (()),
103) -> Callable[[_T], _T]:
104 return lambda a: a 1feabcd
107class FieldInfo(PydanticFieldInfo): 1feabcd
108 def __init__(self, default: Any = Undefined, **kwargs: Any) -> None: 1feabcd
109 primary_key = kwargs.pop("primary_key", False) 1feabcd
110 nullable = kwargs.pop("nullable", Undefined) 1feabcd
111 foreign_key = kwargs.pop("foreign_key", Undefined) 1feabcd
112 ondelete = kwargs.pop("ondelete", Undefined) 1feabcd
113 unique = kwargs.pop("unique", False) 1feabcd
114 index = kwargs.pop("index", Undefined) 1feabcd
115 sa_type = kwargs.pop("sa_type", Undefined) 1feabcd
116 sa_column = kwargs.pop("sa_column", Undefined) 1feabcd
117 sa_column_args = kwargs.pop("sa_column_args", Undefined) 1feabcd
118 sa_column_kwargs = kwargs.pop("sa_column_kwargs", Undefined) 1feabcd
119 if sa_column is not Undefined: 1feabcd
120 if sa_column_args is not Undefined: 1feabcd
121 raise RuntimeError( 1feabcd
122 "Passing sa_column_args is not supported when "
123 "also passing a sa_column"
124 )
125 if sa_column_kwargs is not Undefined: 1feabcd
126 raise RuntimeError( 1feabcd
127 "Passing sa_column_kwargs is not supported when "
128 "also passing a sa_column"
129 )
130 if primary_key is not Undefined: 1feabcd
131 raise RuntimeError( 1feabcd
132 "Passing primary_key is not supported when "
133 "also passing a sa_column"
134 )
135 if nullable is not Undefined: 1feabcd
136 raise RuntimeError( 1feabcd
137 "Passing nullable is not supported when also passing a sa_column"
138 )
139 if foreign_key is not Undefined: 1feabcd
140 raise RuntimeError( 1feabcd
141 "Passing foreign_key is not supported when "
142 "also passing a sa_column"
143 )
144 if ondelete is not Undefined: 1feabcd
145 raise RuntimeError( 1feabcd
146 "Passing ondelete is not supported when also passing a sa_column"
147 )
148 if unique is not Undefined: 1feabcd
149 raise RuntimeError( 1feabcd
150 "Passing unique is not supported when also passing a sa_column"
151 )
152 if index is not Undefined: 1feabcd
153 raise RuntimeError( 1feabcd
154 "Passing index is not supported when also passing a sa_column"
155 )
156 if sa_type is not Undefined: 1feabcd
157 raise RuntimeError( 1feabcd
158 "Passing sa_type is not supported when also passing a sa_column"
159 )
160 if ondelete is not Undefined: 1feabcd
161 if foreign_key is Undefined: 1feabcd
162 raise RuntimeError("ondelete can only be used with foreign_key") 1feabcd
163 super().__init__(default=default, **kwargs) 1feabcd
164 self.primary_key = primary_key 1feabcd
165 self.nullable = nullable 1feabcd
166 self.foreign_key = foreign_key 1feabcd
167 self.ondelete = ondelete 1feabcd
168 self.unique = unique 1feabcd
169 self.index = index 1feabcd
170 self.sa_type = sa_type 1feabcd
171 self.sa_column = sa_column 1feabcd
172 self.sa_column_args = sa_column_args 1feabcd
173 self.sa_column_kwargs = sa_column_kwargs 1feabcd
176class RelationshipInfo(Representation): 1feabcd
177 def __init__( 1eabcd
178 self,
179 *,
180 back_populates: Optional[str] = None,
181 cascade_delete: Optional[bool] = False,
182 passive_deletes: Optional[Union[bool, Literal["all"]]] = False,
183 link_model: Optional[Any] = None,
184 sa_relationship: Optional[RelationshipProperty] = None, # type: ignore
185 sa_relationship_args: Optional[Sequence[Any]] = None,
186 sa_relationship_kwargs: Optional[Mapping[str, Any]] = None,
187 ) -> None:
188 if sa_relationship is not None: 1feabcd
189 if sa_relationship_args is not None: 1feabcd
190 raise RuntimeError( 1feabcd
191 "Passing sa_relationship_args is not supported when "
192 "also passing a sa_relationship"
193 )
194 if sa_relationship_kwargs is not None: 1feabcd
195 raise RuntimeError( 1feabcd
196 "Passing sa_relationship_kwargs is not supported when "
197 "also passing a sa_relationship"
198 )
199 self.back_populates = back_populates 1feabcd
200 self.cascade_delete = cascade_delete 1feabcd
201 self.passive_deletes = passive_deletes 1feabcd
202 self.link_model = link_model 1feabcd
203 self.sa_relationship = sa_relationship 1feabcd
204 self.sa_relationship_args = sa_relationship_args 1feabcd
205 self.sa_relationship_kwargs = sa_relationship_kwargs 1feabcd
208# include sa_type, sa_column_args, sa_column_kwargs
209@overload 1feabcd
210def Field( 1eabcd
211 default: Any = Undefined, 1feabcd
212 *,
213 default_factory: Optional[NoArgAnyCallable] = None, 1feabcd
214 alias: Optional[str] = None, 1feabcd
215 title: Optional[str] = None, 1feabcd
216 description: Optional[str] = None, 1feabcd
217 exclude: Union[ 1eabcd
218 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1eabcd
219 ] = None, 1feabcd
220 include: Union[ 1eabcd
221 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1eabcd
222 ] = None, 1feabcd
223 const: Optional[bool] = None, 1feabcd
224 gt: Optional[float] = None, 1feabcd
225 ge: Optional[float] = None, 1feabcd
226 lt: Optional[float] = None, 1feabcd
227 le: Optional[float] = None, 1feabcd
228 multiple_of: Optional[float] = None, 1feabcd
229 max_digits: Optional[int] = None, 1feabcd
230 decimal_places: Optional[int] = None, 1feabcd
231 min_items: Optional[int] = None, 1feabcd
232 max_items: Optional[int] = None, 1feabcd
233 unique_items: Optional[bool] = None, 1feabcd
234 min_length: Optional[int] = None, 1feabcd
235 max_length: Optional[int] = None, 1feabcd
236 allow_mutation: bool = True, 1feabcd
237 regex: Optional[str] = None, 1feabcd
238 discriminator: Optional[str] = None, 1feabcd
239 repr: bool = True, 1feabcd
240 primary_key: Union[bool, UndefinedType] = Undefined, 1feabcd
241 foreign_key: Any = Undefined, 1feabcd
242 unique: Union[bool, UndefinedType] = Undefined, 1feabcd
243 nullable: Union[bool, UndefinedType] = Undefined, 1feabcd
244 index: Union[bool, UndefinedType] = Undefined, 1feabcd
245 sa_type: Union[Type[Any], UndefinedType] = Undefined, 1feabcd
246 sa_column_args: Union[Sequence[Any], UndefinedType] = Undefined, 1feabcd
247 sa_column_kwargs: Union[Mapping[str, Any], UndefinedType] = Undefined, 1feabcd
248 schema_extra: Optional[Dict[str, Any]] = None, 1feabcd
249) -> Any: ... 1feabcd
252# When foreign_key is str, include ondelete
253# include sa_type, sa_column_args, sa_column_kwargs
254@overload 1feabcd
255def Field( 1eabcd
256 default: Any = Undefined, 1feabcd
257 *,
258 default_factory: Optional[NoArgAnyCallable] = None, 1feabcd
259 alias: Optional[str] = None, 1feabcd
260 title: Optional[str] = None, 1feabcd
261 description: Optional[str] = None, 1feabcd
262 exclude: Union[ 1eabcd
263 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1eabcd
264 ] = None, 1feabcd
265 include: Union[ 1eabcd
266 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1eabcd
267 ] = None, 1feabcd
268 const: Optional[bool] = None, 1feabcd
269 gt: Optional[float] = None, 1feabcd
270 ge: Optional[float] = None, 1feabcd
271 lt: Optional[float] = None, 1feabcd
272 le: Optional[float] = None, 1feabcd
273 multiple_of: Optional[float] = None, 1feabcd
274 max_digits: Optional[int] = None, 1feabcd
275 decimal_places: Optional[int] = None, 1feabcd
276 min_items: Optional[int] = None, 1feabcd
277 max_items: Optional[int] = None, 1feabcd
278 unique_items: Optional[bool] = None, 1feabcd
279 min_length: Optional[int] = None, 1feabcd
280 max_length: Optional[int] = None, 1feabcd
281 allow_mutation: bool = True, 1feabcd
282 regex: Optional[str] = None, 1feabcd
283 discriminator: Optional[str] = None, 1feabcd
284 repr: bool = True, 1feabcd
285 primary_key: Union[bool, UndefinedType] = Undefined, 1feabcd
286 foreign_key: str, 1eabcd
287 ondelete: Union[OnDeleteType, UndefinedType] = Undefined, 1feabcd
288 unique: Union[bool, UndefinedType] = Undefined, 1feabcd
289 nullable: Union[bool, UndefinedType] = Undefined, 1feabcd
290 index: Union[bool, UndefinedType] = Undefined, 1feabcd
291 sa_type: Union[Type[Any], UndefinedType] = Undefined, 1feabcd
292 sa_column_args: Union[Sequence[Any], UndefinedType] = Undefined, 1feabcd
293 sa_column_kwargs: Union[Mapping[str, Any], UndefinedType] = Undefined, 1feabcd
294 schema_extra: Optional[Dict[str, Any]] = None, 1feabcd
295) -> Any: ... 1feabcd
298# Include sa_column, don't include
299# primary_key
300# foreign_key
301# ondelete
302# unique
303# nullable
304# index
305# sa_type
306# sa_column_args
307# sa_column_kwargs
308@overload 1feabcd
309def Field( 1eabcd
310 default: Any = Undefined, 1feabcd
311 *,
312 default_factory: Optional[NoArgAnyCallable] = None, 1feabcd
313 alias: Optional[str] = None, 1feabcd
314 title: Optional[str] = None, 1feabcd
315 description: Optional[str] = None, 1feabcd
316 exclude: Union[ 1eabcd
317 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1eabcd
318 ] = None, 1feabcd
319 include: Union[ 1eabcd
320 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any 1eabcd
321 ] = None, 1feabcd
322 const: Optional[bool] = None, 1feabcd
323 gt: Optional[float] = None, 1feabcd
324 ge: Optional[float] = None, 1feabcd
325 lt: Optional[float] = None, 1feabcd
326 le: Optional[float] = None, 1feabcd
327 multiple_of: Optional[float] = None, 1feabcd
328 max_digits: Optional[int] = None, 1feabcd
329 decimal_places: Optional[int] = None, 1feabcd
330 min_items: Optional[int] = None, 1feabcd
331 max_items: Optional[int] = None, 1feabcd
332 unique_items: Optional[bool] = None, 1feabcd
333 min_length: Optional[int] = None, 1feabcd
334 max_length: Optional[int] = None, 1feabcd
335 allow_mutation: bool = True, 1feabcd
336 regex: Optional[str] = None, 1feabcd
337 discriminator: Optional[str] = None, 1feabcd
338 repr: bool = True, 1feabcd
339 sa_column: Union[Column, UndefinedType] = Undefined, # type: ignore 1feabcd
340 schema_extra: Optional[Dict[str, Any]] = None, 1feabcd
341) -> Any: ... 1feabcd
344def Field( 1eabcd
345 default: Any = Undefined,
346 *,
347 default_factory: Optional[NoArgAnyCallable] = None,
348 alias: Optional[str] = None,
349 title: Optional[str] = None,
350 description: Optional[str] = None,
351 exclude: Union[
352 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any
353 ] = None,
354 include: Union[
355 AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any
356 ] = None,
357 const: Optional[bool] = None,
358 gt: Optional[float] = None,
359 ge: Optional[float] = None,
360 lt: Optional[float] = None,
361 le: Optional[float] = None,
362 multiple_of: Optional[float] = None,
363 max_digits: Optional[int] = None,
364 decimal_places: Optional[int] = None,
365 min_items: Optional[int] = None,
366 max_items: Optional[int] = None,
367 unique_items: Optional[bool] = None,
368 min_length: Optional[int] = None,
369 max_length: Optional[int] = None,
370 allow_mutation: bool = True,
371 regex: Optional[str] = None,
372 discriminator: Optional[str] = None,
373 repr: bool = True,
374 primary_key: Union[bool, UndefinedType] = Undefined,
375 foreign_key: Any = Undefined,
376 ondelete: Union[OnDeleteType, UndefinedType] = Undefined,
377 unique: Union[bool, UndefinedType] = Undefined,
378 nullable: Union[bool, UndefinedType] = Undefined,
379 index: Union[bool, UndefinedType] = Undefined,
380 sa_type: Union[Type[Any], UndefinedType] = Undefined,
381 sa_column: Union[Column, UndefinedType] = Undefined, # type: ignore
382 sa_column_args: Union[Sequence[Any], UndefinedType] = Undefined,
383 sa_column_kwargs: Union[Mapping[str, Any], UndefinedType] = Undefined,
384 schema_extra: Optional[Dict[str, Any]] = None,
385) -> Any:
386 current_schema_extra = schema_extra or {} 1feabcd
387 field_info = FieldInfo( 1feabcd
388 default,
389 default_factory=default_factory,
390 alias=alias,
391 title=title,
392 description=description,
393 exclude=exclude,
394 include=include,
395 const=const,
396 gt=gt,
397 ge=ge,
398 lt=lt,
399 le=le,
400 multiple_of=multiple_of,
401 max_digits=max_digits,
402 decimal_places=decimal_places,
403 min_items=min_items,
404 max_items=max_items,
405 unique_items=unique_items,
406 min_length=min_length,
407 max_length=max_length,
408 allow_mutation=allow_mutation,
409 regex=regex,
410 discriminator=discriminator,
411 repr=repr,
412 primary_key=primary_key,
413 foreign_key=foreign_key,
414 ondelete=ondelete,
415 unique=unique,
416 nullable=nullable,
417 index=index,
418 sa_type=sa_type,
419 sa_column=sa_column,
420 sa_column_args=sa_column_args,
421 sa_column_kwargs=sa_column_kwargs,
422 **current_schema_extra,
423 )
424 post_init_field_info(field_info) 1feabcd
425 return field_info 1feabcd
428@overload 1feabcd
429def Relationship( 1eabcd
430 *,
431 back_populates: Optional[str] = None, 1feabcd
432 cascade_delete: Optional[bool] = False, 1feabcd
433 passive_deletes: Optional[Union[bool, Literal["all"]]] = False, 1feabcd
434 link_model: Optional[Any] = None, 1feabcd
435 sa_relationship_args: Optional[Sequence[Any]] = None, 1feabcd
436 sa_relationship_kwargs: Optional[Mapping[str, Any]] = None, 1feabcd
437) -> Any: ... 1feabcd
440@overload 1feabcd
441def Relationship( 1eabcd
442 *,
443 back_populates: Optional[str] = None, 1feabcd
444 cascade_delete: Optional[bool] = False, 1feabcd
445 passive_deletes: Optional[Union[bool, Literal["all"]]] = False, 1feabcd
446 link_model: Optional[Any] = None, 1feabcd
447 sa_relationship: Optional[RelationshipProperty[Any]] = None, 1feabcd
448) -> Any: ... 1feabcd
451def Relationship( 1eabcd
452 *,
453 back_populates: Optional[str] = None,
454 cascade_delete: Optional[bool] = False,
455 passive_deletes: Optional[Union[bool, Literal["all"]]] = False,
456 link_model: Optional[Any] = None,
457 sa_relationship: Optional[RelationshipProperty[Any]] = None,
458 sa_relationship_args: Optional[Sequence[Any]] = None,
459 sa_relationship_kwargs: Optional[Mapping[str, Any]] = None,
460) -> Any:
461 relationship_info = RelationshipInfo( 1feabcd
462 back_populates=back_populates,
463 cascade_delete=cascade_delete,
464 passive_deletes=passive_deletes,
465 link_model=link_model,
466 sa_relationship=sa_relationship,
467 sa_relationship_args=sa_relationship_args,
468 sa_relationship_kwargs=sa_relationship_kwargs,
469 )
470 return relationship_info 1feabcd
473@__dataclass_transform__(kw_only_default=True, field_descriptors=(Field, FieldInfo)) 1feabcd
474class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta): 1feabcd
475 __sqlmodel_relationships__: Dict[str, RelationshipInfo] 1feabcd
476 model_config: SQLModelConfig 1feabcd
477 model_fields: Dict[str, FieldInfo] 1feabcd
478 __config__: Type[SQLModelConfig] 1feabcd
479 __fields__: Dict[str, ModelField] # type: ignore[assignment] 1feabcd
481 # Replicate SQLAlchemy
482 def __setattr__(cls, name: str, value: Any) -> None: 1feabcd
483 if is_table_model_class(cls): 1feabcd
484 DeclarativeMeta.__setattr__(cls, name, value) 1feabcd
485 else:
486 super().__setattr__(name, value) 1feabcd
488 def __delattr__(cls, name: str) -> None: 1feabcd
489 if is_table_model_class(cls): 1feabcd
490 DeclarativeMeta.__delattr__(cls, name) 1feabcd
491 else:
492 super().__delattr__(name) 1fe
494 # From Pydantic
495 def __new__( 1eabcd
496 cls,
497 name: str,
498 bases: Tuple[Type[Any], ...],
499 class_dict: Dict[str, Any],
500 **kwargs: Any,
501 ) -> Any:
502 relationships: Dict[str, RelationshipInfo] = {} 1feabcd
503 dict_for_pydantic = {} 1feabcd
504 original_annotations = get_annotations(class_dict) 1feabcd
505 pydantic_annotations = {} 1feabcd
506 relationship_annotations = {} 1feabcd
507 for k, v in class_dict.items(): 1feabcd
508 if isinstance(v, RelationshipInfo): 1feabcd
509 relationships[k] = v 1feabcd
510 else:
511 dict_for_pydantic[k] = v 1feabcd
512 for k, v in original_annotations.items(): 1feabcd
513 if k in relationships: 1feabcd
514 relationship_annotations[k] = v 1feabcd
515 else:
516 pydantic_annotations[k] = v 1feabcd
517 dict_used = { 1eabcd
518 **dict_for_pydantic,
519 "__weakref__": None,
520 "__sqlmodel_relationships__": relationships,
521 "__annotations__": pydantic_annotations,
522 }
523 # Duplicate logic from Pydantic to filter config kwargs because if they are
524 # passed directly including the registry Pydantic will pass them over to the
525 # superclass causing an error
526 allowed_config_kwargs: Set[str] = { 1feabcd
527 key
528 for key in dir(BaseConfig)
529 if not (
530 key.startswith("__") and key.endswith("__")
531 ) # skip dunder methods and attributes
532 }
533 config_kwargs = { 1feabcd
534 key: kwargs[key] for key in kwargs.keys() & allowed_config_kwargs
535 }
536 new_cls = super().__new__(cls, name, bases, dict_used, **config_kwargs) 1feabcd
537 new_cls.__annotations__ = { 1eabcd
538 **relationship_annotations,
539 **pydantic_annotations,
540 **new_cls.__annotations__,
541 }
543 def get_config(name: str) -> Any: 1feabcd
544 config_class_value = get_config_value( 1feabcd
545 model=new_cls, parameter=name, default=Undefined
546 )
547 if config_class_value is not Undefined: 1feabcd
548 return config_class_value 1abcd
549 kwarg_value = kwargs.get(name, Undefined) 1feabcd
550 if kwarg_value is not Undefined: 1feabcd
551 return kwarg_value 1feabcd
552 return Undefined 1feabcd
554 config_table = get_config("table") 1feabcd
555 if config_table is True: 1feabcd
556 # If it was passed by kwargs, ensure it's also set in config
557 set_config_value(model=new_cls, parameter="table", value=config_table) 1feabcd
558 for k, v in get_model_fields(new_cls).items(): 1feabcd
559 col = get_column_from_field(v) 1feabcd
560 setattr(new_cls, k, col) 1feabcd
561 # Set a config flag to tell FastAPI that this should be read with a field
562 # in orm_mode instead of preemptively converting it to a dict.
563 # This could be done by reading new_cls.model_config['table'] in FastAPI, but
564 # that's very specific about SQLModel, so let's have another config that
565 # other future tools based on Pydantic can use.
566 set_config_value( 1feabcd
567 model=new_cls, parameter="read_from_attributes", value=True
568 )
569 # For compatibility with older versions
570 # TODO: remove this in the future
571 set_config_value(model=new_cls, parameter="read_with_orm_mode", value=True) 1feabcd
573 config_registry = get_config("registry") 1feabcd
574 if config_registry is not Undefined: 1feabcd
575 config_registry = cast(registry, config_registry) 1feabcd
576 # If it was passed by kwargs, ensure it's also set in config
577 set_config_value(model=new_cls, parameter="registry", value=config_table) 1feabcd
578 setattr(new_cls, "_sa_registry", config_registry) # noqa: B010 1feabcd
579 setattr(new_cls, "metadata", config_registry.metadata) # noqa: B010 1feabcd
580 setattr(new_cls, "__abstract__", True) # noqa: B010 1feabcd
581 return new_cls 1feabcd
583 # Override SQLAlchemy, allow both SQLAlchemy and plain Pydantic models
584 def __init__( 1eabcd
585 cls, classname: str, bases: Tuple[type, ...], dict_: Dict[str, Any], **kw: Any
586 ) -> None:
587 # Only one of the base classes (or the current one) should be a table model
588 # this allows FastAPI cloning a SQLModel for the response_model without
589 # trying to create a new SQLAlchemy, for a new table, with the same name, that
590 # triggers an error
591 base_is_table = any(is_table_model_class(base) for base in bases) 1feabcd
592 if is_table_model_class(cls) and not base_is_table: 1feabcd
593 for rel_name, rel_info in cls.__sqlmodel_relationships__.items(): 1feabcd
594 if rel_info.sa_relationship: 1feabcd
595 # There's a SQLAlchemy relationship declared, that takes precedence
596 # over anything else, use that and continue with the next attribute
597 setattr(cls, rel_name, rel_info.sa_relationship) # Fix #315 1feabcd
598 continue 1feabcd
599 raw_ann = cls.__annotations__[rel_name] 1feabcd
600 origin = get_origin(raw_ann) 1feabcd
601 if origin is Mapped: 1feabcd
602 ann = raw_ann.__args__[0]
603 else:
604 ann = raw_ann 1feabcd
605 # Plain forward references, for models not yet defined, are not
606 # handled well by SQLAlchemy without Mapped, so, wrap the
607 # annotations in Mapped here
608 cls.__annotations__[rel_name] = Mapped[ann] # type: ignore[valid-type] 1feabcd
609 relationship_to = get_relationship_to( 1feabcd
610 name=rel_name, rel_info=rel_info, annotation=ann
611 )
612 rel_kwargs: Dict[str, Any] = {} 1feabcd
613 if rel_info.back_populates: 1feabcd
614 rel_kwargs["back_populates"] = rel_info.back_populates 1feabcd
615 if rel_info.cascade_delete: 1feabcd
616 rel_kwargs["cascade"] = "all, delete-orphan" 1feabcd
617 if rel_info.passive_deletes: 1feabcd
618 rel_kwargs["passive_deletes"] = rel_info.passive_deletes 1feabcd
619 if rel_info.link_model: 1feabcd
620 ins = inspect(rel_info.link_model) 1feabcd
621 local_table = getattr(ins, "local_table") # noqa: B009 1feabcd
622 if local_table is None: 1feabcd
623 raise RuntimeError(
624 "Couldn't find the secondary table for "
625 f"model {rel_info.link_model}"
626 )
627 rel_kwargs["secondary"] = local_table 1feabcd
628 rel_args: List[Any] = [] 1feabcd
629 if rel_info.sa_relationship_args: 1feabcd
630 rel_args.extend(rel_info.sa_relationship_args)
631 if rel_info.sa_relationship_kwargs: 1feabcd
632 rel_kwargs.update(rel_info.sa_relationship_kwargs)
633 rel_value = relationship(relationship_to, *rel_args, **rel_kwargs) 1feabcd
634 setattr(cls, rel_name, rel_value) # Fix #315 1feabcd
635 # SQLAlchemy no longer uses dict_
636 # Ref: https://github.com/sqlalchemy/sqlalchemy/commit/428ea01f00a9cc7f85e435018565eb6da7af1b77
637 # Tag: 1.4.36
638 DeclarativeMeta.__init__(cls, classname, bases, dict_, **kw) 1feabcd
639 else:
640 ModelMetaclass.__init__(cls, classname, bases, dict_, **kw) 1feabcd
643def get_sqlalchemy_type(field: Any) -> Any: 1feabcd
644 if IS_PYDANTIC_V2: 1feabcd
645 field_info = field 1fe
646 else:
647 field_info = field.field_info 1abcd
648 sa_type = getattr(field_info, "sa_type", Undefined) # noqa: B009 1feabcd
649 if sa_type is not Undefined: 1feabcd
650 return sa_type
652 type_ = get_type_from_field(field) 1feabcd
653 metadata = get_field_metadata(field) 1feabcd
655 # Check enums first as an enum can also be a str, needed by Pydantic/FastAPI
656 if issubclass(type_, Enum): 1feabcd
657 return sa_Enum(type_) 1feabcd
658 if issubclass( 1feabcd
659 type_,
660 (
661 str,
662 ipaddress.IPv4Address,
663 ipaddress.IPv4Network,
664 ipaddress.IPv6Address,
665 ipaddress.IPv6Network,
666 Path,
667 EmailStr,
668 ),
669 ):
670 max_length = getattr(metadata, "max_length", None) 1feabcd
671 if max_length: 1feabcd
672 return AutoString(length=max_length)
673 return AutoString 1feabcd
674 if issubclass(type_, float): 1feabcd
675 return Float
676 if issubclass(type_, bool): 1feabcd
677 return Boolean 1feabcd
678 if issubclass(type_, int): 1feabcd
679 return Integer 1feabcd
680 if issubclass(type_, datetime): 1feabcd
681 return DateTime
682 if issubclass(type_, date): 1feabcd
683 return Date
684 if issubclass(type_, timedelta): 1feabcd
685 return Interval
686 if issubclass(type_, time): 1feabcd
687 return Time
688 if issubclass(type_, bytes): 1feabcd
689 return LargeBinary
690 if issubclass(type_, Decimal): 1feabcd
691 return Numeric( 1feabcd
692 precision=getattr(metadata, "max_digits", None),
693 scale=getattr(metadata, "decimal_places", None),
694 )
695 if issubclass(type_, uuid.UUID): 1feabcd
696 return Uuid 1feabcd
697 raise ValueError(f"{type_} has no matching SQLAlchemy type") 1feabcd
700def get_column_from_field(field: Any) -> Column: # type: ignore 1feabcd
701 if IS_PYDANTIC_V2: 1feabcd
702 field_info = field 1fe
703 else:
704 field_info = field.field_info 1abcd
705 sa_column = getattr(field_info, "sa_column", Undefined) 1feabcd
706 if isinstance(sa_column, Column): 1feabcd
707 return sa_column 1feabcd
708 sa_type = get_sqlalchemy_type(field) 1feabcd
709 primary_key = getattr(field_info, "primary_key", Undefined) 1feabcd
710 if primary_key is Undefined: 1feabcd
711 primary_key = False 1feabcd
712 index = getattr(field_info, "index", Undefined) 1feabcd
713 if index is Undefined: 1feabcd
714 index = False 1feabcd
715 nullable = not primary_key and is_field_noneable(field) 1feabcd
716 # Override derived nullability if the nullable property is set explicitly
717 # on the field
718 field_nullable = getattr(field_info, "nullable", Undefined) # noqa: B009 1feabcd
719 if field_nullable is not Undefined: 1feabcd
720 assert not isinstance(field_nullable, UndefinedType) 1feabcd
721 nullable = field_nullable 1feabcd
722 args = [] 1feabcd
723 foreign_key = getattr(field_info, "foreign_key", Undefined) 1feabcd
724 if foreign_key is Undefined: 1feabcd
725 foreign_key = None 1feabcd
726 unique = getattr(field_info, "unique", Undefined) 1feabcd
727 if unique is Undefined: 1feabcd
728 unique = False 1feabcd
729 if foreign_key: 1feabcd
730 if field_info.ondelete == "SET NULL" and not nullable: 1feabcd
731 raise RuntimeError('ondelete="SET NULL" requires nullable=True') 1feabcd
732 assert isinstance(foreign_key, str) 1feabcd
733 ondelete = getattr(field_info, "ondelete", Undefined) 1feabcd
734 if ondelete is Undefined: 1feabcd
735 ondelete = None 1feabcd
736 assert isinstance(ondelete, (str, type(None))) # for typing 1feabcd
737 args.append(ForeignKey(foreign_key, ondelete=ondelete)) 1feabcd
738 kwargs = { 1eabcd
739 "primary_key": primary_key,
740 "nullable": nullable,
741 "index": index,
742 "unique": unique,
743 }
744 sa_default = Undefined 1feabcd
745 if field_info.default_factory: 1feabcd
746 sa_default = field_info.default_factory 1feabcd
747 elif field_info.default is not Undefined: 1feabcd
748 sa_default = field_info.default 1feabcd
749 if sa_default is not Undefined: 1feabcd
750 kwargs["default"] = sa_default 1feabcd
751 sa_column_args = getattr(field_info, "sa_column_args", Undefined) 1feabcd
752 if sa_column_args is not Undefined: 1feabcd
753 args.extend(list(cast(Sequence[Any], sa_column_args))) 1feabcd
754 sa_column_kwargs = getattr(field_info, "sa_column_kwargs", Undefined) 1feabcd
755 if sa_column_kwargs is not Undefined: 1feabcd
756 kwargs.update(cast(Dict[Any, Any], sa_column_kwargs)) 1feabcd
757 return Column(sa_type, *args, **kwargs) # type: ignore 1feabcd
760class_registry = weakref.WeakValueDictionary() # type: ignore 1feabcd
762default_registry = registry() 1feabcd
764_TSQLModel = TypeVar("_TSQLModel", bound="SQLModel") 1feabcd
767class SQLModel(BaseModel, metaclass=SQLModelMetaclass, registry=default_registry): 1feabcd
768 # SQLAlchemy needs to set weakref(s), Pydantic will set the other slots values
769 __slots__ = ("__weakref__",) 1feabcd
770 __tablename__: ClassVar[Union[str, Callable[..., str]]] 1feabcd
771 __sqlmodel_relationships__: ClassVar[Dict[str, RelationshipProperty[Any]]] 1feabcd
772 __name__: ClassVar[str] 1feabcd
773 metadata: ClassVar[MetaData] 1feabcd
774 __allow_unmapped__ = True # https://docs.sqlalchemy.org/en/20/changelog/migration_20.html#migration-20-step-six 1feabcd
776 if IS_PYDANTIC_V2: 1feabcd
777 model_config = SQLModelConfig(from_attributes=True) 1fe
778 else:
780 class Config: 1abcd
781 orm_mode = True 1abcd
783 def __new__(cls, *args: Any, **kwargs: Any) -> Any: 1feabcd
784 new_object = super().__new__(cls) 1feabcd
785 # SQLAlchemy doesn't call __init__ on the base class when querying from DB
786 # Ref: https://docs.sqlalchemy.org/en/14/orm/constructors.html
787 # Set __fields_set__ here, that would have been set when calling __init__
788 # in the Pydantic model so that when SQLAlchemy sets attributes that are
789 # added (e.g. when querying from DB) to the __fields_set__, this already exists
790 init_pydantic_private_attrs(new_object) 1feabcd
791 return new_object 1feabcd
793 def __init__(__pydantic_self__, **data: Any) -> None: 1feabcd
794 # Uses something other than `self` the first arg to allow "self" as a
795 # settable attribute
797 # SQLAlchemy does very dark black magic and modifies the __init__ method in
798 # sqlalchemy.orm.instrumentation._generate_init()
799 # so, to make SQLAlchemy work, it's needed to explicitly call __init__ to
800 # trigger all the SQLAlchemy logic, it doesn't work using cls.__new__, setting
801 # attributes obj.__dict__, etc. The __init__ method has to be called. But
802 # there are cases where calling all the default logic is not ideal, e.g.
803 # when calling Model.model_validate(), as the validation is done outside
804 # of instance creation.
805 # At the same time, __init__ is what users would normally call, by creating
806 # a new instance, which should have validation and all the default logic.
807 # So, to be able to set up the internal SQLAlchemy logic alone without
808 # executing the rest, and support things like Model.model_validate(), we
809 # use a contextvar to know if we should execute everything.
810 if finish_init.get(): 1feabcd
811 sqlmodel_init(self=__pydantic_self__, data=data) 1feabcd
813 def __setattr__(self, name: str, value: Any) -> None: 1feabcd
814 if name in {"_sa_instance_state"}: 1feabcd
815 self.__dict__[name] = value 1feabcd
816 return 1feabcd
817 else:
818 # Set in SQLAlchemy, before Pydantic to trigger events and updates
819 if is_table_model_class(self.__class__) and is_instrumented(self, name): # type: ignore[no-untyped-call] 1feabcd
820 set_attribute(self, name, value) 1feabcd
821 # Set in Pydantic model to trigger possible validation changes, only for
822 # non relationship values
823 if name not in self.__sqlmodel_relationships__: 1feabcd
824 super().__setattr__(name, value) 1feabcd
826 def __repr_args__(self) -> Sequence[Tuple[Optional[str], Any]]: 1feabcd
827 # Don't show SQLAlchemy private attributes
828 return [ 1eabcd
829 (k, v)
830 for k, v in super().__repr_args__()
831 if not (isinstance(k, str) and k.startswith("_sa_"))
832 ]
834 @declared_attr # type: ignore 1feabcd
835 def __tablename__(cls) -> str: 1feabcd
836 return cls.__name__.lower() 1feabcd
838 @classmethod 1feabcd
839 def model_validate( 1eabcd
840 cls: Type[_TSQLModel],
841 obj: Any,
842 *,
843 strict: Union[bool, None] = None,
844 from_attributes: Union[bool, None] = None,
845 context: Union[Dict[str, Any], None] = None,
846 update: Union[Dict[str, Any], None] = None,
847 ) -> _TSQLModel:
848 return sqlmodel_validate( 1feabcd
849 cls=cls,
850 obj=obj,
851 strict=strict,
852 from_attributes=from_attributes,
853 context=context,
854 update=update,
855 )
857 def model_dump( 1eabcd
858 self,
859 *,
860 mode: Union[Literal["json", "python"], str] = "python",
861 include: IncEx = None,
862 exclude: IncEx = None,
863 context: Union[Dict[str, Any], None] = None,
864 by_alias: bool = False,
865 exclude_unset: bool = False,
866 exclude_defaults: bool = False,
867 exclude_none: bool = False,
868 round_trip: bool = False,
869 warnings: Union[bool, Literal["none", "warn", "error"]] = True,
870 serialize_as_any: bool = False,
871 ) -> Dict[str, Any]:
872 if PYDANTIC_VERSION >= "2.7.0": 1feabcd
873 extra_kwargs: Dict[str, Any] = { 1e
874 "context": context,
875 "serialize_as_any": serialize_as_any,
876 }
877 else:
878 extra_kwargs = {} 1fabcd
879 if IS_PYDANTIC_V2: 1feabcd
880 return super().model_dump( 1fe
881 mode=mode,
882 include=include,
883 exclude=exclude,
884 by_alias=by_alias,
885 exclude_unset=exclude_unset,
886 exclude_defaults=exclude_defaults,
887 exclude_none=exclude_none,
888 round_trip=round_trip,
889 warnings=warnings,
890 **extra_kwargs,
891 )
892 else:
893 return super().dict( 1abcd
894 include=include,
895 exclude=exclude,
896 by_alias=by_alias,
897 exclude_unset=exclude_unset,
898 exclude_defaults=exclude_defaults,
899 exclude_none=exclude_none,
900 )
902 @deprecated( 1feabcd
903 """
904 🚨 `obj.dict()` was deprecated in SQLModel 0.0.14, you should
905 instead use `obj.model_dump()`.
906 """
907 )
908 def dict( 1eabcd
909 self,
910 *,
911 include: IncEx = None,
912 exclude: IncEx = None,
913 by_alias: bool = False,
914 exclude_unset: bool = False,
915 exclude_defaults: bool = False,
916 exclude_none: bool = False,
917 ) -> Dict[str, Any]:
918 return self.model_dump( 1feabcd
919 include=include,
920 exclude=exclude,
921 by_alias=by_alias,
922 exclude_unset=exclude_unset,
923 exclude_defaults=exclude_defaults,
924 exclude_none=exclude_none,
925 )
927 @classmethod 1feabcd
928 @deprecated( 1feabcd
929 """
930 🚨 `obj.from_orm(data)` was deprecated in SQLModel 0.0.14, you should
931 instead use `obj.model_validate(data)`.
932 """
933 )
934 def from_orm( 1eabcd
935 cls: Type[_TSQLModel], obj: Any, update: Optional[Dict[str, Any]] = None
936 ) -> _TSQLModel:
937 return cls.model_validate(obj, update=update) 1feabcd
939 @classmethod 1feabcd
940 @deprecated( 1feabcd
941 """
942 🚨 `obj.parse_obj(data)` was deprecated in SQLModel 0.0.14, you should
943 instead use `obj.model_validate(data)`.
944 """
945 )
946 def parse_obj( 1eabcd
947 cls: Type[_TSQLModel], obj: Any, update: Optional[Dict[str, Any]] = None
948 ) -> _TSQLModel:
949 if not IS_PYDANTIC_V2: 1feabcd
950 obj = cls._enforce_dict_if_root(obj) # type: ignore[attr-defined] # noqa 1abcd
951 return cls.model_validate(obj, update=update) 1feabcd
953 # From Pydantic, override to only show keys from fields, omit SQLAlchemy attributes
954 @deprecated( 1feabcd
955 """
956 🚨 You should not access `obj._calculate_keys()` directly.
958 It is only useful for Pydantic v1.X, you should probably upgrade to
959 Pydantic v2.X.
960 """,
961 category=None,
962 )
963 def _calculate_keys( 1eabcd
964 self,
965 include: Optional[Mapping[Union[int, str], Any]],
966 exclude: Optional[Mapping[Union[int, str], Any]],
967 exclude_unset: bool,
968 update: Optional[Dict[str, Any]] = None,
969 ) -> Optional[AbstractSet[str]]:
970 return _calculate_keys( 1abcd
971 self,
972 include=include,
973 exclude=exclude,
974 exclude_unset=exclude_unset,
975 update=update,
976 )
978 def sqlmodel_update( 1eabcd
979 self: _TSQLModel,
980 obj: Union[Dict[str, Any], BaseModel],
981 *,
982 update: Union[Dict[str, Any], None] = None,
983 ) -> _TSQLModel:
984 use_update = (update or {}).copy() 1feabcd
985 if isinstance(obj, dict): 1feabcd
986 for key, value in {**obj, **use_update}.items(): 1feabcd
987 if key in get_model_fields(self): 1feabcd
988 setattr(self, key, value) 1feabcd
989 elif isinstance(obj, BaseModel):
990 for key in get_model_fields(obj):
991 if key in use_update:
992 value = use_update.pop(key)
993 else:
994 value = getattr(obj, key)
995 setattr(self, key, value)
996 for remaining_key in use_update:
997 if remaining_key in get_model_fields(self):
998 value = use_update.pop(remaining_key)
999 setattr(self, remaining_key, value)
1000 else:
1001 raise ValueError(
1002 "Can't use sqlmodel_update() with something that "
1003 f"is not a dict or SQLModel or Pydantic model: {obj}"
1004 )
1005 return self 1feabcd