Is it possible to re-open a cancelled/failed `Broa...
# coroutines
t
Is it possible to re-open a cancelled/failed
BroadcastChannel
? I'd like to re-execute the block passed to
broadcast
when receiving from it after being closed :
Copy code
val channel = broadcast<String>(capacity = Channel.CONFLATED, start = LAZY) {
    val callback = object : Subscription {
        override fun onValue(value: String) {
            // May be called multiple times.
            offer(value)
        }
        override fun onError(error: Exception) {
            close(error)
        }
    }

    subscribe(callback)
    awaitClose { unsubscribe(callback) }
}

// At some point in time, receive latest value from the channel. 
// Because the broadcast is lazy, this registers a callback and wait for a value.
// Lets assume that the callback failed, the exception is rethrown here.
channel.consume { receive() }

// At a later time, I'd like "receive" to re-execute the producer coroutine, re-registering a new callback.
channel.consume { receive() }
z
no
a closed channel is considered a terminal event
d
I think you want a
Flow
.
👍 2
Copy code
val flow = callbackFlow<String> {
    val callback = object : Subscription {
        override fun onValue(value: String) {
            // May be called multiple times.
            offer(value)
        }
        override fun onError(error: Exception) {
            close(error)
        }
    }

    subscribe(callback)
    awaitClose { unsubscribe(callback) }
}.conflate()

// At some point in time, receive latest value from the channel. 
// Because the broadcast is lazy, this registers a callback and wait for a value.
// Lets assume that the callback failed, the exception is rethrown here.
flow.first()

// At a later time, I'd like "receive" to re-execute the producer coroutine, re-registering a new callback.
flow.first()
t
Thanks @zak.taccardi and @Dominaezzz. I initially thought of using a
Flow
, but noticed that the subscription will be registered/unregistered everytime the flow is collected with
first
. But since the subscription is somewhat costly, how to maintain it until an error occurs ? I tried to buffer values to a
ConflatedBroadcastChannel
, collecting the flow on the first
receive
then getting the latest value on subsequent
receive
, but now I don't know how re-new the subscription after receiving an error ...
d
Don't use
consume
then.
t
I thought of keeping a reference to the
BroadcastChannel
in a class mutable var, and when requesting the latest value, check if the channel is closed. If it is, create a new one to re-collect the flow. Is it safe in a multithread context ? Shouldn't the
var
be
@Volatile
or atomic ?
d
but now I don't know how re-new the subscription after receiving an error
Flow
has a
retry
operator