Coverage for bbconf/modules/bedsets.py: 16%

176 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-17 04:01 +0000

1import logging 

2from typing import Dict, List 

3 

4from geniml.io.utils import compute_md5sum_bedset 

5from sqlalchemy import Float, Numeric, func, or_, select 

6from sqlalchemy.orm import Session 

7 

8from bbconf.config_parser import BedBaseConfig 

9from bbconf.const import PKG_NAME 

10from bbconf.db_utils import BedFileBedSetRelation, BedSets, BedStats, Files, Bed 

11from bbconf.exceptions import BedSetExistsError, BedSetNotFoundError 

12from bbconf.models.bed_models import BedStatsModel 

13from bbconf.models.bedset_models import ( 

14 BedMetadataBasic, 

15 BedSetBedFiles, 

16 BedSetListResult, 

17 BedSetMetadata, 

18 BedSetPlots, 

19 BedSetStats, 

20 FileModel, 

21) 

22 

23_LOGGER = logging.getLogger(PKG_NAME) 

24 

25 

26class BedAgentBedSet: 

27 """ 

28 Class that represents Bedset in Database. 

29 

30 This class has method to add, delete, get files and metadata from the database. 

31 """ 

32 

33 def __init__(self, config: BedBaseConfig): 

34 """ 

35 :param config: config object 

36 """ 

37 self.config = config 

38 self._db_engine = self.config.db_engine 

39 

40 def get(self, identifier: str, full: bool = False) -> BedSetMetadata: 

41 """ 

42 Get file metadata by identifier. 

43 

44 :param identifier: bed file identifier 

45 :param full: return full record with stats, plots, files and metadata 

46 :return: project metadata 

47 """ 

48 

49 statement = select(BedSets).where(BedSets.id == identifier) 

50 

51 with Session(self._db_engine.engine) as session: 

52 bedset_obj = session.scalar(statement) 

53 if not bedset_obj: 

54 raise BedSetNotFoundError(identifier) 

55 list_of_bedfiles = [ 

56 bedset_obj.bedfile_id for bedset_obj in bedset_obj.bedfiles 

57 ] 

58 if full: 

59 plots = BedSetPlots() 

60 for plot in bedset_obj.files: 

61 setattr(plots, plot.name, FileModel(**plot.__dict__)) 

62 

63 stats = BedSetStats( 

64 mean=BedStatsModel(**bedset_obj.bedset_means), 

65 sd=BedStatsModel(**bedset_obj.bedset_standard_deviation), 

66 ).model_dump() 

67 else: 

68 plots = None 

69 stats = None 

70 

71 bedset_metadata = BedSetMetadata( 

72 id=bedset_obj.id, 

73 name=bedset_obj.name, 

74 description=bedset_obj.description, 

75 md5sum=bedset_obj.md5sum, 

76 statistics=stats, 

77 plots=plots, 

78 bed_ids=list_of_bedfiles, 

79 ) 

80 

81 return bedset_metadata 

82 

83 def get_plots(self, identifier: str) -> BedSetPlots: 

84 """ 

85 Get plots for bedset by identifier. 

86 

87 :param identifier: bedset identifier 

88 :return: bedset plots 

89 """ 

90 statement = select(BedSets).where(BedSets.id == identifier) 

91 

92 with Session(self._db_engine.engine) as session: 

93 bedset_object = session.scalar(statement) 

94 if not bedset_object: 

95 raise BedSetNotFoundError(f"Bed file with id: {identifier} not found.") 

96 bedset_files = BedSetPlots() 

97 for result in bedset_object.files: 

98 if result.name in bedset_files.model_fields: 

99 setattr( 

100 bedset_files, 

101 result.name, 

102 FileModel( 

103 **result.__dict__, 

104 object_id=f"bed.{identifier}.{result.name}", 

105 access_methods=self.config.construct_access_method_list( 

106 result.path 

107 ), 

108 ), 

109 ) 

110 return bedset_files 

111 

