I’m trying to coerce Realm to work with coroutines...
# coroutines
r
I’m trying to coerce Realm to work with coroutines, but my
ReceiveChannel
is coming back already closed from this signature
Copy code
@ExperimentalCoroutinesApi
override fun CoroutineScope.messageUpdateProducer(): ReceiveChannel<List<Message>> = produce {
    realm.where(RealmMessage::class.java).findAll().addChangeListener { results ->
        offer(results.takeIf { it.isNotEmpty() }?.map { it.messageActual } ?: listOf())
    }
}
It looks like, from the documentation that if the producer encounters an error, it will close automatically, but I’m not sure what’s generating an error in there. I’m handling all the null cases and my crashes are only generating a stack trace internal to the channel mechanism.
Copy code
kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
        at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1094)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveSelect.resumeReceiveClosed(AbstractChannel.kt:993)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:332)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:271)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:95)
        at kotlinx.coroutines.channels.ProducerCoroutine.onCompleted(Produce.kt:98)
        at kotlinx.coroutines.channels.ProducerCoroutine.onCompleted(Produce.kt:91)
        at kotlinx.coroutines.AbstractCoroutine.onCompletionInternal(AbstractCoroutine.kt:102)
        at kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:274)
        at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:784)
        at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:764)
        at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:111)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
        at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238)
        at android.os.Handler.handleCallback(Handler.java:739)
        at android.os.Handler.dispatchMessage(Handler.java:95)
        at android.os.Looper.loop(Looper.java:135)
        at android.app.ActivityThread.main(ActivityThread.java:5223)
        at java.lang.reflect.Method.invoke(Native Method)
        at java.lang.reflect.Method.invoke(Method.java:372)
        at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:899)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:694)
z
once the
produce
completes, the receive channel may be closed? I don’t remember
r
oh interesting, I thought channels were hot 🤔
d
You don't need
produce
here.
Just create a channel and then return it.
💯 2
@zak.taccardi is right.
r
Thanks. I guess I still don’t fully understand
produce
d
You need to unregister the change listener when the channel is closed though
👍 1
a
in a
ProducerScope
you can do that with
awaitClose { ... }
at the end of the produce block and unregister in that block
or make the whole thing simpler and use the same code from the
produce
block with the
awaitClose
added, but with
channelFlow
instead of
produce
- you won't need the
CoroutineScope
and you get a cold stream
r
Cold stream meaning that it doesn’t start emitting values until it’s been “subscribed” to?
👌 1