Coverage for bbconf/modules/bedfiles.py: 13%

410 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-17 04:01 +0000

1import os 

2from logging import getLogger 

3from typing import Dict, Union 

4 

5import numpy as np 

6from tqdm import tqdm 

7 

8from geniml.bbclient import BBClient 

9from geniml.io import RegionSet 

10from genimtools.tokenizers import RegionSet as GRegionSet 

11from pephubclient.exceptions import ResponseError 

12from qdrant_client.models import Distance, PointIdsList, VectorParams 

13from sqlalchemy import and_, delete, func, select 

14from sqlalchemy.orm import Session 

15 

16from bbconf.config_parser.bedbaseconfig import BedBaseConfig 

17from bbconf.const import PKG_NAME, ZARR_TOKENIZED_FOLDER, DEFAULT_LICENSE 

18from bbconf.db_utils import Bed, BedStats, Files, TokenizedBed, Universes 

19from bbconf.exceptions import ( 

20 BedBaseConfError, 

21 BedFIleExistsError, 

22 BEDFileNotFoundError, 

23 TokenizeFileExistsError, 

24 TokenizeFileNotExistError, 

25 UniverseNotFoundError, 

26 QdrantInstanceNotInitializedError, 

27) 

28from bbconf.models.bed_models import ( 

29 BedClassification, 

30 BedEmbeddingResult, 

31 BedFiles, 

32 BedListResult, 

33 BedListSearchResult, 

34 BedMetadata, 

35 BedMetadataBasic, 

36 BedPEPHub, 

37 BedPlots, 

38 BedStatsModel, 

39 FileModel, 

40 QdrantSearchResult, 

41 TokenizedBedResponse, 

42 UniverseMetadata, 

43 TokenizedPathResponse, 

44 BedPEPHubRestrict, 

45 BedSetMinimal, 

46) 

47 

48_LOGGER = getLogger(PKG_NAME) 

49 

50QDRANT_GENOME = "hg38" 

51 

52 

53class BedAgentBedFile: 

54 """ 

55 Class that represents Bedfile in Database. 

56 

57 This class has method to add, delete, get files and metadata from the database. 

58 """ 

59 

60 def __init__(self, config: BedBaseConfig, bbagent_obj=None): 

61 """ 

62 :param config: config object with database and qdrant engine and credentials 

63 :param bbagent_obj: BedBaseAgent object (Parent object) 

64 """ 

65 self._sa_engine = config.db_engine.engine 

66 self._db_engine = config.db_engine 

67 self._qdrant_engine = config.qdrant_engine 

68 self._boto3_client = config.boto3_client 

69 self._config = config 

70 self.bb_agent = bbagent_obj 

71 

72 def get(self, identifier: str, full: bool = False) -> BedMetadata: 

73 """ 

74 Get file metadata by identifier. 

75 

76 :param identifier: bed file identifier 

77 :param full: if True, return full metadata, including statistics, files, and raw metadata from pephub 

78 :return: project metadata 

79 """ 

80 statement = select(Bed).where(Bed.id == identifier) 

81 

82 bed_plots = BedPlots() 

83 bed_files = BedFiles() 

84 

85 with Session(self._sa_engine) as session: 

86 bed_object = session.scalar(statement) 

87 if not bed_object: 

88 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") 

89 

90 if full: 

91 for result in bed_object.files: 

92 # PLOTS 

93 if result.name in BedPlots.model_fields: 

94 setattr( 

95 bed_plots, 

96 result.name, 

97 FileModel( 

98 **result.__dict__, 

99 object_id=f"bed.{identifier}.{result.name}", 

100 access_methods=self._config.construct_access_method_list( 

101 result.path 

102 ), 

103 ), 

104 ) 

105 # FILES 

106 elif result.name in BedFiles.model_fields: 

107 ( 

108 setattr( 

109 bed_files, 

110 result.name, 

111 FileModel( 

112 **result.__dict__, 

113 object_id=f"bed.{identifier}.{result.name}", 

114 access_methods=self._config.construct_access_method_list( 

115 result.path 

116 ), 

117 ), 

118 ), 

119 ) 

120 

121 else: 

122 _LOGGER.error( 

123 f"Unknown file type: {result.name}. And is not in the model fields. Skipping.." 

124 ) 

125 bed_stats = BedStatsModel(**bed_object.stats.__dict__) 

126 bed_bedsets = [] 

127 for relation in bed_object.bedsets: 

128 bed_bedsets.append( 

129 BedSetMinimal( 

130 id=relation.bedset.id, 

131 description=relation.bedset.description, 

132 name=relation.bedset.name, 

133 ) 

134 ) 

135 

136 if bed_object.universe: 

137 universe_meta = UniverseMetadata(**bed_object.universe.__dict__) 

138 else: 

139 universe_meta = UniverseMetadata() 

140 else: 

141 bed_plots = None 

142 bed_files = None 

143 bed_stats = None 

144 universe_meta = None 

145 bed_bedsets = [] 

146 

147 try: 

