Florian
01/30/2021, 10:46 AMLoading
keeps emitting database updates rather than only a single result (first
)?
flow {
val data = query().first()
val flow = if (shouldFetch(data)) {
emit(Resource.Loading(data))
try {
saveFetchResult(fetch())
onFetchSuccess()
query().map { Resource.Success(it) }
} catch (t: Throwable) {
onFetchFailed(t)
query().map { Resource.Error(t, it) }
}
} else {
query().map { Resource.Success(it) }
}
emitAll(flow)
}
Vikas Singh
01/30/2021, 1:27 PMGilles Barbier
01/30/2021, 6:04 PMFlorian
01/31/2021, 2:22 PMemitAll
inside a Flow, execute some other operation in parallel, and then switch to a different emitAll
once the operation is done?Dominaezzz
01/31/2021, 5:03 PM@ExperimentalCoroutinesApi
internal fun <T> Flow<T?>.coalesce(other: Flow<T>): Flow<T> {
return distinctUntilChanged { old, new -> old == null && new == null }
.transformLatest {
if (it != null) {
// TODO: Will "Latest" wrongly cancel this branch?
emit(it)
} else {
emitAll(other)
}
}
}
christophsturm
01/31/2021, 6:22 PMclass Worker(val id: Int) {
fun work(): Worker {
Thread.sleep(1000)
return Worker(id)
}
}
println(measureTimeMillis { runBlocking {
listOf(Worker(1), Worker(2), Worker(3))
.map { worker ->
async(Dispatchers.Default) {
val worker2 = worker.work()
worker2.work()
}
}.awaitAll()
}})
with 2 maps instead?
listOf(Worker(1), Worker(2), Worker(3)).map {it.work()}.map{it.work()}
but still running the workers in multiple threads?
I know how i could do it with a channel, but is it also possible with a flow?Vivek Sharma
01/31/2021, 6:47 PMAditya Wasan
02/01/2021, 6:05 AMprivate val _isCacheReady = MutableStateFlow(false)
val isCacheReady = _isCacheReady.asStateFlow()
After filling up the cache, I'm updating the value
_isCacheReady.value = true
And here's the observing code
init {
lobstersRepository.isCacheReady.onEach {
_savedPosts.value = lobstersRepository.getAllPostsFromCache()
}.launchIn(viewModelScope)
}
Florian
02/01/2021, 9:13 AMchannelFlow
? It seems to work as intended, I just want to make sure I'm not doing anything dumb. The point here is to have Loading inside the launch block emit updates while the try
block is running concurrently. I think I don't need to call awaitClose
because collection from the query never ends.
inline fun <ResultType, RequestType> networkBoundResource(
crossinline query: () -> Flow<ResultType>,
crossinline fetch: suspend () -> RequestType,
crossinline saveFetchResult: suspend (RequestType) -> Unit,
crossinline onFetchSuccess: () -> Unit = { },
crossinline onFetchFailed: (Throwable) -> Unit = { },
crossinline shouldFetch: (ResultType) -> Boolean = { true }
) = channelFlow {
val data = query().first()
if (shouldFetch(data)) {
val loading = launch {
query().collect { send(Resource.Loading(it)) }
}
try {
saveFetchResult(fetch())
onFetchSuccess()
loading.cancel()
query().collect { send(Resource.Success(it)) }
} catch (t: Throwable) {
onFetchFailed(t)
loading.cancel()
query().collect { send(Resource.Error(t, it)) }
}
} else {
query().collect { send(Resource.Success(it)) }
}
}
Pablo
02/01/2021, 9:43 AMsuspend fun
for dummies? I mean, if you would have to explain what is a suspend
for someone that is completely new on this world of coroutines what would be your answer?Christian Ekrem
02/01/2021, 1:16 PM// stream is _really_ someDao.loadAll(): Flow<Something>
val stream = MutableStateFlow("cachedValue")
// refresh is _really_ fetchSomethingFromApi.also { someDao.updateData(it) }
val refresh = suspend {
delay(1000)
stream.value = "freshValueFromAPI"
}
suspend fun whyDoesThisNotWork(): Flow<String> = stream
.onStart {
coroutineScope {
launch {
refresh()
}
}
}
suspend fun thisWorks(): Flow<String> = flow {
coroutineScope {
launch {
refresh()
}
stream.collect {
emit(it)
}
}
}
It’s not like the working version is horrible, I just want to understand why the first one does not work!Pablo
02/01/2021, 2:35 PMBrais Gabin
02/01/2021, 7:57 PMGuillermo Alcantara
02/02/2021, 12:58 AM@Test
fun testFoo() = runBlocking {
val testCoroutineScope = TestCoroutineScope().apply {
pauseDispatcher() // This needs to be here because the actual test, handles time.
}
val sharedFlow = MutableSharedFlow<Int>()
val values = mutableListOf<Int>()
println("before launch")
val job = testCoroutineScope.launch {
println("before collect")
sharedFlow.collect {
println("before adding $it")
values.add(it)
}
}
println("before emits")
sharedFlow.emit(1)
sharedFlow.emit(2)
testCoroutineScope.runCurrent()
assertEquals(mutableListOf(1, 2), values)
job.cancel()
}
Pablo
02/02/2021, 9:40 AMreturn suspendCoroutine{
continuation -> usecase.invoke{result -> continuation.resumeWith(Result.success(result)}
}
Is there any difference between suspendCancellableCoroutine? Becuase I know I've been using this for instance for firebase sign inalbertosh
02/02/2021, 1:29 PMFlow.onEmpty
operator was added (https://github.com/Kotlin/kotlinx.coroutines/issues/1890)
Is there any plan for including Flow.onNotEmpty
?
Use case is a method that return paginated data and you need all the items
fun getData(page: Int, pageSize: Int): Flow<Item> = ...
fun getAllData() = getAllData(0, 5)
private fun getAllData(page: Int, pageSize: Int): Flow<Item> = flow {
val data = getData(page, pageSize)
emitAll(data)
data.onNotEmpty { emitAll(getAllData(page + 1, pageSize) }
}
Maybe I’m not following a proper approach, some other ideas I had using count
private fun getAllData(page: Int, pageSize: Int): Flow<Item> = flow {
val data = getData(page, pageSize)
emitAll(data)
if (data.count() == pageSize) { // Since `count` does a `collect`, another flow is created
emitAll(getAllData(page + 1, pageSize)
}
}
To avoid creating two flows per page, I tried with a SharedFlow
but since it never ends you can’t invoke count
BTW, I’m intending to use it on backend side, so not sure if I can use the Paging Library from JetpackVivek Sharma
02/02/2021, 5:33 PMvoben
02/02/2021, 5:41 PM@Test
fun `test event is emitted`() = coroutineRule.runBlockingTest {
val viewModel = MyViewModel()
launch {
val elements = viewModel.mySharedFlow.take(1).toList()
elements[0] shouldBeEqualTo MyEvent
}
}
// ViewModel
val mySharedFlow = MutableSharedFlow<MyEvent>(extraBufferCapacity = 1)
init {
mySharedFlow.tryEmit(MyEvent)
}
Ben Butterworth
02/02/2021, 6:15 PMwithContext(context)
or async(context) {}
) and return Deferred
, or should callers have the responsibility to select the context? I would think its cleaner to encapsulate the context choice to a lower level function itself.
Fragment/Activity calls ViewModel calls Service calls API. The Service returns a Result<T>
, and I originally made ViewModel functions return Deferred<T>
, so Fragments need to await
it. However, I found lots of examples calling async {}
everywhere the functions are called, so callers need to know which context to use, which they will likely know less than the API designer. Also, the withContext
coroutine function seems to imply people should be calling it in fragments/ highest level. I assume this because withContext
seems ‘terminal’ (it suspends until it completes).
I am still working on my coroutine understanding, so I would appreciate any help 🙂taer
02/02/2021, 10:12 PMfirst
but I don't want to close the flow. I need the rest of the elements to be still available.Simon Lin
02/03/2021, 3:14 AMSendChannel::isClosedForSend
every time when I send? If the channel could be close in any time.zjuhasz
02/03/2021, 8:17 AMSharedFlow
that is set to BufferOverflow.DROP_OLDEST
actually drops a value from its buffer due to the buffer being full? I would like to log a warning when this occurs.Mikael Alfredsson
02/04/2021, 1:03 PMStephen Edwards
02/04/2021, 3:40 PMAfzal Najam
02/04/2021, 5:03 PMval parentScope = CoroutineScope(Dispatchers.Default + Job() + exceptionHandler)
But if I create it like this, and one of its children jobs had an exception, siblings didn't fail and other coroutines were able to launch just fine:
val parentScope = MyScope()
Where MyScope
is:
class MyScope: CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + Job() + exceptionHandler
}
I thought that both `parentScope`s would behave exactly the same. Am I missing something?
Here's a gist: https://gist.github.com/AfzalivE/3bfe168e879ba8b97ad62292217b0491Tristan Hamilton
02/04/2021, 10:18 PMwasyl
02/05/2021, 7:39 AMinCoroutine {
var foo: Foo? = Foo()
val fooRef = WeakReference(foo)
triggerAndWaitForGc()
assert(fooRef.get() != null)
println(foo) // Test passes if this is removed
inCoroutine {
foo = null
triggerAndWaitForGc()
assert(fooRef.get() == null)
}
}
So first I hold Foo
in a strong reference (variable) and assert WeakRef
also has a reference to it. But later I set foo = null
, at which point I expect that there’s no strong reference to Foo
and it can be GC-ed. This doesn’t happen in the snippet above, because println(foo)
generates the following bit:
Foo var9 = (Foo)foo.element;
var5 = false;
System.out.println(var9);
and var9
is never cleared, so there will be a strong reference to Foo
as long as that coroutine is active (?). Is this expected?
Snippet with full code: https://pl.kotl.in/25D0_X5w8allan.conda
02/05/2021, 8:42 AMNikola Milovic
02/05/2021, 12:41 PMclass CoroutineDownloadWorker(
context: Context,
params: WorkerParameters
) : CoroutineWorker(context, params) {
override suspend fun doWork(): Result {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
val data = downloadSynchronously("<https://www.google.com>")
saveData(data)
return Result.success()
}
}
}
Which wouldn't work? As it has to be return@withContext
and you need a return value outside of it? Or I am I missing somethingFlorian
02/05/2021, 12:55 PMFlorian
02/05/2021, 12:55 PMMarc Knaup
02/05/2021, 12:59 PMtakeWhile
and alike throw a CancellationException
internallyFlorian
02/05/2021, 1:03 PMMarc Knaup
02/05/2021, 1:26 PMlaunchIn(scope)
Florian
02/05/2021, 1:28 PM