Coverage for tests / examples / nats / test_e07_object_storage.py: 80%
19 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import pytest
3from faststream import TestApp
4from faststream.nats import TestNatsBroker
7@pytest.mark.connected()
8@pytest.mark.asyncio()
9@pytest.mark.nats()
10async def test_basic() -> None:
11 from examples.nats.e07_object_storage import app, broker, handler
13 async with TestNatsBroker(broker, with_real=True):
14 await broker.start()
16 os = await broker.object_storage("example-bucket")
17 try:
18 existed_files = await os.list()
19 except Exception:
20 existed_files = ()
22 call = True
23 for file in existed_files:
24 if file.name == "file.txt": 24 ↛ 23line 24 didn't jump to line 23 because the condition on line 24 was always true
25 call = False
27 if call: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 async with TestApp(app):
29 pass
31 await handler.wait_call(3.0)
32 handler.mock.assert_called_once_with("file.txt")