148 if full: 

149 bed_metadata = BedPEPHubRestrict( 

150 **self._config.phc.sample.get( 

151 namespace=self._config.config.phc.namespace, 

152 name=self._config.config.phc.name, 

153 tag=self._config.config.phc.tag, 

154 sample_name=identifier, 

155 ) 

156 ) 

157 else: 

158 bed_metadata = None 

159 except Exception as e: 

160 _LOGGER.warning(f"Could not retrieve metadata from pephub. Error: {e}") 

161 bed_metadata = None 

162 

163 return BedMetadata( 

164 id=bed_object.id, 

165 name=bed_object.name, 

166 stats=bed_stats, 

167 plots=bed_plots, 

168 files=bed_files, 

169 description=bed_object.description, 

170 submission_date=bed_object.submission_date, 

171 last_update_date=bed_object.last_update_date, 

172 raw_metadata=bed_metadata, 

173 genome_alias=bed_object.genome_alias, 

174 genome_digest=bed_object.genome_digest, 

175 bed_type=bed_object.bed_type, 

176 bed_format=bed_object.bed_format, 

177 is_universe=bed_object.is_universe, 

178 license_id=bed_object.license_id or DEFAULT_LICENSE, 

179 universe_metadata=universe_meta, 

180 full_response=full, 

181 bedsets=bed_bedsets, 

182 ) 

183 

184 def get_stats(self, identifier: str) -> BedStatsModel: 

185 """ 

186 Get file statistics by identifier. 

187 

188 :param identifier: bed file identifier 

189 

190 :return: project statistics as BedStats object 

191 """ 

192 statement = select(BedStats).where(BedStats.id == identifier) 

193 

194 with Session(self._sa_engine) as session: 

195 bed_object = session.scalar(statement) 

196 if not bed_object: 

197 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") 

198 bed_stats = BedStatsModel(**bed_object.__dict__) 

199 

200 return bed_stats 

201 

202 def get_plots(self, identifier: str) -> BedPlots: 

203 """ 

204 Get file plots by identifier. 

205 

206 :param identifier: bed file identifier 

207 :return: project plots 

208 """ 

209 statement = select(Bed).where(Bed.id == identifier) 

210 

211 with Session(self._sa_engine) as session: 

212 bed_object = session.scalar(statement) 

213 if not bed_object: 

214 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") 

215 bed_plots = BedPlots() 

216 for result in bed_object.files: 

217 if result.name in BedPlots.model_fields: 

218 setattr( 

219 bed_plots, 

220 result.name, 

221 FileModel( 

222 **result.__dict__, 

223 object_id=f"bed.{identifier}.{result.name}", 

224 access_methods=self._config.construct_access_method_list( 

225 result.path 

226 ), 

227 ), 

228 ) 

229 return bed_plots 

230 

231 def get_files(self, identifier: str) -> BedFiles: 

232 """ 

233 Get file files by identifier. 

234 

235 :param identifier: bed file identifier 

236 :return: project files 

237 """ 

238 statement = select(Bed).where(Bed.id == identifier) 

239 

240 with Session(self._sa_engine) as session: 

241 bed_object = session.scalar(statement) 

242 if not bed_object: 

243 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") 

244 bed_files = BedFiles() 

245 for result in bed_object.files: 

246 if result.name in BedFiles.model_fields: 

247 setattr( 

248 bed_files, 

249 result.name, 

250 FileModel( 

251 **result.__dict__, 

252 object_id=f"bed.{identifier}.{result.name}", 

253 access_methods=self._config.construct_access_method_list( 

254 result.path 

255 ), 

256 ), 

257 ) 

258 return bed_files 

259 

260 def get_raw_metadata(self, identifier: str) -> BedPEPHub: 

261 """ 

262 Get file metadata by identifier. 

263 

264 :param identifier: bed file identifier 

265 :return: project metadata 

266 """ 

267 try: 

268 bed_metadata = self._config.phc.sample.get( 

269 namespace=self._config.config.phc.namespace, 

270 name=self._config.config.phc.name, 

271 tag=self._config.config.phc.tag, 

272 sample_name=identifier, 

273 ) 

274 except Exception as e: 

275 _LOGGER.warning(f"Could not retrieve metadata from pephub. Error: {e}") 

276 bed_metadata = {} 

277 return BedPEPHubRestrict(**bed_metadata) 

278 

279 def get_classification(self, identifier: str) -> BedClassification: 

280 """ 

281 Get file classification by identifier. 

282 

283 :param identifier: bed file identifier 

284 :return: project classification 

285 """ 

286 statement = select(Bed).where(Bed.id == identifier) 

287 

288 with Session(self._sa_engine) as session: 

289 bed_object = session.scalar(statement) 

290 if not bed_object: 

291 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") 

292 bed_classification = BedClassification(**bed_object.__dict__) 

293 

294 return bed_classification 

295 

296 def get_objects(self, identifier: str) -> Dict[str, FileModel]: 

