https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

tseisel

09/03/2019, 5:28 PM
Is it possible to re-open a cancelled/failed
BroadcastChannel
? I'd like to re-execute the block passed to
broadcast
when receiving from it after being closed :
Copy code
val channel = broadcast<String>(capacity = Channel.CONFLATED, start = LAZY) {
    val callback = object : Subscription {
        override fun onValue(value: String) {
            // May be called multiple times.
            offer(value)
        }
        override fun onError(error: Exception) {
            close(error)
        }
    }

    subscribe(callback)
    awaitClose { unsubscribe(callback) }
}

// At some point in time, receive latest value from the channel. 
// Because the broadcast is lazy, this registers a callback and wait for a value.
// Lets assume that the callback failed, the exception is rethrown here.
channel.consume { receive() }

// At a later time, I'd like "receive" to re-execute the producer coroutine, re-registering a new callback.
channel.consume { receive() }
z

zak.taccardi

09/03/2019, 5:28 PM
no
a closed channel is considered a terminal event
d

Dominaezzz

09/03/2019, 5:29 PM
I think you want a
Flow
.
👍 2
Copy code
val flow = callbackFlow<String> {
    val callback = object : Subscription {
        override fun onValue(value: String) {
            // May be called multiple times.
            offer(value)
        }
        override fun onError(error: Exception) {
            close(error)
        }
    }

    subscribe(callback)
    awaitClose { unsubscribe(callback) }
}.conflate()

// At some point in time, receive latest value from the channel. 
// Because the broadcast is lazy, this registers a callback and wait for a value.
// Lets assume that the callback failed, the exception is rethrown here.
flow.first()

// At a later time, I'd like "receive" to re-execute the producer coroutine, re-registering a new callback.
flow.first()
t

tseisel

09/03/2019, 6:24 PM
Thanks @zak.taccardi and @Dominaezzz. I initially thought of using a
Flow
, but noticed that the subscription will be registered/unregistered everytime the flow is collected with
first
. But since the subscription is somewhat costly, how to maintain it until an error occurs ? I tried to buffer values to a
ConflatedBroadcastChannel
, collecting the flow on the first
receive
then getting the latest value on subsequent
receive
, but now I don't know how re-new the subscription after receiving an error ...
d

Dominaezzz

09/03/2019, 7:29 PM
Don't use
consume
then.
t

tseisel

09/03/2019, 7:50 PM
I thought of keeping a reference to the
BroadcastChannel
in a class mutable var, and when requesting the latest value, check if the channel is closed. If it is, create a new one to re-collect the flow. Is it safe in a multithread context ? Shouldn't the
var
be
@Volatile
or atomic ?
d

Dominaezzz

09/03/2019, 9:06 PM
but now I don't know how re-new the subscription after receiving an error
Flow
has a
retry
operator
4 Views