Coverage for configzen/processor.py: 38%

150 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-15 02:42 +0000

1""" 

2Replacement API processor for configuration data. 

3 

4Allows to tweak the configuration data programmatically before it is given 

5to the model config and revert the changes back to the original data structure 

6when the configuration managed by that model is saved. 

7""" 

8 

9from __future__ import annotations 1abcdefgh

10 

11from collections import UserDict 1abcdefgh

12from collections.abc import Mapping, MutableMapping, MutableSequence, Sequence 1abcdefgh

13from contextvars import copy_context 1abcdefgh

14from copy import copy 1abcdefgh

15from dataclasses import dataclass 1abcdefgh

16from functools import partial 1abcdefgh

17from typing import TYPE_CHECKING, ClassVar, TypedDict, cast 1abcdefgh

18 

19from configzen.errors import ConfigProcessorError 1abcdefgh

20from configzen.sources import get_config_source 1abcdefgh

21 

22if TYPE_CHECKING: 1abcdefgh

23 from collections.abc import Callable, Iterator 

24 from typing import NewType, TypeVar, overload 

25 

26 from typing_extensions import TypeAlias 

27 

28 from configzen.data import Data 

29 

30 Macro: TypeAlias = "Callable[..., Data]" 

31 MacroT = TypeVar("MacroT", bound=Macro) 

32 MacroDict: TypeAlias = "dict[str, Macro]" 

33 Char = NewType("Char", str) 1h

34else: 

35 

36 def Char(s: str) -> str: # noqa: N802 1abcdefgh

37 if len(s) != 1: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true1abcdefgh

38 msg = "Char must be a single character" 

39 raise ValueError(msg) 

40 return s 1abcdefgh

41 

42 

43__all__ = ( 1abcdefgh

44 "ConfigProcessor", 

45 "ProcessorOptions", 

46 "ProcessorReplacement", 

47) 

48 

49MACRO_FUNC: str = "__configzen_macro_func__" 1abcdefgh

50 

51 

52class ProcessorOptions(TypedDict, total=False): 1abcdefgh

53 """Prototype of the allowed options for the ConfigProcessor class.""" 

54 

55 macro_prefix: Char 1abcdefgh

56 update_prefix: Char 1abcdefgh

57 macros_on_top: bool 1abcdefgh

58 lenient: bool 1abcdefgh

59 

60 

61class _ProcessedReplacements: 1abcdefgh

62 def __init__(self) -> None: 1abcdefgh

63 self.__replacements: dict[str, ProcessorReplacement] = {} 

64 

65 def items(self) -> Iterator[tuple[str, ProcessorReplacement]]: 1abcdefgh

66 yield from self.__replacements.items() 

67 

68 def update(self, substitute: dict[str, ProcessorReplacement]) -> None: 1abcdefgh

69 replacements = self.__replacements 

70 if set(replacements) & set(substitute): # what then? 

71 msg = "Replacement collision" 

72 raise ValueError(msg) 

73 replacements.update(substitute) 

74 

75 

76# Note: Add generic type params for 3.9+ 

77class _ProcessedData(UserDict): # type: ignore[type-arg] 1abcdefgh

78 def __init__( 1abcdefgh

79 self, 

80 *, 

81 data: MutableMapping[str, object], 

82 options: ProcessorOptions, 

83 macros: MacroDict, 

84 ) -> None: 

85 self.macros = macros 

86 self.options = options 

87 self._context = copy_context() 

88 self.__replacements = _ProcessedReplacements() 

89 

90 super().__init__() 

91 

92 for key, value in data.items(): 

93 replacement = self.find_replacement( 

94 key, 

95 value, 

96 lenient=self.options.get("lenient", True), 

97 ) 

98 if replacement is None: 

99 self.data[key] = value 

100 continue 

101 substitute = replacement.content 

102 self.data.update(substitute) 

103 self.__replacements.update(dict.fromkeys(substitute, replacement)) 

104 

105 def find_replacement( 1abcdefgh

106 self, 

107 key: str | None, 

108 value: object, 

109 *, 

110 lenient: bool = True, 

111 ) -> ProcessorReplacement | None: 

