necati
08/22/2019, 3:03 PMval myFlow = flow {
emit(myRepository.suspendFunctionReturnsAValue())
}
Is there any built-in way to do this better than that such as myRepository.suspendFunctionReturnsAValue().asFlow()
yshrsmz
08/23/2019, 6:51 AMf
does not emit the latest value.
Any idea what's going on?
Actual output is in the thread.
const val DURATION = 100L
class ExampleUnitTest {
@Test
fun test() = runBlocking {
val job = launch {
val f = generateSequence(1) { it + 1 }.asFlow()
.onEach { delay(DURATION) }
.onStart { emit(0) }
.onEach { println("${System.currentTimeMillis()}:onNext-f: $it") }
val f2 = generateSequence(1) { it + 1 }.asFlow()
.onEach { delay(DURATION * 3) }
.onStart { emit(0) }
.onEach { println("${System.currentTimeMillis()}:onNext-f2: $it") }
f2.zip(f.conflate()) { a, b -> "$a, $b" }
.collectIndexed { index, value -> println("${System.currentTimeMillis()}:collect-$index: $value") }
}
job.join()
}
}
Jan Skrasek
08/23/2019, 11:57 AMcombine
& combineTransform
functions cancelling if a new value is emitted? It seems they are not and there is no alternative to do so.
Eg. I want to this https://play.kotlinlang.org/#eyJ2ZXJzaW9uIjoiMS4zLjMxIiwiY29kZSI6ImltcG9ydCBrb3RsaW54LmNvcm91dGluZXMuKlxuaW1wb3J0IGtvdGxpbnguY29yb3V0aW5lcy5mbG93Lipcblxuc3VzcGVuZCBmdW4gbWFpbigpIHtcbiAgICB2YWwgZmxvd04gPSBmbG93T2YoMSwgMiwgMywgNCwgNSkuY29uZmxhdGUoKS5kZWxheUVhY2goNSlcbiAgICB2YWwgZmxvd0wgPSBmbG93T2YoXCJBXCIsIFwiQlwiLCBcIkNcIikuY29uZmxhdGUoKS5kZWxheUVhY2goMTApXG4gICAgXG4gICAgY29tYmluZShmbG93TiwgZmxvd0wpIHsgbiwgbCAtPlxuICAgICAgICBkZWxheSgxMDApXG4gICAgICAgIG4udG9TdHJpbmcoKSArIGxcbiAgICB9LmNvbGxlY3QgeyBwcmludGxuKGl0KSB9XG59IiwicGxhdGZvcm0iOiJqYXZhIiwiYXJncyI6IiJ9 to print just "5C"
Second, I wonder why this example returns such result now .Vsevolod Tolstopyatov [JB]
08/23/2019, 12:58 PMkotlinx.coroutines
1.3.0 is here!
This is the first stable release with Flow
. Try it, use it, break it!
Full changelog with all compatibility guarantees: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.3.0U75957
08/23/2019, 6:53 PMFlow
https://pl.kotl.in/nEwZYT3Kl (I have implementation with recursive launch
, but I think it could be more elegant with Flow
, but don't know how)
I want launch limited number of requests and as some request are completed, get the following from the queue.
For instance, we have queue/list of 100 URLs. But we want limit number of simultaneously executed requests by 5. Not 20 butches of 5 requests, but first butch with 5 request and next, when some request is ready. How can we process result of each request using collect
?rook
08/23/2019, 7:55 PMsuspend fun fooFlow(): Flow<List<Foo>>
fun CoroutineScope.onFooEmission(action: (List<Foo>) -> Unit) {
launch {
fooFlow().collect { foos -> action(foos) }
}
}
But at the collect call-site, I’m getting Type mismatch.
Required: FlowCollector<List<Foo>>
Found: (Nothing) -> Unit
dave08
08/25/2019, 1:42 AMursus
08/25/2019, 8:16 AMdiesieben07
08/25/2019, 6:42 PMflow { }
and channelFlow { }
is that the latter is "threadsafe", meaning you can switch dispatcher inside the suspending block? Or is there something else?synhershko
08/25/2019, 9:19 PMreturn@runBlocking tagsChannel.toList()
.
Problem is, I run my code and while I can see things being emitted from one channel to another, the code runs forever without any of the broadcastchannel subscribers receiving any message.
What am I missing here?ansman
08/26/2019, 12:37 AMDispatchers.Main.immediate
? yield()
will not do anything. delay(1)
works but is kind of uglycoder82
08/26/2019, 8:16 AMhalim
08/26/2019, 10:03 AMval stream = flow {
emit(1)
emit(2)
}
emit(T) method use for send data to flow, but how flow store this data ?Matej Drobnič
08/26/2019, 11:03 AMSelectClause
so it can get triggered in select statementDominaezzz
08/26/2019, 3:41 PMkotlinx-coroutines
to expose an Async<T>
type alias? That'll resolve to Promise<T>
in JS, CompletableFuture<T>
in Java8, ListenableFuture<T>
on Android and Deferred<T>
on Native. (Basically an appropriate async type for each platform that can be easily consumed from their respective languages). Then a GlobalScope.platformAsync
or something to go along with it. I find myself needing to do this already since suspension only works within Kotlin itself and not from Java, JS or Obj-C.Ive Vasiljevic
08/26/2019, 6:11 PMsam
08/26/2019, 6:35 PMsuspend fun foo() { async { ... } }
. I’d like to use async inside foo, and inherit from whatever scope foo was called with.Seri
08/26/2019, 9:26 PMDavid Glasser
08/27/2019, 4:14 AMwithContext(Dispatchers.Default) {
var x = false
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
x = true
})
println(x)
}
I understand that the three lines (var x = false
, x = true
, println(x)
) will run "one at a time" but are all the right things done with the memory model to guarantee that the write will be seen by the println, without needing to use atomics or the like?Sergio Crespo Toubes
08/27/2019, 6:24 AMprivate val uiJob = SupervisorJob()
val uiScope = CoroutineScope(Dispatchers.Main + uiJob)
val journeys = MutableLiveData<List<Journey>>()
fun loadJourneys() = uiScope.launch {
val token = preferences.getProfileToken() ?: return@launch
try {
journeys.value = journeysService.getJourneys(token)
}catch (e: Exception){
showGenericError.value = Unit
}
}
Hello i am trying coroutines with mvvm pattern. What i am doing bad with this? It´s working but the ui is blocked while is getting journeys. How can i fix this? Thanks.Matej Drobnič
08/27/2019, 9:49 AMlaunch {
val channelA = openChannelA()
val channelB = openChannelB()
val channelC = openChannelC()
while (isActive) {
select {
channelA.onReceive {
...
}
channelB.onReceive {
...
}
channelC.onReceive {
...
}
}
}
}
However, I also want all of those channels closed if the actor is terminated for any reasonJan Skrasek
08/27/2019, 11:48 AM3A
or 1A\n3A
2) Why it never produce 2B
?
3) Why without conflate() it gives also random results?
4) Why without conflate() it returns sometimes 2B
?
Link for playground: https://play.kotlinlang.org/#eyJ2ZXJzaW9uIjoiMS4zLjMxIiwiY29kZSI6ImltcG9ydCBrb3RsaW54LmNvcm91dGluZXMuKlxuaW1wb3J0IGtvdGxpbnguY29yb3V0aW5lcy5jaGFubmVscy4qXG5pbXBvcnQga290bGlueC5jb3JvdXRpbmVzLmZsb3cuKlxuXG5zdXNwZW5kIGZ1biBtYWluKCkge1xuXHR2YWwgZmxvdzE6IEZsb3c8SW50PiA9IGZsb3dPZigxLCAyLCAzKS5jb25mbGF0ZSgpXG4gICAgdmFsIGZsb3cyOiBGbG93PFN0cmluZz4gPSBmbG93T2YoXCJBXCIpLmNvbmZsYXRlKClcbiAgICBcbiAgICBjb21iaW5lKGZsb3cxLCBmbG93MikgeyBhLCBiIC0+IFxuXHRcdGEudG9TdHJpbmcoKSArIGJcbiAgICB9LmNvbGxlY3QgeyBwcmludGxuKGl0KSB9XG59IiwicGxhdGZvcm0iOiJqYXZhIiwiYXJncyI6IiJ9kalpeshp0310
08/27/2019, 12:03 PMfun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
How does Kotlin stop execution of launch
code block?paulblessing
08/27/2019, 3:52 PM@Test
fun testFooWithLaunchAndDelay() = runBlockingTest {
foo()
// the coroutine launched by foo has not completed here, it is suspended waiting for delay(1_000)
advanceTimeBy(1_000) // progress time, this will cause the delay to resume
// the coroutine launched by foo has completed here
// ...
}
suspend fun CoroutineScope.foo() {
launch {
println(1) // executes eagerly when foo() is called due to runBlockingTest
delay(1_000) // suspends until time is advanced by at least 1_000
println(2) // executes after advanceTimeBy(1_000)
}
}
Trying to learn how to use kotlinx-coroutines-test
using the above code sample from the docs. Even if the advanceTimeBy
is removed or if the delay
time is increased, the test seems to print both 1 and 2 immediately. What am I missing?Paulius Ruminas
08/27/2019, 6:58 PMCancellationException
? Cancelling the job is a normal operation that is not considered as an error. Code snippet:
job = scope.launch {
try {
// proccess the action
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
// log the error
}
}
Luis Munoz
08/27/2019, 8:26 PMJosh Feinberg
08/27/2019, 8:31 PMgregd
08/27/2019, 8:49 PMwithContext
in production code (as opposite to hello world examples)…
Where do you put withContext
in your code? Is it inside suspending function body, or at the call site?
Also, if you have nested suspending functions, let’s say a typical android app with ViewModel > Repository > DataSource, where DataSource contains suspending functions - where do you put withContext
?
Is there any official guidelines for this?David Glasser
08/27/2019, 10:44 PMsuspendCoroutine
and suspendCancellableCoroutine
when writing a wrapper around an external callback-based API.
My original understanding was that the difference between the two is just whether or not the continuation you get access to in the block has methods that lets the block you write right there cancel it. So you should use the latter if the API you're wrapping has some way of indicating that it cancelled so that you would want to cancel the suspended coroutine, but otherwise there's no need to use the longer-named function.
But when I test these out,
fun main() = runBlocking {
withTimeout(10) {
suspendCancellableCoroutine { continuation: CancellableContinuation<Unit> -> }
}
}
throws a TimeoutCancellationException (as I expected), but
fun main() = runBlocking {
withTimeout(10) {
suspendCoroutine { continuation: Continuation<Unit> -> }
}
}
appears to hang, which surprises me.
Does that imply that when you run suspendCoroutine then your current coroutine can't be cancelled "from above"? In that case, why would you ever use suspendCoroutine instead of suspendCancellableCoroutine?ursus
08/28/2019, 4:19 AMlaunch
exist if async
can return Unit?ursus
08/28/2019, 4:19 AMlaunch
exist if async
can return Unit?basher
08/28/2019, 4:26 AMursus
08/28/2019, 6:17 AMgildor
08/28/2019, 6:32 AMrun and forget
is fine, for async is not fine