Code Monkey home page Code Monkey logo

Comments (7)

jan-tennert avatar jan-tennert commented on June 12, 2024

The problem is that

val changeFlow = postgresChangeFlow<PostgresAction>(schema) {
    this.table = table
    filter(primaryKey.columnName, FilterOperator.EQ, key)
}

has to be called before RealtimeChannel#subscribe and since channelFlow is a cold flow, this call order would not be assured. By making the function suspending and calling this method directly, there can't be any problems.

Note: The postgresChangeFlow method is not directly suspending, but there has to be a DB call before

from supabase-kt.

mlykotom avatar mlykotom commented on June 12, 2024

(OT: I understand now why I wasn't receiving the realtime messages in the app .. because I forgot to call channel.subscribe()!)

If what you say is required, then the API doesn't enforce it and maybe it should?

Let's say I want to have a repository where I want to subscribe for realtime data and pass down just the Flow. I will have something like this:

suspend fun shopItems(): Flow<List<ShopItemModel>> {
        val channel = supabase.channel("db-changes")

        val flow = channel.postgresListDataFlow(
            schema = "public",
            table = "ShopItem",
            primaryKey = ShopItemDto::id
        )
            .map { items -> items.map { it.toModel() } }

        channel.subscribe()
        return flow
    }

But I can easily have subscribe before the flow, which you say doesn't work:

    suspend fun shopItems(): Flow<List<ShopItemModel>> {
        val channel = supabase.channel("db-changes")
        channel.subscribe()

        return channel.postgresListDataFlow(
            schema = "public",
            table = "ShopItem",
            primaryKey = ShopItemDto::id
        )
            .map { items -> items.map { it.toModel() } }
    }

I can make it more like a chain of operators with .onStart { channel.subscribe() }, which brings several questions:

  1. It looks that only initialData request is suspend, can this be part of the channelFlow? Therefore you get initial data only when you subscribe to that flow (which is probably expected).

  2. I see addPostgresChange that adds the config to the listener, but I don't see any remove* function. Is it possible that this keeps the listener even though someone unsubscribes from the channel?

  3. should we subscribe the channel as part of the operator as well when it starts collecting (onStart) ?

  4. And what's the recommendation about unsubscribing the realtime channel? I don't see it in the example. I assume it should be called as well? (And therefore it could also be called part of the stream)

Sorry if these questions are obvious and I haven't looked properly (I'm new to Supabase)

Thanks!

from supabase-kt.

jan-tennert avatar jan-tennert commented on June 12, 2024

It looks that only initialData request is suspend, can this be part of the channelFlow? Therefore you get initial data only when you subscribe to that flow (which is probably expected).

Looking at the implementation again, yes, we can probably move the initial data there and remove the suspending modifier for this method. But the postgresChangeFlow has to be outside the channel flow. The single flow variant has to be suspending, since the result of the DB call is used in the postgresChangeFlow method.

I see addPostgresChange that adds the config to the listener, but I don't see any remove* function. Is it possible that this keeps the listener even though someone unsubscribes from the channel?

The way Supabase Realtime works is that you specify what database changes you want to receive when subscribing to the channel. This method just adds the configuration to the join payload.
The actual listener gets added here:

val id = callbackManager.addPostgresCallback(config, callback)
awaitClose { callbackManager.removeCallbackById(id) }

But you are right, it seems like that entry doesn't get removed in awaitClose

should we subscribe the channel as part of the operator as well when it starts collecting (onStart) ?

It is only important that you call the postgresListDataFlow or any other postgres method before subscribing, because of the join payload.

And what's the recommendation about unsubscribing the realtime channel? I don't see it in the example. I assume it should be called as well? (And therefore it could also be called part of the stream)

You can probably just unsubscribe from the channel when the flow ends.

from supabase-kt.

mlykotom avatar mlykotom commented on June 12, 2024

It is only important that you call the postgresListDataFlow or any other postgres method before subscribing, because of the join payload.

If this is important, I'm thinking if the subscribe method should be part of the call and part of the Flow stream?
Basically you can have it in onStart { subscribe()} and onComplete { unsubscribe() }, which means that when this Flow is active, you're listening to the changes in the channel.

I don't know if it's expected to stay subscribed in the channel (you call .subscribe(), but you never listen to the changes)?
But this is what currently the API allowing very easily.

If this was part of the Flow stream and you'd like to keep the subscription for the whole time of the app, you can transform the flow to a SharedFlow for example, which can keep the subscription on the whole time.

This is a breaking change in the API, so probably not possible to change directly, but I'm keen on hearing your thoughts.

Thanks!

from supabase-kt.

jan-tennert avatar jan-tennert commented on June 12, 2024

If this is important, I'm thinking if the subscribe method should be part of the call and part of the Flow stream?
Basically you can have it in onStart { subscribe()} and onComplete { unsubscribe() }, which means that when this Flow is active, you're listening to the changes in the channel.
I don't know if it's expected to stay subscribed in the channel (you call .subscribe(), but you never listen to the changes)?
But this is what currently the API allowing very easily

Yes, so apart from postgres changes, you can also listen for broadcast and presence changes. The difference is that you can always call these flow methods no matter whether you are subscribed to the channel or not.
But I agree that this is currently not ideal, these "new" flow methods postgrestListDataFlow and postgrestSingleDataFlow are pretty new and not available in the other client libs, and because the idea is to make all client libs work essentially the same to make switching easier, this is currently a compromise.
Maybe we could add some extension functions to the Postgrest plugin for really only listening for postgres changes and subscring/unsubscribing gets handled automatically.

from supabase-kt.

mlykotom avatar mlykotom commented on June 12, 2024

I think it's fine (and important?) to keep the building blocks available (subscribe, unsubscribe, adding the listeners, etc). If there's someone using it in a very specific way, they can just use the building blocks.

For very simple usage I think it could be wrapped to always do the actions in appropriate way without any memory leaks or anything that you'd need to remember.

If this means adding extensions on Postgrest directly, then I agree.

I understand now a bit better that one channel can hold broadcast, presence, and db changes in one, which is probably why this design. I also see the pattern coming from other languages implementations, where you need to implement the callback. In kotlin, the Flow implementation is quite powerful that it can automatically subscribe/unsubscribe channel if there's nobody listening.

I understand though this probably requires some refactor, I think this issue is probably getting bigger than necessary as it started with the suspend functions :)

from supabase-kt.

jan-tennert avatar jan-tennert commented on June 12, 2024

Yes, I think we can close this issue. I removed the suspend modifier for the postgresListDataFlow methods and we can further discuss this issue in #570

from supabase-kt.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.