297 """ 

298 Get all object related to bedfile 

299 

300 :param identifier: bed file identifier 

301 :return: project objects dict 

302 """ 

303 statement = select(Bed).where(Bed.id == identifier) 

304 return_dict = {} 

305 

306 with Session(self._sa_engine) as session: 

307 bed_object = session.scalar(statement) 

308 if not bed_object: 

309 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") 

310 for result in bed_object.files: 

311 return_dict[result.name] = FileModel(**result.__dict__) 

312 

313 return return_dict 

314 

315 def get_embedding(self, identifier: str) -> BedEmbeddingResult: 

316 """ 

317 Get bed file embedding of bed file from qdrant. 

318 

319 :param identifier: bed file identifier 

320 :return: bed file embedding 

321 """ 

322 if not self.exists(identifier): 

323 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") 

324 result = self._qdrant_engine.qd_client.retrieve( 

325 collection_name=self._config.config.qdrant.collection, 

326 ids=[identifier], 

327 with_vectors=True, 

328 with_payload=True, 

329 ) 

330 if not result: 

331 raise BEDFileNotFoundError( 

332 f"Bed file with id: {identifier} not found in qdrant database." 

333 ) 

334 return BedEmbeddingResult( 

335 identifier=identifier, embedding=result[0].vector, payload=result[0].payload 

336 ) 

337 

338 def get_ids_list( 

339 self, 

340 limit: int = 100, 

341 offset: int = 0, 

342 genome: str = None, 

343 bed_type: str = None, 

344 ) -> BedListResult: 

345 """ 

346 Get list of bed file identifiers. 

347 

348 :param limit: number of results to return 

349 :param offset: offset to start from 

350 :param genome: filter by genome 

351 :param bed_type: filter by bed type. e.g. 'bed6+4' 

352 :param full: if True, return full metadata, including statistics, files, and raw metadata from pephub 

353 

354 :return: list of bed file identifiers 

355 """ 

356 statement = select(Bed) 

357 count_statement = select(func.count(Bed.id)) 

358 

359 # TODO: make it generic, like in pephub 

360 if genome: 

361 statement = statement.where(Bed.genome_alias == genome) 

362 count_statement = count_statement.where(Bed.genome_alias == genome) 

363 

364 if bed_type: 

365 statement = statement.where(Bed.bed_type == bed_type) 

366 count_statement = count_statement.where(Bed.bed_type == bed_type) 

367 

368 statement = statement.limit(limit).offset(offset) 

369 

370 result_list = [] 

371 with Session(self._sa_engine) as session: 

372 bed_ids = session.scalars(statement) 

373 count = session.execute(count_statement).one() 

374 

375 for result in bed_ids: 

376 result_list.append(BedMetadataBasic(**result.__dict__)) 

377 

378 return BedListResult( 

379 count=count[0], 

380 limit=limit, 

381 offset=offset, 

382 results=result_list, 

383 ) 

384 

385 def add( 

386 self, 

387 identifier: str, 

388 stats: dict, 

389 metadata: dict = None, 

390 plots: dict = None, 

391 files: dict = None, 

392 classification: dict = None, 

393 license_id: str = DEFAULT_LICENSE, 

394 upload_qdrant: bool = False, 

395 upload_pephub: bool = False, 

396 upload_s3: bool = False, 

397 local_path: str = None, 

398 overwrite: bool = False, 

399 nofail: bool = False, 

400 ) -> None: 

401 """ 

402 Add bed file to the database. 

403 

404 :param identifier: bed file identifier 

405 :param stats: bed file results {statistics, plots, files, metadata} 

406 :param metadata: bed file metadata (will be saved in pephub) 

407 :param plots: bed file plots 

408 :param files: bed file files 

409 :param classification: bed file classification 

410 :param license_id: bed file license id (default: 'DUO:0000042'). Full list of licenses: 

411 https://raw.githubusercontent.com/EBISPOT/DUO/master/duo.csv 

412 :param upload_qdrant: add bed file to qdrant indexs 

413 :param upload_pephub: add bed file to pephub 

414 :param upload_s3: upload files to s3 

415 :param local_path: local path to the output files 

416 :param overwrite: overwrite bed file if it already exists 

417 :param nofail: do not raise an error for error in pephub/s3/qdrant or record exsist and not overwrite 

418 :return: None 

419 """ 

420 _LOGGER.info(f"Adding bed file to database. bed_id: {identifier}") 

421 

422 if self.exists(identifier): 

423 _LOGGER.warning(f"Bed file with id: {identifier} exists in the database.") 

424 if not overwrite: 

425 if not nofail: 

426 raise BedFIleExistsError( 

427 f"Bed file with id: {identifier} already exists in the database." 

428 ) 

429 _LOGGER.warning("Overwrite set to False. Skipping..") 

430 return None 

431 else: 

432 self.delete(identifier) 

433 

434 if license_id not in self.bb_agent.list_of_licenses: 

435 raise BedBaseConfError( 

436 f"License: {license_id} is not in the list of licenses. Please provide a valid license." 

437 f"List of licenses: {self.bb_agent.list_of_licenses}" 

438 ) 

