Coverage for bbconf/db_utils.py: 72%

204 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-17 04:01 +0000

1import datetime 

2import logging 

3from typing import List, Optional 

4import pandas as pd 

5 

6from sqlalchemy import TIMESTAMP, BigInteger, ForeignKey, Result, Select, event, select 

7from sqlalchemy.dialects.postgresql import JSON 

8from sqlalchemy.engine import URL, Engine, create_engine 

9from sqlalchemy.event import listens_for 

10from sqlalchemy.exc import ProgrammingError, IntegrityError 

11from sqlalchemy.ext.compiler import compiles 

12from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, relationship 

13from sqlalchemy_schemadisplay import create_schema_graph 

14 

15from bbconf.const import PKG_NAME, LICENSES_CSV_URL 

16 

17_LOGGER = logging.getLogger(PKG_NAME) 

18 

19 

20POSTGRES_DIALECT = "postgresql+psycopg" 

21 

22# tables, that were created in this execution 

23tables_initialized: list = [] 

24 

25 

26class SchemaError(Exception): 

27 def __init__(self): 

28 super().__init__( 

29 """The database schema is incorrect, can't connect to the database!""" 

30 ) 

31 

32 

33class BIGSERIAL(BigInteger): 

34 pass 

35 

36 

37@compiles(BIGSERIAL, POSTGRES_DIALECT) 

38def compile_bigserial_pg(type_, compiler, **kw): 

39 return "BIGSERIAL" 

40 

41 

42@compiles(JSON, POSTGRES_DIALECT) 

43def compile_jsonb_pg(type_, compiler, **kw): 

44 return "JSONB" 

45 

46 

47class Base(DeclarativeBase): 

48 type_annotation_map = {datetime.datetime: TIMESTAMP(timezone=True)} 

49 

50 

51@event.listens_for(Base.metadata, "after_create") 

52def receive_after_create(target, connection, tables, **kw): 

53 """ 

54 listen for the 'after_create' event 

55 """ 

56 global tables_initialized 

57 if tables: 

58 _LOGGER.info("A table was created") 

59 tables_initialized = [name.fullname for name in tables] 

60 else: 

61 _LOGGER.info("A table was not created") 

62 

63 

64def deliver_update_date(context): 

65 return datetime.datetime.now(datetime.timezone.utc) 

66 

67 

68class Bed(Base): 

69 __tablename__ = "bed" 

70 

71 id: Mapped[str] = mapped_column(primary_key=True, index=True) 

72 name: Mapped[Optional[str]] 

73 genome_alias: Mapped[Optional[str]] 

74 genome_digest: Mapped[Optional[str]] 

75 description: Mapped[Optional[str]] 

76 bed_type: Mapped[str] = mapped_column(default="bed3") 

77 bed_format: Mapped[str] = mapped_column(default="bed") 

78 indexed: Mapped[bool] = mapped_column( 

79 default=False, comment="Whether sample was added to qdrant" 

80 ) 

81 pephub: Mapped[bool] = mapped_column( 

82 default=False, comment="Whether sample was added to pephub" 

83 ) 

84 

85 submission_date: Mapped[datetime.datetime] = mapped_column( 

86 default=deliver_update_date 

87 ) 

88 last_update_date: Mapped[Optional[datetime.datetime]] = mapped_column( 

89 default=deliver_update_date, 

90 onupdate=deliver_update_date, 

91 ) 

92 is_universe: Mapped[Optional[bool]] = mapped_column(default=False) 

93 

94 files: Mapped[List["Files"]] = relationship( 

95 "Files", back_populates="bedfile", cascade="all, delete-orphan" 

96 ) 

97 

98 bedsets: Mapped[List["BedFileBedSetRelation"]] = relationship( 

99 "BedFileBedSetRelation", back_populates="bedfile", cascade="all, delete-orphan" 

100 ) 

101 

102 stats: Mapped["BedStats"] = relationship( 

103 back_populates="bed", cascade="all, delete-orphan" 

104 ) 

105 

106 universe: Mapped["Universes"] = relationship( 

107 back_populates="bed", cascade="all, delete-orphan" 

108 ) 

109 tokenized: Mapped["TokenizedBed"] = relationship( 

110 back_populates="bed", cascade="all, delete-orphan" 

111 ) 

