Peter Kievits
09/08/2023, 7:53 AMNorbi
09/09/2023, 3:20 PMrunBlocking()
in my application if the main()
function itself is suspend
?pers
09/10/2023, 1:16 PMviewModelScope.launch {
_uiState.update(newState)
}
#B
_uiState.update(newState)
#C
_uiState.value = newState
koreatlwls
09/11/2023, 7:06 AMreactormonk
09/11/2023, 9:10 AMDeferred
out form a callback? got this pattern:
kotlin
val contract = ActivityResultContracts.StartActivityForResult()
suspendCoroutine { cont ->
registerForActivityResult(contract, contract.createIntent(this, client.signInIntent)) {
<createing a Deferred here>
}
}
Can I somehow pull the Deferred
directly into the suspendCoroutine
, or do I have to do all the plumbing manually?Norbi
09/11/2023, 6:11 PMrunTest()
and I got this error:
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out after 10m of _virtual_ (kotlinx.coroutines.test) time. To use the real time, wrap 'withTimeout' in 'withContext(Dispatchers.Default.limitedParallelism(1))'
I was wondering why limitedParallelism(1)
is necessary, and how my test would behave without it?
Thanks.Remon Shehata
09/11/2023, 6:58 PMCoroutineScope.launch(Dispatchers.Main){
flow.collect{}
}
Eugene
09/12/2023, 10:22 AMkotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled
It has no stacktrace (due to https://github.com/Kotlin/kotlinx.coroutines/issues/1866).
The only case that I was able to reproduce is when you throw this exception outside of coroutines.
Any ideas?Jacob
09/12/2023, 12:20 PMwithContext(iodispatcher)
and when to use runInterruptible(iodispatcher)?
Jacob
09/12/2023, 12:31 PMconsumeAsFlow
how do I make sure that emission happens interruptibly?윤동환
09/13/2023, 12:15 AM/-- no1
val job = async(dispatcher) { /** do something */ }
/** do other processes */
val result = job.await()
/-- no2
val result = withContext(dispatcher) { /** do something */ }
Miguel Vargas
09/13/2023, 2:09 AMcombine()
when you have a long list of Flows? I worry about the Flows and the param names becoming unsynced as the code is changed over time.
combine(
stringFlow1,
stringFlow2,
intFlow1,
intFlow2,
) { string2, string1, int2, int1 ->
}
Tower Guidev2
09/14/2023, 6:11 AMPatrick Steiger
09/15/2023, 5:36 PMval flow = MutableSharedFlow<Int>()
val job = CoroutineScope(Job()).launch {
flow.collect { println(it) }
}
suspend fun main() {
yield()
Thread.sleep(1000)
flow.emit(1)
yield()
Thread.sleep(1000)
job.cancelAndJoin()
}
why does collector get cancelled before receiving the emission in above example? In order to avoid race conditions, I put some yields and sleeps to make sure first collector appears before the emission, and also to make sure collector has time to collect after the emission. Still never happens
It only happens with delay(1)
after emissionAbhimanyu
09/16/2023, 5:19 AMPeter Kievits
09/18/2023, 3:41 PMDontsu
09/19/2023, 1:35 AMKotlin Coroutines deep dive
by Marcin Moskala
.
In the part of understanding Kotlin Coroutine
of the book(specifically, How does suspension work?
), I read this code below.
suspend fun main() {
println("Before")
suspendCoroutine<Unit> { continuation ->
continuation.resume(Unit)
}
println("After")
}
// Before
// After
Notice that “After” in the example above is printed because we call
resume in suspendCoroutine. This statement is true, but I need to clarify. You might imagine
that here we suspend and immediately resume. This is a good intuition, but the truth is that there is an optimization that prevents a suspension if resuming is immediate.Speaking of the
optimization
, in this case, Does it mean that "There is no suspension point in the Coroutine. So, there is no need to be suspended. That's why the compiler optimizes."??
For instances,
1)
suspend fun getUsers(): List<User> {
// no asynchronous work and suspension point here
// ....
return users
}
in the code above, the IDE informs me that I don't need to use "suspend"(Redundant 'suspend' modifier). So, it also would be an optimazation.
2)
suspend fun getUsers(): List<User> {
suspendCoroutine<Unit> { continuation -> // this code line is definitely a suspention point.
// list sorting or something but not asynchronous work.
continuation.resume(Unit)
}
return users
}
in the code above, it's almost same code of the example of the book. but i wonder how the compiler optimizes the code under the hood.
and I wonder if there are any other optimizations for susepnd
. i hope someone tells me about it.reactormonk
09/19/2023, 8:32 AMsuspend fun Flow<R>.mapState(initial: T, fun: (T, R) -> (T, Flow<S>)): Flow<S>
somewhere?Rohan Maity
09/20/2023, 6:41 AMflowOn
Davio
09/20/2023, 12:21 PMsuspend fun foo() =
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
blockingCall()
}
I think this code is handled by a thread from the thread pool that is shared with the Default dispatchers pool, which means that if this suspending function was called from another thread, that thread can now be used for something else, correct?
But if this code is reached from another suspending function already executing on that Dispatcher or the default dispatcher, I guess it will try to not switch to another thread?
And the thread in the pool that is performing the blocking code is blocked I think. If there are by default 64 threads in that pool, does that mean that for instance in a microservice with a REST controller with suspending functions, only 64 requests can be handled in parallel?Lilly
09/21/2023, 11:44 AMsingleOrNull()/firstOrNull()
that does not suspend until first item is emitted? My use case:
suspend fun execute(dispatcher: Coroutines.Dispatcher) {
withContext(dispatcher) {
// do something that returns result
// also process outstanding tasks from SharedFlow<() -> Unit>
outStandingTask.state.singleOrNull()?.invoke() // suspends, so no result is returned
return result
}
}
Would a simple Queue fit better?Manoj
09/22/2023, 8:47 AMMike Dawson
09/22/2023, 5:26 PM윤동환
09/24/2023, 11:58 PMinterface EventDispatcher {
suspend fun start()
}
interface EventBus<T> {
val events: SharedFlow<T>
}
interface MutableEventBus<T> : EventBus<T> {
suspend fun emit(event: T)
}
interface EventConsumer : CoroutineScope {
suspend fun consume()
}
@Component
class LifecycleManager(
private val dispatcher: EventDispatcher,
private val consumers: Set<EventConsumer>,
) {
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
@PostConstruct
fun start() {
consumers.forEach { consumer ->
scope.launch {
consumer.consume()
}
}
scope.launch { dispatcher.start() }
}
@PreDestroy
fun close() = runBlocking {
consumers
.map { consumer -> scope.async { consumer.cancel() } }
.joinAll()
scope.cancel()
}
}
It’s my applications’ event driven structure.
• EventDispatcher
implementation should subscribe event stream service like AWS Kinesis or Kafka. And EventDispatcher dispatch event by call emit method of MutableEventBus.
• EventBus
implementation should broadcast T type event’s. And it’s provide only readonly events.
• MutableEventBus
implementation is extension of EventBus that provide emit(produce) event method.
• EventConsumer
implementation should consume events from EventBus.
• LifecycleManager
is Spring bean that managing whole lifecycle of event components.肖志康
09/25/2023, 9:16 AMmyanmarking
09/25/2023, 10:31 AMazabost
09/25/2023, 9:11 PMSharedFlow
where I emit all the incoming WebSocket-like messages where
• some incoming messages are "commands" I'm supposed to execute
• some incoming messages are "responses" to the requests my app makes in order to get more details about said "commands"
• (unfortunately, the details can't be transmitted along with the "commands" and must be explicitly requested by the app)
A single collector of said SharedFlow
determines what action should be taken once a "command" arrives.
Processing a "command" may involve sending a request (for the details) and awaiting another answer using another ad-hoc spawned collector from the same SharedFlow
.
In short, it looks more or less like this (pseudo-code)
class SomeClassThatHasALifecycleBoundScope {
fun onStarted() {
lifecycleScope.launch {
commandExecutor.collectCommands()
}
}
}
interface WebSocketClient {
// replay = 0, buffer = 0, onBufferOverflow = SUSPEND, commands emitted using emit()
val commands: SharedFlow<Command>
suspend fun sendRequestForCommandDetails(commandId: String, clientToken: String)
}
class CommandExecutor {
suspend fun collectCommands() = withContext(observingDispatcher) {
webSocketClient.commands.collect {
processCommand(it)
}
}
suspend fun processCommand(command: Command) = coroutineScope {
val clientToken = UUID.randomUUID().toString()
val detailsJob = async {
webSocketClient.commands
.timeout(10.seconds)
.first { it.clientToken == clientToken }
}
webSocketClient.sendRequestForCommandDetails(command.id, clientToken)
val commandDetails = detailsJob.await()
doSomethingWithCommandDetails(commandDetails)
}
}
Obviously, the problem above is that detailsJob
can't receive an answer from webSocketClient.commands
until the collector from collectCommands()
unblocks the SharedFlow
(emit()
is suspended)
So I'm thinking about processing the commands in dedicated coroutines to unblock the SharedFlow
.
The question is: is it OK from the structured concurrency point of view to launch
in collect
e.g. like this?
suspend fun collectCommands() = withContext(observingDispatcher) {
webSocketClient.commands.collect {
launch {
try {
processCommand(it)
catch (e: Exception) { ... }
}
}
}
Is the new coroutine a child (or "grandchild") of SomeClassThatHasALifecycleBoundScope.lifecycleScope
? Will it be cancelled properly when lifecycleScope
gets cancelled?Remon Shehata
09/27/2023, 11:47 AMSharedFlow
, collectLatest
and debounce
so the endpoint won't be hit every time there is a text change.
However, collectLatest
is being called many times. why? and how can I fix it?
full code
searchText
.debounce(1000)
.distinctUntilChanged { prev, next -> prev != next }
.filter { it.isNotEmpty() }
.sample(500)
.collectLatest {
Log.d("Remon", "search collect - value: $it")
}
Patrick Steiger
09/27/2023, 7:52 PMMutableSharedFlow(0, 1, DROP_OLDEST)
guaranteed to immediately deliver events (without dropping any event ever) when emitting with a single single/threaded emitter (tryEmit
) to collectors collecting with a DirectDispatcher
if collectors with direct dispatcher don’t suspend?
Even if there are other collectors with other dispatchers that can suspend?
object DirectDispatcher : CoroutineDispatcher() {
override fun dispatch(…) {
block.run()
}
}
Eugen Martynov
09/28/2023, 8:29 AM