Coverage for fastagency/helpers.py: 96%

35 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1import importlib 1aiefgjbcd

2import json 1aiefgjbcd

3import tempfile 1aiefgjbcd

4from collections.abc import Iterator 1aiefgjbcd

5from contextlib import contextmanager 1aiefgjbcd

6from json import JSONDecoder 1aiefgjbcd

7from pathlib import Path 1aiefgjbcd

8from typing import Optional 1aiefgjbcd

9 

10__all__ = ["check_imports"] 1aiefgjbcd

11 

12 

13def check_imports(package_names: list[str], target_name: str) -> None: 1aiefgjbcd

14 not_importable = [ 1aiefgjbcd

15 f"'{package_name}'" 

16 for package_name in package_names 

17 if importlib.util.find_spec(package_name) is None 

18 ] 

19 if len(not_importable) > 0: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1aiefgjbcd

20 raise ImportError( 

21 f'Package(s) {", ".join(not_importable)} not found. Please install it with:\n\npip install "fastagency[{target_name}]"\n' 

22 ) 

23 

24 

25# based on https://stackoverflow.com/questions/61380028/how-to-detect-and-indent-json-substrings-inside-longer-non-json-text/61384796#61384796 

26def extract_json_objects( 1aiefgjbcd

27 text: str, decoder: Optional[JSONDecoder] = None 

28) -> Iterator[str]: 

29 decoder = decoder or JSONDecoder() 1abcdh

30 pos = 0 1abcdh

31 while True: 

32 # print(f"matching: {text[pos:]}") 

33 match = text.find("{", pos) 1abcdh

34 if match == -1: 1abcdh

35 yield text[pos:] # return the remaining text 1abcdh

36 break 1abcdh

37 yield text[pos:match].rstrip(" ") # modification for the non-JSON parts 1abcdh

38 try: 1abcdh

39 result, index = decoder.raw_decode(text[match:]) 1abcdh

40 yield result 1abcdh

41 pos = match + index 1abcdh

42 # move past space characters if needed 

43 while pos < len(text) and text[pos] == " ": 1abcdh

44 pos += 1 1abcd

45 except ValueError: 1abcd

46 yield text[match] 1abcd

47 pos = match + 1 1abcd

48 

49 

50def jsonify_string(line: str) -> str: 1aiefgjbcd

51 line_parts: list[str] = [] 1abcdh

52 for result in extract_json_objects(line): 1abcdh

53 if isinstance(result, dict): # got a JSON obj 1abcdh

54 line_parts.append(f"\n```\n{json.dumps(result, indent=4)}\n```\n") 1abcdh

55 else: # got text/non-JSON-obj 

56 line_parts.append(result) 1abcdh

57 # (don't make that a list comprehension, quite un-readable) 

58 

59 return "".join(line_parts) 1abcdh

60 

61 

62@contextmanager 1aiefgjbcd

63def optional_temp_path(path: Optional[str] = None) -> Iterator[Path]: 1aiefgjbcd

64 if path is None: 1aefgbcd

65 with tempfile.TemporaryDirectory() as temp_dir: 1aefgbcd

66 yield Path(temp_dir) 1aefgbcd

67 else: 

68 yield Path(path) 1abcd