Dragos Rachieru
10/03/2022, 2:18 PMflowOf(1,2,5)
.operation { last, current -> current - last }
.onEach { println(it) }
.collect()
Result: 1,3Zoltan Demant
10/05/2022, 6:11 AMscope.launch
blocks that are running in parallell.
@JvmInline
internal value class Queue(
private val trigger: MutableSharedFlow<Unit> = MutableSharedFlow(extraBufferCapacity = 8),
) {
suspend fun flush() {
trigger.emit(Unit)
}
suspend fun await() {
trigger.first()
}
}
Cedric Lindigkeit
10/05/2022, 1:20 PMreactormonk
10/06/2022, 2:37 PMval resultsFlow = MutableSharedFlow<HdmiResult>()
val results: Flow<List<HdmiResult>> = resultsFlow
.runningFold(listOf()) { list: List<HdmiResult>, v: HdmiResult -> list + listOf(v) }
But somehow the first ~ 10 items are not present in the results
flowNino
10/07/2022, 10:21 AMCancellationException
).
Wouldn't using a big catch (exception: Exception)
around a suspending function api.getFooResponse()
defeat this purpose and prevent any coroutine calling getFoo()
to be cancelled correctly ? Wouldn't the CancellationException
be "swallowed" ? Should we re-throw it ?
suspend fun getFoo(): Foo? = try {
val fooResponse = api.getFooResponse()
Foo(bar = fooResponse.bar)
} catch (exception: Exception) {
exception.printStackTrace()
null
}
janvladimirmostert
10/07/2022, 2:10 PMDavid Corrado
10/07/2022, 4:19 PMExerosis
10/10/2022, 3:59 AMVaibhav Nalawade
10/11/2022, 2:25 AMNino
10/11/2022, 10:15 AMupdateFoo(foo: String?)
can be called any number of time, from multiple different coroutines
2/ A Flow should be available to emit the "last value that went through `updateFoo`", but should not emit quicker than every 100ms
3/ The flow should not re-emit similar values
4/ This is the bad part, the Flow should emit null values immediately
My approach :
companion object {
private val SAMPLE_DURATION = 100.milliseconds
}
private val fooMutableStateFlow = MutableStateFlow<String?>(null)
val fooFlow: Flow<String?> = fooMutableStateFlow
.sample(SAMPLE_DURATION)
.onStart { emit(fooMutableStateFlow.value) }
.distinctUntilChanged() // Avoid double emission of null value with .onStart...
fun updateFoo(foo: String?) {
fooMutableStateFlow.value = foo
}
I'd love an operator like debounce so I could do something like that :
val fooFlow: Flow<String?> = fooMutableStateFlow.sample {
if (it == null) {
Duration.ZERO
} else {
Toto.SAMPLE_DURATION
}
}
John Herrlin
10/11/2022, 7:51 PMGlobalScope.launch
to send fire-and-forget HTTP requests to other services. I don't like the GlobalScope.launch
idea and trying to thinking of other solutions. If I would have been in Clojure I think I would have created a global go-loop to handle the fire-and-forget requests and have a channel to communicate with that go-loop. Could that been done in Kotlin? Is that a good approach for concurrent fire-and-forget stuff or are there better ways of doing it?Christoph Wiesner
10/12/2022, 6:52 AMcombine
operator only emits once every flow has an emission.
in my case i want to combine flows and it might that one flow would not emit at all - in that case i want to have the transformation receive a null value for not yet existing values of those streams.
combineAlways(flow1, flow2) { v1, v2 (null as not yet an emission) ->
...
}
is there a way to achieve that?elye
10/12/2022, 9:58 AMchannelFlow
can do what flow
can't e.g.
• Retrieve value from a concurrently run coroutine
• Can use a non-suspending way i.e. `trySend` to send data
It looks like it is more powerful than flow
. I wonder
• if there's anything where flow
can do but not in channelFlow
, or
• if there's anything flow
is preferred over channelFlow
(e.g. in terms of efficiency or performance?)?
The reason I ask is, I want to see if we can just use channelFlow
for everything instead, or what's the scenario we should use flow
instead? I may have missed out something fundamental?reactormonk
10/12/2022, 3:33 PMretryWhen
alternative for regular suspend functions? Just like https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/retry-when.htmlzt
10/13/2022, 5:13 AMreactormonk
10/13/2022, 7:22 AMcollectLatest
isn't really working that well, I'm not sure how to declare an element invalid - or is collectLatest
the correct way here? Or should I be using a different abstraction that Flow
if a previous element becomes dead/invalid?reactormonk
10/13/2022, 8:31 AMFlow<Resource>
, how can I declare a Resource
to be unavailable? As an additional information, only the newest Resource
can be available.hfhbd
10/13/2022, 2:50 PMExerosis
10/14/2022, 3:57 AMrunBlocking {
SMR()
println("test")
}
it hangs when SMR is like:
suspend fun SMR() = withContext(dispatcher) {
launch { ... }
}
But not if it's like:
suspend fun CoroutineScope.SMR() {
launch { ... }
}
From the look of things withContext should just grab the old context, merge with new context and dispatch if the dispatcher changed.Michał Kalinowski
10/16/2022, 10:04 PMDelay
use millis time(scheduleResumeAfterDelay) instead of more accurate Duration
? For now there is no way to pass Duration to UI dispatcher which is accurate, i.e don't make errors with rounding 😞Eivind
10/17/2022, 11:08 AMLukasz Kalnik
10/18/2022, 9:41 AMFlow
or through a Channel
. After the last event I want to send a "complete" signal. Is there something like this in Flow
?Daniele Segato
10/18/2022, 10:34 AMSharingStarted.whileSubscribed*()
API be called SharingStarted.whileCollected*()
instead? since collect {}
is used on it, not subscribe {}
?Chuck Stein
10/18/2022, 6:37 PMcombine
vs combineTransform
?Zach Klippenstein (he/him) [MOD]
10/18/2022, 6:41 PMContinuationInterceptor
and CoroutineDispatcher
actually have separate “polymorphic” keys. This seems to mean it’s possible to create a coroutine context with a dispatcher A and an interceptor B that wraps A, where if you ask it for the interceptor you will get B but if you ask specifically for the dispatcher you’ll get A.
Has anyone written a blog or anything about gotchas with combinations of this, or done it before and hit snags? I’m seeing some weird behavior in this situation when B tries to wrap a continuation and then ask the wrapped dispatcher to intercept.David Corrado
10/18/2022, 9:45 PMChuck Stein
10/19/2022, 1:48 PMTestScope
(e.g. to inject into the class under test), then you must call runTest
on that scope, because there can only be one TestScope
instance in a test. But it doesn't say why? Doing this causes my runTest
block to timeout and fail the test, because the class I injected the TestScope
into uses it to shareIn
, which never completes. So my question is what are the repercussions if I disregard this rule and instead call runTest(testScope.testScheduler)
(which passes), rather than testScope.runTest
as the docs suggest (which fails). Or, if I should really be using the recommended way, how do I get it to not time out for my use case?reactormonk
10/19/2022, 3:58 PMFlow
to a non-suspend context? Collect to a var
?Hakon Grotte
10/21/2022, 1:10 PMRetry-After
header if it is not ready. (The same http request can be repeated after a delay.)
Moreover, the HTTP endpoint does not process the requests that created the polling ids sequentially.
When a resource is ready it should be produced to a new Kafka topic.
I do realize that Kafka Streams might be a suitable option here, but it sounds quite difficult with the async nature of the http endpoint.
I feel like Kotlin coroutines could benefit this implementation and with some research/thought I have made a "Proof of concept". Following is a very psuedo-ish code flow (pun intended):
fun main = runBlocking { <-- to obtain coroutinescope
// 1. method running 'while loop' consuming Kafka topic inside flow builder. returns Flow<PollingId>
// 2. method consuming flow from "1." and producing to new SharedFlow<Resource>
// flow.onEach { pollingId ->
// CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
// val resource = apiService.pollUntilReady(pollingid)
// resourceFlow.send(resource)
// }.collect()
^-- Launch each polling job in new coroutine job to obtain parallel processing. Should perhaps .map{} instead of .onEach {} to obtain job references for cleanup etc.
// 3. method consuming/collecting SharedFlow<Resource> that method "2." sends/produces to, and sending resources to new Kafka topic
}
Some feedback on the proposed implementation would be appreciated 🙂 Any pitfalls, improvements, is this a bad implementation, etc.Wai-Wai Ng
10/21/2022, 4:24 PMprivate suspend fun awaitX(arg1: String): X? = suspendCancellableCoroutine {
functionThatExecutesCallback{ result -> println("test"); it.resume(result) }
}
and for some reason, the function that calls this never resumes execution even though test
does get printed. Any obvious ideas as to what to check? IntelliJ isn't showing the Coroutine debugger tab so I'm somewhat limited to debugging by printing...Wai-Wai Ng
10/21/2022, 4:24 PMprivate suspend fun awaitX(arg1: String): X? = suspendCancellableCoroutine {
functionThatExecutesCallback{ result -> println("test"); it.resume(result) }
}
and for some reason, the function that calls this never resumes execution even though test
does get printed. Any obvious ideas as to what to check? IntelliJ isn't showing the Coroutine debugger tab so I'm somewhat limited to debugging by printing...Zach Klippenstein (he/him) [MOD]
10/21/2022, 5:11 PMWai-Wai Ng
10/21/2022, 6:24 PMZach Klippenstein (he/him) [MOD]
10/25/2022, 2:42 PMWai-Wai Ng
11/02/2022, 2:35 PMrunBlocking
inside functionThatExecutesCallback
that launched a separate coroutine that never finished. I'm not sure this is strictly intuitive behaviour but at the same time I'm not persuaded it's definitely wrong.Zach Klippenstein (he/him) [MOD]
11/02/2022, 4:39 PM