112 license_id: Mapped["str"] = mapped_column( 

113 ForeignKey("licenses.id", ondelete="CASCADE"), nullable=True, index=True 

114 ) 

115 license_mapping: Mapped["License"] = relationship("License", back_populates="bed") 

116 

117 

118class BedStats(Base): 

119 __tablename__ = "bed_stats" 

120 

121 id: Mapped[str] = mapped_column( 

122 ForeignKey("bed.id", ondelete="CASCADE"), 

123 primary_key=True, 

124 index=True, 

125 ) 

126 number_of_regions: Mapped[Optional[float]] 

127 gc_content: Mapped[Optional[float]] 

128 median_tss_dist: Mapped[Optional[float]] 

129 mean_region_width: Mapped[Optional[float]] 

130 exon_frequency: Mapped[Optional[float]] 

131 intron_frequency: Mapped[Optional[float]] 

132 promoterprox_frequency: Mapped[Optional[float]] 

133 intergenic_frequency: Mapped[Optional[float]] 

134 promotercore_frequency: Mapped[Optional[float]] 

135 fiveutr_frequency: Mapped[Optional[float]] 

136 threeutr_frequency: Mapped[Optional[float]] 

137 fiveutr_percentage: Mapped[Optional[float]] 

138 threeutr_percentage: Mapped[Optional[float]] 

139 promoterprox_percentage: Mapped[Optional[float]] 

140 exon_percentage: Mapped[Optional[float]] 

141 intron_percentage: Mapped[Optional[float]] 

142 intergenic_percentage: Mapped[Optional[float]] 

143 promotercore_percentage: Mapped[Optional[float]] 

144 tssdist: Mapped[Optional[float]] 

145 

146 bed: Mapped["Bed"] = relationship("Bed", back_populates="stats") 

147 

148 

149class Files(Base): 

150 __tablename__ = "files" 

151 

152 id: Mapped[int] = mapped_column(primary_key=True, index=True) 

153 name: Mapped[str] = mapped_column( 

154 nullable=False, comment="Name of the file, e.g. bed, bigBed" 

155 ) 

156 title: Mapped[Optional[str]] 

157 type: Mapped[str] = mapped_column( 

158 default="file", comment="Type of the object, e.g. file, plot, ..." 

159 ) 

160 path: Mapped[str] 

161 path_thumbnail: Mapped[str] = mapped_column( 

162 nullable=True, comment="Thumbnail path of the file" 

163 ) 

164 description: Mapped[Optional[str]] 

165 size: Mapped[Optional[int]] = mapped_column(default=0, comment="Size of the file") 

166 

167 bedfile_id: Mapped[str] = mapped_column( 

168 ForeignKey("bed.id", ondelete="CASCADE"), nullable=True, index=True 

169 ) 

170 bedset_id: Mapped[str] = mapped_column( 

171 ForeignKey("bedsets.id", ondelete="CASCADE"), nullable=True, index=True 

172 ) 

173 

174 bedfile: Mapped["Bed"] = relationship("Bed", back_populates="files") 

175 bedset: Mapped["BedSets"] = relationship("BedSets", back_populates="files") 

176 

177 

178class BedFileBedSetRelation(Base): 

179 __tablename__ = "bedfile_bedset_relation" 

180 

181 bedset_id: Mapped[str] = mapped_column( 

182 ForeignKey("bedsets.id", ondelete="CASCADE"), primary_key=True 

183 ) 

184 bedfile_id: Mapped[str] = mapped_column( 

185 ForeignKey("bed.id", ondelete="CASCADE"), primary_key=True 

186 ) 

187 

188 bedset: Mapped["BedSets"] = relationship("BedSets", back_populates="bedfiles") 

189 bedfile: Mapped["Bed"] = relationship("Bed", back_populates="bedsets") 

190 

191 

192class BedSets(Base): 

193 __tablename__ = "bedsets" 

194 

195 id: Mapped[str] = mapped_column(primary_key=True, index=True) 

196 name: Mapped[str] = mapped_column(nullable=False, comment="Name of the bedset") 

197 description: Mapped[Optional[str]] = mapped_column( 

198 comment="Description of the bedset" 

199 ) 

