Coverage for bbconf/config_parser/bedbaseconfig.py: 34%

184 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-17 04:01 +0000

1import logging 

2import os 

3import warnings 

4from pathlib import Path 

5from typing import List, Literal, Union 

6 

7import boto3 

8import qdrant_client 

9import s3fs 

10import yacman 

11import zarr 

12from botocore.exceptions import BotoCoreError, EndpointConnectionError 

13from geniml.region2vec import Region2VecExModel 

14from geniml.search import BED2BEDSearchInterface, QdrantBackend, Text2BEDSearchInterface 

15from geniml.search.query2vec import BED2Vec, Text2Vec 

16from pephubclient import PEPHubClient 

17from zarr import Group as Z_GROUP 

18 

19from bbconf.config_parser.const import ( 

20 S3_BEDSET_PATH_FOLDER, 

21 S3_FILE_PATH_FOLDER, 

22 S3_PLOTS_PATH_FOLDER, 

23) 

24from bbconf.config_parser.models import ConfigFile 

25from bbconf.const import PKG_NAME, ZARR_TOKENIZED_FOLDER 

26from bbconf.db_utils import BaseEngine 

27from bbconf.exceptions import ( 

28 BadAccessMethodError, 

29 BedBaseConfError, 

30 BedbaseS3ConnectionError, 

31) 

32from bbconf.helpers import get_absolute_path, get_bedbase_cfg 

33from bbconf.models.base_models import FileModel 

34from bbconf.models.bed_models import BedFiles, BedPlots 

35from bbconf.models.bedset_models import BedSetPlots 

36from bbconf.models.drs_models import AccessMethod, AccessURL 

37 

38_LOGGER = logging.getLogger(PKG_NAME) 

39 

40 

41class BedBaseConfig: 

42 def __init__(self, config: Union[Path, str]): 

43 self.cfg_path = get_bedbase_cfg(config) 

44 self._config = self._read_config_file(self.cfg_path) 

45 

46 self._db_engine = self._init_db_engine() 

47 self._qdrant_engine = self._init_qdrant_backend() 

48 self._t2bsi = self._init_t2bsi_object() 

49 self._b2bsi = self._init_b2bsi_object() 

50 self._r2v = self._init_r2v_object() 

51 

52 self._phc = self._init_pephubclient() 

53 self._boto3_client = self._init_boto3_client() 

54 

55 @staticmethod 

56 def _read_config_file(config_path: str) -> ConfigFile: 

57 """ 

58 Read configuration file and insert default values if not set 

59 

60 :param config_path: configuration file path 

61 :return: None 

62 :raises: raise_missing_key (if config key is missing) 

63 """ 

64 _config = yacman.YAMLConfigManager(filepath=config_path).exp 

65 

66 config_dict = {} 

67 for field_name, annotation in ConfigFile.model_fields.items(): 

68 try: 

69 config_dict[field_name] = annotation.annotation( 

70 **_config.get(field_name) 

71 ) 

72 except TypeError: 

73 # TODO: this should be more specific 

74 config_dict[field_name] = annotation.annotation() 

75 

76 return ConfigFile(**config_dict) 

77 # return ConfigFile.from_yaml(Path(config_path)) 

78 

79 @property 

80 def config(self) -> ConfigFile: 

81 """ 

82 Get configuration 

83 

84 :return: configuration object 

85 """ 

86 return self._config 

87 

88 @property 

89 def db_engine(self) -> BaseEngine: 

90 """ 

91 Get database engine 

92 

93 :return: database engine 

94 """ 

95 return self._db_engine 

96 

97 @property 

98 def t2bsi(self) -> Union[Text2BEDSearchInterface, None]: 

99 """ 

100 Get text2bednn object 

101 

102 :return: text2bednn object 

103 """ 

104 return self._t2bsi 

105 

106 @property 

107 def b2bsi(self) -> Union[BED2BEDSearchInterface, None]: 

108 """ 

109 Get bed2bednn object 

110 

111 :return: bed2bednn object 

112 """ 