112 """ 

113 Find a replacement for a single item, for programmatic use. 

114 

115 Return None if not found. 

116 """ 

117 macro_prefix = self.options["macro_prefix"] 

118 update_prefix = self.options["update_prefix"] 

119 

120 if not key: 

121 return None 

122 

123 # Note: Use str.removeprefix() for 3.9+ 

124 if key.startswith(macro_prefix): 

125 macro_name = key[len(macro_prefix) :] 

126 try: 

127 macro = self.macros[macro_name] 

128 except KeyError as err: 

129 if lenient: 

130 return None 

131 msg = f"No such macro: {macro_name!r}" 

132 raise ConfigProcessorError(msg) from err 

133 return ProcessorReplacement( 

134 key=key, 

135 value=value, 

136 content=self._context.run(macro, value), 

137 ) 

138 

139 if key.startswith(update_prefix): 

140 update_key = key[len(update_prefix) :] 

141 return ProcessorReplacement( 

142 key=key, 

143 value=value, 

144 content={update_key: self.update_existing(update_key, value)}, 

145 ) 

146 

147 return None 

148 

149 def update_existing(self, key: str, value: object) -> object: 1abcdefgh

150 """Update (NOT replace) a value for key with a new value.""" 

151 missing = object() 

152 existent = self.get(key, missing) 

153 if existent is missing: 

154 return value 

155 substitute: object 

156 if isinstance(existent, MutableMapping) and isinstance(value, Mapping): 

157 substitute = copy(existent) 

158 substitute.update(value) 

159 return substitute 

160 if isinstance(existent, MutableSequence) and isinstance(value, Sequence): 

161 substitute = copy(existent) 

162 substitute.extend(value) 

163 return substitute 

164 msg = f"Cannot update {type(existent)} with {type(value)}" 

165 raise TypeError(msg) 

166 

167 def revert_replacements(self) -> Data: 1abcdefgh

168 """Revert all replacements and return the original data structure.""" 

169 before_replacements = {} 

170 skip_keys = set() 

171 for key, replacement in self.__replacements.items(): 

172 before_replacements[replacement.key] = replacement.value 

173 skip_keys.add(key) 

174 for key, value in self.data.items(): 

175 if key not in skip_keys: 

176 before_replacements[key] = value 

177 return before_replacements 

178 

179 

180class ConfigProcessor: 1abcdefgh

181 """ 

182 A class that takes in configuration data and processes it. 

183 

184 Recursively resolves & applies replacements in data magically. 

185 """ 

186 

187 _get_processed_data: Callable[..., _ProcessedData] = _ProcessedData 1abcdefgh

188 

189 _macros: ClassVar[MacroDict] 1abcdefgh

190 

191 def __init__( # noqa: PLR0913 1abcdefgh

192 self, 

193 initial: Data, 

194 *, 

195 macro_prefix: Char = Char("^"), # noqa: B008 

196 update_prefix: Char = Char("+"), # noqa: B008 

197 macros_on_top: bool = False, 

198 lenient: bool = True, 

199 ) -> None: 

200 self.__initial = initial 

201 self.__data: _ProcessedData = None # type: ignore[assignment] 

202 

203 self.options = ProcessorOptions( 

204 macro_prefix=macro_prefix, 

205 update_prefix=update_prefix, 

206 macros_on_top=macros_on_top, 

207 lenient=lenient, 

208 ) 

209 

210 @property 1abcdefgh

211 def macros(self) -> MacroDict: 1abcdefgh

212 """Get macros bound to this processor.""" 

213 return { 

214 macro_name: macro.__get__(self, type(self)) 

215 for macro_name, macro in self._macros.items() 

216 } 

217 

218 @property 1abcdefgh

219 def roundtrip_initial(self) -> Data: 1abcdefgh

220 """The initial configuration data that the processor was given.""" 

221 return self.__initial 

222 

223 def create_processor(self, data: Data) -> ConfigProcessor: 1abcdefgh

224 """Create a new configuration processor with identical options.""" 

225 return type(self)(data, **self.options) 

226 

