https://kotlinlang.org logo
#coroutines
Title
# coroutines
r

rook

08/05/2019, 9:29 PM
I’m trying to coerce Realm to work with coroutines, but my
ReceiveChannel
is coming back already closed from this signature
Copy code
@ExperimentalCoroutinesApi
override fun CoroutineScope.messageUpdateProducer(): ReceiveChannel<List<Message>> = produce {
    realm.where(RealmMessage::class.java).findAll().addChangeListener { results ->
        offer(results.takeIf { it.isNotEmpty() }?.map { it.messageActual } ?: listOf())
    }
}
It looks like, from the documentation that if the producer encounters an error, it will close automatically, but I’m not sure what’s generating an error in there. I’m handling all the null cases and my crashes are only generating a stack trace internal to the channel mechanism.
Copy code
kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
        at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1094)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveSelect.resumeReceiveClosed(AbstractChannel.kt:993)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:332)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:271)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:95)
        at kotlinx.coroutines.channels.ProducerCoroutine.onCompleted(Produce.kt:98)
        at kotlinx.coroutines.channels.ProducerCoroutine.onCompleted(Produce.kt:91)
        at kotlinx.coroutines.AbstractCoroutine.onCompletionInternal(AbstractCoroutine.kt:102)
        at kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:274)
        at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:784)
        at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:764)
        at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:111)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
        at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238)
        at android.os.Handler.handleCallback(Handler.java:739)
        at android.os.Handler.dispatchMessage(Handler.java:95)
        at android.os.Looper.loop(Looper.java:135)
        at android.app.ActivityThread.main(ActivityThread.java:5223)
        at java.lang.reflect.Method.invoke(Native Method)
        at java.lang.reflect.Method.invoke(Method.java:372)
        at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:899)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:694)
z

zak.taccardi

08/05/2019, 9:30 PM
once the
produce
completes, the receive channel may be closed? I don’t remember
r

rook

08/05/2019, 9:31 PM
oh interesting, I thought channels were hot 🤔
d

Dominaezzz

08/05/2019, 9:34 PM
You don't need
produce
here.
Just create a channel and then return it.
💯 2
@zak.taccardi is right.
r

rook

08/05/2019, 9:40 PM
Thanks. I guess I still don’t fully understand
produce
d

Dico

08/05/2019, 10:11 PM
You need to unregister the change listener when the channel is closed though
👍 1
a

Adam Powell

08/05/2019, 11:59 PM
in a
ProducerScope
you can do that with
awaitClose { ... }
at the end of the produce block and unregister in that block
or make the whole thing simpler and use the same code from the
produce
block with the
awaitClose
added, but with
channelFlow
instead of
produce
- you won't need the
CoroutineScope
and you get a cold stream
r

rook

08/06/2019, 4:01 PM
Cold stream meaning that it doesn’t start emitting values until it’s been “subscribed” to?
👌 1
7 Views