200 submission_date: Mapped[datetime.datetime] = mapped_column( 

201 default=deliver_update_date 

202 ) 

203 last_update_date: Mapped[Optional[datetime.datetime]] = mapped_column( 

204 default=deliver_update_date, 

205 onupdate=deliver_update_date, 

206 ) 

207 md5sum: Mapped[Optional[str]] = mapped_column(comment="MD5 sum of the bedset") 

208 

209 bedset_means: Mapped[Optional[dict]] = mapped_column( 

210 JSON, comment="Mean values of the bedset" 

211 ) 

212 bedset_standard_deviation: Mapped[Optional[dict]] = mapped_column( 

213 JSON, comment="Median values of the bedset" 

214 ) 

215 

216 bedfiles: Mapped[List["BedFileBedSetRelation"]] = relationship( 

217 "BedFileBedSetRelation", back_populates="bedset", cascade="all, delete-orphan" 

218 ) 

219 files: Mapped[List["Files"]] = relationship("Files", back_populates="bedset") 

220 universe: Mapped["Universes"] = relationship("Universes", back_populates="bedset") 

221 

222 

223class Universes(Base): 

224 __tablename__ = "universes" 

225 

226 id: Mapped[str] = mapped_column( 

227 ForeignKey("bed.id", ondelete="CASCADE"), 

228 primary_key=True, 

229 index=True, 

230 ) 

231 method: Mapped[str] = mapped_column( 

232 nullable=True, comment="Method used to create the universe" 

233 ) 

234 bedset_id: Mapped[str] = mapped_column( 

235 ForeignKey("bedsets.id", ondelete="CASCADE"), 

236 index=True, 

237 nullable=True, 

238 ) 

239 

240 bed: Mapped["Bed"] = relationship("Bed", back_populates="universe") 

241 bedset: Mapped["BedSets"] = relationship("BedSets", back_populates="universe") 

242 tokenized: Mapped["TokenizedBed"] = relationship( 

243 "TokenizedBed", 

244 back_populates="universe", 

245 ) 

246 

247 

248class TokenizedBed(Base): 

249 __tablename__ = "tokenized_bed" 

250 

251 bed_id: Mapped[str] = mapped_column( 

252 ForeignKey("bed.id", ondelete="CASCADE"), 

253 primary_key=True, 

254 index=True, 

255 nullable=False, 

256 ) 

257 universe_id: Mapped[str] = mapped_column( 

258 ForeignKey("universes.id", ondelete="CASCADE"), 

259 primary_key=True, 

260 index=True, 

261 nullable=False, 

262 ) 

263 path: Mapped[str] = mapped_column( 

264 nullable=False, comment="Path to the tokenized bed file" 

265 ) 

266 

267 bed: Mapped["Bed"] = relationship("Bed", back_populates="tokenized") 

268 universe: Mapped["Universes"] = relationship( 

269 "Universes", back_populates="tokenized" 

270 ) 

271 

272 

273class License(Base): 

274 __tablename__ = "licenses" 

275 

276 id: Mapped[str] = mapped_column(primary_key=True, index=True) 

277 shorthand: Mapped[str] = mapped_column(nullable=True, comment="License shorthand") 

278 label: Mapped[str] = mapped_column(nullable=False, comment="License label") 

279 description: Mapped[str] = mapped_column( 

280 nullable=False, comment="License description" 

281 ) 

282 

283 bed: Mapped[List["Bed"]] = relationship("Bed", back_populates="license_mapping") 

284 

285 

286@listens_for(Universes, "after_insert") 

287@listens_for(Universes, "after_update") 

288def add_bed_universe(mapper, connection, target): 

289 with Session(connection) as session: 

290 bed = session.scalar(select(Bed).where(Bed.id == target.id)) 

291 bed.is_universe = True 

292 session.commit() 

293 

294 

295@listens_for(Universes, "after_delete") 

296def delete_bed_universe(mapper, connection, target): 

297 with Session(connection) as session: 

298 bed = session.scalar(select(Bed).where(Bed.id == target.id)) 

299 bed.is_universe = False 

300 session.commit() 

301 

302 

303class BaseEngine: 

304 """ 

305 A class with base methods, that are used in several classes. 

306 """ 

