FastStream Extension
You can integrate AnyDI with FastStream easily. Since FastStream uses FastDepends, you can use the same Provide[...] annotation or Inject() marker as with FastAPI extension, instead of FastDepends' Depends.
Version Requirement
Only FastStream >= 0.6 is supported. There are breaking changes between FastStream 0.5 and 0.6 versions that make earlier versions incompatible with this extension.
Here's an example of how to make them work together:
from faststream.redis import RedisBroker
import anydi.ext.faststream
from anydi import Container, Provide
class GreetingService:
async def greet(self, name: str) -> str:
return f"Hello, {name}"
container = Container()
@container.provider(scope="singleton")
def greeting_service() -> GreetingService:
return GreetingService()
broker = RedisBroker()
@broker.subscriber("greetings")
async def handle_greeting(
name: str,
service: Provide[GreetingService],
) -> str:
return await service.greet(name=name)
anydi.ext.faststream.install(broker, container)
Note
To detect a dependency type, provide a valid type annotation.
Provide[Service] is equivalent to Annotated[Service, Inject()].
You can also use the Inject() marker as a default value:
from anydi import Inject
@broker.subscriber("greetings")
async def handle_greeting(
name: str,
service: GreetingService = Inject(),
) -> str:
return await service.greet(name=name)
Request-scoped dependencies
To use request-scoped dependencies with FastStream, use the RequestScopedMiddleware that wraps message handlers in request context. This is useful for message-level resources:
import uuid
from faststream.redis import RedisBroker
import anydi.ext.faststream
from anydi import Container, Provide
from anydi.ext.faststream import RequestScopedMiddleware
class MessageLogger:
def __init__(self) -> None:
self.message_id = str(uuid.uuid4())
def log(self, text: str) -> None:
print(f"[{self.message_id}] {text}")
container = Container()
@container.provider(scope="request")
def message_logger() -> MessageLogger:
return MessageLogger()
broker = RedisBroker(middlewares=(RequestScopedMiddleware,))
@broker.subscriber("orders.process")
async def process_order(
order_id: str,
logger: Provide[MessageLogger],
) -> None:
logger.log(f"Processing order {order_id}")
logger.log("Order processed successfully")
anydi.ext.faststream.install(broker, container)
The RequestScopedMiddleware processes each message in its own request context, so request-scoped dependencies are resolved and isolated per message.
Custom scoped dependencies
You can use custom scopes with FastStream by creating custom middleware for your scope:
from functools import cached_property
from typing import Any
from faststream import BaseMiddleware, StreamMessage
from faststream.redis import RedisBroker
import anydi.ext.faststream
from anydi import Container, Provide
from anydi.ext.faststream import get_container_from_context
class BatchContext:
def __init__(self, batch_id: str) -> None:
self.batch_id = batch_id
self.processed_count = 0
container = Container()
# Register custom "batch" scope
container.register_scope("batch")
@container.provider(scope="batch")
def batch_context() -> BatchContext:
return BatchContext(batch_id="batch-123")
class BatchScopedMiddleware(BaseMiddleware):
@cached_property
def container(self) -> Container:
return get_container_from_context(self.context)
async def consume_scope(
self, call_next: Any, msg: StreamMessage[Any]
) -> Any:
async with self.container.ascoped_context("batch"):
return await call_next(msg)
broker = RedisBroker(middlewares=(BatchScopedMiddleware,))
@broker.subscriber("batch.process")
async def process_message(
message: str,
ctx: Provide[BatchContext],
) -> None:
ctx.processed_count += 1
print(f"Batch {ctx.batch_id}: Processed {ctx.processed_count} messages")
anydi.ext.faststream.install(broker, container)
This pattern allows you to share state and resources across messages within a custom scope.