Comments (7)
The problem is that
val changeFlow = postgresChangeFlow<PostgresAction>(schema) {
this.table = table
filter(primaryKey.columnName, FilterOperator.EQ, key)
}
has to be called before RealtimeChannel#subscribe
and since channelFlow
is a cold flow, this call order would not be assured. By making the function suspending and calling this method directly, there can't be any problems.
Note: The postgresChangeFlow
method is not directly suspending, but there has to be a DB call before
from supabase-kt.
(OT: I understand now why I wasn't receiving the realtime messages in the app .. because I forgot to call channel.subscribe()
!)
If what you say is required, then the API doesn't enforce it and maybe it should?
Let's say I want to have a repository where I want to subscribe for realtime data and pass down just the Flow
. I will have something like this:
suspend fun shopItems(): Flow<List<ShopItemModel>> {
val channel = supabase.channel("db-changes")
val flow = channel.postgresListDataFlow(
schema = "public",
table = "ShopItem",
primaryKey = ShopItemDto::id
)
.map { items -> items.map { it.toModel() } }
channel.subscribe()
return flow
}
But I can easily have subscribe before the flow, which you say doesn't work:
suspend fun shopItems(): Flow<List<ShopItemModel>> {
val channel = supabase.channel("db-changes")
channel.subscribe()
return channel.postgresListDataFlow(
schema = "public",
table = "ShopItem",
primaryKey = ShopItemDto::id
)
.map { items -> items.map { it.toModel() } }
}
I can make it more like a chain of operators with .onStart { channel.subscribe() }
, which brings several questions:
-
It looks that only
initialData
request is suspend, can this be part of thechannelFlow
? Therefore you get initial data only when you subscribe to that flow (which is probably expected). -
I see
addPostgresChange
that adds the config to the listener, but I don't see anyremove*
function. Is it possible that this keeps the listener even though someone unsubscribes from the channel? -
should we subscribe the channel as part of the operator as well when it starts collecting (
onStart
) ? -
And what's the recommendation about unsubscribing the realtime channel? I don't see it in the example. I assume it should be called as well? (And therefore it could also be called part of the stream)
Sorry if these questions are obvious and I haven't looked properly (I'm new to Supabase)
Thanks!
from supabase-kt.
It looks that only initialData request is suspend, can this be part of the channelFlow? Therefore you get initial data only when you subscribe to that flow (which is probably expected).
Looking at the implementation again, yes, we can probably move the initial data there and remove the suspending modifier for this method. But the postgresChangeFlow
has to be outside the channel flow. The single flow variant has to be suspending, since the result of the DB call is used in the postgresChangeFlow
method.
I see addPostgresChange that adds the config to the listener, but I don't see any remove* function. Is it possible that this keeps the listener even though someone unsubscribes from the channel?
The way Supabase Realtime works is that you specify what database changes you want to receive when subscribing to the channel. This method just adds the configuration to the join payload.
The actual listener gets added here:
But you are right, it seems like that entry doesn't get removed in
awaitClose
should we subscribe the channel as part of the operator as well when it starts collecting (onStart) ?
It is only important that you call the postgresListDataFlow
or any other postgres method before subscribing, because of the join payload.
And what's the recommendation about unsubscribing the realtime channel? I don't see it in the example. I assume it should be called as well? (And therefore it could also be called part of the stream)
You can probably just unsubscribe from the channel when the flow ends.
from supabase-kt.
It is only important that you call the
postgresListDataFlow
or any other postgres method before subscribing, because of the join payload.
If this is important, I'm thinking if the subscribe
method should be part of the call and part of the Flow
stream?
Basically you can have it in onStart { subscribe()}
and onComplete { unsubscribe() }
, which means that when this Flow is active, you're listening to the changes in the channel.
I don't know if it's expected to stay subscribed in the channel (you call .subscribe()
, but you never listen to the changes)?
But this is what currently the API allowing very easily.
If this was part of the Flow
stream and you'd like to keep the subscription for the whole time of the app, you can transform the flow to a SharedFlow
for example, which can keep the subscription on the whole time.
This is a breaking change in the API, so probably not possible to change directly, but I'm keen on hearing your thoughts.
Thanks!
from supabase-kt.
If this is important, I'm thinking if the subscribe method should be part of the call and part of the Flow stream?
Basically you can have it in onStart { subscribe()} and onComplete { unsubscribe() }, which means that when this Flow is active, you're listening to the changes in the channel.
I don't know if it's expected to stay subscribed in the channel (you call .subscribe(), but you never listen to the changes)?
But this is what currently the API allowing very easily
Yes, so apart from postgres changes, you can also listen for broadcast and presence changes. The difference is that you can always call these flow methods no matter whether you are subscribed to the channel or not.
But I agree that this is currently not ideal, these "new" flow methods postgrestListDataFlow
and postgrestSingleDataFlow
are pretty new and not available in the other client libs, and because the idea is to make all client libs work essentially the same to make switching easier, this is currently a compromise.
Maybe we could add some extension functions to the Postgrest
plugin for really only listening for postgres changes and subscring/unsubscribing gets handled automatically.
from supabase-kt.
I think it's fine (and important?) to keep the building blocks available (subscribe
, unsubscribe
, adding the listeners, etc). If there's someone using it in a very specific way, they can just use the building blocks.
For very simple usage I think it could be wrapped to always do the actions in appropriate way without any memory leaks or anything that you'd need to remember.
If this means adding extensions on Postgrest
directly, then I agree.
I understand now a bit better that one channel can hold broadcast, presence, and db changes in one, which is probably why this design. I also see the pattern coming from other languages implementations, where you need to implement the callback. In kotlin, the Flow
implementation is quite powerful that it can automatically subscribe/unsubscribe channel if there's nobody listening.
I understand though this probably requires some refactor, I think this issue is probably getting bigger than necessary as it started with the suspend
functions :)
from supabase-kt.
Yes, I think we can close this issue. I removed the suspend modifier for the postgresListDataFlow
methods and we can further discuss this issue in #570
from supabase-kt.
Related Issues (20)
- [Bug]: Column.Raw not removing line breaks
- [Feature request]: Update to Kotlin 2.0.0
- [Bug]: PostgREST filter logical operator unexpected behaviour HOT 1
- [Question]: Is it really necessary that if the authentication with the keys is incorrect it will exit the application with a FATAL ERROR? HOT 2
- [Question]: Is there a possibility to log in anonymously and make it persistent? HOT 6
- [Feature request]: Add support for new private channels
- [Bug]: supabase-runtime fails to connect on iOS devices HOT 3
- [Question]: Trouble Implementing Password Reset on Android with Supabase HOT 3
- [Bug]: Auth refresh sessions is not working correctly HOT 9
- [Feature request]: App To App OAuth for Discord HOT 1
- [Feature request]: Simplify realtime flows for postgres changes
- [Question]: I think it's better to move the samples to root project? HOT 2
- [Feature request]: Improve Exception on Cancellation HOT 2
- [Bug]: [Storage] Empty file is created on Dashboard when pass empty `ByteArray` to upload function
- [Bug]: All object keys must match HOT 13
- [Bug]: Error: Missing one of these types: signup, email_change, sms, phone_change HOT 2
- [Feature request]: remove session, emit `SIGNED_OUT` when JWT `session_id` is invalid
- [Question]: How cancel supabase request? HOT 3
- [Feature request]: Export `SupabaseExperimental` annotation in `compose-auth-ui` plugin HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from supabase-kt.