Coverage for tests / examples / nats / test_e07_object_storage.py: 80%

19 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import pytest 

2 

3from faststream import TestApp 

4from faststream.nats import TestNatsBroker 

5 

6 

7@pytest.mark.connected() 

8@pytest.mark.asyncio() 

9@pytest.mark.nats() 

10async def test_basic() -> None: 

11 from examples.nats.e07_object_storage import app, broker, handler 

12 

13 async with TestNatsBroker(broker, with_real=True): 

14 await broker.start() 

15 

16 os = await broker.object_storage("example-bucket") 

17 try: 

18 existed_files = await os.list() 

19 except Exception: 

20 existed_files = () 

21 

22 call = True 

23 for file in existed_files: 

24 if file.name == "file.txt": 24 ↛ 23line 24 didn't jump to line 23 because the condition on line 24 was always true

25 call = False 

26 

27 if call: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 async with TestApp(app): 

29 pass 

30 

31 await handler.wait_call(3.0) 

32 handler.mock.assert_called_once_with("file.txt")