tseisel
09/03/2019, 5:28 PMBroadcastChannel
?
I'd like to re-execute the block passed to broadcast
when receiving from it after being closed :
val channel = broadcast<String>(capacity = Channel.CONFLATED, start = LAZY) {
val callback = object : Subscription {
override fun onValue(value: String) {
// May be called multiple times.
offer(value)
}
override fun onError(error: Exception) {
close(error)
}
}
subscribe(callback)
awaitClose { unsubscribe(callback) }
}
// At some point in time, receive latest value from the channel.
// Because the broadcast is lazy, this registers a callback and wait for a value.
// Lets assume that the callback failed, the exception is rethrown here.
channel.consume { receive() }
// At a later time, I'd like "receive" to re-execute the producer coroutine, re-registering a new callback.
channel.consume { receive() }
zak.taccardi
09/03/2019, 5:28 PMDominaezzz
09/03/2019, 5:29 PMFlow
.val flow = callbackFlow<String> {
val callback = object : Subscription {
override fun onValue(value: String) {
// May be called multiple times.
offer(value)
}
override fun onError(error: Exception) {
close(error)
}
}
subscribe(callback)
awaitClose { unsubscribe(callback) }
}.conflate()
// At some point in time, receive latest value from the channel.
// Because the broadcast is lazy, this registers a callback and wait for a value.
// Lets assume that the callback failed, the exception is rethrown here.
flow.first()
// At a later time, I'd like "receive" to re-execute the producer coroutine, re-registering a new callback.
flow.first()
tseisel
09/03/2019, 6:24 PMFlow
, but noticed that the subscription will be registered/unregistered everytime the flow is collected with first
.
But since the subscription is somewhat costly, how to maintain it until an error occurs ?
I tried to buffer values to a ConflatedBroadcastChannel
, collecting the flow on the first receive
then getting the latest value on subsequent receive
, but now I don't know how re-new the subscription after receiving an error ...Dominaezzz
09/03/2019, 7:29 PMconsume
then.tseisel
09/03/2019, 7:50 PMBroadcastChannel
in a class mutable var, and when requesting the latest value, check if the channel is closed. If it is, create a new one to re-collect the flow.
Is it safe in a multithread context ? Shouldn't the var
be @Volatile
or atomic ?Dominaezzz
09/03/2019, 9:06 PMbut now I don't know how re-new the subscription after receiving an error
Flow
has a retry
operator