Comments (14)
I'm adding this feature.😃
from async-graphql.
Adding this feature took a little longer than I expected. 😁
async-graphql
upgrade to 1.9.17
.
async-graphql-actix-web
upgrade to 1.1.6
if you use actix-web
.
async-graphql-warp
upgrade to 1.1.5
if you use warp
.
You can test it in GraphQL playground.
The token for the GraphQL subscription is sent by payload of the websocket protocol
Set the following values into GraphQL playground HTTP Headers.
{
"token": "123456"
}
This is the actual subscription request sent through the websocket.
{"type":"connection_init","payload":{"token":"123456"}}
from async-graphql.
Just in case anyone scratches their head like I did while trying to figure out how to embed the ctx into the stream.
Here is how I solved it using zip and repeat:
#[field]
async fn my_handler(&self, ctx: &Context<'_>) -> impl Stream<Item = MyEvent> {
let repo = ctx.data::<BoxedRepository>();
self.channel
.subscribe()
.zip(futures::stream::repeat(repo.clone()))
.filter_map(|(_evt, _repo)| async move {
//do something with repo like fetching data per event...
None
})
}
from async-graphql.
StreamExt::map
can only accept one synchronous closure, you should use StreamExt::then
replace StreamExt::map
.
fn map<T, F>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> T
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: Future
StreamExt::filter
does not have this problem
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
Your code should be changed like this
#[async_graphql::Subscription]
impl SubscriptionRoot {
#[field]
async fn messages(&self, ctx: &Context<'_>, mutation_type: Option<MutationType>) -> impl Stream<Item = MessageChanged> {
let convo_db = ctx.data::<Database>().conversations.clone();
SimpleBroker::<MessageChanged>::subscribe().filter(move |event| {
let convo_db = convo_db.clone(); // move ownership to async block
async move {
let convos = convo_db.get_all().await;
// logic here...
res
}
})
}
}
I hope I can help you.😁
from async-graphql.
To recap: I guess the trick is to move your data into the closure, clone it and move the clone into the async
block. This seems to work with or without Arc
but for performance reasons this should of course be an Arc
on most types to avoid expensive clones. My problem was that I always moved the data straight to the async
block and tried to clone there.
from async-graphql.
Maybe you can try async-stream
https://crates.io/crates/async-stream, this might be more convenient.
from async-graphql.
It does not seem to fix the problem that I cannot prove that the stream does not outlive the Context because the returned stream has a 'static
lifetime and &Context
does not.
from async-graphql.
Awesome work with the fast update! Just to give my current status, I also ran into an issue using my async DB representation within the stream:
#[async_graphql::Subscription]
impl SubscriptionRoot {
#[field]
async fn mydata(&self, ctx: &Context<'_>, mutation_type: Option<MutationType>) -> impl Stream<Item = DataChanged> {
let db = ctx.data::<Database>().data.clone();
SimpleBroker::<DataChanged>::subscribe().filter(move |event| {
let results = db.get_all().await;
// logic here...
async move { res }
})
}
}
And await
is not allowed, as it is not an async block.
Will see if this can be solved with async-stream
. I imagine that many people would be interacting with context that presents async APIs, so maybe SimpleBroker should produce an async-compatible stream by default?
from async-graphql.
I think this boils down to the same issue I was having.
What do you mean by async-compatible?
I think it would be nice to be able to get a clone of the context with every event. Something like this:
SimpleBroker::<MyEvent>::subscribe_with_ctx().map(|(evt, ctx)| async move { evt })
Should be easy to implement and simplify client code. Are there better solutions? What do you think?
from async-graphql.
@nicolaiunrein my Rust is not super strong, but I think there are two different problems here. One is the lifetime issue, giving whatever you want to pass into the stream a 'static
lifetime. I also ran into this issue but in my case was able to solve it by wrapping the data I wanted to share in Arc
.
let user = ctx.data::<Option<User>>().clone().unwrap(); // unsafe unwrap just for example
let user_for_stream = Arc::new(user);
// then can use any data from the user inside the .filter(|x| ...) block
// or whatever else on each new event
But there is also the fact that the stream itself does not allow for calling async functions within the body of the map/filter (e.g., compiler error on any .await
, specifically 'await' is only allowed inside 'async' functions and blocks
), which it looks like async-stream
might solve.
Does that make sense to you? In your case, would wrapping repo
in Arc
also solve your problem? Or is this undesirable for other reasons I'm not aware of?
from async-graphql.
It does not seem to fix the problem that I cannot prove that the stream does not outlive the Context because the returned stream has a
'static
lifetime and&Context
does not.
@nicolaiunrein Yes, this does not solve the lifetime problem.😄
from async-graphql.
I think this boils down to the same issue I was having.
What do you mean by async-compatible?
I think it would be nice to be able to get a clone of the context with every event. Something like this:SimpleBroker::<MyEvent>::subscribe_with_ctx().map(|(evt, ctx)| async move { evt })Should be easy to implement and simplify client code. Are there better solutions? What do you think?
@nicolaiunrein This is hard to implement, because the context borrows some data that can't be cloned, I think solve this problem by store Arc<T>
to the context data.
from async-graphql.
This should be the best solution.
from async-graphql.
Is this still working? Because I tried it and getting the following error:
hidden type for `impl Stream<Item = BookChanged>` captures lifetime that does not appear in bounds
from async-graphql.
Related Issues (20)
- Access the value of "variable" arguments (`Value::Variable`)
- Has anyone does any benchmarks because im getting extremely poor results and I do not know why? HOT 8
- Get mutable referenece to the global data defined in the `Context` or `Schema` HOT 1
- Confusing `unused_mut` warning in `#[Object]` HOT 8
- How to handle both directions of one-to-many relation in federated graph
- Question: How to get server to send ping messages on subscriptions? HOT 2
- Parsing multiple operations in a file HOT 1
- Non nullable variables should allow default values HOT 3
- Object with single skipped field but with ComplexObject HOT 2
- As using proxy type
- Using flatten inside an impl with no other fields causes a compile error
- Subscription with MPSC receiver in context data
- Reduce clippy noise from #[Object] macro HOT 2
- Guard trait lifetime HOT 1
- Using generics with both SimpleObject and InputObject as field in output type fails HOT 1
- Subscription Authentication
- Stack overflow after upgrade to 7.0.2 HOT 5
- Does async-graphql validate responses?
- Error reading data from ExtensionContext after upgrade to 7.0.3 HOT 1
- Create a general error formateer
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from async-graphql.