112 def get_objects(self, identifier: str) -> Dict[str, FileModel]: 

113 """ 

114 Get objects for bedset by identifier. 

115 

116 :param identifier: bedset identifier 

117 :return: bedset objects 

118 """ 

119 statement = select(BedSets).where(BedSets.id == identifier) 

120 return_dict = {} 

121 

122 with Session(self._db_engine.engine) as session: 

123 bedset_object = session.scalar(statement) 

124 if not bedset_object: 

125 raise BedSetNotFoundError(f"Bedset with id: {identifier} not found.") 

126 for result in bedset_object.files: 

127 return_dict[result.name] = FileModel( 

128 **result.__dict__, 

129 object_id=f"bed.{identifier}.{result.name}", 

130 access_methods=self.config.construct_access_method_list( 

131 result.path 

132 ), 

133 ) 

134 

135 return return_dict 

136 

137 def get_statistics(self, identifier: str) -> BedSetStats: 

138 """ 

139 Get statistics for bedset by identifier. 

140 

141 :param identifier: bedset identifier 

142 :return: bedset statistics 

143 """ 

144 statement = select(BedSets).where(BedSets.id == identifier) 

145 with Session(self._db_engine.engine) as session: 

146 bedset_object = session.scalar(statement) 

147 if not bedset_object: 

148 raise BedSetNotFoundError(f"Bedset with id: {identifier} not found.") 

149 return BedSetStats( 

150 mean=BedStatsModel(**bedset_object.bedset_means), 

151 sd=BedStatsModel(**bedset_object.bedset_standard_deviation), 

152 ) 

153 

154 def create( 

155 self, 

156 identifier: str, 

157 name: str, 

158 bedid_list: List[str], 

159 description: str = None, 

160 statistics: bool = False, 

161 plots: dict = None, 

162 upload_pephub: bool = False, 

163 upload_s3: bool = False, 

164 local_path: str = "", 

165 no_fail: bool = False, 

166 overwrite: bool = False, 

167 ) -> None: 

168 """ 

169 Create bedset in the database. 

170 

171 :param identifier: bedset identifier 

172 :param name: bedset name 

173 :param description: bedset description 

174 :param bedid_list: list of bed file identifiers 

175 :param statistics: calculate statistics for bedset 

176 :param plots: dictionary with plots 

177 :param upload_pephub: upload bedset to pephub (create view in pephub) 

178 :param upload_s3: upload bedset to s3 

179 :param local_path: local path to the output files 

180 :param no_fail: do not raise an error if bedset already exists 

181 :param overwrite: overwrite the record in the database 

182 :return: None 

183 """ 

184 _LOGGER.info(f"Creating bedset '{identifier}'") 

185 

186 if statistics: 

187 stats = self._calculate_statistics(bedid_list) 

188 else: 

189 stats = None 

190 if self.exists(identifier): 

191 if not overwrite and not no_fail: 

192 raise BedSetExistsError(identifier) 

193 self.delete(identifier) 

194 

195 if upload_pephub: 

196 try: 

197 self._create_pephub_view(identifier, description, bedid_list, no_fail) 

198 except Exception as e: 

199 _LOGGER.error(f"Failed to create view in pephub: {e}") 

200 if not no_fail: 

201 raise e 

202 

203 new_bedset = BedSets( 

204 id=identifier, 

205 name=name, 

206 description=description, 

207 bedset_means=stats.mean.model_dump() if stats else None, 

208 bedset_standard_deviation=stats.sd.model_dump() if stats else None, 

209 md5sum=compute_md5sum_bedset(bedid_list), 

210 ) 

211 

212 if upload_s3: 

213 plots = BedSetPlots(**plots) if plots else BedSetPlots() 

214 plots = self.config.upload_files_s3( 

215 identifier, files=plots, base_path=local_path, type="bedsets" 

216 ) 

217 

218 try: 

219 with Session(self._db_engine.engine) as session: 

220 session.add(new_bedset) 

221 

