Comments (10)
Hello, @a14n!
Unfortunately, we can't use aio-pika RPC way. The main goal of FastStream RPC is do not create a special queue to consume responses. Thus, FastStream uses RMQ Direct Reply-to feature to send RPC requests. But, this mechanism has a limitation - you can't send multiple requests at the same time from one channel-connection pair. So, we can't send RPC requests concurrently until #975 is not released.
Although, RPC is not a correct case for RMQ: we already discussed it in #1252.
If you want to consume persistent request-reply stream, please create persistent subscriber and use reply_to
header instead of RPC. If you want to match replies with original requests, you can use smth like the following snippet (from Discord)
from asyncio import Future
from contextlib import asynccontextmanager
from typing import Annotated, Any
from uuid import uuid4
from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker
@asynccontextmanager
async def lifespan():
context.set_global("replies_container", {})
yield
RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]
broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan)
@broker.subscriber("replies")
def handle_reply(
data,
container: RepliesContainer,
cor_id: str = Context("message.correlation_id"),
):
if (future := container.pop(cor_id, None)) and not future.done():
future.set_result(data)
async def custom_rpc(
broker: NatsBroker,
msg: Any,
subject: str,
container: RepliesContainer,
) -> Any:
cor_id = uuid4().hex
container[cor_id] = result_future = Future()
await broker.publish(msg, subject, correlation_id=cor_id)
return await result_future
# Emulation
@broker.subscriber("in")
@broker.publisher("replies")
async def handle_request(data):
"""Your hardcoded not-FastStream service."""
return data
@app.after_startup
async def t(container: RepliesContainer):
response = await custom_rpc(broker, "test", "in", container)
print(response)
This code is the same with aio-pika example thing, so it should solve your problem
from faststream.
Thanks for your quick reply.
from faststream.
@broker.subscriber("replies")
Hello
If there are multiple producers, and multiple producers are subscribed to replies, and if messages sent by consumers to replies are received by other producers (consumers acting as replicas), then the producer who published the task will not receive the message.
from faststream.
producer code:
import os
import random
from faststream.rabbit import RabbitBroker
from faststream import Context, FastStream, context
from fastapi import FastAPI
import uvicorn
from contextlib import asynccontextmanager
from asyncio import Future
from typing import Annotated, Any
import uuid
RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]
broker = RabbitBroker()
@asynccontextmanager
async def fastapi_lifespan(fastapi_app: FastAPI):
context.set_global("replies_container", {})
await broker.start()
yield
await broker.close()
fastapi_app = FastAPI(lifespan=fastapi_lifespan)
@broker.subscriber("replies")
def handle_reply(
data,
container: RepliesContainer,
cor_id: str = Context("message.correlation_id"),
):
print('handle_reply receives message', data, cor_id, os.getpid())
if (future := container.pop(cor_id, None)) and not future.done():
future.set_result(data)
async def custom_rpc(
broker: RabbitBroker,
msg: Any,
subject: str,
container: RepliesContainer,
) -> Any:
cor_id = uuid.uuid4().hex
container[cor_id] = result_future = Future()
print('Publish task', msg, cor_id, os.getpid())
await broker.publish(msg, subject, correlation_id=cor_id)
print('Publish task success', msg, cor_id, os.getpid())
return await result_future
@fastapi_app.post('/push_task')
async def push_task():
msg = 'test' + str(random.randint(100, 999))
container = context.get('replies_container')
response = await custom_rpc(broker, msg, "in", container)
print('response received', msg, response, os.getpid())
Consumer code:
from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker
from asyncio import Future
from contextlib import asynccontextmanager
from typing import Annotated, Any
@asynccontextmanager
async def lifespan():
context.set_global("replies_container", {})
yield
RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]
broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan)
@broker.subscriber("in")
@broker.publisher("replies")
async def handle_request(data):
"""Your hardcoded not-FastStream service."""
print('received task', data)
return data
log:
Publish task test482 cc99276685bd4ac19227680b44743231 9744
Publish task success test482 cc99276685bd4ac19227680b44743231 9744
INFO: Application startup complete.
2024-04-03 11:18:44,145 INFO - default | replies | 1b9d8205b0 - Received
handle_reply receives message test482 cc99276685bd4ac19227680b44743231 9744
response received test482 test482 9744
INFO: 127.0.0.1:16378 - "POST /push_task HTTP/1.1" 200 OK
2024-04-03 11:18:44,155 INFO - default | replies | 1b9d8205b0 - Processed
Publish task test225 46555d6651c943d5a835f10e88a79789 9744
Publish task success test225 46555d6651c943d5a835f10e88a79789 9744
2024-04-03 11:18:47,772 INFO - default | replies | 95783da1b1 - Received
response received test225 46555d6651c943d5a835f10e88a79789 14688
2024-04-03 11:18:47,780 INFO - default | replies | 95783da1b1 - Processed
When the process with pid 9744 publishes a task, and the response happens to be received by 9744, then the response is normal. However, if the task published by 9744 is received by 14688, the response cannot be obtained.
Are there any good ideas to solve this problem?
from faststream.
Are there any good ideas to solve this problem?
Already solved, the queue is generated based on the pid, and the consumer adds messages to the specified queue.
@broker.subscriber("in")
async def handle_request(data: dict, cor_id: str = Context("message.correlation_id")):
"""Your hardcoded not-FastStream service."""
print('收到任务', data, cor_id)
reply_queue = data.pop('reply_queue', None)
await broker.publish(message=data, queue=reply_queue, correlation_id=cor_id)
from faststream.
I met the same problem with redis, even with rpc=False
. It acts like a sync program and takes no advantages of async
.
from faststream.
I think there should be a parameters to decide how many task can run parallelly
from faststream.
I think there should be a parameters to decide how many task can run parallelly
You can use other methods to implement RPC calls.
from faststream.
I think there should be a parameters to decide how many task can run parallelly
You can use other methods to implement RPC calls.
I don't mean the rpc request. The point is the whole lib takes no advantage of asyncio. Tasks run one by one, just like sync program. What 's meaning of make such a lib using asyncio/anyio?
from faststream.
I think there should be a parameters to decide how many task can run parallelly
You can use other methods to implement RPC calls.
I don't mean the rpc request. The point is the whole lib takes no advantage of asyncio. Tasks run one by one, just like sync program. What 's meaning of make such a lib using asyncio/anyio?
It runs asynchronously, if you think it runs synchronously, you can post the test code.
One more thing, your question doesn't seem to be related to this question.
from faststream.
Related Issues (20)
- Bug: TestRabbitBroker's response type should be consistent between mock broker and real RabbitMQ broker HOT 2
- Fix - Check docs for broken links is failing
- Bug: `auto_commit=False` and `retry=True` in confluent-kafka doesn't result in retries
- Feature: need in receiving headers of certain kafka messages in batch subscription mode HOT 1
- Feature: Provide a way to customize logging in confluent's Consumer and Producer objects HOT 1
- Bug: Unable to process messages when headers() returns null in batch confluent kafka consumers HOT 2
- Jobs are failing in main branch with pydantic error
- Bug: HOT 1
- Bug: starting the broker breaks in-memory behaviour HOT 2
- Bug: fake consumer doesn't account for routing key HOT 4
- Feature: Support for disabling automatic topic creation. HOT 2
- Complex bug with rabbitbroker (ChannelInvalidStateError, CancelledError) HOT 3
- Infinite retry with retry=3
- Feature: subscribe and produce to different broker HOT 3
- Bug: Pydantic Model is not fully defined. Possible bug in type resolver HOT 2
- Use redis broker When there are multiple workers integrated into fastapi, the message will be processed multiple times
- Fix CI
- feature: concurrent Redis consuming HOT 7
- Chore: update dependencies and fix failing tests in python 3.8
- Roadmap
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from faststream.