mbonnin
07/16/2019, 2:33 PMval downloadFlow = flow {
var read = 0
while (read < total) {
read += readFromNetwork() // How do I make this run from a background thread ? <http://Dispatcher.IO|Dispatcher.IO> or other ?
emit(read to total)
}
}
downloadFlow.collect {
// this should run from the main thread
displayProgress(it)
}
What would be the idiomatic way to have a long running background operation emit progress that is read from the main thread ? The flow
doc explicitely prohibits changing context in the flow block.nwh
07/16/2019, 5:25 PMursus
07/16/2019, 7:48 PMursus
07/16/2019, 11:19 PMmbonnin
07/17/2019, 9:59 AMdave08
07/17/2019, 12:17 PMdave08
07/17/2019, 12:38 PMPaul Woitaschek
07/17/2019, 1:58 PMFlow<ViewState>
. This ViewState
also has a boolean property if an error should be shown.
Upon error I want to emit the latest view state but call copy(showError = true)
and complete the flow so that a consumer can use sth like:
.repeatWhen { repeatClickedFlow() }
bitkid
07/17/2019, 3:25 PMThomas
07/17/2019, 6:42 PMFatal Exception: kotlinx.coroutines.flow.internal.ChildCancelledException Child of the scoped flow was cancelled
For some reason this crash does not have a stack trace (on both Firebase and Google Play). This makes it very hard to find the cause. Could someone here help me out? I am using Kotlin 1.3.41
and Coroutines 1.3.0-M2
.digitalsanctum
07/18/2019, 8:03 PMbogdoll
07/19/2019, 9:47 AMfun externalApi(query: String, callback: (Long,String)->Unit): Unit
I would like to convert this into something like
fun externalApiWrapper( query: String ): Flow<Pair<Long,String >>
Currently I have no idea how to do that.Eric Martori
07/19/2019, 11:23 AM@Test
fun `BroadcastChannel keep a coroutine alive`() = runBlockingTest {
val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val flow = channel.asFlow()
launch {
flow.collect { assert(true) }
channel.send(1)
}
}
this test fails with`kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs: ["coroutine#2":StandaloneCoroutine{Active}@2a2d45ba]`
it forces me to close the channel for the test to pass.
But if i change the flow
for a receiveChannel
like this:
@Test
fun `BroadcastChannel keep a coroutine alive`() = runBlockingTest {
val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val receive = channel.openSubscription()
launch {
receive.consume { assert(true) }
channel.send(1)
}
}
The test passes without problemsadeln
07/19/2019, 1:57 PMfun <T> merge(f1: Flow<T>, f2: Flow<T>): Flow<T> =
flowOf(f1, f2).flattenMerge()
Vsevolod Tolstopyatov [JB]
07/19/2019, 3:49 PMkotlinx.coroutines
1.3.0-RC is here!
Changelog:
• Flow
core API leaves its experimental status and has a lot of new extensions!
• Reworked reactive integrations, including simplified lifecycle management and consistent handling of fatal exceptions
• kotlinx-coroutines-bom
for managing transitive dependencies
• Various minor bug fixes and improvements
• Full changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.3.0-rcasad.awadia
07/19/2019, 6:15 PMjw
07/20/2019, 1:08 AMMutex.withLock
pattern that supports a suspending body? Aside from just writing it myself, of course. Is there some anti-pattern or gotcha here that prevents it from being provided out-of-the-box?asad.awadia
07/20/2019, 7:58 PMMarko Mitic
07/22/2019, 12:30 PMinternal class BlockedAppsStoreDatabaseImpl(private val blockedAppDao: BlockedAppDao) : BlockedAppsStore {
private val channel = Channel<DbOp>(Channel.UNLIMITED)
init {
GlobalScope.launch {
select {
channel.onReceive { op ->
when (op) {
is Get -> op.cb.complete(blockedAppDao.getAll().toSet())
is Put -> blockedAppDao.putOrReplace(BlockedAppEntity(op.uid))
is Remove -> blockedAppDao.remove(op.uid)
is PutSet -> blockedAppDao.putOrReplace(op.uids)
is Fill -> blockedAppDao.fill(op.uids)
is RemoveAll -> blockedAppDao.removeAll()
}
}
}
}
}
override fun getAll(): Set<Uid> {
val deferred = CompletableDeferred<Set<Uid>>()
channel.sendBlocking(Get(deferred))
return runBlocking { deferred.await() }
}
override fun put(uid: Uid) {
channel.sendBlocking(Put(uid))
}
...
mbonnin
07/22/2019, 1:26 PMFlow<T>.firstOrNull(): T?
Or did I miss it ?Stephan Schroeder
07/22/2019, 3:17 PMfun main() = runBlocking { // this: CoroutineScope
coroutineScope {
launch {
delay(200L)
println("2Task from runBlocking")
}
coroutineScope { // Creates a coroutine scope
launch {
delay(500L)
println("3Task from nested launch")
}
delay(100L)
println("1Task from coroutine scope") // This line will be printed before the nested launch
}
println("4Coroutine scope is over") // This line is not printed until the nested launch completes
}
}
jsijsling
07/23/2019, 10:19 AM<http://Dispatchers.IO|Dispatchers.IO>
produced significant garbage overhead. Log snippet:
Background concurrent copying GC freed 1692(904KB) AllocSpace objects, 127(2MB) LOS objects, 49% free, 3MB/7MB, paused 390us total 103.292ms
Background concurrent copying GC freed 1965(1129KB) AllocSpace objects, 163(3MB) LOS objects, 50% free, 3MB/7MB, paused 552us total 118.408ms
Long wait of 8.836ms for Thread[20,tid=16153,Native,Thread*=0xe0c63c00,peer=0x12c40078,"DefaultDispatcher-worker-6"] suspension!
Background concurrent copying GC freed 2006(1273KB) AllocSpace objects, 197(4MB) LOS objects, 50% free, 3MB/7MB, paused 613us total 120.900ms
Background concurrent copying GC freed 2420(1437KB) AllocSpace objects, 216(4MB) LOS objects, 37% free, 10MB/16MB, paused 1.202ms total 256.912ms
Waiting for a blocking GC ProfileSaver
WaitForGcToComplete blocked ProfileSaver on HeapTrim for 29.549ms
Background concurrent copying GC freed 2242(1268KB) AllocSpace objects, 189(4MB) LOS objects, 49% free, 3MB/6MB, paused 520us total 102.964ms
Background concurrent copying GC freed 1782(1071KB) AllocSpace objects, 154(3MB) LOS objects, 49% free, 5MB/10MB, paused 386us total 112.534ms
Any clue why this might be?tseisel
07/23/2019, 2:00 PMFlow
to a Channel
than the following ?
flow {
coroutineScope {
val channel = produce {
aFlow.collect { send(it) }
}
}
}
I want to write a custom logic for combining 2 `Flow`s, zip
and combineLatest
don't fit my use case.Marko Mitic
07/23/2019, 2:02 PMsuspend fun getAThingThatMayTakeTime() : T
?
fun getAThing(): Future<T>
?Joaquim Ley
07/23/2019, 3:03 PMkotlin.coroutines.intrinsics.CoroutineSingletons cannot be cast to MY_MODEL
- The way I apply these transformations is with the .map() operator
To make it more clear
Layer 1 - DB -> Flowable -> .map() -> asFlow() - ✅
Layer 2 - flow (from layer 1) -> .map() -> flow() - 💥
I’ve been searching google and GitHub but it seems like this hasn’t been solved.
logs
Caused by: kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=DispatchedCoroutine{Cancelled}@173d7e3
Caused by: java.lang.ClassCastException: kotlin.coroutines.intrinsics.CoroutineSingletons cannot be cast to MODEL_CLASS_HERE_
at CONTACT_REPOSITORY_CLASSl$fetchFlowContacts$1$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:73)
at kotlinx.coroutines.flow.FlowKt__TransformKt$map$$inlined$unsafeTransform$1$2.emit(Collect.kt:98)
at kotlinx.coroutines.reactive.flow.PublisherAsFlow.collect(PublisherAsFlow.kt:78)
at kotlinx.coroutines.reactive.flow.PublisherAsFlow$collect$1.invokeSuspend(Unknown Source:12)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)
-------------
A workaround is using the flatMapConcat()
instead of map()
and return a flowOf(someTransformation())
which feels a little dirty creating a new flow every-time data is emitted.
---------------
// Kotlin
versions.kotlin = “1.3.41”
versions.kotlin_coroutines = “1.2.1"
versions.kotlin_coroutines_reactive = “1.3.0-RC”
versions.kotlin_rx = “2.1.0"
Ran on AndroidIcaro Temponi
07/23/2019, 5:14 PM-XXLanguage:+InlineClasses
for inline classes)eygraber
07/23/2019, 9:50 PMdavid.bilik
07/24/2019, 5:45 AMCoroutineScope
so I can correctly cancel jobs when my view is destroyed .. and I use repository pattern to fetch/store data. In one case though I use RxJava to observe database changes and If there are no data in database, i will fetch data from Api.. the method looks something like this
override suspend fun observeData(): Flowable<Data> {
return dao.observeData()
.doOnNext {
if (it.isEmpty()) {
dao.insertData(api.fetchData())
}
}
}
however as you may notice, this does not work because the doOnNext
method is not suspend and i cannot call my api. I’ve been trying some approaches, the current one which is working but I dont know if its correct is this one
override suspend fun observeData(scope: CoroutineScope): Flowable<Data> {
return dao.observeData()
.doOnNext {
if (it.isEmpty()) {
scope.launch {
dao.insertData(api.fetchData())
}
}
}
}
any better approaches? The easiest would be to implement CoroutineScope
in my repository but it does not have clear strict lifecycle so I could cancel jobs when its “ending”Tuan Kiet
07/24/2019, 8:32 AMGlobalScope
considered bad practice, what is the specific use case of it?ahulyk
07/24/2019, 9:11 AM