222 if no_fail: 

223 bedid_list = list(set(bedid_list)) 

224 for bedfile in bedid_list: 

225 session.add( 

226 BedFileBedSetRelation(bedset_id=identifier, bedfile_id=bedfile) 

227 ) 

228 if upload_s3: 

229 for k, v in plots: 

230 if v: 

231 new_file = Files( 

232 **v.model_dump(exclude_none=True, exclude_unset=True), 

233 bedset_id=identifier, 

234 type="plot", 

235 ) 

236 session.add(new_file) 

237 

238 session.commit() 

239 except Exception as e: 

240 _LOGGER.error(f"Failed to create bedset: {e}") 

241 if not no_fail: 

242 raise e 

243 

244 _LOGGER.info(f"Bedset '{identifier}' was created successfully") 

245 return None 

246 

247 def _calculate_statistics(self, bed_ids: List[str]) -> BedSetStats: 

248 """ 

249 Calculate statistics for bedset. 

250 

251 :param bed_ids: list of bed file identifiers 

252 :return: statistics 

253 """ 

254 

255 _LOGGER.info("Calculating bedset statistics") 

256 numeric_columns = BedStatsModel.model_fields 

257 

258 bedset_sd = {} 

259 bedset_mean = {} 

260 with Session(self._db_engine.engine) as session: 

261 for column_name in numeric_columns: 

262 mean_bedset_statement = select( 

263 func.round( 

264 func.avg(getattr(BedStats, column_name)).cast(Numeric), 4 

265 ).cast(Float) 

266 ).where(BedStats.id.in_(bed_ids)) 

267 

268 sd_bedset_statement = select( 

269 func.round( 

270 func.stddev(getattr(BedStats, column_name)).cast(Numeric), 

271 4, 

272 ).cast(Float) 

273 ).where(BedStats.id.in_(bed_ids)) 

274 

275 bedset_sd[column_name] = session.execute(sd_bedset_statement).one()[0] 

276 bedset_mean[column_name] = session.execute(mean_bedset_statement).one()[ 

277 0 

278 ] 

279 

280 bedset_stats = BedSetStats( 

281 mean=bedset_mean, 

282 sd=bedset_sd, 

283 ) 

284 

285 _LOGGER.info("Bedset statistics were calculated successfully") 

286 return bedset_stats 

287 

288 def _create_pephub_view( 

289 self, 

290 bedset_id: str, 

291 description: str = None, 

292 bed_ids: list = None, 

293 nofail: bool = False, 

294 ) -> None: 

295 """ 

296 Create view in pephub for bedset. 

297 

298 :param bedset_id: bedset identifier 

299 :param description: bedset description 

300 :param bed_ids: list of bed file identifiers 

301 :param nofail: do not raise an error if sample not found 

302 

303 :return: None 

304 """ 

305 

306 _LOGGER.info(f"Creating view in pephub for bedset '{bedset_id}'") 

307 try: 

308 self.config.phc.view.create( 

309 namespace=self.config.config.phc.namespace, 

310 name=self.config.config.phc.name, 

311 tag=self.config.config.phc.tag, 

312 view_name=bedset_id, 

313 # description=description, 

314 sample_list=bed_ids, 

315 ) 

316 except Exception as e: 

317 _LOGGER.error(f"Failed to create view in pephub: {e}") 

318 if not nofail: 

319 raise e 

320 return None 

321 

322 def get_ids_list( 

323 self, query: str = None, limit: int = 10, offset: int = 0 

324 ) -> BedSetListResult: 

325 """ 

326 Get list of bedsets from the database. 

327 

328 :param query: search query 

329 :param limit: limit of results 

330 :param offset: offset of results 

331 :return: list of bedsets 

332 """ 

333 statement = select(BedSets.id) 

334 count_statement = select(func.count(BedSets.id)) 

335 if query: 

336 sql_search_str = f"%{query}%" 