439 

440 stats = BedStatsModel(**stats) 

441 # TODO: we should not check for specific keys, of the plots! 

442 plots = BedPlots(**plots) 

443 files = BedFiles(**files) 

444 

445 classification = BedClassification(**classification) 

446 if upload_pephub: 

447 metadata = BedPEPHub(**metadata) 

448 try: 

449 self.upload_pephub( 

450 identifier, 

451 metadata.model_dump(exclude=set("input_file")), 

452 overwrite, 

453 ) 

454 except Exception as e: 

455 _LOGGER.warning( 

456 f"Could not upload to pephub. Error: {e}. nofail: {nofail}" 

457 ) 

458 if not nofail: 

459 raise e 

460 else: 

461 _LOGGER.info("upload_pephub set to false. Skipping pephub..") 

462 

463 if upload_qdrant: 

464 if classification.genome_alias == "hg38": 

465 _LOGGER.info(f"Uploading bed file to qdrant.. [{identifier}]") 

466 self.upload_file_qdrant( 

467 identifier, 

468 files.bed_file.path, 

469 metadata.model_dump(exclude=set("input_file")), 

470 ) 

471 _LOGGER.info(f"File uploaded to qdrant. {identifier}") 

472 else: 

473 _LOGGER.warning( 

474 f"Could not upload to qdrant. Genome: {classification.genome_alias} is not supported." 

475 ) 

476 else: 

477 _LOGGER.info("upload_qdrant set to false. Skipping qdrant..") 

478 

479 # Upload files to s3 

480 if upload_s3: 

481 if files: 

482 files = self._config.upload_files_s3( 

483 identifier, files=files, base_path=local_path, type="files" 

484 ) 

485 

486 if plots: 

487 plots = self._config.upload_files_s3( 

488 identifier, files=plots, base_path=local_path, type="plots" 

489 ) 

490 with Session(self._sa_engine) as session: 

491 new_bed = Bed( 

492 id=identifier, 

493 **classification.model_dump(), 

494 license_id=license_id, 

495 indexed=upload_qdrant, 

496 pephub=upload_pephub, 

497 ) 

498 session.add(new_bed) 

499 if upload_s3: 

500 for k, v in files: 

501 if v: 

502 new_file = Files( 

503 **v.model_dump( 

504 exclude_none=True, 

505 exclude_unset=True, 

506 exclude={"object_id", "access_methods"}, 

507 ), 

508 bedfile_id=identifier, 

509 type="file", 

510 ) 

511 session.add(new_file) 

512 for k, v in plots: 

513 if v: 

514 new_plot = Files( 

515 **v.model_dump( 

516 exclude_none=True, 

517 exclude_unset=True, 

518 exclude={"object_id", "access_methods"}, 

519 ), 

520 bedfile_id=identifier, 

521 type="plot", 

522 ) 

523 session.add(new_plot) 

524 

525 new_bedstat = BedStats(**stats.model_dump(), id=identifier) 

526 session.add(new_bedstat) 

527 

528 session.commit() 

529 

530 return None 

531 

532 def update( 

533 self, 

534 identifier: str, 

535 stats: dict, 

536 metadata: dict = None, 

537 plots: dict = None, 

538 files: dict = None, 

539 classification: dict = None, 

540 add_to_qdrant: bool = False, 

541 upload_pephub: bool = False, 

542 upload_s3: bool = False, 

543 local_path: str = None, 

544 overwrite: bool = False, 

545 nofail: bool = False, 

546 ): 

547 """ 

548 Update bed file to the database. 

549 

550 !! WARNING: this method is in development. Please, void of using it! 

551 

552 :param identifier: bed file identifier 

553 :param stats: bed file results {statistics, plots, files, metadata} 

554 :param metadata: bed file metadata (will be saved in pephub) 

555 :param plots: bed file plots 

556 :param files: bed file files 

557 :param classification: bed file classification 

558 :param add_to_qdrant: add bed file to qdrant indexs 

559 :param upload_pephub: add bed file to pephub 

560 :param upload_s3: upload files to s3 

561 :param local_path: local path to the output files 

562 :param overwrite: overwrite bed file if it already exists 

563 :param nofail: do not raise an error for error in pephub/s3/qdrant or record exsist and not overwrite 

564 :return: None 

565 """ 

566 if not self.exists(identifier): 

567 raise BEDFileNotFoundError( 

568 f"Bed file with id: {identifier} not found. Cannot update." 

569 ) 

570 

571 stats = BedStatsModel(**stats) 

572 plots = BedPlots(**plots) 

573 files = BedFiles(**files) 

574 classification = BedClassification(**classification) 

575 

576 if upload_pephub: 

577 metadata = BedPEPHub(**metadata) 

578 try: 

579 self.update_pephub(identifier, metadata.model_dump(), overwrite) 

580 except Exception as e: 

581 _LOGGER.warning( 

582 f"Could not upload to pephub. Error: {e}. nofail: {nofail}" 

583 ) 

