Coverage for bbconf/modules/bedsets.py: 16%
176 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-17 04:01 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-17 04:01 +0000
1import logging
2from typing import Dict, List
4from geniml.io.utils import compute_md5sum_bedset
5from sqlalchemy import Float, Numeric, func, or_, select
6from sqlalchemy.orm import Session
8from bbconf.config_parser import BedBaseConfig
9from bbconf.const import PKG_NAME
10from bbconf.db_utils import BedFileBedSetRelation, BedSets, BedStats, Files, Bed
11from bbconf.exceptions import BedSetExistsError, BedSetNotFoundError
12from bbconf.models.bed_models import BedStatsModel
13from bbconf.models.bedset_models import (
14 BedMetadataBasic,
15 BedSetBedFiles,
16 BedSetListResult,
17 BedSetMetadata,
18 BedSetPlots,
19 BedSetStats,
20 FileModel,
21)
23_LOGGER = logging.getLogger(PKG_NAME)
26class BedAgentBedSet:
27 """
28 Class that represents Bedset in Database.
30 This class has method to add, delete, get files and metadata from the database.
31 """
33 def __init__(self, config: BedBaseConfig):
34 """
35 :param config: config object
36 """
37 self.config = config
38 self._db_engine = self.config.db_engine
40 def get(self, identifier: str, full: bool = False) -> BedSetMetadata:
41 """
42 Get file metadata by identifier.
44 :param identifier: bed file identifier
45 :param full: return full record with stats, plots, files and metadata
46 :return: project metadata
47 """
49 statement = select(BedSets).where(BedSets.id == identifier)
51 with Session(self._db_engine.engine) as session:
52 bedset_obj = session.scalar(statement)
53 if not bedset_obj:
54 raise BedSetNotFoundError(identifier)
55 list_of_bedfiles = [
56 bedset_obj.bedfile_id for bedset_obj in bedset_obj.bedfiles
57 ]
58 if full:
59 plots = BedSetPlots()
60 for plot in bedset_obj.files:
61 setattr(plots, plot.name, FileModel(**plot.__dict__))
63 stats = BedSetStats(
64 mean=BedStatsModel(**bedset_obj.bedset_means),
65 sd=BedStatsModel(**bedset_obj.bedset_standard_deviation),
66 ).model_dump()
67 else:
68 plots = None
69 stats = None
71 bedset_metadata = BedSetMetadata(
72 id=bedset_obj.id,
73 name=bedset_obj.name,
74 description=bedset_obj.description,
75 md5sum=bedset_obj.md5sum,
76 statistics=stats,
77 plots=plots,
78 bed_ids=list_of_bedfiles,
79 )
81 return bedset_metadata
83 def get_plots(self, identifier: str) -> BedSetPlots:
84 """
85 Get plots for bedset by identifier.
87 :param identifier: bedset identifier
88 :return: bedset plots
89 """
90 statement = select(BedSets).where(BedSets.id == identifier)
92 with Session(self._db_engine.engine) as session:
93 bedset_object = session.scalar(statement)
94 if not bedset_object:
95 raise BedSetNotFoundError(f"Bed file with id: {identifier} not found.")
96 bedset_files = BedSetPlots()
97 for result in bedset_object.files:
98 if result.name in bedset_files.model_fields:
99 setattr(
100 bedset_files,
101 result.name,
102 FileModel(
103 **result.__dict__,
104 object_id=f"bed.{identifier}.{result.name}",
105 access_methods=self.config.construct_access_method_list(
106 result.path
107 ),
108 ),
109 )
110 return bedset_files
112 def get_objects(self, identifier: str) -> Dict[str, FileModel]:
113 """
114 Get objects for bedset by identifier.
116 :param identifier: bedset identifier
117 :return: bedset objects
118 """
119 statement = select(BedSets).where(BedSets.id == identifier)
120 return_dict = {}
122 with Session(self._db_engine.engine) as session:
123 bedset_object = session.scalar(statement)
124 if not bedset_object:
125 raise BedSetNotFoundError(f"Bedset with id: {identifier} not found.")
126 for result in bedset_object.files:
127 return_dict[result.name] = FileModel(
128 **result.__dict__,
129 object_id=f"bed.{identifier}.{result.name}",
130 access_methods=self.config.construct_access_method_list(
131 result.path
132 ),
133 )
135 return return_dict
137 def get_statistics(self, identifier: str) -> BedSetStats:
138 """
139 Get statistics for bedset by identifier.
141 :param identifier: bedset identifier
142 :return: bedset statistics
143 """
144 statement = select(BedSets).where(BedSets.id == identifier)
145 with Session(self._db_engine.engine) as session:
146 bedset_object = session.scalar(statement)
147 if not bedset_object:
148 raise BedSetNotFoundError(f"Bedset with id: {identifier} not found.")
149 return BedSetStats(
150 mean=BedStatsModel(**bedset_object.bedset_means),
151 sd=BedStatsModel(**bedset_object.bedset_standard_deviation),
152 )
154 def create(
155 self,
156 identifier: str,
157 name: str,
158 bedid_list: List[str],
159 description: str = None,
160 statistics: bool = False,
161 plots: dict = None,
162 upload_pephub: bool = False,
163 upload_s3: bool = False,
164 local_path: str = "",
165 no_fail: bool = False,
166 overwrite: bool = False,
167 ) -> None:
168 """
169 Create bedset in the database.
171 :param identifier: bedset identifier
172 :param name: bedset name
173 :param description: bedset description
174 :param bedid_list: list of bed file identifiers
175 :param statistics: calculate statistics for bedset
176 :param plots: dictionary with plots
177 :param upload_pephub: upload bedset to pephub (create view in pephub)
178 :param upload_s3: upload bedset to s3
179 :param local_path: local path to the output files
180 :param no_fail: do not raise an error if bedset already exists
181 :param overwrite: overwrite the record in the database
182 :return: None
183 """
184 _LOGGER.info(f"Creating bedset '{identifier}'")
186 if statistics:
187 stats = self._calculate_statistics(bedid_list)
188 else:
189 stats = None
190 if self.exists(identifier):
191 if not overwrite and not no_fail:
192 raise BedSetExistsError(identifier)
193 self.delete(identifier)
195 if upload_pephub:
196 try:
197 self._create_pephub_view(identifier, description, bedid_list, no_fail)
198 except Exception as e:
199 _LOGGER.error(f"Failed to create view in pephub: {e}")
200 if not no_fail:
201 raise e
203 new_bedset = BedSets(
204 id=identifier,
205 name=name,
206 description=description,
207 bedset_means=stats.mean.model_dump() if stats else None,
208 bedset_standard_deviation=stats.sd.model_dump() if stats else None,
209 md5sum=compute_md5sum_bedset(bedid_list),
210 )
212 if upload_s3:
213 plots = BedSetPlots(**plots) if plots else BedSetPlots()
214 plots = self.config.upload_files_s3(
215 identifier, files=plots, base_path=local_path, type="bedsets"
216 )
218 try:
219 with Session(self._db_engine.engine) as session:
220 session.add(new_bedset)
222 if no_fail:
223 bedid_list = list(set(bedid_list))
224 for bedfile in bedid_list:
225 session.add(
226 BedFileBedSetRelation(bedset_id=identifier, bedfile_id=bedfile)
227 )
228 if upload_s3:
229 for k, v in plots:
230 if v:
231 new_file = Files(
232 **v.model_dump(exclude_none=True, exclude_unset=True),
233 bedset_id=identifier,
234 type="plot",
235 )
236 session.add(new_file)
238 session.commit()
239 except Exception as e:
240 _LOGGER.error(f"Failed to create bedset: {e}")
241 if not no_fail:
242 raise e
244 _LOGGER.info(f"Bedset '{identifier}' was created successfully")
245 return None
247 def _calculate_statistics(self, bed_ids: List[str]) -> BedSetStats:
248 """
249 Calculate statistics for bedset.
251 :param bed_ids: list of bed file identifiers
252 :return: statistics
253 """
255 _LOGGER.info("Calculating bedset statistics")
256 numeric_columns = BedStatsModel.model_fields
258 bedset_sd = {}
259 bedset_mean = {}
260 with Session(self._db_engine.engine) as session:
261 for column_name in numeric_columns:
262 mean_bedset_statement = select(
263 func.round(
264 func.avg(getattr(BedStats, column_name)).cast(Numeric), 4
265 ).cast(Float)
266 ).where(BedStats.id.in_(bed_ids))
268 sd_bedset_statement = select(
269 func.round(
270 func.stddev(getattr(BedStats, column_name)).cast(Numeric),
271 4,
272 ).cast(Float)
273 ).where(BedStats.id.in_(bed_ids))
275 bedset_sd[column_name] = session.execute(sd_bedset_statement).one()[0]
276 bedset_mean[column_name] = session.execute(mean_bedset_statement).one()[
277 0
278 ]
280 bedset_stats = BedSetStats(
281 mean=bedset_mean,
282 sd=bedset_sd,
283 )
285 _LOGGER.info("Bedset statistics were calculated successfully")
286 return bedset_stats
288 def _create_pephub_view(
289 self,
290 bedset_id: str,
291 description: str = None,
292 bed_ids: list = None,
293 nofail: bool = False,
294 ) -> None:
295 """
296 Create view in pephub for bedset.
298 :param bedset_id: bedset identifier
299 :param description: bedset description
300 :param bed_ids: list of bed file identifiers
301 :param nofail: do not raise an error if sample not found
303 :return: None
304 """
306 _LOGGER.info(f"Creating view in pephub for bedset '{bedset_id}'")
307 try:
308 self.config.phc.view.create(
309 namespace=self.config.config.phc.namespace,
310 name=self.config.config.phc.name,
311 tag=self.config.config.phc.tag,
312 view_name=bedset_id,
313 # description=description,
314 sample_list=bed_ids,
315 )
316 except Exception as e:
317 _LOGGER.error(f"Failed to create view in pephub: {e}")
318 if not nofail:
319 raise e
320 return None
322 def get_ids_list(
323 self, query: str = None, limit: int = 10, offset: int = 0
324 ) -> BedSetListResult:
325 """
326 Get list of bedsets from the database.
328 :param query: search query
329 :param limit: limit of results
330 :param offset: offset of results
331 :return: list of bedsets
332 """
333 statement = select(BedSets.id)
334 count_statement = select(func.count(BedSets.id))
335 if query:
336 sql_search_str = f"%{query}%"
337 statement = statement.where(
338 or_(
339 BedSets.name.ilike(sql_search_str),
340 BedSets.description.ilike(sql_search_str),
341 )
342 )
343 count_statement = count_statement.where(
344 or_(
345 BedSets.name.ilike(sql_search_str),
346 BedSets.description.ilike(sql_search_str),
347 )
348 )
350 with Session(self._db_engine.engine) as session:
351 bedset_list = session.execute(statement.limit(limit).offset(offset))
352 bedset_count = session.execute(count_statement).one()
354 result_list = []
355 for bedset_id in bedset_list:
356 result_list.append(self.get(bedset_id[0]))
357 return BedSetListResult(
358 count=bedset_count[0],
359 limit=limit,
360 offset=offset,
361 results=result_list,
362 )
364 def get_bedset_bedfiles(self, identifier: str) -> BedSetBedFiles:
365 """
366 Get list of bedfiles in bedset.
368 :param identifier: bedset identifier
370 :return: list of bedfiles
371 """
372 sub_statement = select(BedFileBedSetRelation.bedfile_id).where(
373 BedFileBedSetRelation.bedset_id == identifier
374 )
375 statement = select(Bed).where(Bed.id.in_(sub_statement))
377 with Session(self._db_engine.engine) as session:
378 bedfiles_list = session.scalars(statement)
379 results = [
380 BedMetadataBasic(**bedfile_obj.__dict__)
381 for bedfile_obj in bedfiles_list
382 ]
384 return BedSetBedFiles(
385 count=len(results),
386 results=results,
387 )
389 def delete(self, identifier: str) -> None:
390 """
391 Delete bed file from the database.
393 :param identifier: bedset identifier
394 :return: None
395 """
396 if not self.exists(identifier):
397 raise BedSetNotFoundError(identifier)
399 _LOGGER.info(f"Deleting bedset '{identifier}'")
401 with Session(self._db_engine.engine) as session:
402 statement = select(BedSets).where(BedSets.id == identifier)
404 bedset_obj = session.scalar(statement)
405 files = [FileModel(**k.__dict__) for k in bedset_obj.files]
407 session.delete(bedset_obj)
408 session.commit()
410 self.delete_phc_view(identifier, nofail=True)
411 if files:
412 self.config.delete_files_s3(files)
414 def delete_phc_view(self, identifier: str, nofail: bool = False) -> None:
415 """
416 Delete view in pephub.
418 :param identifier: bedset identifier
419 :param nofail: do not raise an error if view not found
420 :return: None
421 """
422 _LOGGER.info(f"Deleting view in pephub for bedset '{identifier}'")
423 try:
424 self.config.phc.view.delete(
425 namespace=self.config.config.phc.namespace,
426 name=self.config.config.phc.name,
427 tag=self.config.config.phc.tag,
428 view_name=identifier,
429 )
430 except Exception as e:
431 _LOGGER.error(f"Failed to delete view in pephub: {e}")
432 if not nofail:
433 raise e
434 return None
436 def exists(self, identifier: str) -> bool:
437 """
438 Check if bedset exists in the database.
440 :param identifier: bedset identifier
441 :return: True if bedset exists, False otherwise
442 """
443 statement = select(BedSets).where(BedSets.id == identifier)
444 with Session(self._db_engine.engine) as session:
445 result = session.execute(statement).one_or_none()
446 if result:
447 return True
448 return False
450 def add_bedfile(self, identifier: str, bedfile: str) -> None:
451 raise NotImplementedError
453 def delete_bedfile(self, identifier: str, bedfile: str) -> None:
454 raise NotImplementedError