113 return self._b2bsi 

114 

115 @property 

116 def r2v(self) -> Region2VecExModel: 

117 """ 

118 Get region2vec object 

119 

120 :return: region2vec object 

121 """ 

122 return self._r2v 

123 

124 @property 

125 def qdrant_engine(self) -> QdrantBackend: 

126 """ 

127 Get qdrant engine 

128 

129 :return: qdrant engine 

130 """ 

131 return self._qdrant_engine 

132 

133 @property 

134 def phc(self) -> PEPHubClient: 

135 """ 

136 Get PEPHub client 

137 

138 :return: PEPHub client 

139 """ 

140 return self._phc 

141 

142 @property 

143 def boto3_client(self) -> boto3.client: 

144 """ 

145 Get boto3 client 

146 

147 :return: boto3 client 

148 """ 

149 return self._boto3_client 

150 

151 @property 

152 def zarr_root(self) -> Union[Z_GROUP, None]: 

153 """ 

154 Get zarr root object (Group) 

155 

156 :return: zarr root group object 

157 """ 

158 

159 try: 

160 s3fc_obj = s3fs.S3FileSystem( 

161 endpoint_url=self._config.s3.endpoint_url, 

162 key=self._config.s3.aws_access_key_id, 

163 secret=self._config.s3.aws_secret_access_key, 

164 ) 

165 except BotoCoreError as e: 

166 _LOGGER.error(f"Error in creating s3fs object: {e}") 

167 warnings.warn(f"Error in creating s3fs object: {e}", UserWarning) 

168 return None 

169 

170 s3_path = f"s3://{self._config.s3.bucket}/{ZARR_TOKENIZED_FOLDER}" 

171 

172 zarr_store = s3fs.S3Map( 

173 root=s3_path, s3=s3fc_obj, check=False, create=self._config.s3.modify_access 

174 ) 

175 cache = zarr.LRUStoreCache(zarr_store, max_size=2**28) 

176 

177 return zarr.group(store=cache, overwrite=False) 

178 

179 def _init_db_engine(self) -> BaseEngine: 

180 return BaseEngine( 

181 host=self._config.database.host, 

182 port=self._config.database.port, 

183 database=self._config.database.database, 

184 user=self._config.database.user, 

185 password=self._config.database.password, 

186 drivername=f"{self._config.database.dialect}+{self._config.database.driver}", 

187 ) 

188 

189 def _init_qdrant_backend(self) -> QdrantBackend: 

190 """ 

191 Create qdrant client object using credentials provided in config file 

192 

193 :return: QdrantClient 

194 """ 

195 try: 

196 return QdrantBackend( 

197 collection=self._config.qdrant.collection, 

198 qdrant_host=self._config.qdrant.host, 

199 qdrant_port=self._config.qdrant.port, 

200 qdrant_api_key=self._config.qdrant.api_key, 

201 ) 

202 except qdrant_client.http.exceptions.ResponseHandlingException as err: 

203 _LOGGER.error(f"error in Connection to qdrant! skipping... Error: {err}") 

204 warnings.warn( 

205 f"error in Connection to qdrant! skipping... Error: {err}", UserWarning 

206 ) 

207 

208 def _init_t2bsi_object(self) -> Union[Text2BEDSearchInterface, None]: 

209 """ 

210 Create Text 2 BED search interface and return this object 

211 

212 :return: Text2BEDSearchInterface object 

213 """ 

214 

215 try: 

216 return Text2BEDSearchInterface( 

217 backend=self.qdrant_engine, 

218 query2vec=Text2Vec( 

219 hf_repo=self._config.path.text2vec, 

220 v2v=self._config.path.vec2vec, 

221 ), 

222 ) 

223 except Exception as e: 

224 _LOGGER.error("Error in creating Text2BEDSearchInterface object: " + str(e)) 

225 warnings.warn( 

226 "Error in creating Text2BEDSearchInterface object: " + str(e), 

227 UserWarning, 

228 ) 

