FastStream Extension
Integrating AnyDI with FastStream is straightforward. Since FastStream uses FastDepends library, there is a simple workaround for using the two together using custom Inject parameter instead of standard Depends.
Here's an example of how to make them work together:
from faststream.redis import RedisBroker
import anydi.ext.faststream
from anydi import Container
from anydi.ext.faststream import Inject
class HelloService:
async def say_hello(self, name: str) -> str:
return f"Hello, {name}"
container = Container()
@container.provider(scope="singleton")
def hello_service() -> HelloService:
return HelloService()
broker = RedisBroker()
@broker.subscriber("hello")
async def say_hello(
name: str,
hello_service: HelloService = Inject(),
) -> str:
return await hello_service.say_hello(name=name)
anydi.ext.faststream.install(broker, container)
Note
To detect a dependency interface, provide a valid type annotation.