584 if not nofail: 

585 raise e 

586 else: 

587 _LOGGER.info("upload_pephub set to false. Skipping pephub..") 

588 

589 if add_to_qdrant: 

590 self.upload_file_qdrant( 

591 identifier, files.bed_file.path, payload=metadata.model_dump() 

592 ) 

593 

594 statement = select(Bed).where(Bed.id == identifier) 

595 

596 if upload_s3: 

597 _LOGGER.warning("S3 upload is not implemented yet") 

598 # if files: 

599 # files = self._config.upload_files_s3( 

600 # identifier, files=files, base_path=local_path, type="files" 

601 # ) 

602 # 

603 # if plots: 

604 # plots = self._config.upload_files_s3( 

605 # identifier, files=plots, base_path=local_path, type="plots" 

606 # ) 

607 

608 with Session(self._sa_engine) as session: 

609 bed_object = session.scalar(statement) 

610 

611 setattr(bed_object, **stats.model_dump()) 

612 setattr(bed_object, **classification.model_dump()) 

613 

614 bed_object.indexed = add_to_qdrant 

615 bed_object.pephub = upload_pephub 

616 

617 if upload_s3: 

618 _LOGGER.warning("S3 upload is not implemented yet") 

619 # for k, v in files: 

620 # if v: 

621 # new_file = Files( 

622 # **v.model_dump(exclude_none=True, exclude_unset=True), 

623 # bedfile_id=identifier, 

624 # type="file", 

625 # ) 

626 # session.add(new_file) 

627 # for k, v in plots: 

628 # if v: 

629 # new_plot = Files( 

630 # **v.model_dump(exclude_none=True, exclude_unset=True), 

631 # bedfile_id=identifier, 

632 # type="plot", 

633 # ) 

634 # session.add(new_plot) 

635 

636 session.commit() 

637 

638 raise NotImplementedError 

639 

640 def delete(self, identifier: str) -> None: 

641 """ 

642 Delete bed file from the database. 

643 

644 :param identifier: bed file identifier 

645 :return: None 

646 """ 

647 _LOGGER.info(f"Deleting bed file from database. bed_id: {identifier}") 

648 if not self.exists(identifier): 

649 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") 

650 

651 with Session(self._sa_engine) as session: 

652 statement = select(Bed).where(Bed.id == identifier) 

653 bed_object = session.scalar(statement) 

654 

655 files = [FileModel(**k.__dict__) for k in bed_object.files] 

656 delete_pephub = bed_object.pephub 

657 delete_qdrant = bed_object.indexed 

658 

659 session.delete(bed_object) 

660 session.commit() 

661 

662 if delete_pephub: 

663 self.delete_pephub_sample(identifier) 

664 if delete_qdrant: 

665 self.delete_qdrant_point(identifier) 

666 self._config.delete_files_s3(files) 

667 

668 def upload_pephub(self, identifier: str, metadata: dict, overwrite: bool = False): 

669 if not metadata: 

670 _LOGGER.warning("No metadata provided. Skipping pephub upload..") 

671 return False 

672 self._config.phc.sample.create( 

673 namespace=self._config.config.phc.namespace, 

674 name=self._config.config.phc.name, 

675 tag=self._config.config.phc.tag, 

676 sample_name=identifier, 

677 sample_dict=metadata, 

678 overwrite=overwrite, 

679 ) 

680 

681 def update_pephub(self, identifier: str, metadata: dict, overwrite: bool = False): 

682 if not metadata: 

683 _LOGGER.warning("No metadata provided. Skipping pephub upload..") 

684 return False 

685 self._config.phc.sample.update( 

686 namespace=self._config.config.phc.namespace, 

687 name=self._config.config.phc.name, 

688 tag=self._config.config.phc.tag, 

689 sample_name=identifier, 

690 sample_dict=metadata, 

691 ) 

692 

693 def delete_pephub_sample(self, identifier: str): 

694 """ 

695 Delete sample from pephub 

696 

697 :param identifier: bed file identifier 

698 """ 

699 try: 

700 self._config.phc.sample.remove( 

701 namespace=self._config.config.phc.namespace, 

702 name=self._config.config.phc.name, 

703 tag=self._config.config.phc.tag, 

704 sample_name=identifier, 

705 ) 

706 except ResponseError as e: 

707 _LOGGER.warning(f"Could not delete from pephub. Error: {e}") 

708 

709 def upload_file_qdrant( 

710 self, 

711 bed_id: str, 

712 bed_file: Union[str, RegionSet], 

713 payload: dict = None, 

714 ) -> None: 

715 """ 

716 Convert bed file to vector and add it to qdrant database 

717 

718 !Warning: only hg38 genome can be added to qdrant! 

719 

720 :param bed_id: bed file id 

721 :param bed_file: path to the bed file, or RegionSet object 

722 :param payload: additional metadata to store alongside vectors 

723 :return: None 

724 """ 

725 if self._qdrant_engine is None: 

