Tomas Kormanak
09/07/2021, 12:42 PMcoroutineScope {
val job1 = launch { foo() }
val job2 = launch { bar() }
delay(1000)
job1.cancel()
job2.cancel()
}
Tower Guidev2
09/07/2021, 2:16 PMLilly
09/07/2021, 2:59 PMDispatchers.Default
. I'm asking because in my vms I have suspending fucntions as well as non-suspending functions (most of the time not expensive operations) and I'm wondering if there is a thumb of rule when to switch from main
to default
or should we always switch to default when we working outside the UI scope not matter how expensive the operation is?diego-gomez-olvera
09/07/2021, 4:07 PMLilly
09/07/2021, 11:55 PMwithIndex
, takeWhile
and takeIf
but it doesn't let me close the flow. I'm not sure but does takeWhile
close the flow if false is returned? Maybe a combination of multiple operators will do it. Any ideas?Lilly
09/08/2021, 11:56 AMsuspend fun readPacketFlow(): Flow<Packet> = flow {
api.readByteFlow().collect { bytes ->
// parse the bytes and build packets from it
...
val packet: Packet = someOperation()
emit(packet)
}
I'm observing strange behavior. On the caller site I'm collecting the flow like:
fun requestPacket() {
coroutineScope {
launch {
dataSource.readPacketFlow()
.onStart { Log.d("PacketFlow", "starting packet flow.") } // logged
.onCompletion { Log.d("PacketFlow", "completing packet flow.") } // logged
.collect {
val packet = it as ResponsePacket
Log.i("Received in Flow", "bytes: [${packet.bytes.toHexString()}]") // not logged
}
Log.i("Request", "after readPacketFlow.") // not logged
}
}
Log.i("Request", "after scope.") // not logged
}
None of the logs (except the first two) are logged. I don't get what's going on here. Any ideas?The Monster
09/08/2021, 7:52 PMreturn webservice.getAllFacts()
.also {
coroutineScope {
launch {
catFactDao.insertAllFacts(it)
}
}
}
Code compiles, but is it a bad idea to use scope function on the return value (does it block even with the coroutine?)Johnjake Talledo
09/09/2021, 8:50 AMunit test
I was testing hot flow / MutableSharedFlow
inside my runBlockTest
I need to collect the flows then cancel the job, launch
return Job
and coroutineScope
return a Unit
and both of them can call suspending function
If I use both launch
it will work I get the desired output which is 10, but if I used coroutineScope
to collect the flows and cancel the using launch
I get Job was cancelled
or instead of launch using both coroutineScope
I get the same error. Why? Also if I use move the delay to first launch
collection, I get the same prompt error of Job was cancelled
, I thought I was running parallel execution e.g: like launch1
after completed launch2
then cancel the entire TestCoroutineScope
.Norbi
09/09/2021, 9:43 AMsuspend
function (called by a coroutine) can I somehow access the coroutineContext
? To be more specific, I'd like to access the CoroutineName
for debugging reasons if specified.The Monster
09/09/2021, 7:18 PMgetUser
function is not suspend, but it is calling refreshUser
function. The code is from here https://developer.android.com/jetpack/guide#persist-data
fun getUser(userId: String): Flow<User> {
refreshUser(userId)
// Returns a Flow object directly from the database.
return userDao.load(userId)
}
private suspend fun refreshUser(userId: String) {
// Check if user data was fetched recently.
val userExists = userDao.hasUser(FRESH_TIMEOUT)
if (!userExists) {
// Refreshes the data.
val response = webservice.getUser(userId)
// Check for errors here.
`// Updates the database. Since userDao.load()
returns an object of`
`// Flow<User>
, a new User
object is emitted every time there's a`
`// change in the User
table.`
userDao.save(response.body()!!)
}
}
Lauren Yew
09/09/2021, 11:33 PMLilly
09/10/2021, 12:16 AMchannelFlow
that stops sending to collector after some time and I can't figure out why:
override fun readByteArrayStream(): Flow<ByteArray> = channelFlow {
try {
requireNotNull(bluetoothSocket) { "Bluetooth socket is null. Connection has been closed." }
val inputStream = bluetoothSocket!!.inputStream
val buffer = ByteArray(1024)
while (isActive) {
val numBytes = inputStream.read(buffer)
val readBytes = buffer.copyOf(numBytes)
Log.d("Reading", "$readBytes")
send(readBytes)
}
} catch (e: Exception) {
Log.e("Error", "error: ${e.message}")
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
It's still logging so the flow didn't stop. Any ideas?Alexey Yakovlev
09/10/2021, 11:04 AMDaniele Segato
09/10/2021, 2:44 PMval sourceA = MutableStateFlow<A>(null)
val sourceB = MutableStateFlow<B>(null)
val dataFromA = sourceA.map { it.x }.distinctUntilChanged()
val dataFromB = sourceB.map { it.y }.distinctUntilChanged()
val updateProcedure =
dataFromA
.combine(dataFromB) { x, y -> x to y }
.distinctUntilChanged()
.onEach { (x, y) ->
otherMutableStateFlow.update { /*...*/ }
}
.launcIn(viewModelScope)
now if I input changing data in sourceA it is completely ignored after the 1st one: my onEach
run only once.
if I remove the combine and just get data from A + hardcode y to something it gets called every time.
I've used combine SO MANY times and this is the first time something like this is happening to me.
Does anyone have ANY idea of what could be going on here?dimsuz
09/10/2021, 4:22 PMsuspend fun doWork()
and want to also wrap it in "listener-form" like
fun doWork(onWorkDone: () -> Unit) {
someScope.launch { doWork(); onWorkDone() }
}
I fear this might have some thread safety issues. What would be the "correct" way to do something like this?
Why I want this: having a public API, where coroutines are an implementation detail. So I don't want to expose suspend
functions or CoroutineScope
anywhere.dimsuz
09/10/2021, 10:13 PMwithContext
?
suspend fun callMe() {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
doAllWorkHere()
}
}
If withContext
is not used then I risk that the caller will use a context which is bad for this particular workload.
Or is there some other well established pattern or design consideration for such cases?brabo-hi
09/11/2021, 1:29 AMFlow<List<T>>
to Flow<T>
TwoClocks
09/11/2021, 1:54 AMDidier Villevalois
09/12/2021, 9:26 AMnativeTest
task in a multi-platform project (using coroutines 1.5.2-native-mt
). I use runBlocking
around each of my tests. This works correctly when I run each test independently one by one. But when I run the whole test suite, at the start of the second test, I get Uncaught Kotlin exception: kotlin.IllegalStateException: Cannot execute task because event loop was shut down
(full stack-trace in thread). Did anyone ever faced this?benkuly
09/13/2021, 8:36 AML.C
09/13/2021, 10:47 AMtypealias AsyncCallback = (Result?, Throwable?) -> Unit
private val listeners = mutableListOf<Command>()
@ExperimentalUnsignedTypes
fun interface CommandInterface {
fun performAsync(
arg1: UByte,
arg2: UByte?,
arg3: ByteArray,
callback: AsyncCallback
)
}
@ExperimentalUnsignedTypes
data class Command(
val arg1: UByte,
val arg2: UByte? = null,
val arg3: AsyncCallback
)
private val executor = CommandInterface { arg1, arg2, arg3, callback ->
listeners.add(
Command(
arg1,
arg2,
callback = callback
)
)
// This method returns [Result] and resume [continuation]
action(arg3)
}
suspend fun perform(
arg1: UByte,
arg2: UByte?,
arg3: ByteArray
): Result? =
try {
suspendCoroutineWithTimeout(10000) { continuation ->
try {
continuation.invokeOnCancellation {
cancelRequest(arg1, arg2)
}
//Log.d(TAG,"arg1 type : ${arg1 is UByte}")
executor.performAsync(
arg1,
arg2,
arg3 = arg3
) { result, exception ->
when {
response == null ->
continuation.resumeWithException(Exception("Null point exception"))
exception != null ->
continuation.resumeWithException(exception)
else -> {
continuation.resume(result)
}
}
}
} catch (e: Throwable) {
e.message
continuation.cancel()
}
}
} catch (e: TimeoutCancellationException) {
cancelRequest(arg1,arg2)
null
}
When the app called perform() function, it got an error in a coroutine:abstract method “void CommandInterface.performAsync-vmjznIo(byte, kotlin.UByte, byte[], kotlin.jvm.functions.Function2)”
It seemed the Kotlin compiler recognised the first argument arg1 as byte, but the interface defined it as UByte.
The Kotlin plugin I am using is 1.5.30, AGP version is 7.0.2lesincs
09/13/2021, 12:39 PMmaxmello
09/13/2021, 2:20 PMcallbackFlow + stateIn
or a flow created with combine + stateIn
. Now this is a StateFlow
which has no emit
that is callable from “outside”. Is there a way to achieve this, basically a .toMutableStateFlow()
?Karlo Lozovina
09/13/2021, 7:23 PMChris Wu
09/13/2021, 8:27 PMFlux
, but I want to have instead a Flow
. Is this something people do and if so, how do I go about doing it? Or am I thinking of this the wrong way? I have google searched, but surprisingly I can’t find any examples to learn from. Thanks so much!Jrichards1408
09/14/2021, 11:03 AMhfhbd
09/14/2021, 11:24 AMDispatcher.Main
when converting the Publisher to Flow and vice versa, (maybe even too much), but I am always getting
Flow invariant is violated:
Emission from another coroutine is detected.
Child of null, expected child of DispatchedCoroutine{Completed}@806156c8.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
Justin Salér
09/14/2021, 11:55 AMzip
, but it cancels Flow B. I can get it to work with replayCache()?.latestOrNull()
but that doesn't look good at all...maxmello
09/15/2021, 8:07 AMMutableStateFlow.update { newValue }
vs. .emit(newValue)
? Basically, just to be safe regarding concurrent updates?thanh
09/16/2021, 8:23 AM-Dkotlinx.coroutines.debug
in Android Studio to debug coroutine?thanh
09/16/2021, 8:23 AM-Dkotlinx.coroutines.debug
in Android Studio to debug coroutine?Albert Chang
09/16/2021, 8:37 AMif (BuildConfig.DEBUG) {
System.setProperty(
kotlinx.coroutines.DEBUG_PROPERTY_NAME, kotlinx.coroutines.DEBUG_PROPERTY_VALUE_ON
)
}
thanh
09/16/2021, 8:49 AMuli
09/16/2021, 8:44 PM