Coverage for examples / nats / e07_object_storage.py: 80%

10 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from io import BytesIO 

2 

3from faststream import FastStream, Logger 

4from faststream.nats import NatsBroker 

5from faststream.nats.annotations import ObjectStorage 

6 

7broker = NatsBroker() 

8app = FastStream(broker) 

9 

10 

11@broker.subscriber("example-bucket", obj_watch=True) 

12async def handler(filename: str, storage: ObjectStorage, logger: Logger) -> None: 

13 assert filename == "file.txt" 

14 file = await storage.get(filename) 

15 logger.info(file.data) 

16 

17 

18@app.after_startup 

19async def test_send() -> None: 

20 os = await broker.object_storage("example-bucket") 

21 await os.put("file.txt", BytesIO(b"File mock"))