Code Monkey home page Code Monkey logo

Comments (14)

sunli829 avatar sunli829 commented on May 3, 2024 2

I'm adding this feature.😃

from async-graphql.

sunli829 avatar sunli829 commented on May 3, 2024 1

Adding this feature took a little longer than I expected. 😁

async-graphql upgrade to 1.9.17.
async-graphql-actix-web upgrade to 1.1.6 if you use actix-web.
async-graphql-warp upgrade to 1.1.5 if you use warp.

https://github.com/sunli829/async-graphql-examples/blob/c1844f622a74c13b53cff0f209ba45d3ad9413fa/actix-web/token-from-header/src/main.rs#L57

You can test it in GraphQL playground.

The token for the GraphQL subscription is sent by payload of the websocket protocol

Set the following values into GraphQL playground HTTP Headers.

{
  "token": "123456"
}

This is the actual subscription request sent through the websocket.

{"type":"connection_init","payload":{"token":"123456"}}

from async-graphql.

nicolaiunrein avatar nicolaiunrein commented on May 3, 2024 1

Just in case anyone scratches their head like I did while trying to figure out how to embed the ctx into the stream.
Here is how I solved it using zip and repeat:

#[field]
async fn my_handler(&self, ctx: &Context<'_>) -> impl Stream<Item = MyEvent> {
    let repo = ctx.data::<BoxedRepository>();
    self.channel
        .subscribe()
        .zip(futures::stream::repeat(repo.clone()))
        .filter_map(|(_evt, _repo)| async move {
            //do something with repo like fetching data per event...
            None
        })
}

from async-graphql.

sunli829 avatar sunli829 commented on May 3, 2024 1

@Ejhfast

StreamExt::map can only accept one synchronous closure, you should use StreamExt::then replace StreamExt::map.

fn map<T, F>(self, f: F) -> Map<Self, F>
where
    F: FnMut(Self::Item) -> T

fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
where
    F: FnMut(Self::Item) -> Fut,
    Fut: Future

StreamExt::filter does not have this problem

fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
where
    F: FnMut(&Self::Item) -> Fut,
    Fut: Future<Output = bool>,

Your code should be changed like this

#[async_graphql::Subscription]
impl SubscriptionRoot {
    #[field]
    async fn messages(&self, ctx: &Context<'_>, mutation_type: Option<MutationType>) -> impl Stream<Item = MessageChanged> {
        let convo_db = ctx.data::<Database>().conversations.clone();
        SimpleBroker::<MessageChanged>::subscribe().filter(move |event| {
            let convo_db = convo_db.clone(); // move ownership to async block
            async move {
               let convos = convo_db.get_all().await;
               // logic here...
               res
            }
        })
    }
}

I hope I can help you.😁

from async-graphql.

nicolaiunrein avatar nicolaiunrein commented on May 3, 2024 1

To recap: I guess the trick is to move your data into the closure, clone it and move the clone into the async block. This seems to work with or without Arc but for performance reasons this should of course be an Arc on most types to avoid expensive clones. My problem was that I always moved the data straight to the async block and tried to clone there.

from async-graphql.

sunli829 avatar sunli829 commented on May 3, 2024

Maybe you can try async-stream https://crates.io/crates/async-stream, this might be more convenient.

from async-graphql.

nicolaiunrein avatar nicolaiunrein commented on May 3, 2024

It does not seem to fix the problem that I cannot prove that the stream does not outlive the Context because the returned stream has a 'static lifetime and &Context does not.

from async-graphql.

Ejhfast avatar Ejhfast commented on May 3, 2024

Awesome work with the fast update! Just to give my current status, I also ran into an issue using my async DB representation within the stream:

#[async_graphql::Subscription]
impl SubscriptionRoot {
    #[field]
    async fn mydata(&self, ctx: &Context<'_>, mutation_type: Option<MutationType>) -> impl Stream<Item = DataChanged> {
        let db = ctx.data::<Database>().data.clone();
        SimpleBroker::<DataChanged>::subscribe().filter(move |event| {
            let results = db.get_all().await;
            // logic here...
            async move { res }
        })
    }
}

And await is not allowed, as it is not an async block.

Will see if this can be solved with async-stream. I imagine that many people would be interacting with context that presents async APIs, so maybe SimpleBroker should produce an async-compatible stream by default?

from async-graphql.

nicolaiunrein avatar nicolaiunrein commented on May 3, 2024

I think this boils down to the same issue I was having.
What do you mean by async-compatible?
I think it would be nice to be able to get a clone of the context with every event. Something like this:

SimpleBroker::<MyEvent>::subscribe_with_ctx().map(|(evt, ctx)| async move { evt })

Should be easy to implement and simplify client code. Are there better solutions? What do you think?

from async-graphql.

Ejhfast avatar Ejhfast commented on May 3, 2024

@nicolaiunrein my Rust is not super strong, but I think there are two different problems here. One is the lifetime issue, giving whatever you want to pass into the stream a 'static lifetime. I also ran into this issue but in my case was able to solve it by wrapping the data I wanted to share in Arc.

let user = ctx.data::<Option<User>>().clone().unwrap(); // unsafe unwrap just for example
let user_for_stream = Arc::new(user);
// then can use any data from the user inside the .filter(|x| ...) block 
// or whatever else on each new event

But there is also the fact that the stream itself does not allow for calling async functions within the body of the map/filter (e.g., compiler error on any .await, specifically 'await' is only allowed inside 'async' functions and blocks), which it looks like async-stream might solve.

Does that make sense to you? In your case, would wrapping repo in Arc also solve your problem? Or is this undesirable for other reasons I'm not aware of?

from async-graphql.

sunli829 avatar sunli829 commented on May 3, 2024

It does not seem to fix the problem that I cannot prove that the stream does not outlive the Context because the returned stream has a 'static lifetime and &Context does not.

@nicolaiunrein Yes, this does not solve the lifetime problem.😄

from async-graphql.

sunli829 avatar sunli829 commented on May 3, 2024

I think this boils down to the same issue I was having.
What do you mean by async-compatible?
I think it would be nice to be able to get a clone of the context with every event. Something like this:

SimpleBroker::<MyEvent>::subscribe_with_ctx().map(|(evt, ctx)| async move { evt })

Should be easy to implement and simplify client code. Are there better solutions? What do you think?

@nicolaiunrein This is hard to implement, because the context borrows some data that can't be cloned, I think solve this problem by store Arc<T> to the context data.

from async-graphql.

sunli829 avatar sunli829 commented on May 3, 2024

This should be the best solution.

from async-graphql.

Dindaleon avatar Dindaleon commented on May 3, 2024

Is this still working? Because I tried it and getting the following error:

hidden type for `impl Stream<Item = BookChanged>` captures lifetime that does not appear in bounds

from async-graphql.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.