726 raise QdrantInstanceNotInitializedError 

727 

728 if not self._config.r2v: 

729 raise BedBaseConfError( 

730 "Could not add add region to qdrant. Invalid type, or path. " 

731 ) 

732 

733 _LOGGER.debug(f"Adding bed file to qdrant. bed_id: {bed_id}") 

734 if isinstance(bed_file, str): 

735 bed_region_set = GRegionSet(bed_file) 

736 elif isinstance(bed_file, RegionSet) or isinstance(bed_file, GRegionSet): 

737 bed_region_set = bed_file 

738 else: 

739 raise BedBaseConfError( 

740 "Could not add add region to qdrant. Invalid type, or path. " 

741 ) 

742 # Not really working 

743 # bed_embedding = np.mean([self._config.r2v.encode(r) for r in bed_region_set], axis=0) 

744 

745 bed_embedding = np.mean(self._config.r2v.encode(bed_region_set), axis=0) 

746 

747 # Upload bed file vector to the database 

748 vec_dim = bed_embedding.shape[0] 

749 self._qdrant_engine.load( 

750 ids=[bed_id], 

751 vectors=bed_embedding.reshape(1, vec_dim), 

752 payloads=[{**payload}], 

753 ) 

754 return None 

755 

756 def text_to_bed_search( 

757 self, query: str, limit: int = 10, offset: int = 0 

758 ) -> BedListSearchResult: 

759 """ 

760 Search for bed files by text query in qdrant database 

761 

762 :param query: text query 

763 :param limit: number of results to return 

764 :param offset: offset to start from 

765 

766 :return: list of bed file metadata 

767 """ 

768 _LOGGER.info(f"Looking for: {query}") 

769 _LOGGER.info(f"Using backend: {self._config.t2bsi}") 

770 

771 results = self._config.t2bsi.query_search(query, limit=limit, offset=offset) 

772 results_list = [] 

773 for result in results: 

774 result_id = result["id"].replace("-", "") 

775 try: 

776 result_meta = self.get(result_id) 

777 except BEDFileNotFoundError as e: 

778 _LOGGER.warning( 

779 f"Could not retrieve metadata for bed file: {result_id}. Error: {e}" 

780 ) 

781 continue 

782 if result_meta: 

783 results_list.append(QdrantSearchResult(**result, metadata=result_meta)) 

784 return BedListSearchResult( 

785 count=self.bb_agent.get_stats.bedfiles_number, 

786 limit=limit, 

787 offset=offset, 

788 results=results_list, 

789 ) 

790 

791 def bed_to_bed_search( 

792 self, 

793 region_set: RegionSet, 

794 limit: int = 10, 

795 offset: int = 0, 

796 ) -> BedListSearchResult: 

797 results = self._config.b2bsi.query_search( 

798 region_set, limit=limit, offset=offset 

799 ) 

800 results_list = [] 

801 for result in results: 

802 result_id = result["id"].replace("-", "") 

803 try: 

804 result_meta = self.get(result_id) 

805 except BEDFileNotFoundError as e: 

806 _LOGGER.warning( 

807 f"Could not retrieve metadata for bed file: {result_id}. Error: {e}" 

808 ) 

809 continue 

810 if result_meta: 

811 results_list.append(QdrantSearchResult(**result, metadata=result_meta)) 

812 return BedListSearchResult( 

813 count=self.bb_agent.get_stats.bedfiles_number, 

814 limit=limit, 

815 offset=offset, 

816 results=results_list, 

817 ) 

818 

819 def reindex_qdrant(self) -> None: 

820 """ 

821 Re-upload all files to quadrant. 

822 !Warning: only hg38 genome can be added to qdrant! 

823 

824 If you want want to fully reindex/reupload to qdrant, first delete collection and create new one. 

825 

826 Upload all files to qdrant. 

827 """ 

828 bb_client = BBClient() 

829 

830 statement = select(Bed.id).where(Bed.genome_alias == QDRANT_GENOME) 

831 

832 with Session(self._db_engine.engine) as session: 

833 bed_ids = session.execute(statement).all() 

834 

835 bed_ids = [bed_result[0] for bed_result in bed_ids] 

836 

837 with tqdm(total=len(bed_ids), position=0, leave=True) as pbar: 

838 for record_id in bed_ids: 

839 try: 

840 bed_region_set_obj = GRegionSet(bb_client.seek(record_id)) 

841 except FileNotFoundError: 

842 bed_region_set_obj = bb_client.load_bed(record_id) 

843 

844 pbar.set_description(f"Processing file: {record_id}") 

845 metadata = self._config.phc.sample.get( 

846 namespace=self._config.config.phc.namespace, 

847 name=self._config.config.phc.name, 

848 tag=self._config.config.phc.tag, 

849 sample_name=record_id, 

850 ) 

851 

852 self.upload_file_qdrant( 

853 bed_id=record_id, 

854 bed_file=bed_region_set_obj, 

855 payload=BedPEPHubRestrict(**metadata).model_dump(), 

856 ) 