307 

308 def __init__( 

309 self, 

310 *, 

311 host: str = "localhost", 

312 port: int = 5432, 

313 database: str = "bedbase", 

314 user: str = None, 

315 password: str = None, 

316 drivername: str = POSTGRES_DIALECT, 

317 dsn: str = None, 

318 echo: bool = False, 

319 ): 

320 """ 

321 Initialize connection to the bedbase database. You can use The basic connection parameters 

322 or libpq connection string. 

323 

324 :param host: database server address e.g., localhost or an IP address. 

325 :param port: the port number that defaults to 5432 if it is not provided. 

326 :param database: the name of the database that you want to connect. 

327 :param user: the username used to authenticate. 

328 :param password: password used to authenticate. 

329 :param drivername: driver used in 

330 :param dsn: libpq connection string using the dsn parameter 

331 (e.g. 'postgresql://user_name:password@host_name:port/db_name') 

332 """ 

333 if not dsn: 

334 dsn = URL.create( 

335 host=host, 

336 port=port, 

337 database=database, 

338 username=user, 

339 password=password, 

340 drivername=drivername, 

341 ) 

342 

343 self._engine = create_engine(dsn, echo=echo) 

344 self.create_schema(self._engine) 

345 self.check_db_connection() 

346 

347 def create_schema(self, engine=None): 

348 """ 

349 Create sql schema in the database. 

350 

351 :param engine: sqlalchemy engine [Default: None] 

352 :return: None 

353 """ 

354 if not engine: 

355 engine = self._engine 

356 Base.metadata.create_all(engine) 

357 

358 global tables_initialized 

359 if License.__tablename__ in tables_initialized: 

360 try: 

361 # It is weired, but tables are sometimes initialized twice. Or it says like that... 

362 self._upload_licenses() 

363 except IntegrityError: 

364 pass 

365 

366 def delete_schema(self, engine=None) -> None: 

367 """ 

368 Delete sql schema in the database. 

369 

370 :param engine: sqlalchemy engine [Default: None] 

371 :return: None 

372 """ 

373 if not engine: 

374 engine = self._engine 

375 Base.metadata.drop_all(engine) 

376 return None 

377 

378 def session_execute(self, statement: Select) -> Result: 

379 """ 

380 Execute statement using sqlalchemy statement 

381 

382 :param statement: SQL query or a SQL expression that is constructed using 

383 SQLAlchemy's SQL expression language 

384 :return: query result represented with declarative base 

385 """ 

386 _LOGGER.debug(f"Executing statement: {statement}") 

387 with Session(self._engine) as session: 

388 query_result = session.execute(statement) 

389 

390 return query_result 

391 

392 @property 

393 def session(self): 

394 """ 

395 :return: started sqlalchemy session 

396 """ 

397 return self._start_session() 

398 

399 @property 

400 def engine(self) -> Engine: 

401 """ 

402 :return: sqlalchemy engine 

403 """ 

404 return self._engine 

405 

406 def _start_session(self): 

407 session = Session(self.engine) 

408 try: 

409 session.execute(select(Bed).limit(1)) 

410 except ProgrammingError: 

411 raise SchemaError() 

412 

413 return session 

414 

415 def check_db_connection(self): 

416 try: 

417 self.session_execute(select(Bed).limit(1)) 

418 except ProgrammingError: 

419 raise SchemaError() 

420 

421 def create_schema_graph(self, output_file: str = "schema.svg"): 

422 """ 

423 Create schema graph of the database. 

424 

425 :param output_file: path to the output file 

426 :return: None 

427 """ 

428 graph = create_schema_graph(engine=self.engine, metadata=Base.metadata) 

429 graph.write(output_file, format="svg", prog="dot") 

430 return None 

431 

432 def _upload_licenses(self): 

433 """ 

434 Upload licenses to the database. 

435 """ 

436 

437 _LOGGER.info("Uploading licenses to the database...") 

438 df = pd.read_csv(LICENSES_CSV_URL) 

439 

440 with Session(self.engine) as session: 

441 df.to_sql( 

442 License.__tablename__, self.engine, if_exists="append", index=False 

443 ) 

444 session.commit() 

445 

446 _LOGGER.info("Licenses uploaded successfully!")