Aaron Stacy
11/24/2021, 11:12 PMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.Executors
fun main(args: Array<String>) = runBlocking {
val requests = flow {
var i = 0
val serverDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val handlerDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
launch(serverDispatcher) {
while (true) {
i += 1
val result = i
launch(handlerDispatcher) {
delay(500)
launch(serverDispatcher) {
emit(result)
}
}
}
}
}
// I want this to:
//
// 1. Suspend while collecting each emitted value.
// 2. Continue to collect until seeing a value >= 4
// 3. At that point continue.
// 4. Cancel the flow (thereby closing the server's socket).
//
// But instead it suspends, the while loop keeps running and indicating it's emitting values, and the onEach print
// statement never executes.
requests.takeWhile { it < 4 }.onEach { println("got $it") }.collect()
}
Adam Powell
11/25/2021, 12:59 AMAdam Powell
11/25/2021, 1:02 AMephemient
11/25/2021, 1:06 AMemit
(which would trip the runtime safety checks)Aaron Stacy
11/29/2021, 4:08 AMAdam Powell
11/29/2021, 4:28 AMflow {
var i = 0
while (true) {
emit(++i)
}
}
what are the thread pools intended to do in here? I think the intent got accidentally optimized out in simplifying for the exampleAaron Stacy
11/29/2021, 1:51 PMserverDispatcher
is accepting the sockets, handlerDispatcher
is where the coroutines that read requests and write responses are intended to dispatch. Come to think of it this is all a bit much...Adam Powell
11/29/2021, 2:11 PMflow {}
builder goes way out of its way to get you not to break context preservation by emitting from some other dispatcher or concurrent child job, which puts you either into channelFlow {}
territory or your own reimplementation of it if you start hopping dispatchersAdam Powell
11/29/2021, 2:12 PMAaron Stacy
11/29/2021, 2:28 PMchannelFlow
would be the thing I'd want.Aaron Stacy
11/29/2021, 2:28 PMAaron Stacy
11/29/2021, 2:28 PM