857 pbar.write(f"File: {record_id} uploaded to qdrant successfully.") 

858 pbar.update(1) 

859 

860 return None 

861 

862 def delete_qdrant_point(self, identifier: str) -> None: 

863 """ 

864 Delete bed file from qdrant. 

865 

866 :param identifier: bed file identifier 

867 :return: None 

868 """ 

869 

870 result = self._config.qdrant_engine.qd_client.delete( 

871 collection_name=self._config.config.qdrant.collection, 

872 points_selector=PointIdsList( 

873 points=[identifier], 

874 ), 

875 ) 

876 return result 

877 

878 def create_qdrant_collection(self) -> bool: 

879 """ 

880 Create qdrant collection for bed files. 

881 """ 

882 return self._config.qdrant_engine.qd_client.create_collection( 

883 collection_name=self._config.config.qdrant.collection, 

884 vectors_config=VectorParams(size=100, distance=Distance.DOT), 

885 ) 

886 

887 def exists(self, identifier: str) -> bool: 

888 """ 

889 Check if bed file exists in the database. 

890 

891 :param identifier: bed file identifier 

892 :return: True if bed file exists, False otherwise 

893 """ 

894 statement = select(Bed).where(Bed.id == identifier) 

895 

896 with Session(self._sa_engine) as session: 

897 bed_object = session.scalar(statement) 

898 if not bed_object: 

899 return False 

900 return True 

901 

902 def exists_universe(self, identifier: str) -> bool: 

903 """ 

904 Check if universe exists in the database. 

905 

906 :param identifier: universe identifier 

907 

908 :return: True if universe exists, False otherwise 

909 """ 

910 statement = select(Universes).where(Universes.id == identifier) 

911 

912 with Session(self._sa_engine) as session: 

913 bed_object = session.scalar(statement) 

914 if not bed_object: 

915 return False 

916 return True 

917 

918 def add_universe( 

919 self, bedfile_id: str, bedset_id: str = None, construct_method: str = None 

920 ) -> str: 

921 """ 

922 Add universe to the database. 

923 

924 :param bedfile_id: bed file identifier 

925 :param bedset_id: bedset identifier 

926 :param construct_method: method used to construct the universe 

927 

928 :return: universe identifier. 

929 """ 

930 

931 if not self.exists(bedfile_id): 

932 raise BEDFileNotFoundError 

933 with Session(self._sa_engine) as session: 

934 new_univ = Universes( 

935 id=bedfile_id, bedset_id=bedset_id, method=construct_method 

936 ) 

937 session.add(new_univ) 

938 session.commit() 

939 

940 _LOGGER.info(f"Universe added to the database successfully. id: {bedfile_id}") 

941 return bedfile_id 

942 

943 def delete_universe(self, identifier: str) -> None: 

944 """ 

945 Delete universe from the database. 

946 

947 :param identifier: universe identifier 

948 :return: None 

949 """ 

950 if not self.exists_universe(identifier): 

951 raise UniverseNotFoundError(f"Universe not found. id: {identifier}") 

952 

953 with Session(self._sa_engine) as session: 

954 statement = delete(Universes).where(Universes.id == identifier) 

955 session.execute(statement) 

956 session.commit() 

957 

958 def add_tokenized( 

959 self, bed_id: str, universe_id: str, token_vector: list, overwrite: bool = False 

960 ) -> str: 

961 """ 

962 Add tokenized bed file to the database 

963 

964 :param bed_id: bed file identifier 

965 :param universe_id: universe identifier 

966 :param token_vector: list of tokens 

967 :param overwrite: overwrite tokenized file if it already exists 

968 

969 :return: token path 

970 """ 

971 

972 with Session(self._sa_engine) as session: 

973 if not self.exists_universe(universe_id): 

974 raise UniverseNotFoundError( 

975 f"Universe not found in the database. id: {universe_id}" 

976 f"Please add universe first." 

977 ) 

978 

979 if self.exist_tokenized(bed_id, universe_id): 

980 if not overwrite: 

981 if not overwrite: 

982 raise TokenizeFileExistsError( 

983 "Tokenized file already exists in the database. " 

984 "Set overwrite to True to overwrite it." 

985 ) 

986 else: 

987 self.delete_tokenized(bed_id, universe_id) 

988 

989 path = self._add_zarr_s3( 

990 bed_id=bed_id, 

991 universe_id=universe_id, 

992 tokenized_vector=token_vector, 

993 overwrite=overwrite, 

994 ) 

995 path = os.path.join(f"s3://{self._config.config.s3.bucket}", path) 

996 new_token = TokenizedBed(bed_id=bed_id, universe_id=universe_id, path=path) 

997 

998 session.add(new_token) 

999 session.commit() 

1000 return path 

1001 

1002 def _add_zarr_s3( 

1003 self, 

1004 universe_id: str, 

1005 bed_id: str, 

1006 tokenized_vector: list, 

1007 overwrite: bool = False, 

1008 ) -> str: 

