Coverage for bbconf/modules/bedfiles.py: 13%
410 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-17 04:01 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-17 04:01 +0000
1import os
2from logging import getLogger
3from typing import Dict, Union
5import numpy as np
6from tqdm import tqdm
8from geniml.bbclient import BBClient
9from geniml.io import RegionSet
10from genimtools.tokenizers import RegionSet as GRegionSet
11from pephubclient.exceptions import ResponseError
12from qdrant_client.models import Distance, PointIdsList, VectorParams
13from sqlalchemy import and_, delete, func, select
14from sqlalchemy.orm import Session
16from bbconf.config_parser.bedbaseconfig import BedBaseConfig
17from bbconf.const import PKG_NAME, ZARR_TOKENIZED_FOLDER, DEFAULT_LICENSE
18from bbconf.db_utils import Bed, BedStats, Files, TokenizedBed, Universes
19from bbconf.exceptions import (
20 BedBaseConfError,
21 BedFIleExistsError,
22 BEDFileNotFoundError,
23 TokenizeFileExistsError,
24 TokenizeFileNotExistError,
25 UniverseNotFoundError,
26 QdrantInstanceNotInitializedError,
27)
28from bbconf.models.bed_models import (
29 BedClassification,
30 BedEmbeddingResult,
31 BedFiles,
32 BedListResult,
33 BedListSearchResult,
34 BedMetadata,
35 BedMetadataBasic,
36 BedPEPHub,
37 BedPlots,
38 BedStatsModel,
39 FileModel,
40 QdrantSearchResult,
41 TokenizedBedResponse,
42 UniverseMetadata,
43 TokenizedPathResponse,
44 BedPEPHubRestrict,
45 BedSetMinimal,
46)
48_LOGGER = getLogger(PKG_NAME)
50QDRANT_GENOME = "hg38"
53class BedAgentBedFile:
54 """
55 Class that represents Bedfile in Database.
57 This class has method to add, delete, get files and metadata from the database.
58 """
60 def __init__(self, config: BedBaseConfig, bbagent_obj=None):
61 """
62 :param config: config object with database and qdrant engine and credentials
63 :param bbagent_obj: BedBaseAgent object (Parent object)
64 """
65 self._sa_engine = config.db_engine.engine
66 self._db_engine = config.db_engine
67 self._qdrant_engine = config.qdrant_engine
68 self._boto3_client = config.boto3_client
69 self._config = config
70 self.bb_agent = bbagent_obj
72 def get(self, identifier: str, full: bool = False) -> BedMetadata:
73 """
74 Get file metadata by identifier.
76 :param identifier: bed file identifier
77 :param full: if True, return full metadata, including statistics, files, and raw metadata from pephub
78 :return: project metadata
79 """
80 statement = select(Bed).where(Bed.id == identifier)
82 bed_plots = BedPlots()
83 bed_files = BedFiles()
85 with Session(self._sa_engine) as session:
86 bed_object = session.scalar(statement)
87 if not bed_object:
88 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.")
90 if full:
91 for result in bed_object.files:
92 # PLOTS
93 if result.name in BedPlots.model_fields:
94 setattr(
95 bed_plots,
96 result.name,
97 FileModel(
98 **result.__dict__,
99 object_id=f"bed.{identifier}.{result.name}",
100 access_methods=self._config.construct_access_method_list(
101 result.path
102 ),
103 ),
104 )
105 # FILES
106 elif result.name in BedFiles.model_fields:
107 (
108 setattr(
109 bed_files,
110 result.name,
111 FileModel(
112 **result.__dict__,
113 object_id=f"bed.{identifier}.{result.name}",
114 access_methods=self._config.construct_access_method_list(
115 result.path
116 ),
117 ),
118 ),
119 )
121 else:
122 _LOGGER.error(
123 f"Unknown file type: {result.name}. And is not in the model fields. Skipping.."
124 )
125 bed_stats = BedStatsModel(**bed_object.stats.__dict__)
126 bed_bedsets = []
127 for relation in bed_object.bedsets:
128 bed_bedsets.append(
129 BedSetMinimal(
130 id=relation.bedset.id,
131 description=relation.bedset.description,
132 name=relation.bedset.name,
133 )
134 )
136 if bed_object.universe:
137 universe_meta = UniverseMetadata(**bed_object.universe.__dict__)
138 else:
139 universe_meta = UniverseMetadata()
140 else:
141 bed_plots = None
142 bed_files = None
143 bed_stats = None
144 universe_meta = None
145 bed_bedsets = []
147 try:
148 if full:
149 bed_metadata = BedPEPHubRestrict(
150 **self._config.phc.sample.get(
151 namespace=self._config.config.phc.namespace,
152 name=self._config.config.phc.name,
153 tag=self._config.config.phc.tag,
154 sample_name=identifier,
155 )
156 )
157 else:
158 bed_metadata = None
159 except Exception as e:
160 _LOGGER.warning(f"Could not retrieve metadata from pephub. Error: {e}")
161 bed_metadata = None
163 return BedMetadata(
164 id=bed_object.id,
165 name=bed_object.name,
166 stats=bed_stats,
167 plots=bed_plots,
168 files=bed_files,
169 description=bed_object.description,
170 submission_date=bed_object.submission_date,
171 last_update_date=bed_object.last_update_date,
172 raw_metadata=bed_metadata,
173 genome_alias=bed_object.genome_alias,
174 genome_digest=bed_object.genome_digest,
175 bed_type=bed_object.bed_type,
176 bed_format=bed_object.bed_format,
177 is_universe=bed_object.is_universe,
178 license_id=bed_object.license_id or DEFAULT_LICENSE,
179 universe_metadata=universe_meta,
180 full_response=full,
181 bedsets=bed_bedsets,
182 )
184 def get_stats(self, identifier: str) -> BedStatsModel:
185 """
186 Get file statistics by identifier.
188 :param identifier: bed file identifier
190 :return: project statistics as BedStats object
191 """
192 statement = select(BedStats).where(BedStats.id == identifier)
194 with Session(self._sa_engine) as session:
195 bed_object = session.scalar(statement)
196 if not bed_object:
197 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.")
198 bed_stats = BedStatsModel(**bed_object.__dict__)
200 return bed_stats
202 def get_plots(self, identifier: str) -> BedPlots:
203 """
204 Get file plots by identifier.
206 :param identifier: bed file identifier
207 :return: project plots
208 """
209 statement = select(Bed).where(Bed.id == identifier)
211 with Session(self._sa_engine) as session:
212 bed_object = session.scalar(statement)
213 if not bed_object:
214 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.")
215 bed_plots = BedPlots()
216 for result in bed_object.files:
217 if result.name in BedPlots.model_fields:
218 setattr(
219 bed_plots,
220 result.name,
221 FileModel(
222 **result.__dict__,
223 object_id=f"bed.{identifier}.{result.name}",
224 access_methods=self._config.construct_access_method_list(
225 result.path
226 ),
227 ),
228 )
229 return bed_plots
231 def get_files(self, identifier: str) -> BedFiles:
232 """
233 Get file files by identifier.
235 :param identifier: bed file identifier
236 :return: project files
237 """
238 statement = select(Bed).where(Bed.id == identifier)
240 with Session(self._sa_engine) as session:
241 bed_object = session.scalar(statement)
242 if not bed_object:
243 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.")
244 bed_files = BedFiles()
245 for result in bed_object.files:
246 if result.name in BedFiles.model_fields:
247 setattr(
248 bed_files,
249 result.name,
250 FileModel(
251 **result.__dict__,
252 object_id=f"bed.{identifier}.{result.name}",
253 access_methods=self._config.construct_access_method_list(
254 result.path
255 ),
256 ),
257 )
258 return bed_files
260 def get_raw_metadata(self, identifier: str) -> BedPEPHub:
261 """
262 Get file metadata by identifier.
264 :param identifier: bed file identifier
265 :return: project metadata
266 """
267 try:
268 bed_metadata = self._config.phc.sample.get(
269 namespace=self._config.config.phc.namespace,
270 name=self._config.config.phc.name,
271 tag=self._config.config.phc.tag,
272 sample_name=identifier,
273 )
274 except Exception as e:
275 _LOGGER.warning(f"Could not retrieve metadata from pephub. Error: {e}")
276 bed_metadata = {}
277 return BedPEPHubRestrict(**bed_metadata)
279 def get_classification(self, identifier: str) -> BedClassification:
280 """
281 Get file classification by identifier.
283 :param identifier: bed file identifier
284 :return: project classification
285 """
286 statement = select(Bed).where(Bed.id == identifier)
288 with Session(self._sa_engine) as session:
289 bed_object = session.scalar(statement)
290 if not bed_object:
291 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.")
292 bed_classification = BedClassification(**bed_object.__dict__)
294 return bed_classification
296 def get_objects(self, identifier: str) -> Dict[str, FileModel]:
297 """
298 Get all object related to bedfile
300 :param identifier: bed file identifier
301 :return: project objects dict
302 """
303 statement = select(Bed).where(Bed.id == identifier)
304 return_dict = {}
306 with Session(self._sa_engine) as session:
307 bed_object = session.scalar(statement)
308 if not bed_object:
309 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.")
310 for result in bed_object.files:
311 return_dict[result.name] = FileModel(**result.__dict__)
313 return return_dict
315 def get_embedding(self, identifier: str) -> BedEmbeddingResult:
316 """
317 Get bed file embedding of bed file from qdrant.
319 :param identifier: bed file identifier
320 :return: bed file embedding
321 """
322 if not self.exists(identifier):
323 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.")
324 result = self._qdrant_engine.qd_client.retrieve(
325 collection_name=self._config.config.qdrant.collection,
326 ids=[identifier],
327 with_vectors=True,
328 with_payload=True,
329 )
330 if not result:
331 raise BEDFileNotFoundError(
332 f"Bed file with id: {identifier} not found in qdrant database."
333 )
334 return BedEmbeddingResult(
335 identifier=identifier, embedding=result[0].vector, payload=result[0].payload
336 )
338 def get_ids_list(
339 self,
340 limit: int = 100,
341 offset: int = 0,
342 genome: str = None,
343 bed_type: str = None,
344 ) -> BedListResult:
345 """
346 Get list of bed file identifiers.
348 :param limit: number of results to return
349 :param offset: offset to start from
350 :param genome: filter by genome
351 :param bed_type: filter by bed type. e.g. 'bed6+4'
352 :param full: if True, return full metadata, including statistics, files, and raw metadata from pephub
354 :return: list of bed file identifiers
355 """
356 statement = select(Bed)
357 count_statement = select(func.count(Bed.id))
359 # TODO: make it generic, like in pephub
360 if genome:
361 statement = statement.where(Bed.genome_alias == genome)
362 count_statement = count_statement.where(Bed.genome_alias == genome)
364 if bed_type:
365 statement = statement.where(Bed.bed_type == bed_type)
366 count_statement = count_statement.where(Bed.bed_type == bed_type)
368 statement = statement.limit(limit).offset(offset)
370 result_list = []
371 with Session(self._sa_engine) as session:
372 bed_ids = session.scalars(statement)
373 count = session.execute(count_statement).one()
375 for result in bed_ids:
376 result_list.append(BedMetadataBasic(**result.__dict__))
378 return BedListResult(
379 count=count[0],
380 limit=limit,
381 offset=offset,
382 results=result_list,
383 )
385 def add(
386 self,
387 identifier: str,
388 stats: dict,
389 metadata: dict = None,
390 plots: dict = None,
391 files: dict = None,
392 classification: dict = None,
393 license_id: str = DEFAULT_LICENSE,
394 upload_qdrant: bool = False,
395 upload_pephub: bool = False,
396 upload_s3: bool = False,
397 local_path: str = None,
398 overwrite: bool = False,
399 nofail: bool = False,
400 ) -> None:
401 """
402 Add bed file to the database.
404 :param identifier: bed file identifier
405 :param stats: bed file results {statistics, plots, files, metadata}
406 :param metadata: bed file metadata (will be saved in pephub)
407 :param plots: bed file plots
408 :param files: bed file files
409 :param classification: bed file classification
410 :param license_id: bed file license id (default: 'DUO:0000042'). Full list of licenses:
411 https://raw.githubusercontent.com/EBISPOT/DUO/master/duo.csv
412 :param upload_qdrant: add bed file to qdrant indexs
413 :param upload_pephub: add bed file to pephub
414 :param upload_s3: upload files to s3
415 :param local_path: local path to the output files
416 :param overwrite: overwrite bed file if it already exists
417 :param nofail: do not raise an error for error in pephub/s3/qdrant or record exsist and not overwrite
418 :return: None
419 """
420 _LOGGER.info(f"Adding bed file to database. bed_id: {identifier}")
422 if self.exists(identifier):
423 _LOGGER.warning(f"Bed file with id: {identifier} exists in the database.")
424 if not overwrite:
425 if not nofail:
426 raise BedFIleExistsError(
427 f"Bed file with id: {identifier} already exists in the database."
428 )
429 _LOGGER.warning("Overwrite set to False. Skipping..")
430 return None
431 else:
432 self.delete(identifier)
434 if license_id not in self.bb_agent.list_of_licenses:
435 raise BedBaseConfError(
436 f"License: {license_id} is not in the list of licenses. Please provide a valid license."
437 f"List of licenses: {self.bb_agent.list_of_licenses}"
438 )
440 stats = BedStatsModel(**stats)
441 # TODO: we should not check for specific keys, of the plots!
442 plots = BedPlots(**plots)
443 files = BedFiles(**files)
445 classification = BedClassification(**classification)
446 if upload_pephub:
447 metadata = BedPEPHub(**metadata)
448 try:
449 self.upload_pephub(
450 identifier,
451 metadata.model_dump(exclude=set("input_file")),
452 overwrite,
453 )
454 except Exception as e:
455 _LOGGER.warning(
456 f"Could not upload to pephub. Error: {e}. nofail: {nofail}"
457 )
458 if not nofail:
459 raise e
460 else:
461 _LOGGER.info("upload_pephub set to false. Skipping pephub..")
463 if upload_qdrant:
464 if classification.genome_alias == "hg38":
465 _LOGGER.info(f"Uploading bed file to qdrant.. [{identifier}]")
466 self.upload_file_qdrant(
467 identifier,
468 files.bed_file.path,
469 metadata.model_dump(exclude=set("input_file")),
470 )
471 _LOGGER.info(f"File uploaded to qdrant. {identifier}")
472 else:
473 _LOGGER.warning(
474 f"Could not upload to qdrant. Genome: {classification.genome_alias} is not supported."
475 )
476 else:
477 _LOGGER.info("upload_qdrant set to false. Skipping qdrant..")
479 # Upload files to s3
480 if upload_s3:
481 if files:
482 files = self._config.upload_files_s3(
483 identifier, files=files, base_path=local_path, type="files"
484 )
486 if plots:
487 plots = self._config.upload_files_s3(
488 identifier, files=plots, base_path=local_path, type="plots"
489 )
490 with Session(self._sa_engine) as session:
491 new_bed = Bed(
492 id=identifier,
493 **classification.model_dump(),
494 license_id=license_id,
495 indexed=upload_qdrant,
496 pephub=upload_pephub,
497 )
498 session.add(new_bed)
499 if upload_s3:
500 for k, v in files:
501 if v:
502 new_file = Files(
503 **v.model_dump(
504 exclude_none=True,
505 exclude_unset=True,
506 exclude={"object_id", "access_methods"},
507 ),
508 bedfile_id=identifier,
509 type="file",
510 )
511 session.add(new_file)
512 for k, v in plots:
513 if v:
514 new_plot = Files(
515 **v.model_dump(
516 exclude_none=True,
517 exclude_unset=True,
518 exclude={"object_id", "access_methods"},
519 ),
520 bedfile_id=identifier,
521 type="plot",
522 )
523 session.add(new_plot)
525 new_bedstat = BedStats(**stats.model_dump(), id=identifier)
526 session.add(new_bedstat)
528 session.commit()
530 return None
532 def update(
533 self,
534 identifier: str,
535 stats: dict,
536 metadata: dict = None,
537 plots: dict = None,
538 files: dict = None,
539 classification: dict = None,
540 add_to_qdrant: bool = False,
541 upload_pephub: bool = False,
542 upload_s3: bool = False,
543 local_path: str = None,
544 overwrite: bool = False,
545 nofail: bool = False,
546 ):
547 """
548 Update bed file to the database.
550 !! WARNING: this method is in development. Please, void of using it!
552 :param identifier: bed file identifier
553 :param stats: bed file results {statistics, plots, files, metadata}
554 :param metadata: bed file metadata (will be saved in pephub)
555 :param plots: bed file plots
556 :param files: bed file files
557 :param classification: bed file classification
558 :param add_to_qdrant: add bed file to qdrant indexs
559 :param upload_pephub: add bed file to pephub
560 :param upload_s3: upload files to s3
561 :param local_path: local path to the output files
562 :param overwrite: overwrite bed file if it already exists
563 :param nofail: do not raise an error for error in pephub/s3/qdrant or record exsist and not overwrite
564 :return: None
565 """
566 if not self.exists(identifier):
567 raise BEDFileNotFoundError(
568 f"Bed file with id: {identifier} not found. Cannot update."
569 )
571 stats = BedStatsModel(**stats)
572 plots = BedPlots(**plots)
573 files = BedFiles(**files)
574 classification = BedClassification(**classification)
576 if upload_pephub:
577 metadata = BedPEPHub(**metadata)
578 try:
579 self.update_pephub(identifier, metadata.model_dump(), overwrite)
580 except Exception as e:
581 _LOGGER.warning(
582 f"Could not upload to pephub. Error: {e}. nofail: {nofail}"
583 )
584 if not nofail:
585 raise e
586 else:
587 _LOGGER.info("upload_pephub set to false. Skipping pephub..")
589 if add_to_qdrant:
590 self.upload_file_qdrant(
591 identifier, files.bed_file.path, payload=metadata.model_dump()
592 )
594 statement = select(Bed).where(Bed.id == identifier)
596 if upload_s3:
597 _LOGGER.warning("S3 upload is not implemented yet")
598 # if files:
599 # files = self._config.upload_files_s3(
600 # identifier, files=files, base_path=local_path, type="files"
601 # )
602 #
603 # if plots:
604 # plots = self._config.upload_files_s3(
605 # identifier, files=plots, base_path=local_path, type="plots"
606 # )
608 with Session(self._sa_engine) as session:
609 bed_object = session.scalar(statement)
611 setattr(bed_object, **stats.model_dump())
612 setattr(bed_object, **classification.model_dump())
614 bed_object.indexed = add_to_qdrant
615 bed_object.pephub = upload_pephub
617 if upload_s3:
618 _LOGGER.warning("S3 upload is not implemented yet")
619 # for k, v in files:
620 # if v:
621 # new_file = Files(
622 # **v.model_dump(exclude_none=True, exclude_unset=True),
623 # bedfile_id=identifier,
624 # type="file",
625 # )
626 # session.add(new_file)
627 # for k, v in plots:
628 # if v:
629 # new_plot = Files(
630 # **v.model_dump(exclude_none=True, exclude_unset=True),
631 # bedfile_id=identifier,
632 # type="plot",
633 # )
634 # session.add(new_plot)
636 session.commit()
638 raise NotImplementedError
640 def delete(self, identifier: str) -> None:
641 """
642 Delete bed file from the database.
644 :param identifier: bed file identifier
645 :return: None
646 """
647 _LOGGER.info(f"Deleting bed file from database. bed_id: {identifier}")
648 if not self.exists(identifier):
649 raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.")
651 with Session(self._sa_engine) as session:
652 statement = select(Bed).where(Bed.id == identifier)
653 bed_object = session.scalar(statement)
655 files = [FileModel(**k.__dict__) for k in bed_object.files]
656 delete_pephub = bed_object.pephub
657 delete_qdrant = bed_object.indexed
659 session.delete(bed_object)
660 session.commit()
662 if delete_pephub:
663 self.delete_pephub_sample(identifier)
664 if delete_qdrant:
665 self.delete_qdrant_point(identifier)
666 self._config.delete_files_s3(files)
668 def upload_pephub(self, identifier: str, metadata: dict, overwrite: bool = False):
669 if not metadata:
670 _LOGGER.warning("No metadata provided. Skipping pephub upload..")
671 return False
672 self._config.phc.sample.create(
673 namespace=self._config.config.phc.namespace,
674 name=self._config.config.phc.name,
675 tag=self._config.config.phc.tag,
676 sample_name=identifier,
677 sample_dict=metadata,
678 overwrite=overwrite,
679 )
681 def update_pephub(self, identifier: str, metadata: dict, overwrite: bool = False):
682 if not metadata:
683 _LOGGER.warning("No metadata provided. Skipping pephub upload..")
684 return False
685 self._config.phc.sample.update(
686 namespace=self._config.config.phc.namespace,
687 name=self._config.config.phc.name,
688 tag=self._config.config.phc.tag,
689 sample_name=identifier,
690 sample_dict=metadata,
691 )
693 def delete_pephub_sample(self, identifier: str):
694 """
695 Delete sample from pephub
697 :param identifier: bed file identifier
698 """
699 try:
700 self._config.phc.sample.remove(
701 namespace=self._config.config.phc.namespace,
702 name=self._config.config.phc.name,
703 tag=self._config.config.phc.tag,
704 sample_name=identifier,
705 )
706 except ResponseError as e:
707 _LOGGER.warning(f"Could not delete from pephub. Error: {e}")
709 def upload_file_qdrant(
710 self,
711 bed_id: str,
712 bed_file: Union[str, RegionSet],
713 payload: dict = None,
714 ) -> None:
715 """
716 Convert bed file to vector and add it to qdrant database
718 !Warning: only hg38 genome can be added to qdrant!
720 :param bed_id: bed file id
721 :param bed_file: path to the bed file, or RegionSet object
722 :param payload: additional metadata to store alongside vectors
723 :return: None
724 """
725 if self._qdrant_engine is None:
726 raise QdrantInstanceNotInitializedError
728 if not self._config.r2v:
729 raise BedBaseConfError(
730 "Could not add add region to qdrant. Invalid type, or path. "
731 )
733 _LOGGER.debug(f"Adding bed file to qdrant. bed_id: {bed_id}")
734 if isinstance(bed_file, str):
735 bed_region_set = GRegionSet(bed_file)
736 elif isinstance(bed_file, RegionSet) or isinstance(bed_file, GRegionSet):
737 bed_region_set = bed_file
738 else:
739 raise BedBaseConfError(
740 "Could not add add region to qdrant. Invalid type, or path. "
741 )
742 # Not really working
743 # bed_embedding = np.mean([self._config.r2v.encode(r) for r in bed_region_set], axis=0)
745 bed_embedding = np.mean(self._config.r2v.encode(bed_region_set), axis=0)
747 # Upload bed file vector to the database
748 vec_dim = bed_embedding.shape[0]
749 self._qdrant_engine.load(
750 ids=[bed_id],
751 vectors=bed_embedding.reshape(1, vec_dim),
752 payloads=[{**payload}],
753 )
754 return None
756 def text_to_bed_search(
757 self, query: str, limit: int = 10, offset: int = 0
758 ) -> BedListSearchResult:
759 """
760 Search for bed files by text query in qdrant database
762 :param query: text query
763 :param limit: number of results to return
764 :param offset: offset to start from
766 :return: list of bed file metadata
767 """
768 _LOGGER.info(f"Looking for: {query}")
769 _LOGGER.info(f"Using backend: {self._config.t2bsi}")
771 results = self._config.t2bsi.query_search(query, limit=limit, offset=offset)
772 results_list = []
773 for result in results:
774 result_id = result["id"].replace("-", "")
775 try:
776 result_meta = self.get(result_id)
777 except BEDFileNotFoundError as e:
778 _LOGGER.warning(
779 f"Could not retrieve metadata for bed file: {result_id}. Error: {e}"
780 )
781 continue
782 if result_meta:
783 results_list.append(QdrantSearchResult(**result, metadata=result_meta))
784 return BedListSearchResult(
785 count=self.bb_agent.get_stats.bedfiles_number,
786 limit=limit,
787 offset=offset,
788 results=results_list,
789 )
791 def bed_to_bed_search(
792 self,
793 region_set: RegionSet,
794 limit: int = 10,
795 offset: int = 0,
796 ) -> BedListSearchResult:
797 results = self._config.b2bsi.query_search(
798 region_set, limit=limit, offset=offset
799 )
800 results_list = []
801 for result in results:
802 result_id = result["id"].replace("-", "")
803 try:
804 result_meta = self.get(result_id)
805 except BEDFileNotFoundError as e:
806 _LOGGER.warning(
807 f"Could not retrieve metadata for bed file: {result_id}. Error: {e}"
808 )
809 continue
810 if result_meta:
811 results_list.append(QdrantSearchResult(**result, metadata=result_meta))
812 return BedListSearchResult(
813 count=self.bb_agent.get_stats.bedfiles_number,
814 limit=limit,
815 offset=offset,
816 results=results_list,
817 )
819 def reindex_qdrant(self) -> None:
820 """
821 Re-upload all files to quadrant.
822 !Warning: only hg38 genome can be added to qdrant!
824 If you want want to fully reindex/reupload to qdrant, first delete collection and create new one.
826 Upload all files to qdrant.
827 """
828 bb_client = BBClient()
830 statement = select(Bed.id).where(Bed.genome_alias == QDRANT_GENOME)
832 with Session(self._db_engine.engine) as session:
833 bed_ids = session.execute(statement).all()
835 bed_ids = [bed_result[0] for bed_result in bed_ids]
837 with tqdm(total=len(bed_ids), position=0, leave=True) as pbar:
838 for record_id in bed_ids:
839 try:
840 bed_region_set_obj = GRegionSet(bb_client.seek(record_id))
841 except FileNotFoundError:
842 bed_region_set_obj = bb_client.load_bed(record_id)
844 pbar.set_description(f"Processing file: {record_id}")
845 metadata = self._config.phc.sample.get(
846 namespace=self._config.config.phc.namespace,
847 name=self._config.config.phc.name,
848 tag=self._config.config.phc.tag,
849 sample_name=record_id,
850 )
852 self.upload_file_qdrant(
853 bed_id=record_id,
854 bed_file=bed_region_set_obj,
855 payload=BedPEPHubRestrict(**metadata).model_dump(),
856 )
857 pbar.write(f"File: {record_id} uploaded to qdrant successfully.")
858 pbar.update(1)
860 return None
862 def delete_qdrant_point(self, identifier: str) -> None:
863 """
864 Delete bed file from qdrant.
866 :param identifier: bed file identifier
867 :return: None
868 """
870 result = self._config.qdrant_engine.qd_client.delete(
871 collection_name=self._config.config.qdrant.collection,
872 points_selector=PointIdsList(
873 points=[identifier],
874 ),
875 )
876 return result
878 def create_qdrant_collection(self) -> bool:
879 """
880 Create qdrant collection for bed files.
881 """
882 return self._config.qdrant_engine.qd_client.create_collection(
883 collection_name=self._config.config.qdrant.collection,
884 vectors_config=VectorParams(size=100, distance=Distance.DOT),
885 )
887 def exists(self, identifier: str) -> bool:
888 """
889 Check if bed file exists in the database.
891 :param identifier: bed file identifier
892 :return: True if bed file exists, False otherwise
893 """
894 statement = select(Bed).where(Bed.id == identifier)
896 with Session(self._sa_engine) as session:
897 bed_object = session.scalar(statement)
898 if not bed_object:
899 return False
900 return True
902 def exists_universe(self, identifier: str) -> bool:
903 """
904 Check if universe exists in the database.
906 :param identifier: universe identifier
908 :return: True if universe exists, False otherwise
909 """
910 statement = select(Universes).where(Universes.id == identifier)
912 with Session(self._sa_engine) as session:
913 bed_object = session.scalar(statement)
914 if not bed_object:
915 return False
916 return True
918 def add_universe(
919 self, bedfile_id: str, bedset_id: str = None, construct_method: str = None
920 ) -> str:
921 """
922 Add universe to the database.
924 :param bedfile_id: bed file identifier
925 :param bedset_id: bedset identifier
926 :param construct_method: method used to construct the universe
928 :return: universe identifier.
929 """
931 if not self.exists(bedfile_id):
932 raise BEDFileNotFoundError
933 with Session(self._sa_engine) as session:
934 new_univ = Universes(
935 id=bedfile_id, bedset_id=bedset_id, method=construct_method
936 )
937 session.add(new_univ)
938 session.commit()
940 _LOGGER.info(f"Universe added to the database successfully. id: {bedfile_id}")
941 return bedfile_id
943 def delete_universe(self, identifier: str) -> None:
944 """
945 Delete universe from the database.
947 :param identifier: universe identifier
948 :return: None
949 """
950 if not self.exists_universe(identifier):
951 raise UniverseNotFoundError(f"Universe not found. id: {identifier}")
953 with Session(self._sa_engine) as session:
954 statement = delete(Universes).where(Universes.id == identifier)
955 session.execute(statement)
956 session.commit()
958 def add_tokenized(
959 self, bed_id: str, universe_id: str, token_vector: list, overwrite: bool = False
960 ) -> str:
961 """
962 Add tokenized bed file to the database
964 :param bed_id: bed file identifier
965 :param universe_id: universe identifier
966 :param token_vector: list of tokens
967 :param overwrite: overwrite tokenized file if it already exists
969 :return: token path
970 """
972 with Session(self._sa_engine) as session:
973 if not self.exists_universe(universe_id):
974 raise UniverseNotFoundError(
975 f"Universe not found in the database. id: {universe_id}"
976 f"Please add universe first."
977 )
979 if self.exist_tokenized(bed_id, universe_id):
980 if not overwrite:
981 if not overwrite:
982 raise TokenizeFileExistsError(
983 "Tokenized file already exists in the database. "
984 "Set overwrite to True to overwrite it."
985 )
986 else:
987 self.delete_tokenized(bed_id, universe_id)
989 path = self._add_zarr_s3(
990 bed_id=bed_id,
991 universe_id=universe_id,
992 tokenized_vector=token_vector,
993 overwrite=overwrite,
994 )
995 path = os.path.join(f"s3://{self._config.config.s3.bucket}", path)
996 new_token = TokenizedBed(bed_id=bed_id, universe_id=universe_id, path=path)
998 session.add(new_token)
999 session.commit()
1000 return path
1002 def _add_zarr_s3(
1003 self,
1004 universe_id: str,
1005 bed_id: str,
1006 tokenized_vector: list,
1007 overwrite: bool = False,
1008 ) -> str:
1009 """
1010 Add zarr file to the database
1012 :param universe_id: universe identifier
1013 :param bed_id: bed file identifier
1014 :param tokenized_vector: tokenized vector
1016 :return: zarr path
1017 """
1018 univers_group = self._config.zarr_root.require_group(universe_id)
1020 if not univers_group.get(bed_id):
1021 _LOGGER.info("Saving tokenized vector to s3")
1022 path = univers_group.create_dataset(bed_id, data=tokenized_vector).path
1023 elif overwrite:
1024 _LOGGER.info("Overwriting tokenized vector in s3")
1025 path = univers_group.create_dataset(
1026 bed_id, data=tokenized_vector, overwrite=True
1027 ).path
1028 else:
1029 raise TokenizeFileExistsError(
1030 "Tokenized file already exists in the database. "
1031 "Set overwrite to True to overwrite it."
1032 )
1034 return os.path.join(ZARR_TOKENIZED_FOLDER, path)
1036 def get_tokenized(self, bed_id: str, universe_id: str) -> TokenizedBedResponse:
1037 """
1038 Get zarr file from the database
1040 :param bed_id: bed file identifier
1041 :param universe_id: universe identifier
1043 :return: zarr path
1044 """
1045 if not self.exist_tokenized(bed_id, universe_id):
1046 raise TokenizeFileNotExistError("Tokenized file not found in the database.")
1047 univers_group = self._config.zarr_root.require_group(universe_id)
1049 return TokenizedBedResponse(
1050 universe_id=universe_id,
1051 bed_id=bed_id,
1052 tokenized_bed=list(univers_group[bed_id]),
1053 )
1055 def delete_tokenized(self, bed_id: str, universe_id: str) -> None:
1056 """
1057 Delete tokenized bed file from the database
1059 :param bed_id: bed file identifier
1060 :param universe_id: universe identifier
1062 :return: None
1063 """
1064 if not self.exist_tokenized(bed_id, universe_id):
1065 raise TokenizeFileNotExistError("Tokenized file not found in the database.")
1066 univers_group = self._config.zarr_root.require_group(universe_id)
1068 del univers_group[bed_id]
1070 with Session(self._sa_engine) as session:
1071 statement = delete(TokenizedBed).where(
1072 and_(
1073 TokenizedBed.bed_id == bed_id,
1074 TokenizedBed.universe_id == universe_id,
1075 )
1076 )
1077 session.execute(statement)
1078 session.commit()
1080 return None
1082 def _get_tokenized_path(self, bed_id: str, universe_id: str) -> str:
1083 """
1084 Get tokenized path to tokenized file
1086 :param bed_id: bed file identifier
1087 :param universe_id: universe identifier
1089 :return: token path
1090 """
1091 if not self.exist_tokenized(bed_id, universe_id):
1092 raise TokenizeFileNotExistError("Tokenized file not found in the database.")
1094 with Session(self._sa_engine) as session:
1095 statement = select(TokenizedBed).where(
1096 and_(
1097 TokenizedBed.bed_id == bed_id,
1098 TokenizedBed.universe_id == universe_id,
1099 ),
1100 )
1101 tokenized_object = session.scalar(statement)
1102 return tokenized_object.path
1104 def exist_tokenized(self, bed_id: str, universe_id: str) -> bool:
1105 """
1106 Check if tokenized bed file exists in the database
1108 :param bed_id: bed file identifier
1109 :param universe_id: universe identifier
1111 :return: bool
1112 """
1113 with Session(self._sa_engine) as session:
1114 statement = select(TokenizedBed).where(
1115 and_(
1116 TokenizedBed.bed_id == bed_id,
1117 TokenizedBed.universe_id == universe_id,
1118 )
1119 )
1120 tokenized_object = session.scalar(statement)
1121 if not tokenized_object:
1122 return False
1123 return True
1125 def get_tokenized_link(
1126 self, bed_id: str, universe_id: str
1127 ) -> TokenizedPathResponse:
1128 """
1129 Get tokenized link to tokenized file
1131 :param bed_id: bed file identifier
1132 :param universe_id: universe identifier
1134 :return: token link
1135 :raises: TokenizeFileNotExistError
1136 """
1137 file_path = self._get_tokenized_path(bed_id, universe_id)
1139 return TokenizedPathResponse(
1140 endpoint_url=self._config.config.s3.endpoint_url,
1141 file_path=file_path,
1142 bed_id=bed_id,
1143 universe_id=universe_id,
1144 )