Coverage for examples / nats / e07_object_storage.py: 80%
10 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from io import BytesIO
3from faststream import FastStream, Logger
4from faststream.nats import NatsBroker
5from faststream.nats.annotations import ObjectStorage
7broker = NatsBroker()
8app = FastStream(broker)
11@broker.subscriber("example-bucket", obj_watch=True)
12async def handler(filename: str, storage: ObjectStorage, logger: Logger) -> None:
13 assert filename == "file.txt"
14 file = await storage.get(filename)
15 logger.info(file.data)
18@app.after_startup
19async def test_send() -> None:
20 os = await broker.object_storage("example-bucket")
21 await os.put("file.txt", BytesIO(b"File mock"))