1009 """ 

1010 Add zarr file to the database 

1011 

1012 :param universe_id: universe identifier 

1013 :param bed_id: bed file identifier 

1014 :param tokenized_vector: tokenized vector 

1015 

1016 :return: zarr path 

1017 """ 

1018 univers_group = self._config.zarr_root.require_group(universe_id) 

1019 

1020 if not univers_group.get(bed_id): 

1021 _LOGGER.info("Saving tokenized vector to s3") 

1022 path = univers_group.create_dataset(bed_id, data=tokenized_vector).path 

1023 elif overwrite: 

1024 _LOGGER.info("Overwriting tokenized vector in s3") 

1025 path = univers_group.create_dataset( 

1026 bed_id, data=tokenized_vector, overwrite=True 

1027 ).path 

1028 else: 

1029 raise TokenizeFileExistsError( 

1030 "Tokenized file already exists in the database. " 

1031 "Set overwrite to True to overwrite it." 

1032 ) 

1033 

1034 return os.path.join(ZARR_TOKENIZED_FOLDER, path) 

1035 

1036 def get_tokenized(self, bed_id: str, universe_id: str) -> TokenizedBedResponse: 

1037 """ 

1038 Get zarr file from the database 

1039 

1040 :param bed_id: bed file identifier 

1041 :param universe_id: universe identifier 

1042 

1043 :return: zarr path 

1044 """ 

1045 if not self.exist_tokenized(bed_id, universe_id): 

1046 raise TokenizeFileNotExistError("Tokenized file not found in the database.") 

1047 univers_group = self._config.zarr_root.require_group(universe_id) 

1048 

1049 return TokenizedBedResponse( 

1050 universe_id=universe_id, 

1051 bed_id=bed_id, 

1052 tokenized_bed=list(univers_group[bed_id]), 

1053 ) 

1054 

1055 def delete_tokenized(self, bed_id: str, universe_id: str) -> None: 

1056 """ 

1057 Delete tokenized bed file from the database 

1058 

1059 :param bed_id: bed file identifier 

1060 :param universe_id: universe identifier 

1061 

1062 :return: None 

1063 """ 

1064 if not self.exist_tokenized(bed_id, universe_id): 

1065 raise TokenizeFileNotExistError("Tokenized file not found in the database.") 

1066 univers_group = self._config.zarr_root.require_group(universe_id) 

1067 

1068 del univers_group[bed_id] 

1069 

1070 with Session(self._sa_engine) as session: 

1071 statement = delete(TokenizedBed).where( 

1072 and_( 

1073 TokenizedBed.bed_id == bed_id, 

1074 TokenizedBed.universe_id == universe_id, 

1075 ) 

1076 ) 

1077 session.execute(statement) 

1078 session.commit() 

1079 

1080 return None 

1081 

1082 def _get_tokenized_path(self, bed_id: str, universe_id: str) -> str: 

1083 """ 

1084 Get tokenized path to tokenized file 

1085 

1086 :param bed_id: bed file identifier 

1087 :param universe_id: universe identifier 

1088 

1089 :return: token path 

1090 """ 

1091 if not self.exist_tokenized(bed_id, universe_id): 

1092 raise TokenizeFileNotExistError("Tokenized file not found in the database.") 

1093 

1094 with Session(self._sa_engine) as session: 

1095 statement = select(TokenizedBed).where( 

1096 and_( 

1097 TokenizedBed.bed_id == bed_id, 

1098 TokenizedBed.universe_id == universe_id, 

1099 ), 

1100 ) 

1101 tokenized_object = session.scalar(statement) 

1102 return tokenized_object.path 

1103 

1104 def exist_tokenized(self, bed_id: str, universe_id: str) -> bool: 

1105 """ 

1106 Check if tokenized bed file exists in the database 

1107 

1108 :param bed_id: bed file identifier 

1109 :param universe_id: universe identifier 

1110 

1111 :return: bool 

1112 """ 

1113 with Session(self._sa_engine) as session: 

1114 statement = select(TokenizedBed).where( 

1115 and_( 

1116 TokenizedBed.bed_id == bed_id, 

1117 TokenizedBed.universe_id == universe_id, 

1118 ) 

1119 ) 

1120 tokenized_object = session.scalar(statement) 

1121 if not tokenized_object: 

1122 return False 

1123 return True 

1124 

1125 def get_tokenized_link( 

1126 self, bed_id: str, universe_id: str 

1127 ) -> TokenizedPathResponse: 

1128 """ 

1129 Get tokenized link to tokenized file 

1130 

1131 :param bed_id: bed file identifier 

1132 :param universe_id: universe identifier 

1133 

1134 :return: token link 

1135 :raises: TokenizeFileNotExistError 

1136 """ 

1137 file_path = self._get_tokenized_path(bed_id, universe_id) 

1138 

1139 return TokenizedPathResponse( 

1140 endpoint_url=self._config.config.s3.endpoint_url, 

1141 file_path=file_path, 

1142 bed_id=bed_id, 

1143 universe_id=universe_id, 

1144 )