229 return None 

230 

231 def _init_b2bsi_object(self) -> Union[BED2BEDSearchInterface, None]: 

232 """ 

233 Create Bed 2 BED search interface and return this object 

234 

235 :return: Bed2BEDSearchInterface object 

236 """ 

237 try: 

238 return BED2BEDSearchInterface( 

239 backend=self.qdrant_engine, 

240 query2vec=BED2Vec(model=self._config.path.region2vec), 

241 ) 

242 except Exception as e: 

243 _LOGGER.error("Error in creating BED2BEDSearchInterface object: " + str(e)) 

244 warnings.warn( 

245 "Error in creating BED2BEDSearchInterface object: " + str(e), 

246 UserWarning, 

247 ) 

248 return None 

249 

250 @staticmethod 

251 def _init_pephubclient() -> Union[PEPHubClient, None]: 

252 """ 

253 Create Pephub client object using credentials provided in config file 

254 

255 :return: PephubClient 

256 """ 

257 try: 

258 return PEPHubClient() 

259 except Exception as e: 

260 _LOGGER.error(f"Error in creating PephubClient object: {e}") 

261 warnings.warn(f"Error in creating PephubClient object: {e}", UserWarning) 

262 return None 

263 

264 def _init_boto3_client( 

265 self, 

266 ) -> boto3.client: 

267 """ 

268 Create Pephub client object using credentials provided in config file 

269 

270 :return: PephubClient 

271 """ 

272 try: 

273 return boto3.client( 

274 "s3", 

275 endpoint_url=self._config.s3.endpoint_url, 

276 aws_access_key_id=self._config.s3.aws_access_key_id, 

277 aws_secret_access_key=self._config.s3.aws_secret_access_key, 

278 ) 

279 except Exception as e: 

280 _LOGGER.error(f"Error in creating boto3 client object: {e}") 

281 warnings.warn(f"Error in creating boto3 client object: {e}", UserWarning) 

282 return None 

283 

284 def _init_r2v_object(self) -> Region2VecExModel: 

285 """ 

286 Create Region2VecExModel object using credentials provided in config file 

287 """ 

288 return Region2VecExModel(self.config.path.region2vec) 

289 

290 def upload_s3(self, file_path: str, s3_path: Union[Path, str]) -> None: 

291 """ 

292 Upload file to s3. 

293 

294 :param file_path: local path to the file 

295 :param s3_path: path to the file in s3 with file name 

296 :return: None 

297 """ 

298 if not self._boto3_client: 

299 _LOGGER.warning( 

300 "Could not upload file to s3. Connection to s3 not established. Skipping.." 

301 ) 

302 raise BedbaseS3ConnectionError( 

303 "Could not upload file to s3. Connection error." 

304 ) 

305 if not os.path.exists(file_path): 

306 raise BedBaseConfError(f"File {os.path.abspath(file_path)} does not exist.") 

307 _LOGGER.info(f"Uploading file to s3: {s3_path}") 

308 return self._boto3_client.upload_file(file_path, self.config.s3.bucket, s3_path) 

309 

310 def upload_files_s3( 

311 self, 

312 identifier: str, 

313 files: Union[BedFiles, BedPlots, BedSetPlots], 

314 base_path: str, 

315 type: Literal["files", "plots", "bedsets"] = "files", 

316 ) -> Union[BedFiles, BedPlots, BedSetPlots]: 

317 """ 

318 Upload files to s3. 

319 

320 :param identifier: bed file identifier 

321 :param files: dictionary with files to upload 

322 :param base_path: local path to the output files 

323 :param type: type of files to upload [files, plots, bedsets] 

324 :return: None 

325 """ 

326 

327 if type == "files": 

328 s3_output_base_folder = S3_FILE_PATH_FOLDER 

329 elif type == "plots": 

330 s3_output_base_folder = S3_PLOTS_PATH_FOLDER 

331 elif type == "bedsets": 

