:wave: What could be the reason for `selectAsFlow`...
# supabase-kt
n
👋 What could be the reason for
selectAsFlow
to return an empty array when 100% there’s data in the table that matches the filter?
j
Could be misconfigured RLS policies/a signed out user. Does the "normal" select work?
n
Oh, the normal select is also empty 🤔
I’ll check RLS policies now
That looks correct:
Ok, it was RLS, thanks, man 🙂
👍 1
Sorry to disturb again, in the same operation now I’m getting an infinite loop of Supabase connecting to the realtime websocket. What could be the reason for that?
j
Can you share the whole log?
n
This is everything (Kotlin/JS)
j
Thanks, and the message "Connected to realtime websocket!" indicates the problem? It shouldn't be possible that this message gets sent more than once without any other realtime log
n
Yeah, it’s getting sent again and again (the 193 on the left indicates the ammount).
j
Not sure what could cause this, just tried a small snippet on K/JS and it works as expected:
Copy code
fun main() {
    GlobalScope.launch {
        val supabase = createSupabaseClient(
            supabaseUrl = "",
            supabaseKey = ""
        ) {
            install(Auth)
            install(Realtime)
            install(Postgrest)
        }
        val flow = supabase.from("messages").selectAsFlow(Message::id)
        flow.collect {
            println(it)
        }
    }
}
Can you share your code?
n
🤔 Yes, but it’s quite complex.
This is the flow:
Copy code
@OptIn(SupabaseExperimental::class)
    private suspend fun getPubsByManagerStream(userId: String): Flow<List<Pub>> =
        supabase.from("pub_admin")
            .selectAsFlow(
                primaryKeys = listOf(PubAdmin::pub, PubAdmin::user),
                filter = FilterOperation("user", FilterOperator.EQ, userId)
            )
            .flatMapLatest { adminPubs ->
                if (adminPubs.isEmpty()) flowOf(emptyList())
                else combine(adminPubs.map { (pubId, _) -> pubsService.getPubDetailsStream(pubId) }) {
                    it.toList()
                }
            }
j
is
getPubDetailsStream
also using realtime?
n
Yes:
Copy code
@OptIn(SupabaseExperimental::class)
    override suspend fun getPubDetailsStream(pubId: Int): Flow<Pub> =
        supabase.from(tableName)
            .selectSingleValueAsFlow(Pub::id) {
                Pub::id eq pubId
            }
I suspect of some misconfiguration in the DB, though, because I’ve seen this code working.
If you don’t know what could cause it from the top of your head, don’t worry, I’ll investigate the whole DB thing on my side.
j
Copy code
.flatMapLatest { adminPubs ->
                if (adminPubs.isEmpty()) flowOf(emptyList())
                else combine(adminPubs.map { (pubId, _) -> pubsService.getPubDetailsStream(pubId) }) {
                    it.toList()
                }
            }
So for every change you are basically retrieving details & listening for changes on them each on their own?
n
Yes
The list will get updated in number and content
j
I'm wondering if that could cause the creation of an basically infinite growing amount of flows and therefore realtime channels as you repeatedly call
selectSingleValueAsFlow
for every pub on every change. Not sure if these unused flows get cancelled at all 🤔
> Not sure if these unused flows get cancelled at all As a result, the realtime channels are never left. That would probably be hard to debug, but you can try to print out
Realtime#subscriptions
to check how many channels are used. That would be one for
getPubsByManagerStream
and one each for
getPubDetailsStream
calls
n
Thanks! I'll take a deeper look into that.
A single `selectAsFlow`with all the items would be the same result.
This infinite loop was caused due to a serialisation error in one of the objects. For some reason, in the case of an error, the whole flow was retried infinitely. sad panda
👀 1