227 def get_processed_data( 1abcdefgh

228 self, 

229 *, 

230 force: bool = False, 

231 ) -> _ProcessedData: 

232 """ 

233 Create the data with replacements or return the one already cached. 

234 

235 Parameters 

236 ---------- 

237 force 

238 Whether to forcibly parse the original data even if it was already parsed. 

239 Default is False. 

240 

241 """ 

242 if force or self.__data is None: 

243 self.__data = self._get_processed_data( 

244 data=self.__initial, 

245 options=self.options, 

246 macros=self.macros, 

247 ) 

248 return self.__data 

249 

250 def __init_subclass__(cls) -> None: 1abcdefgh

251 """Merge macro registries on subclass.""" 

252 macros_from_class_dict = { 1abcdefgh

253 macro_name: func 

254 for func in vars(cls).values() 

255 if (macro_name := getattr(func, MACRO_FUNC, None)) 

256 } 

257 try: 1abcdefgh

258 macros = {**getattr(cls.__base__, "_macros", {}), **macros_from_class_dict} 1abcdefgh

259 except AttributeError: 

260 macros = {} 

261 cls._macros = macros 1abcdefgh

262 

263 @staticmethod 1abcdefgh

264 def sanitize_macro_name(name: str) -> str: 1abcdefgh

265 """Ensure a uniform name of every macro.""" 

266 return name.strip().casefold() 

267 

268 @classmethod 1abcdefgh

269 def macro(cls, name: str, macro: MacroT) -> MacroT: 1abcdefgh

270 """Override a macro.""" 

271 name = cls.sanitize_macro_name(name) 

272 cls._macros[name] = macro 

273 return macro 

274 

275 

276ConfigProcessor.__init_subclass__() 1abcdefgh

277 

278 

279if TYPE_CHECKING: 1abcdefgh

280 

281 @overload 

282 def macro(func_or_name: MacroT, func: None = None) -> MacroT: ... 

283 @overload 

284 def macro(func_or_name: str, func: MacroT) -> MacroT: ... 

285 @overload 

286 def macro(func_or_name: str, func: None = None) -> Callable[[MacroT], MacroT]: ... 

287 

288 

289def macro( 1abcdefg

290 func_or_name: MacroT | str, 

291 func: MacroT | None = None, 

292) -> MacroT | Callable[[MacroT], MacroT]: 

293 if callable(func_or_name): 1abcdefgh

294 if func is None: 294 ↛ 297line 294 didn't jump to line 297, because the condition on line 294 was always true1abcdefgh

295 func = cast("MacroT", func_or_name) 1abcdefgh

296 return macro(func.__name__, func) 1abcdefgh

297 msg = "Invalid macro() usage" 

298 raise ValueError(msg) 

299 if func is None: 299 ↛ 300line 299 didn't jump to line 300, because the condition on line 299 was never true1abcdefgh

300 return partial(macro, func_or_name) 

301 setattr(func, MACRO_FUNC, func_or_name) 1abcdefgh

302 return func 1abcdefgh

303 

304 

305@dataclass 1abcdefgh

306class ProcessorReplacement: 1abcdefgh

307 """ 

308 A change that was made to the configuration data during processing. 

309 

310 Attributes 

311 ---------- 

312 key 

313 The key of the item before alteration. 

314 value 

315 The value of the item before alteration. 

316 content 

317 The value to unpack in place of the alteration key. 

318 

319 """ 

320 

321 key: str 1abcdefgh

322 value: object 1abcdefgh

323 content: Data 1abcdefgh

324 

325 

326class FileSystemAwareConfigProcessor(ConfigProcessor): 1abcdefgh

327 """ 

328 Config processor that is aware of the file system. 

329 

330 Can handle requests for transcluding other configuration files 

331 to achieve a sense of extendability. 

332 """ 

333 

334 @macro 1abcdefgh

335 def extend(self, sources: str | dict[str, str]) -> Data: 1abcdefgh

336 """Transclude a config in this config.""" 

337 if isinstance(sources, str): 

338 source = get_config_source(sources) 

339 return source.load() 

340 return { 

341 key: get_config_source(source).load() for key, source in sources.items() 

342 }