332 s3_output_base_folder = S3_BEDSET_PATH_FOLDER 

333 else: 

334 raise BedBaseConfError( 

335 f"Invalid type: {type}. Should be 'files', 'plots', or 'bedsets'" 

336 ) 

337 

338 for key, value in files: 

339 if not value: 

340 continue 

341 file_base_name = os.path.basename(value.path) 

342 file_path = get_absolute_path(value.path, base_path) 

343 s3_path = os.path.join( 

344 s3_output_base_folder, 

345 identifier[0], 

346 identifier[1], 

347 file_base_name, 

348 ) 

349 self.upload_s3(file_path, s3_path=s3_path) 

350 

351 setattr(value, "name", key) 

352 setattr(value, "size", os.path.getsize(file_path)) 

353 setattr(value, "path", s3_path) 

354 

355 if value.path_thumbnail: 

356 file_base_name_thumbnail = os.path.basename(value.path_thumbnail) 

357 file_path_thumbnail = get_absolute_path(value.path_thumbnail, base_path) 

358 s3_path_thumbnail = os.path.join( 

359 s3_output_base_folder, 

360 identifier[0], 

361 identifier[1], 

362 file_base_name_thumbnail, 

363 ) 

364 self.upload_s3(file_path_thumbnail, s3_path=s3_path_thumbnail) 

365 setattr(value, "path_thumbnail", s3_path_thumbnail) 

366 

367 return files 

368 

369 def delete_s3(self, s3_path: str) -> None: 

370 """ 

371 Delete file from s3. 

372 

373 :param s3_path: path to the file in s3 

374 :return: None 

375 """ 

376 if not self._boto3_client: 

377 _LOGGER.warning( 

378 "Could not delete file from s3. Connection to s3 not established. Skipping.." 

379 ) 

380 raise BedbaseS3ConnectionError( 

381 "Could not delete file from s3. Connection error." 

382 ) 

383 try: 

384 _LOGGER.info(f"Deleting file from s3: {s3_path}") 

385 return self._boto3_client.delete_object( 

386 Bucket=self.config.s3.bucket, Key=s3_path 

387 ) 

388 except EndpointConnectionError: 

389 raise BedbaseS3ConnectionError( 

390 "Could not delete file from s3. Connection error." 

391 ) 

392 

393 def delete_files_s3(self, files: List[FileModel]) -> None: 

394 """ 

395 Delete files from s3. 

396 

397 :param files: list of file objects 

398 :return: None 

399 """ 

400 for file in files: 

401 self.delete_s3(file.path) 

402 if file.path_thumbnail: 

403 self.delete_s3(file.path_thumbnail) 

404 return None 

405 

406 def get_prefixed_uri(self, postfix: str, access_id: str) -> str: 

407 """ 

408 Return uri with correct prefix (schema) 

409 

410 :param postfix: postfix of the uri (or everything after uri schema) 

411 :param access_id: access method name, e.g. http, s3, etc. 

412 :return: full uri path 

413 """ 

414 

415 try: 

416 prefix = getattr(self.config.access_methods, access_id).prefix 

417 return os.path.join(prefix, postfix) 

418 except KeyError: 

419 _LOGGER.error(f"Access method {access_id} is not defined.") 

420 raise BadAccessMethodError(f"Access method {access_id} is not defined.") 

421 

422 def construct_access_method_list(self, rel_path: str) -> List[AccessMethod]: 

423 """ 

424 Construct access method list for a given record 

425 

426 :param rel_path: relative path to the record 

427 :return: list of access methods 

428 """ 

429 access_methods = [] 

430 for access_id in self.config.access_methods.model_dump().keys(): 

431 access_dict = AccessMethod( 

432 type=access_id, 

433 access_id=access_id, 

434 access_url=AccessURL(url=self.get_prefixed_uri(rel_path, access_id)), 

435 region=self.config.access_methods.model_dump()[access_id].get( 

436 "region", None 

437 ), 

438 ) 

439 access_methods.append(access_dict) 

440 return access_methods