myanmarking
06/03/2020, 11:44 AMsealed class Updates{
data class LiveUpdate(val progress: Int): Updates()
data class History(val list: List<Int>): Updates()
}
private fun fetchUpdates(): Flow<Updates> {
TODO()
}
private fun save(list: List<Updates.History>){
TODO()
}
fun dispatchUpdates(): Flow<Updates.LiveUpdate>{
val history = mutableListOf<Updates.History>()
return fetchUpdates()
.onEach {
if(it is Updates.History){
history.add(it)
}
}
.onCompletion { if(it != null) save(history) }
.filterIsInstance<Updates.History>()
}
Is there any way to improve this code so that the save part doesn't end up a side-effect? Something like an inner flow. I'm not sure how to approach thisJonathan Mew
06/03/2020, 2:46 PMIvan Pavlov
06/03/2020, 5:36 PMval channel = BroadcastChannel<Event>(10)
val receiveChannel = channel.openSubscription()
val flow = receiveChannel.consumeAsFlow()
flow.onEach(someProcessor::doOnEach)
.catch { log.error(it) { "Error while processing event" } }
.launchIn(coroutineScope)
In my case I have to just skip an event if an exception is thrown in someProcessor.doOnEach(event: Event)
and continue processing of next events.
I can achieve what I want with
flow.onEach {
try {
someProcessor.doOnEach(it)
} catch (e: Throwable) {
//log
}
}
but I feel like I'm missing something. Is there a better way to do this?Eric Ampire [MOD]
06/03/2020, 6:36 PMvaskir
06/04/2020, 9:09 AMfun <T> (suspend () -> T).memoize() : suspend () -> T {
var value: T? = null
return suspend { value ?: this().also { value = it }}
}
suspend fun work() : Int = coroutineScope {
delay(1000)
22
}
val memoizedWork = ::work.memoize()
vaskir
06/04/2020, 1:29 PM<http://Dispatchers.IO|Dispatchers.IO>
rkeazor
06/04/2020, 2:55 PMclhols
06/04/2020, 3:58 PMsessionTimeOutJob = GlobalScope.launch {
delay(5.minutes)
activity.finishAffinity()
}
So after 5 minutes the coroutine should resume and call finishAffinity(). But it doesn't happen on release builds. Only on debug builds. On release builds it first resumes when the app is selected in the Recents menu. Long past the 5 minute mark.
Is it somehow the app lifecycle that prevents the coroutine from resuming?vaskir
06/04/2020, 4:06 PMp
is Deferred
, then I expected it to be cancelled on timeout:
select<Unit> {
p.onAwait { println("ok") }
onTimeout(1000) { println("timed out!") }
}
but it's not, it prints "timeout out!", p
is not cancelled. What is the best way to cancel a Deferred
by timeout?Zach Klippenstein (he/him) [MOD]
06/04/2020, 4:51 PMFlow
operators for StateFlow
that preserve the value
property? I filed an issue for map
here, curious if people have other use cases or use cases for other operators (or opinions about this proposal).
https://github.com/Kotlin/kotlinx.coroutines/issues/2081Tash
06/04/2020, 11:01 PMFlow<T>
For example:
// For Flow<List<*>>
fun postsFlow(): Flow<List<Post>>
// OR
fun postLists(): Flow<List<Post>>
// For Flow<*>
fun eventFlow(): Flow<Event>
// OR
fun events(): Flow<Event>
Would be interesting to know if there are other preferred styles.Rick
06/05/2020, 11:10 AMtransactionAwait
: "suspend function can be called only within coroutine body".
callRealmSuspend { realm ->
realm.transactionAwait {
realm.delete(RegionRealmObject::class.java)
realm.delete(ServiceRealmObject::class.java)
}
}
and my callRealmSuspend
function looks like this:
suspend fun callRealmSuspend(call: (realm: Realm) -> Unit): Realm? {
var realm: Realm? = null
try {
realm = Realm.getDefaultInstance()
call(realm)
} catch (error: java.lang.Exception) {
LogHelper.e(messages = *arrayOf("Unhandled error occurred: ${error.localizedMessage}."))
}
return realm
}
so how do I change callRealmSuspend
function to fix the error/ provide a coroutine body? thanks!dpux
06/05/2020, 5:07 PMhandler
function):
https://pl.kotl.in/zyHCNXVLgvaskir
06/05/2020, 6:47 PMchannelFlow
does not cancel the child?
suspend fun run() = channelFlow {
suspendCancellableCoroutine<Unit> { cont ->
cont.invokeOnCancellation { println("Cancelled.") }
cont.resume(Unit) { }
}
for (x in 1..10) {
send(x)
}
}
suspend fun main(): Unit = coroutineScope {
withTimeout(1000) {
run().collect {
println(it)
delay(2000)
}
}
}
It produces:
1
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:158)
at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:128)
at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)
at java.base/java.lang.Thread.run(Thread.java:834)
Process finished with exit code 1
Martin Nordholts
06/05/2020, 7:20 PMBlockingMethodInNonBlockingContext
) on the exec()
call in this code, but this is a false positive, right? Since I run with <http://Dispatchers.IO|Dispatchers.IO>
I don’t see how the code would be problematic:
suspend fun String.test() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
Runtime.getRuntime().exec(this@test)
}
droidrcc
06/06/2020, 9:57 PMrt
06/07/2020, 7:13 AMwhile (...) { delay(1000); emit(buffer); buffer.clear() }
to have timed buffering. I'm aware there are operators for this, just trying to figure things out for myself.
Also, any other notes on pitfalls of my implementation would be appreciated.Martin Nordholts
06/08/2020, 6:46 AMsuspend fun g(): String = "bar"
suspend fun f(mutableMap: MutableMap<String, String>) {
mutableMap.computeIfAbsent("foo") {
g()
}
}
gives me Suspension functions can be called only within coroutine body
compilation errror on the g()
call.
The only solution I can come up with is to re-implement computeIfAbsent
in Kotlin and make the fun
inline
, but that does not seem like an elegant, idiomatic solution.
What is the proper solution to this problem?elizarov
06/08/2020, 9:08 AMRx/Channel/Flow/LiveData
to deliver those events (comment in :thread-please: what you do instead).
1️⃣ Using an Event
wrapper class, adapting solution in the above story to coroutines.
2️⃣ Using a [buffered or rendezvous] Channel
for events, exposing it to the views either as Channel
or as a Flow
via receiveAsFlow()
extension.
3️⃣ Some other approach (comment in :thread-please:).elizarov
06/08/2020, 12:29 PMChannel
) how do you work around the fact that Channel.receive
can get a message in an activity that is already destroyed and thus attempt to work with it will cause an IllegalStateException
? (see this demo https://pl.kotl.in/Rs18kihuJ) The same problem happens when you use Channel.receiveAsFlow
and then collect from the flow (see this demo https://pl.kotl.in/-ehZrPMl4).
😱 I never knew it could happen! All my code is broken!
👍 It never happens to me because I use immediate
dispatcher (Android provides that by default) and I always post from the main thread, so between the event is posted and is being processed my activities cannot get destroyed.
👉 I do something else (comment in :thread-please:)dewildte
06/08/2020, 12:48 PMken_kentan
06/09/2020, 1:34 AMInappropriate blocking method call
when call blocking method in suspend function. I don't know that why this code was warned...
How to fix this?
import java.net.URL
suspend fun foo(bar: String): String {
if (bar.isEmpty()) {
return ""
}
return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
// warning: Inappropriate blocking method call
URL("<https://www.example.com>")
.readText()
}
}
rkeazor
06/09/2020, 7:57 AMken_kentan
06/09/2020, 9:00 AMInappropriate blocking method call
.
So, Should not I call method(will throw IOE) in suspend function?
suspend fun foo() {
// warning: Inappropriate blocking method call
throwIoException()
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
// warning: Inappropriate blocking method call
throwIoException()
}
}
@Throws(IOException::class)
fun throwIoException() {
// something blocking process
}
sdeleuze
06/09/2020, 5:16 PM1.4-M2
and related 1.3.7-1.4-M2
Coroutines dependencies with Gradle 6.5 and get this error:
> Could not resolve all files for configuration ':kotlin-coroutines:kotlinCompilerPluginClasspath'.
> Could not resolve org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1.
Required by:
project :kotlin-coroutines > org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable:1.4-M2 > org.jetbrains.kotlin:kotlin-scripting-compiler-impl-embeddable:1.4-M2
project :kotlin-coroutines > org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable:1.4-M2 > org.jetbrains.kotlin:kotlin-scripting-compiler-impl-embeddable:1.4-M2 > org.jetbrains.kotlin:kotlin-scripting-common:1.4-M2
> Cannot choose between the following variants of org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7-1.4-M2:
- iosArm32-api
- iosArm32-iosArm32MetadataElements
- iosArm64-api
- ...
All of them match the consumer attributes:
- Variant 'iosArm32-api' capability org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7-1.4-M2:
- Unmatched attributes:
- Provides org.gradle.status 'release' but the consumer didn't ask for it
- Provides org.gradle.usage 'kotlin-api' but the consumer didn't ask for it
- Provides org.jetbrains.kotlin.native.target 'ios_arm32' but the consumer didn't ask for it
- Provides org.jetbrains.kotlin.platform.type 'native' but the consumer didn't ask for it
- ...
Any hint about how to fix this (worked fine with Kotlin 1.3.72
and Coroutines 1.3.5
)?Kris Wong
06/09/2020, 8:19 PMrunBlocking(someContext)
, and within the block a call is made to a method that is calling launch
using a scope that was created with the same context. however, it is not blocking. am I doing something wrong?colintheshots
06/09/2020, 8:48 PMLuis Munoz
06/09/2020, 9:17 PMval singleThread = newSingleThreadContext("SingleThread")
val flow = flow {
emit("a")
blocking call
delay(1000)
emit("b")
}.flowOn(singleThread)
launch(<http://Dispachers.IO|Dispachers.IO>) {
flow.collect {
println(it)
}
}
launch(<http://Dispachers.IO|Dispachers.IO>) {
flow.collect {
println(it)
}
}
groostav
06/10/2020, 11:04 PMscope.actor<>{}
always implement Job
? I want to implement some code that calls actor.close()
, and then just to keep things synchronous I want to call (actor as Job).join()
, will that work in future kotlinx-coroutines?Ryan Simon
06/11/2020, 12:51 AMStateFlow<State>
setup to track state changes of a screen in Android. these state changes are observed by the View
.
additionally, i use a Channel<Intent>
to offer
some Intent
from the View
to the ViewModel
. the problem is that my code will suspend and it feels like the main thread is blocked until an Intent
is fully processed by the ViewModel
.
i've added some of my code below to hopefully help with understanding the problem
// ViewModel
val intentChannel = Channel<Intent>(Channel.UNLIMITED)
private val _state = MutableStateFlow<State>(Idle)
val state: StateFlow<State>
get() = _state
init {
viewModelScope.launch {
handleIntents()
}
}
// TODO: we seem to suspend the main thread every time we process an intent; NOT GOOD!
private suspend fun handleIntents() {
intentChannel.consumeAsFlow().collect {
when (it) {
is Init -> with(it) {
// do something on initialization
}
is Load -> with(it) {
_state.value = Loading(page == 0)
getBrandProducts(page = page)
}
is Sort -> with(it) {
_state.value = Sorting
getBrandProducts(page = 0)
}
}
}
}
// brandRepository flows on a <http://Dispatcher.IO|Dispatcher.IO>
private suspend fun getBrandProducts(page: Int) {
brandRepository.getPromotedProducts(
categorySlug = categorySlug,
sortBy = currentSortBy,
sortOrder = currentSortOrder,
offset = page * requestLimit,
limit = requestLimit
)
.map { result -> result.getOrNull()!! }
.collect { result -> _state.value = Loaded(result) }
}
Ryan Simon
06/11/2020, 12:51 AMStateFlow<State>
setup to track state changes of a screen in Android. these state changes are observed by the View
.
additionally, i use a Channel<Intent>
to offer
some Intent
from the View
to the ViewModel
. the problem is that my code will suspend and it feels like the main thread is blocked until an Intent
is fully processed by the ViewModel
.
i've added some of my code below to hopefully help with understanding the problem
// ViewModel
val intentChannel = Channel<Intent>(Channel.UNLIMITED)
private val _state = MutableStateFlow<State>(Idle)
val state: StateFlow<State>
get() = _state
init {
viewModelScope.launch {
handleIntents()
}
}
// TODO: we seem to suspend the main thread every time we process an intent; NOT GOOD!
private suspend fun handleIntents() {
intentChannel.consumeAsFlow().collect {
when (it) {
is Init -> with(it) {
// do something on initialization
}
is Load -> with(it) {
_state.value = Loading(page == 0)
getBrandProducts(page = page)
}
is Sort -> with(it) {
_state.value = Sorting
getBrandProducts(page = 0)
}
}
}
}
// brandRepository flows on a <http://Dispatcher.IO|Dispatcher.IO>
private suspend fun getBrandProducts(page: Int) {
brandRepository.getPromotedProducts(
categorySlug = categorySlug,
sortBy = currentSortBy,
sortOrder = currentSortOrder,
offset = page * requestLimit,
limit = requestLimit
)
.map { result -> result.getOrNull()!! }
.collect { result -> _state.value = Loaded(result) }
}
Zach Klippenstein (he/him) [MOD]
06/11/2020, 12:53 AMmy code will suspend and block the main thread“suspend” and “block” mean different things. Blocking the main thread is (generally) bad. Suspending a coroutine that’s running on the main thread is perfectly fine.
Ryan Simon
06/11/2020, 12:53 AMLoad
intent with a new page, the main thread hangs as the user scrolls and delays the scrolling animation until the work is doneZach Klippenstein (he/him) [MOD]
06/11/2020, 12:56 AMgetPromotedProducts
isn’t the culprit?Ryan Simon
06/11/2020, 12:57 AMfun getPromotedProducts(
categorySlug: String,
sortBy: String?,
sortOrder: String?,
offset: Int,
limit: Int
): Flow<Either<List<BrandProduct>>> {
// TODO need to solve for retry/error handling for Retrofit
return flow {
val userLocation = locationRepository.getLocation().receive().getOrNull()
try {
val result = userLocation?.run {
api.getPromotedProducts(
categorySlug = categorySlug,
latlng = "${this.latitude},${this.longitude}",
sortBy = sortBy,
sortOrder = sortOrder,
offset = offset,
limit = limit
).run { success(this.data?.brandProducts!!) }
} ?: Either.error<List<BrandProduct>>(ServerError)
emit(result)
} catch (e: Exception) {
when (e) {
is HttpException -> {
Timber.d("HttpError ${e.response()?.errorBody()?.string()}")
}
}
emit(Either.error(ServerError))
}
}.flowOn(dispatcher)
}
Zach Klippenstein (he/him) [MOD]
06/11/2020, 12:57 AMconsumeAsFlow().collect
is unnecessary, you can just do consumeEach
.Ryan Simon
06/11/2020, 12:57 AM<http://Dispatchers.IO|Dispatchers.IO>
Zach Klippenstein (he/him) [MOD]
06/11/2020, 1:04 AMdispatcher
here is <http://Dispatchers.IO|Dispatchers.IO>
?
(in the future, big code posts like this are generally easier to read when posted as snippets instead of inline)Ryan Simon
06/11/2020, 1:04 AM<http://Dispatchers.IO|Dispatchers.IO>
Zach Klippenstein (he/him) [MOD]
06/11/2020, 1:05 AMgetPromotedProducts
isn’t even returning immediately?Ryan Simon
06/11/2020, 1:06 AMZach Klippenstein (he/him) [MOD]
06/11/2020, 1:07 AMRyan Simon
06/11/2020, 1:09 AMZach Klippenstein (he/him) [MOD]
06/11/2020, 1:10 AMLoaded
while you’re scrolling, and Loaded
doesn’t include any items, does your RecyclerView keep its current list until it gets a new one from a Loaded
state?Ryan Simon
06/11/2020, 1:12 AMZach Klippenstein (he/him) [MOD]
06/11/2020, 1:13 AMRyan Simon
06/11/2020, 1:14 AMZach Klippenstein (he/him) [MOD]
06/11/2020, 1:16 AM_state.value = Loading(page == 0)
line, does that change anything?Ryan Simon
06/11/2020, 1:17 AMZach Klippenstein (he/him) [MOD]
06/11/2020, 1:18 AMwithContext(<http://Dispatchers.IO|Dispatchers.IO>) { }
around your entire getBrandProducts
method body?Ryan Simon
06/11/2020, 1:19 AMZach Klippenstein (he/him) [MOD]
06/11/2020, 1:27 AMRyan Simon
06/11/2020, 1:29 AMdelay(1000)
to the network request Flow
we make to fetch items, and the other video has no delay
the video with no delay
has the very obvious frame drops, and the one with a delay(1000)
doesn't