George
06/01/2022, 3:35 PMoverride fun encode(
inputStream: Publisher<out Any>,
bufferFactory: DataBufferFactory,
elementType: ResolvableType,
mimeType: MimeType?,
hints: MutableMap<String, Any>?
): Flux<DataBuffer> {
1) return Flux.from(inputStream).map { bufferFactory.wrap(protobufSerializer.encodeToByteArray(elementType)) }
vs
2) return inputStream
.asFlow()
.map { bufferFactory.wrap(protobufSerializer.encodeToByteArray(elementType)) }
.asFlux()
}
Is it worth it to keep the transformation asFlow -> map -> and then return again the Flux ?Stefan Oltmann
06/02/2022, 2:37 PMchannel
or flow
in parallel.
I don't really find something here, just a discussion on GitHub: https://github.com/Kotlin/kotlinx.coroutines/issues/172
I understood that flow.buffer(3).collect { action() }
doesn't do that.
What I want is receive N (N = cpu cores) entries from a collection in parallel to process them.
I don't like how channel.receive() waits forever or throws an exception.
What would be a simple solution for that?Paul Woitaschek
06/03/2022, 6:38 AMval search = MutableStateFlow("")
suspend fun track(query: String) {
delay(1000)
println(query)
}
runBlocking {
search
.onEach { track(it) }
.map {
"The search is $it"
}
.collectLatest {
println("Collected $it")
}
}
In onEach I could launch a coroutine on a scope and store the job to cancel the previous tracking action but is there a better solution for that?Colton Idle
06/03/2022, 8:08 AMGreg Rynkowski
06/03/2022, 6:05 PMrunTest
.
Have a look at the two tests below. The first uses runBlockingTest, the other runTest.
@Test
fun `test shared flow with deferred`() = runBlockingTest {
val sharedFlow = MutableSharedFlow<Int>(replay = 0)
val deferred = async { sharedFlow.first() }
sharedFlow.emit(1)
assertEquals(1, deferred.await())
}
@Test
fun `test shared flow with deferred - runTest`() = runTest {
val sharedFlow = MutableSharedFlow<Int>(replay = 0)
val deferred = async { sharedFlow.first() }
sharedFlow.emit(1)
assertEquals(1, deferred.await())
}
Why the second one never finish?Jan
06/05/2022, 4:57 PMGabi
06/05/2022, 8:18 PMException in thread "DefaultDispatcher-worker-6 @track-session/cu#80" java.lang.NullPointerException: Cannot invoke "kotlinx.coroutines.flow.Flow.collect(kotlinx.coroutines.flow.FlowCollector, kotlin.coroutines.Continuation)" because "this.$this_unsafeTransform$inlined" is null
at kotlinx.coroutines.flow.FlowKt__TransformKt$onEach$$inlined$unsafeTransform$1.collect(SafeCollector.common.kt:113)
at kotlinx.coroutines.flow.FlowKt__CollectKt.collect(Collect.kt:30)
at kotlinx.coroutines.flow.FlowKt.collect(Unknown Source)
at gragas.play.TrackSession.playRegisteredTracks(TrackSession.kt:117)
at gragas.play.TrackSession.access$playRegisteredTracks(TrackSession.kt:38)
at gragas.play.TrackSession$3.invokeSuspend(TrackSession.kt:58)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [CoroutineName(track-session/cu), CoroutineId(80), "track-session/cu#80":StandaloneCoroutine{Cancelling}@6465a67d, Dispatchers.Default]
This is the source of playRegisteredTracks
private suspend fun playRegisteredTracks() {
queue
.onEach { song ->
player.playTrack(song)
}
.collect()
}
And this is the full source of TrackSession.kt
https://gist.github.com/2cc063411159091c45c3a3e1d4cc8155Exerosis
06/06/2022, 4:05 AMRobert Kempton
06/06/2022, 4:32 PMTrey
06/06/2022, 9:41 PMLucas
06/07/2022, 10:22 PMmyanmarking
06/08/2022, 1:41 PMwithContext(<http://Dispatchers.IO|Dispatchers.IO> + NonCancellable)
uli
06/09/2022, 4:32 PMfun <V : Any> SharedPreferences.asFlow(
key: String,
defaultValue: V?,
retrieve: SharedPreferences.(String, V?) -> V?,
): Flow<Optional<V>> {
return callbackFlow {
val prefsListener = SharedPreferences
.OnSharedPreferenceChangeListener { sharedPreferences, k ->
if (k == key) {
val value = sharedPreferences.retrieve(key, defaultValue)
trySendBlocking(value.asOptional())
}
}
registerOnSharedPreferenceChangeListener(prefsListener)
awaitClose {
unregisterOnSharedPreferenceChangeListener(prefsListener)
}
}
.onStart {
emit(Optional.ofNullable(retrieve(key, defaultValue)))
}
.distinctUntilChanged()
As it looks, this code is racy, if shared preferences are changed after onStart
emits, but collecting of the callbackFlow has not yet started.
Anyone has any hints on an elegant solution to this? Like a way to start collecting the callback flow immediately, but still injecting a first element?Lukas Lechner
06/10/2022, 10:41 AMhttps://youtu.be/coq9XDMB-yU▾
Michal Klimczak
06/13/2022, 6:15 AMjuliocbcotta
06/13/2022, 7:06 AMrunBLocking
, but I everybody says we shouldn't be using that, so what should I be using?Michal Klimczak
06/13/2022, 8:50 AMfun test() = runTest(UnconfinedTestDispatcher(), dispatchTimeoutMs = 200) {
val scope = this
val counter = MutableStateFlow(0)
counter.test {
awaitItem() shouldBe 0
scope.launch {
counter.value = 1
counter.value = 0
}
awaitItem() shouldBe 1
awaitItem() shouldBe 0
}
}
This code will run fine on StandardTestDispatcher
but will time out on UnconfinedTestDispatcher
. It's not flakiness, it will time out every single time, so I assume it's a difference between the two dispatchers.Cody Mikol
06/13/2022, 1:44 PMscope.launch {
fooSuspendedFun()
barSuspendedFun()
}
will barSuspendedFun wait until the completion of fooSuspendedFun, or do you need to use something to await the return of the first function?Will Henry
06/13/2022, 9:22 PMclass AppIOCoroutineScope : CoroutineScope by CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
When I receive a webhook from an external service, I create a background job like:
appIOCoroutineScope.launch {
// do the work
}
Eventually, the coroutine never launches. The first line in the launch block is a log statement which I do not see in my logs when I detect that this job isn't running. A server restart always fixes the issue.
I've taken various thread dumps at this point and nothing has stuck out to me like the usual deadlocks. Are there any coroutine debug tools that I can use to better understand what's going on here?Exerosis
06/14/2022, 9:11 AMcontext(Toggled) @Base
suspend fun simultaneously(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> (Unit)
) {
val current = currentCoroutineContext()[ContinuationInterceptor]!! + context
val scope = object : CoroutineScope { override val coroutineContext = current }
var task: Job? = null
onEnabled { task = scope.launch(context) { block() } }
onDisabled {
println("Children: ${task?.children?.count()}")
println("Active: ${currentCoroutineContext()[Job]?.isActive}")
try {
task?.cancelAndJoin(); task = null
} catch (reason: Throwable) {
reason.printStackTrace()
}
println("Active: ${currentCoroutineContext()[Job]?.isActive}")
}
}
The issue is that onDisable callback is SOMETIMES called by code in block
which causes cancelAndJoin() to throw cancellation exception. I figure that if I just catch it and finish letting the disable listeners get called that would be bad... since for example if another disable listener suspends execution it won't be able to return to executing state since it's job is no longer active. But I'm not sure how else I can make sure that the call which is causing cancellation is allowed to complete before it's actually done.
Hopefully, that makes some sense.Mattias Flodin
06/14/2022, 1:47 PMEdwar D Day
06/15/2022, 6:28 AMnull
. I thought of something like that (for 2 `Flow`s):
fun <T : Any> takeFirstNotNull(flow1: Flow<T?>, flow2: Flow<T?>): Flow<T?> {
return flow1.transform {
if (it != null) emit(it) else emitAll(flow2)
},
}
Is there a simpler solution for this (especially, if this might be done for more `Flow`s)?jmfayard
06/16/2022, 6:26 AMrunTest { ... }
hack
The issue is there https://github.com/junit-team/junit5/issues/1914
but very little happened since 3 years
I created an issue, please star ⭐️ https://youtrack.jetbrains.com/issue/KT-52818/Provide-a-quick-fix-against-using-suspending-functions-in-Unit-Testjuliocbcotta
06/16/2022, 1:59 PMmyScope (supervisor + Main + interceptor)
myScope.launch {
text = mySusFunc()
}
suspend fun mySusFunc(): String {
return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
delay(1000)
"myValue"
}
}
Would the attribution text =
always happen in the Main ? Because from some experimenting here, it looks like it happens from the IO thread. Help ?Ky
06/16/2022, 3:12 PMmng
06/16/2022, 4:59 PMval callA = async { the call }
val callB = async { the call }
if (callA.await().isSuccess) {
callB.cancel()
return callA.await()
}else {
callA.cancel()
return callB.await()
}
However, it doesn’t quite get the behavior I want because I’m essentially only waiting for Call A to complete first before I even check on Call B. Can anyone offer some advice, please?Philipp Kleber
06/17/2022, 11:46 AM@OptIn(ExperimentalCoroutinesApi::class)
inline fun <T, S> Flow<T>.mapDisposable(
crossinline transform: suspend (value: T) -> S,
crossinline onDispose: (S) -> Unit
) : Flow<S> {
return flatMapLatest { value ->
callbackFlow {
val transformed: S = transform(value)
trySend(transformed)
awaitClose {
onDispose(transformed)
}
}
}
}
Is there a better or even a built-in way for achieving this?tylerwilson
06/17/2022, 1:18 PMDoru N.
06/17/2022, 2:16 PMclass Repository {
private val idsFlow = MutableStateFlow<List<String>>(emptyList())
fun getMyFlow(): Flow<List<String>> = idsFlow.onSubscription {
if (idsFlow.value.isEmpty() idsFlow.value = getRemoteIds())
}
private suspend fun getRemoteIds(): List<String> {
delay(2_000)
return listOf("1", "2")
}
..
// other functions that can modify myFlow (ie. add / delete / update an item from it)
}
problem is, if multiple collectors are subscribing fast enough, getRemoteData gets called multiple times (which I want to avoid).Exerosis
06/18/2022, 4:50 PMwithContext(Dispatchers.Unconfined) {
println(1)
withContext(Dispatchers.Unconfined) { // Nested unconfined
println(2)
}
println(3)
}
println("Done")
Not always print 1, 2, 3 shouldn't the existence of the unconfined dispatcher have 0 impact on a block of code that doesn't make suspending calls? Is it possible for withContext to modify the way something executes if you aren't inserting a different dispatcher?
What is the meaning of: The invocation of cancel with exception (other than CancellationException) on this supervisor job also cancels parent.
Isn't is the case that every "well formed" call to cancel has either a CancellationException or null? Does that mean cancel() doesn't cancel parent but cancel(CancellationException()) does? Is that a standard understanding for jobs?
Very much struggling to understand how to adapt all of this to my system in an "idiomatic" way.Exerosis
06/18/2022, 4:50 PMwithContext(Dispatchers.Unconfined) {
println(1)
withContext(Dispatchers.Unconfined) { // Nested unconfined
println(2)
}
println(3)
}
println("Done")
Not always print 1, 2, 3 shouldn't the existence of the unconfined dispatcher have 0 impact on a block of code that doesn't make suspending calls? Is it possible for withContext to modify the way something executes if you aren't inserting a different dispatcher?
What is the meaning of: The invocation of cancel with exception (other than CancellationException) on this supervisor job also cancels parent.
Isn't is the case that every "well formed" call to cancel has either a CancellationException or null? Does that mean cancel() doesn't cancel parent but cancel(CancellationException()) does? Is that a standard understanding for jobs?
Very much struggling to understand how to adapt all of this to my system in an "idiomatic" way.Nick Allen
06/18/2022, 7:30 PMExerosis
06/22/2022, 12:59 AMspecialCancellableButtonOrSomething.onClick { event ->
if (!runBlocking(Dispatchers.Main) { someCallbackOrSomething() })
event.cancel()
}
Because the button onClick is on the main thread and so if we dispatch is going to schedule a task rather than running the task right away on the current thread (immediate) then until the onClick listener returns the someCallbackOrSomething() cannot be dispatched... However the onClick listener cannot return until after it has been (deadlock).
Couldn't you just give dispatchers canYield and yield() or just yield and they suspendUninterceptedOrReturn where they internally return if they cannot yield and suspend if they can? IG I still don't understand properly or something.
Ah yes launch would make more sense. If I understand correctly the thread responsible for running launched tasks is either what thread resumes the withContext, or the thread that called launch.
Thanks again!Nick Allen
06/22/2022, 2:09 AMval result = myScope.async(start = CoroutineState.Lazy) { //doesn't run
println("running")
delay(100)
42
}
delay(1000) //still doesn't run
println("result=${result.await()}") //now it'll start, wait 100, and then we'll print "result=42" 1100 millis after the coroutine was created
println("again, result=${result.await()}") //Will print immediately, it only runs once
Undispatched is more of an optimization, to avoid unnecessary thread switches. It takes extra time to schedule and wait for the scheduled task to actually run. It's extra convenient when you know you are already on the dispatcher you want.
You should not be calling runBlocking
from a click listener. It blocks which you should never do on main thread. Even if passing in other dispatchers. You can launch a coroutine using Dispatchers.Main and then you get the benefits of coroutines and can modify UI elements. runBlocking is more for APIs like OkHttp that have a private thread pool and then invoke callbacks on that thread pool expecting you to block so runBlocking is ok.
Of course they could have designed yield differently to always suspend ... but why? It does what it's supposed to do which is pass control back to the dispatcher. If you describe your usecase, then I can probably redirect you to the right API to use.Exerosis
06/23/2022, 1:11 AMNick Allen
06/23/2022, 3:51 AMrunBlocking
as it is for Future.get
.