337 statement = statement.where( 

338 or_( 

339 BedSets.name.ilike(sql_search_str), 

340 BedSets.description.ilike(sql_search_str), 

341 ) 

342 ) 

343 count_statement = count_statement.where( 

344 or_( 

345 BedSets.name.ilike(sql_search_str), 

346 BedSets.description.ilike(sql_search_str), 

347 ) 

348 ) 

349 

350 with Session(self._db_engine.engine) as session: 

351 bedset_list = session.execute(statement.limit(limit).offset(offset)) 

352 bedset_count = session.execute(count_statement).one() 

353 

354 result_list = [] 

355 for bedset_id in bedset_list: 

356 result_list.append(self.get(bedset_id[0])) 

357 return BedSetListResult( 

358 count=bedset_count[0], 

359 limit=limit, 

360 offset=offset, 

361 results=result_list, 

362 ) 

363 

364 def get_bedset_bedfiles(self, identifier: str) -> BedSetBedFiles: 

365 """ 

366 Get list of bedfiles in bedset. 

367 

368 :param identifier: bedset identifier 

369 

370 :return: list of bedfiles 

371 """ 

372 sub_statement = select(BedFileBedSetRelation.bedfile_id).where( 

373 BedFileBedSetRelation.bedset_id == identifier 

374 ) 

375 statement = select(Bed).where(Bed.id.in_(sub_statement)) 

376 

377 with Session(self._db_engine.engine) as session: 

378 bedfiles_list = session.scalars(statement) 

379 results = [ 

380 BedMetadataBasic(**bedfile_obj.__dict__) 

381 for bedfile_obj in bedfiles_list 

382 ] 

383 

384 return BedSetBedFiles( 

385 count=len(results), 

386 results=results, 

387 ) 

388 

389 def delete(self, identifier: str) -> None: 

390 """ 

391 Delete bed file from the database. 

392 

393 :param identifier: bedset identifier 

394 :return: None 

395 """ 

396 if not self.exists(identifier): 

397 raise BedSetNotFoundError(identifier) 

398 

399 _LOGGER.info(f"Deleting bedset '{identifier}'") 

400 

401 with Session(self._db_engine.engine) as session: 

402 statement = select(BedSets).where(BedSets.id == identifier) 

403 

404 bedset_obj = session.scalar(statement) 

405 files = [FileModel(**k.__dict__) for k in bedset_obj.files] 

406 

407 session.delete(bedset_obj) 

408 session.commit() 

409 

410 self.delete_phc_view(identifier, nofail=True) 

411 if files: 

412 self.config.delete_files_s3(files) 

413 

414 def delete_phc_view(self, identifier: str, nofail: bool = False) -> None: 

415 """ 

416 Delete view in pephub. 

417 

418 :param identifier: bedset identifier 

419 :param nofail: do not raise an error if view not found 

420 :return: None 

421 """ 

422 _LOGGER.info(f"Deleting view in pephub for bedset '{identifier}'") 

423 try: 

424 self.config.phc.view.delete( 

425 namespace=self.config.config.phc.namespace, 

426 name=self.config.config.phc.name, 

427 tag=self.config.config.phc.tag, 

428 view_name=identifier, 

429 ) 

430 except Exception as e: 

431 _LOGGER.error(f"Failed to delete view in pephub: {e}") 

432 if not nofail: 

433 raise e 

434 return None 

435 

436 def exists(self, identifier: str) -> bool: 

437 """ 

438 Check if bedset exists in the database. 

439 

440 :param identifier: bedset identifier 

441 :return: True if bedset exists, False otherwise 

442 """ 

443 statement = select(BedSets).where(BedSets.id == identifier) 

444 with Session(self._db_engine.engine) as session: 

445 result = session.execute(statement).one_or_none() 

446 if result: 

447 return True 

448 return False 

449 

450 def add_bedfile(self, identifier: str, bedfile: str) -> None: 

451 raise NotImplementedError 

452 

453 def delete_bedfile(self, identifier: str, bedfile: str) -> None: 

454 raise NotImplementedError