Coverage for bbconf/db_utils.py: 72%
204 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-17 04:01 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-17 04:01 +0000
1import datetime
2import logging
3from typing import List, Optional
4import pandas as pd
6from sqlalchemy import TIMESTAMP, BigInteger, ForeignKey, Result, Select, event, select
7from sqlalchemy.dialects.postgresql import JSON
8from sqlalchemy.engine import URL, Engine, create_engine
9from sqlalchemy.event import listens_for
10from sqlalchemy.exc import ProgrammingError, IntegrityError
11from sqlalchemy.ext.compiler import compiles
12from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, relationship
13from sqlalchemy_schemadisplay import create_schema_graph
15from bbconf.const import PKG_NAME, LICENSES_CSV_URL
17_LOGGER = logging.getLogger(PKG_NAME)
20POSTGRES_DIALECT = "postgresql+psycopg"
22# tables, that were created in this execution
23tables_initialized: list = []
26class SchemaError(Exception):
27 def __init__(self):
28 super().__init__(
29 """The database schema is incorrect, can't connect to the database!"""
30 )
33class BIGSERIAL(BigInteger):
34 pass
37@compiles(BIGSERIAL, POSTGRES_DIALECT)
38def compile_bigserial_pg(type_, compiler, **kw):
39 return "BIGSERIAL"
42@compiles(JSON, POSTGRES_DIALECT)
43def compile_jsonb_pg(type_, compiler, **kw):
44 return "JSONB"
47class Base(DeclarativeBase):
48 type_annotation_map = {datetime.datetime: TIMESTAMP(timezone=True)}
51@event.listens_for(Base.metadata, "after_create")
52def receive_after_create(target, connection, tables, **kw):
53 """
54 listen for the 'after_create' event
55 """
56 global tables_initialized
57 if tables:
58 _LOGGER.info("A table was created")
59 tables_initialized = [name.fullname for name in tables]
60 else:
61 _LOGGER.info("A table was not created")
64def deliver_update_date(context):
65 return datetime.datetime.now(datetime.timezone.utc)
68class Bed(Base):
69 __tablename__ = "bed"
71 id: Mapped[str] = mapped_column(primary_key=True, index=True)
72 name: Mapped[Optional[str]]
73 genome_alias: Mapped[Optional[str]]
74 genome_digest: Mapped[Optional[str]]
75 description: Mapped[Optional[str]]
76 bed_type: Mapped[str] = mapped_column(default="bed3")
77 bed_format: Mapped[str] = mapped_column(default="bed")
78 indexed: Mapped[bool] = mapped_column(
79 default=False, comment="Whether sample was added to qdrant"
80 )
81 pephub: Mapped[bool] = mapped_column(
82 default=False, comment="Whether sample was added to pephub"
83 )
85 submission_date: Mapped[datetime.datetime] = mapped_column(
86 default=deliver_update_date
87 )
88 last_update_date: Mapped[Optional[datetime.datetime]] = mapped_column(
89 default=deliver_update_date,
90 onupdate=deliver_update_date,
91 )
92 is_universe: Mapped[Optional[bool]] = mapped_column(default=False)
94 files: Mapped[List["Files"]] = relationship(
95 "Files", back_populates="bedfile", cascade="all, delete-orphan"
96 )
98 bedsets: Mapped[List["BedFileBedSetRelation"]] = relationship(
99 "BedFileBedSetRelation", back_populates="bedfile", cascade="all, delete-orphan"
100 )
102 stats: Mapped["BedStats"] = relationship(
103 back_populates="bed", cascade="all, delete-orphan"
104 )
106 universe: Mapped["Universes"] = relationship(
107 back_populates="bed", cascade="all, delete-orphan"
108 )
109 tokenized: Mapped["TokenizedBed"] = relationship(
110 back_populates="bed", cascade="all, delete-orphan"
111 )
112 license_id: Mapped["str"] = mapped_column(
113 ForeignKey("licenses.id", ondelete="CASCADE"), nullable=True, index=True
114 )
115 license_mapping: Mapped["License"] = relationship("License", back_populates="bed")
118class BedStats(Base):
119 __tablename__ = "bed_stats"
121 id: Mapped[str] = mapped_column(
122 ForeignKey("bed.id", ondelete="CASCADE"),
123 primary_key=True,
124 index=True,
125 )
126 number_of_regions: Mapped[Optional[float]]
127 gc_content: Mapped[Optional[float]]
128 median_tss_dist: Mapped[Optional[float]]
129 mean_region_width: Mapped[Optional[float]]
130 exon_frequency: Mapped[Optional[float]]
131 intron_frequency: Mapped[Optional[float]]
132 promoterprox_frequency: Mapped[Optional[float]]
133 intergenic_frequency: Mapped[Optional[float]]
134 promotercore_frequency: Mapped[Optional[float]]
135 fiveutr_frequency: Mapped[Optional[float]]
136 threeutr_frequency: Mapped[Optional[float]]
137 fiveutr_percentage: Mapped[Optional[float]]
138 threeutr_percentage: Mapped[Optional[float]]
139 promoterprox_percentage: Mapped[Optional[float]]
140 exon_percentage: Mapped[Optional[float]]
141 intron_percentage: Mapped[Optional[float]]
142 intergenic_percentage: Mapped[Optional[float]]
143 promotercore_percentage: Mapped[Optional[float]]
144 tssdist: Mapped[Optional[float]]
146 bed: Mapped["Bed"] = relationship("Bed", back_populates="stats")
149class Files(Base):
150 __tablename__ = "files"
152 id: Mapped[int] = mapped_column(primary_key=True, index=True)
153 name: Mapped[str] = mapped_column(
154 nullable=False, comment="Name of the file, e.g. bed, bigBed"
155 )
156 title: Mapped[Optional[str]]
157 type: Mapped[str] = mapped_column(
158 default="file", comment="Type of the object, e.g. file, plot, ..."
159 )
160 path: Mapped[str]
161 path_thumbnail: Mapped[str] = mapped_column(
162 nullable=True, comment="Thumbnail path of the file"
163 )
164 description: Mapped[Optional[str]]
165 size: Mapped[Optional[int]] = mapped_column(default=0, comment="Size of the file")
167 bedfile_id: Mapped[str] = mapped_column(
168 ForeignKey("bed.id", ondelete="CASCADE"), nullable=True, index=True
169 )
170 bedset_id: Mapped[str] = mapped_column(
171 ForeignKey("bedsets.id", ondelete="CASCADE"), nullable=True, index=True
172 )
174 bedfile: Mapped["Bed"] = relationship("Bed", back_populates="files")
175 bedset: Mapped["BedSets"] = relationship("BedSets", back_populates="files")
178class BedFileBedSetRelation(Base):
179 __tablename__ = "bedfile_bedset_relation"
181 bedset_id: Mapped[str] = mapped_column(
182 ForeignKey("bedsets.id", ondelete="CASCADE"), primary_key=True
183 )
184 bedfile_id: Mapped[str] = mapped_column(
185 ForeignKey("bed.id", ondelete="CASCADE"), primary_key=True
186 )
188 bedset: Mapped["BedSets"] = relationship("BedSets", back_populates="bedfiles")
189 bedfile: Mapped["Bed"] = relationship("Bed", back_populates="bedsets")
192class BedSets(Base):
193 __tablename__ = "bedsets"
195 id: Mapped[str] = mapped_column(primary_key=True, index=True)
196 name: Mapped[str] = mapped_column(nullable=False, comment="Name of the bedset")
197 description: Mapped[Optional[str]] = mapped_column(
198 comment="Description of the bedset"
199 )
200 submission_date: Mapped[datetime.datetime] = mapped_column(
201 default=deliver_update_date
202 )
203 last_update_date: Mapped[Optional[datetime.datetime]] = mapped_column(
204 default=deliver_update_date,
205 onupdate=deliver_update_date,
206 )
207 md5sum: Mapped[Optional[str]] = mapped_column(comment="MD5 sum of the bedset")
209 bedset_means: Mapped[Optional[dict]] = mapped_column(
210 JSON, comment="Mean values of the bedset"
211 )
212 bedset_standard_deviation: Mapped[Optional[dict]] = mapped_column(
213 JSON, comment="Median values of the bedset"
214 )
216 bedfiles: Mapped[List["BedFileBedSetRelation"]] = relationship(
217 "BedFileBedSetRelation", back_populates="bedset", cascade="all, delete-orphan"
218 )
219 files: Mapped[List["Files"]] = relationship("Files", back_populates="bedset")
220 universe: Mapped["Universes"] = relationship("Universes", back_populates="bedset")
223class Universes(Base):
224 __tablename__ = "universes"
226 id: Mapped[str] = mapped_column(
227 ForeignKey("bed.id", ondelete="CASCADE"),
228 primary_key=True,
229 index=True,
230 )
231 method: Mapped[str] = mapped_column(
232 nullable=True, comment="Method used to create the universe"
233 )
234 bedset_id: Mapped[str] = mapped_column(
235 ForeignKey("bedsets.id", ondelete="CASCADE"),
236 index=True,
237 nullable=True,
238 )
240 bed: Mapped["Bed"] = relationship("Bed", back_populates="universe")
241 bedset: Mapped["BedSets"] = relationship("BedSets", back_populates="universe")
242 tokenized: Mapped["TokenizedBed"] = relationship(
243 "TokenizedBed",
244 back_populates="universe",
245 )
248class TokenizedBed(Base):
249 __tablename__ = "tokenized_bed"
251 bed_id: Mapped[str] = mapped_column(
252 ForeignKey("bed.id", ondelete="CASCADE"),
253 primary_key=True,
254 index=True,
255 nullable=False,
256 )
257 universe_id: Mapped[str] = mapped_column(
258 ForeignKey("universes.id", ondelete="CASCADE"),
259 primary_key=True,
260 index=True,
261 nullable=False,
262 )
263 path: Mapped[str] = mapped_column(
264 nullable=False, comment="Path to the tokenized bed file"
265 )
267 bed: Mapped["Bed"] = relationship("Bed", back_populates="tokenized")
268 universe: Mapped["Universes"] = relationship(
269 "Universes", back_populates="tokenized"
270 )
273class License(Base):
274 __tablename__ = "licenses"
276 id: Mapped[str] = mapped_column(primary_key=True, index=True)
277 shorthand: Mapped[str] = mapped_column(nullable=True, comment="License shorthand")
278 label: Mapped[str] = mapped_column(nullable=False, comment="License label")
279 description: Mapped[str] = mapped_column(
280 nullable=False, comment="License description"
281 )
283 bed: Mapped[List["Bed"]] = relationship("Bed", back_populates="license_mapping")
286@listens_for(Universes, "after_insert")
287@listens_for(Universes, "after_update")
288def add_bed_universe(mapper, connection, target):
289 with Session(connection) as session:
290 bed = session.scalar(select(Bed).where(Bed.id == target.id))
291 bed.is_universe = True
292 session.commit()
295@listens_for(Universes, "after_delete")
296def delete_bed_universe(mapper, connection, target):
297 with Session(connection) as session:
298 bed = session.scalar(select(Bed).where(Bed.id == target.id))
299 bed.is_universe = False
300 session.commit()
303class BaseEngine:
304 """
305 A class with base methods, that are used in several classes.
306 """
308 def __init__(
309 self,
310 *,
311 host: str = "localhost",
312 port: int = 5432,
313 database: str = "bedbase",
314 user: str = None,
315 password: str = None,
316 drivername: str = POSTGRES_DIALECT,
317 dsn: str = None,
318 echo: bool = False,
319 ):
320 """
321 Initialize connection to the bedbase database. You can use The basic connection parameters
322 or libpq connection string.
324 :param host: database server address e.g., localhost or an IP address.
325 :param port: the port number that defaults to 5432 if it is not provided.
326 :param database: the name of the database that you want to connect.
327 :param user: the username used to authenticate.
328 :param password: password used to authenticate.
329 :param drivername: driver used in
330 :param dsn: libpq connection string using the dsn parameter
331 (e.g. 'postgresql://user_name:password@host_name:port/db_name')
332 """
333 if not dsn:
334 dsn = URL.create(
335 host=host,
336 port=port,
337 database=database,
338 username=user,
339 password=password,
340 drivername=drivername,
341 )
343 self._engine = create_engine(dsn, echo=echo)
344 self.create_schema(self._engine)
345 self.check_db_connection()
347 def create_schema(self, engine=None):
348 """
349 Create sql schema in the database.
351 :param engine: sqlalchemy engine [Default: None]
352 :return: None
353 """
354 if not engine:
355 engine = self._engine
356 Base.metadata.create_all(engine)
358 global tables_initialized
359 if License.__tablename__ in tables_initialized:
360 try:
361 # It is weired, but tables are sometimes initialized twice. Or it says like that...
362 self._upload_licenses()
363 except IntegrityError:
364 pass
366 def delete_schema(self, engine=None) -> None:
367 """
368 Delete sql schema in the database.
370 :param engine: sqlalchemy engine [Default: None]
371 :return: None
372 """
373 if not engine:
374 engine = self._engine
375 Base.metadata.drop_all(engine)
376 return None
378 def session_execute(self, statement: Select) -> Result:
379 """
380 Execute statement using sqlalchemy statement
382 :param statement: SQL query or a SQL expression that is constructed using
383 SQLAlchemy's SQL expression language
384 :return: query result represented with declarative base
385 """
386 _LOGGER.debug(f"Executing statement: {statement}")
387 with Session(self._engine) as session:
388 query_result = session.execute(statement)
390 return query_result
392 @property
393 def session(self):
394 """
395 :return: started sqlalchemy session
396 """
397 return self._start_session()
399 @property
400 def engine(self) -> Engine:
401 """
402 :return: sqlalchemy engine
403 """
404 return self._engine
406 def _start_session(self):
407 session = Session(self.engine)
408 try:
409 session.execute(select(Bed).limit(1))
410 except ProgrammingError:
411 raise SchemaError()
413 return session
415 def check_db_connection(self):
416 try:
417 self.session_execute(select(Bed).limit(1))
418 except ProgrammingError:
419 raise SchemaError()
421 def create_schema_graph(self, output_file: str = "schema.svg"):
422 """
423 Create schema graph of the database.
425 :param output_file: path to the output file
426 :return: None
427 """
428 graph = create_schema_graph(engine=self.engine, metadata=Base.metadata)
429 graph.write(output_file, format="svg", prog="dot")
430 return None
432 def _upload_licenses(self):
433 """
434 Upload licenses to the database.
435 """
437 _LOGGER.info("Uploading licenses to the database...")
438 df = pd.read_csv(LICENSES_CSV_URL)
440 with Session(self.engine) as session:
441 df.to_sql(
442 License.__tablename__, self.engine, if_exists="append", index=False
443 )
444 session.commit()
446 _LOGGER.info("Licenses uploaded successfully!")