Coverage for bbconf/config_parser/bedbaseconfig.py: 34%
184 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-17 04:01 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-17 04:01 +0000
1import logging
2import os
3import warnings
4from pathlib import Path
5from typing import List, Literal, Union
7import boto3
8import qdrant_client
9import s3fs
10import yacman
11import zarr
12from botocore.exceptions import BotoCoreError, EndpointConnectionError
13from geniml.region2vec import Region2VecExModel
14from geniml.search import BED2BEDSearchInterface, QdrantBackend, Text2BEDSearchInterface
15from geniml.search.query2vec import BED2Vec, Text2Vec
16from pephubclient import PEPHubClient
17from zarr import Group as Z_GROUP
19from bbconf.config_parser.const import (
20 S3_BEDSET_PATH_FOLDER,
21 S3_FILE_PATH_FOLDER,
22 S3_PLOTS_PATH_FOLDER,
23)
24from bbconf.config_parser.models import ConfigFile
25from bbconf.const import PKG_NAME, ZARR_TOKENIZED_FOLDER
26from bbconf.db_utils import BaseEngine
27from bbconf.exceptions import (
28 BadAccessMethodError,
29 BedBaseConfError,
30 BedbaseS3ConnectionError,
31)
32from bbconf.helpers import get_absolute_path, get_bedbase_cfg
33from bbconf.models.base_models import FileModel
34from bbconf.models.bed_models import BedFiles, BedPlots
35from bbconf.models.bedset_models import BedSetPlots
36from bbconf.models.drs_models import AccessMethod, AccessURL
38_LOGGER = logging.getLogger(PKG_NAME)
41class BedBaseConfig:
42 def __init__(self, config: Union[Path, str]):
43 self.cfg_path = get_bedbase_cfg(config)
44 self._config = self._read_config_file(self.cfg_path)
46 self._db_engine = self._init_db_engine()
47 self._qdrant_engine = self._init_qdrant_backend()
48 self._t2bsi = self._init_t2bsi_object()
49 self._b2bsi = self._init_b2bsi_object()
50 self._r2v = self._init_r2v_object()
52 self._phc = self._init_pephubclient()
53 self._boto3_client = self._init_boto3_client()
55 @staticmethod
56 def _read_config_file(config_path: str) -> ConfigFile:
57 """
58 Read configuration file and insert default values if not set
60 :param config_path: configuration file path
61 :return: None
62 :raises: raise_missing_key (if config key is missing)
63 """
64 _config = yacman.YAMLConfigManager(filepath=config_path).exp
66 config_dict = {}
67 for field_name, annotation in ConfigFile.model_fields.items():
68 try:
69 config_dict[field_name] = annotation.annotation(
70 **_config.get(field_name)
71 )
72 except TypeError:
73 # TODO: this should be more specific
74 config_dict[field_name] = annotation.annotation()
76 return ConfigFile(**config_dict)
77 # return ConfigFile.from_yaml(Path(config_path))
79 @property
80 def config(self) -> ConfigFile:
81 """
82 Get configuration
84 :return: configuration object
85 """
86 return self._config
88 @property
89 def db_engine(self) -> BaseEngine:
90 """
91 Get database engine
93 :return: database engine
94 """
95 return self._db_engine
97 @property
98 def t2bsi(self) -> Union[Text2BEDSearchInterface, None]:
99 """
100 Get text2bednn object
102 :return: text2bednn object
103 """
104 return self._t2bsi
106 @property
107 def b2bsi(self) -> Union[BED2BEDSearchInterface, None]:
108 """
109 Get bed2bednn object
111 :return: bed2bednn object
112 """
113 return self._b2bsi
115 @property
116 def r2v(self) -> Region2VecExModel:
117 """
118 Get region2vec object
120 :return: region2vec object
121 """
122 return self._r2v
124 @property
125 def qdrant_engine(self) -> QdrantBackend:
126 """
127 Get qdrant engine
129 :return: qdrant engine
130 """
131 return self._qdrant_engine
133 @property
134 def phc(self) -> PEPHubClient:
135 """
136 Get PEPHub client
138 :return: PEPHub client
139 """
140 return self._phc
142 @property
143 def boto3_client(self) -> boto3.client:
144 """
145 Get boto3 client
147 :return: boto3 client
148 """
149 return self._boto3_client
151 @property
152 def zarr_root(self) -> Union[Z_GROUP, None]:
153 """
154 Get zarr root object (Group)
156 :return: zarr root group object
157 """
159 try:
160 s3fc_obj = s3fs.S3FileSystem(
161 endpoint_url=self._config.s3.endpoint_url,
162 key=self._config.s3.aws_access_key_id,
163 secret=self._config.s3.aws_secret_access_key,
164 )
165 except BotoCoreError as e:
166 _LOGGER.error(f"Error in creating s3fs object: {e}")
167 warnings.warn(f"Error in creating s3fs object: {e}", UserWarning)
168 return None
170 s3_path = f"s3://{self._config.s3.bucket}/{ZARR_TOKENIZED_FOLDER}"
172 zarr_store = s3fs.S3Map(
173 root=s3_path, s3=s3fc_obj, check=False, create=self._config.s3.modify_access
174 )
175 cache = zarr.LRUStoreCache(zarr_store, max_size=2**28)
177 return zarr.group(store=cache, overwrite=False)
179 def _init_db_engine(self) -> BaseEngine:
180 return BaseEngine(
181 host=self._config.database.host,
182 port=self._config.database.port,
183 database=self._config.database.database,
184 user=self._config.database.user,
185 password=self._config.database.password,
186 drivername=f"{self._config.database.dialect}+{self._config.database.driver}",
187 )
189 def _init_qdrant_backend(self) -> QdrantBackend:
190 """
191 Create qdrant client object using credentials provided in config file
193 :return: QdrantClient
194 """
195 try:
196 return QdrantBackend(
197 collection=self._config.qdrant.collection,
198 qdrant_host=self._config.qdrant.host,
199 qdrant_port=self._config.qdrant.port,
200 qdrant_api_key=self._config.qdrant.api_key,
201 )
202 except qdrant_client.http.exceptions.ResponseHandlingException as err:
203 _LOGGER.error(f"error in Connection to qdrant! skipping... Error: {err}")
204 warnings.warn(
205 f"error in Connection to qdrant! skipping... Error: {err}", UserWarning
206 )
208 def _init_t2bsi_object(self) -> Union[Text2BEDSearchInterface, None]:
209 """
210 Create Text 2 BED search interface and return this object
212 :return: Text2BEDSearchInterface object
213 """
215 try:
216 return Text2BEDSearchInterface(
217 backend=self.qdrant_engine,
218 query2vec=Text2Vec(
219 hf_repo=self._config.path.text2vec,
220 v2v=self._config.path.vec2vec,
221 ),
222 )
223 except Exception as e:
224 _LOGGER.error("Error in creating Text2BEDSearchInterface object: " + str(e))
225 warnings.warn(
226 "Error in creating Text2BEDSearchInterface object: " + str(e),
227 UserWarning,
228 )
229 return None
231 def _init_b2bsi_object(self) -> Union[BED2BEDSearchInterface, None]:
232 """
233 Create Bed 2 BED search interface and return this object
235 :return: Bed2BEDSearchInterface object
236 """
237 try:
238 return BED2BEDSearchInterface(
239 backend=self.qdrant_engine,
240 query2vec=BED2Vec(model=self._config.path.region2vec),
241 )
242 except Exception as e:
243 _LOGGER.error("Error in creating BED2BEDSearchInterface object: " + str(e))
244 warnings.warn(
245 "Error in creating BED2BEDSearchInterface object: " + str(e),
246 UserWarning,
247 )
248 return None
250 @staticmethod
251 def _init_pephubclient() -> Union[PEPHubClient, None]:
252 """
253 Create Pephub client object using credentials provided in config file
255 :return: PephubClient
256 """
257 try:
258 return PEPHubClient()
259 except Exception as e:
260 _LOGGER.error(f"Error in creating PephubClient object: {e}")
261 warnings.warn(f"Error in creating PephubClient object: {e}", UserWarning)
262 return None
264 def _init_boto3_client(
265 self,
266 ) -> boto3.client:
267 """
268 Create Pephub client object using credentials provided in config file
270 :return: PephubClient
271 """
272 try:
273 return boto3.client(
274 "s3",
275 endpoint_url=self._config.s3.endpoint_url,
276 aws_access_key_id=self._config.s3.aws_access_key_id,
277 aws_secret_access_key=self._config.s3.aws_secret_access_key,
278 )
279 except Exception as e:
280 _LOGGER.error(f"Error in creating boto3 client object: {e}")
281 warnings.warn(f"Error in creating boto3 client object: {e}", UserWarning)
282 return None
284 def _init_r2v_object(self) -> Region2VecExModel:
285 """
286 Create Region2VecExModel object using credentials provided in config file
287 """
288 return Region2VecExModel(self.config.path.region2vec)
290 def upload_s3(self, file_path: str, s3_path: Union[Path, str]) -> None:
291 """
292 Upload file to s3.
294 :param file_path: local path to the file
295 :param s3_path: path to the file in s3 with file name
296 :return: None
297 """
298 if not self._boto3_client:
299 _LOGGER.warning(
300 "Could not upload file to s3. Connection to s3 not established. Skipping.."
301 )
302 raise BedbaseS3ConnectionError(
303 "Could not upload file to s3. Connection error."
304 )
305 if not os.path.exists(file_path):
306 raise BedBaseConfError(f"File {os.path.abspath(file_path)} does not exist.")
307 _LOGGER.info(f"Uploading file to s3: {s3_path}")
308 return self._boto3_client.upload_file(file_path, self.config.s3.bucket, s3_path)
310 def upload_files_s3(
311 self,
312 identifier: str,
313 files: Union[BedFiles, BedPlots, BedSetPlots],
314 base_path: str,
315 type: Literal["files", "plots", "bedsets"] = "files",
316 ) -> Union[BedFiles, BedPlots, BedSetPlots]:
317 """
318 Upload files to s3.
320 :param identifier: bed file identifier
321 :param files: dictionary with files to upload
322 :param base_path: local path to the output files
323 :param type: type of files to upload [files, plots, bedsets]
324 :return: None
325 """
327 if type == "files":
328 s3_output_base_folder = S3_FILE_PATH_FOLDER
329 elif type == "plots":
330 s3_output_base_folder = S3_PLOTS_PATH_FOLDER
331 elif type == "bedsets":
332 s3_output_base_folder = S3_BEDSET_PATH_FOLDER
333 else:
334 raise BedBaseConfError(
335 f"Invalid type: {type}. Should be 'files', 'plots', or 'bedsets'"
336 )
338 for key, value in files:
339 if not value:
340 continue
341 file_base_name = os.path.basename(value.path)
342 file_path = get_absolute_path(value.path, base_path)
343 s3_path = os.path.join(
344 s3_output_base_folder,
345 identifier[0],
346 identifier[1],
347 file_base_name,
348 )
349 self.upload_s3(file_path, s3_path=s3_path)
351 setattr(value, "name", key)
352 setattr(value, "size", os.path.getsize(file_path))
353 setattr(value, "path", s3_path)
355 if value.path_thumbnail:
356 file_base_name_thumbnail = os.path.basename(value.path_thumbnail)
357 file_path_thumbnail = get_absolute_path(value.path_thumbnail, base_path)
358 s3_path_thumbnail = os.path.join(
359 s3_output_base_folder,
360 identifier[0],
361 identifier[1],
362 file_base_name_thumbnail,
363 )
364 self.upload_s3(file_path_thumbnail, s3_path=s3_path_thumbnail)
365 setattr(value, "path_thumbnail", s3_path_thumbnail)
367 return files
369 def delete_s3(self, s3_path: str) -> None:
370 """
371 Delete file from s3.
373 :param s3_path: path to the file in s3
374 :return: None
375 """
376 if not self._boto3_client:
377 _LOGGER.warning(
378 "Could not delete file from s3. Connection to s3 not established. Skipping.."
379 )
380 raise BedbaseS3ConnectionError(
381 "Could not delete file from s3. Connection error."
382 )
383 try:
384 _LOGGER.info(f"Deleting file from s3: {s3_path}")
385 return self._boto3_client.delete_object(
386 Bucket=self.config.s3.bucket, Key=s3_path
387 )
388 except EndpointConnectionError:
389 raise BedbaseS3ConnectionError(
390 "Could not delete file from s3. Connection error."
391 )
393 def delete_files_s3(self, files: List[FileModel]) -> None:
394 """
395 Delete files from s3.
397 :param files: list of file objects
398 :return: None
399 """
400 for file in files:
401 self.delete_s3(file.path)
402 if file.path_thumbnail:
403 self.delete_s3(file.path_thumbnail)
404 return None
406 def get_prefixed_uri(self, postfix: str, access_id: str) -> str:
407 """
408 Return uri with correct prefix (schema)
410 :param postfix: postfix of the uri (or everything after uri schema)
411 :param access_id: access method name, e.g. http, s3, etc.
412 :return: full uri path
413 """
415 try:
416 prefix = getattr(self.config.access_methods, access_id).prefix
417 return os.path.join(prefix, postfix)
418 except KeyError:
419 _LOGGER.error(f"Access method {access_id} is not defined.")
420 raise BadAccessMethodError(f"Access method {access_id} is not defined.")
422 def construct_access_method_list(self, rel_path: str) -> List[AccessMethod]:
423 """
424 Construct access method list for a given record
426 :param rel_path: relative path to the record
427 :return: list of access methods
428 """
429 access_methods = []
430 for access_id in self.config.access_methods.model_dump().keys():
431 access_dict = AccessMethod(
432 type=access_id,
433 access_id=access_id,
434 access_url=AccessURL(url=self.get_prefixed_uri(rel_path, access_id)),
435 region=self.config.access_methods.model_dump()[access_id].get(
436 "region", None
437 ),
438 )
439 access_methods.append(access_dict)
440 return access_methods