julian
07/02/2020, 2:13 AMcompose
operator?
(Rather than operating individual items emitted by a Flow, it would operate on the Flow itself, returning a new composed Flow.)Jamie Craane
07/02/2020, 6:46 AM@ExperimentalCoroutinesApi
class PersonsViewModel {
private val realApi = RealApi()
// cached list of persons
private var cachedPersons: List<Person>? = null
// indicates of the data should be retrieved from network or cache. This is a MutableStateFlow since its value
// can be updated by the UI (for example uses refreshes the page).
private val refreshTrigger: MutableStateFlow<RefreshTrigger> = MutableStateFlow(RefreshTrigger.FromCache())
// flatMapLatest on refreshTrigger because we are interested in the latest value only.
var persons: Flow<CommonDataContainer<List<Person>>> = refreshTrigger.flatMapLatest { trigger ->
flow<CommonDataContainer<List<Person>>> {
// Notify the UI we are loading data
emit(CommonDataContainer.Loading())
val refresh = trigger is RefreshTrigger.Refresh
val cached = cachedPersons
if (!refresh && cached != null) {
// Return the cached data if not explicitly refreshed and data is in cache.
emit(CommonDataContainer.Success(cached))
} else {
// Refresh or data is not in cache, retrieve from network.
retrievePersonsFromNetwork()
}
}
}
private suspend fun FlowCollector<CommonDataContainer<List<Person>>>.retrievePersonsFromNetwork() {
val response = realApi.retrievePersons()
when (response) {
is Success -> {
cachedPersons = response.data
// Emit the data.
emit(CommonDataContainer.Success(response.data))
}
// Something went wrong, emit a failure.
is Failure -> emit(CommonDataContainer.Failure())
}
}
suspend fun refresh() {
yield()
refreshTrigger.value = RefreshTrigger.Refresh()
}
}
// CommonDataContainer can represent various states which are interesting to collectors.
sealed class CommonDataContainer<out T> {
// Indicates the data is loading
class Loading<T> : CommonDataContainer<T>()
// Indicates an error occurred retrieving the data
class Failure<T> : CommonDataContainer<T>()
// Indicates success. Holds the data in the data property.
class Success<out T>(val data: T) : CommonDataContainer<T>()
}
// Indicates where the data should com from. Both classes have a unique id to make sure the Refresh trigger is always emitted by MutableStateFlow
// since MutableStateFLow does not emit the value if the new value is the same as the current value.
sealed class RefreshTrigger {
data class FromCache(private val id: String = Random.nextInt().toString()) : RefreshTrigger()
/**
* Every instance has a unique id so the refresh StateFlow sees it as a new value. StateFlow does not emit the same value twice if
* the current value is the same as the new value.
*/
data class Refresh(private val id: String = Random.nextInt().toString()) : RefreshTrigger()
}
ildar.i [Android]
07/02/2020, 10:56 AMsuspend
operations, each can be called independently:
foo1(list)
, foo2(list)
and foo3(list)
- each function changes state of items in list
- foo1
just makes request to the server and returns updated list (updates 1 step)
- foo2
calls foo1
and then with combined result makes a request (updates 2 steps)
- foo3
calls foo2
, combines result and makes a request (updates 3 steps)
Problem is, foo2
needs to gather data (SMS-code) via DialogFragment.
Is there a way to build a continuous pipeline, so the foo3
can continue its flow after foo2
called a dialog, gathered input and returned?vineethraj49
07/02/2020, 2:01 PMinit {}
?Raul
07/02/2020, 2:58 PMjean
07/03/2020, 6:50 AMexpectItem()
I get a timeout exception, but if I check for no more items, it tells me something was received. Any hints to fix this?Nemanja Scepanovic
07/03/2020, 1:44 PMfun getBooksFlow: Flow<List<Book>> =
getUserFlow().flatMapLatest { user -> getUserBooksFlow(user.userId) }.flowOn(Dispatchers.Default)
Marcin Wisniowski
07/03/2020, 10:03 PM.receive()
on both of them, so if there is anything in channel 1 or anything in channel 2 it resumes, but of course .receive()
doesn't actually work since I can't call it on both. What should I be using here? The point here is that channel 1 is higher priority, so I want to get the next item, from channel 1 if there are any, and from channel 2 if channel 1 is empty.ursus
07/04/2020, 11:35 AMTim Malseed
07/04/2020, 2:07 PMStateFlow
to sort of cache and share the result of a coroutine. The StateFlow
is held in a singleton, and accessed (collected) from multiple different places.
When the scope that launches one of these collectors is cancelled (via scope.cancel()
, it seems that the StateFlow
is cancelled and no longer emits to any collectors.
Can I cancel the collectors launched in a scope, without cancelling the parent producer?tseisel
07/05/2020, 12:32 PMFlow.cancellable
operator ? What kind of use case does it solve ?Michał Kalinowski
07/05/2020, 3:02 PMsuspend fun fooA(){
coroutineScope {
fooB(this)
}
}
fun fooB(coroutineScope: CoroutineScope){
continueWithSuspend(coroutineScope){
continueSuspendingFooA()
}
}
private suspend fun continueSuspendingFooA(){}
Jamie Craane
07/05/2020, 4:45 PMflowOf(getWeather(loc)).zip(flowOf(reverseGeocode(loc))) { weather, address ->
weather to address
}
.flowOn(Dispatchers.Default)
.collect { println(it) }
The time it takes to execute this is the total time of the getWeather and reverseGeocode calls combined. I know I can use a map
operator and two async/wait blocks to execute the two calls asynchronously. Is it possible to execute multiple coroutines
like this with flow operators? I tried a couple of different things but without success?
I saw https://github.com/Kotlin/kotlinx.coroutines/issues/1147 so perhaps all operators are sequential for now.dimsuz
07/05/2020, 9:04 PMscan
operator and hot event stream. Basically I want to configure the flow, send events and verify they are received:
runBlocking(Dispatchers.Default) {
val events = BroadcastChannel<Int>(capacity = Channel.BUFFERED)
launch {
events.asFlow().scan(1) { accumulator, value -> value }
.take(2)
.collect { println("received $it") }
}
launch {
events.send(42)
}
}
The above doesn't work, it receives 1(inital accum) item and then hangs indefinitely, because send
is done before collect begins.
I tried moving send
inside the collect
, after initial element is received, but this doesn't work, because scan
is implemented so that its internal collect
also starts after my send
is called (it starts after first collection completes).
Any advice on how to do that?Marcin Wisniowski
07/05/2020, 10:06 PMMark
07/06/2020, 2:22 AMContentResolver
you can optionally pass a CancellationSignal
argument. So I would like to call cancel()
on that object when the coroutine is cancelled. Is there a way to do this without using suspendCancellableCoroutine
(since I don’t need the whole resume()
functionality)?
suspendCancellableCoroutine<Result> { continuation ->
val cancellationSignal = CancellationSignal()
continuation.invokeOnCancellation {
cancellationSignal.cancel()
}
val result: Result = resolver.query(args, cancellationSignal)
continuation.resume(result)
}
bbaldino
07/06/2020, 5:11 PMMaciek
07/06/2020, 6:18 PMAritracrab
07/06/2020, 8:52 PMTim Malseed
07/06/2020, 11:50 PMLuis Munoz
07/07/2020, 4:54 PMfun listen(port: Int = 1337, nThreads: Int = 2) = callbackFlow<String> {
val ch = b.bind(port).coAwait { }
for (obj in coReceiveChannel) {
send(obj)
}
awaitClose {
coReceiveChannel.cancel()
ch.close()
}
} // end of listen
job = listen().onEach { println(it) }.launchIn(scope)
job.cancel() // doesn't call awaitClose
zak.taccardi
07/07/2020, 10:31 PMFlow<T>
? Specifically, if I begin observing a Flow<ViewState>
and it emits no ViewState.Loaded
in the first 300ms, I want to emit a ViewState.Loading
while ensuring that this ViewState.Loading
emits before any ViewState.Loaded
from the source Flow<ViewState>
?
Kind of equivalent to:
val sourceFlow: Flow<ViewState> = ..
sourceFlow
.onStart {
delay(300)
// if `sourceFlow` has not emitted any items yet
emit(ViewState.Loading)
}
https://developer.android.com/reference/androidx/core/widget/ContentLoadingProgressBarandylamax
07/08/2020, 4:16 AM(Mutable)StateFlow<T>
to another (Mutable)StateFlow<T>
? when I call the map
method I map the StateFlow<T>
to a Flow<T>
antrax
07/08/2020, 10:47 AMjdemeulenaere
07/08/2020, 1:56 PMpackage com.example.script
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() {
val count = MutableStateFlow(0)
runBlocking {
// Collect the values in a new coroutine.
val job = launch {
// Prints "3 4 5".
count.take(3).collect {
println("$it ")
}
}
// TODO: Somehow synchronize and suspend here until the coroutine starts collecting
// so that we print "0 1 2" instead.
repeat(3) {
count.value++
}
repeat(3) {
delay(500)
count.value++
}
}
}
aballano
07/08/2020, 2:32 PMinterface ASyntax {}
interface BSyntax : ASyntax
fun hello(f: ASyntax.() -> Unit) = println("A")
suspend fun hello(f: BSyntax.() -> Unit) = println("B")
fun f1() = hello { } // Might work or cause a compiler error because it resolves to the suspended version
suspend fun f2() = hello { } // Will call any hello version, but which one is unknown at first place
suspend fun main() {
f1()
f2()
}
The problem is that depending on the case, either f1 will cause a compilation error because it resolves to the suspended version or f2 will resolve to the non-suspended version.
So I’m wondering if someone knows if this is doable and it might be a bug (that I can report) or this is not really intended to be a thing in the first place (and I should rename one of the hello functions instead)conner
07/08/2020, 9:36 PMSIGSEGV (0xb)
, C [libsystem_pthread.dylib+0x15ad] pthread_mutex_lock+0x0
) when the value is created outside of a callbackFlow
but not when it's inside, what would that mean? in other words,
val thing = createNativeThing()
callbackFlow {
thing.onCallback = {
offer(it)
}
thing.start()
}
crashes but
callbackFlow {
val thing = createNativeThing()
thing.onCallback = {
offer(it)
}
thing.start()
}
does notSam
07/09/2020, 9:03 AMNot Work:
@GET("/users")
suspend fun getUsers(
@QueryMap params:Map<String,Any>): BaseResponse<List<UserDto>>
Worked:
@GET("/users")
suspend fun getUsers(
@Query("lat") lat:Double,
@Query("long") lon:Double,
@Query("genders")genders: String,
@Query("age_max") ageMax:Int,
@Query("age_min") ageMin:Int
): BaseResponse<List<UserDto>>
My Repository implement:
fun search(params:Map<String,Any>) = flow {
emit(service.getMatchedUsers(params
).data.map {
it.mapToDomainModel() }.toList())
}
Ayoub
07/09/2020, 2:30 PMRyan Simon
07/09/2020, 8:47 PMFlow<T>
situation that I'd love to get some feedback on.
the gist is that I'm wanting to transform some Flow<T>
to some Flow<R>
, where the transform
function receives a Result<T>
, but should spit out a Result<R>
to line up with the return type of my function
here's what the existing code looks like
override fun getEquipment(type: String): Flow<Result<List<Equipment>>> {
return apiCallHandler.process { equipmentApi.getEquipment(type) }
.transform { result ->
result.onSuccess { response ->
if (response.equipment.isNotEmpty()) {
val equipmentList = ArrayList<Equipment>()
for (equipment in response.equipment) {
equipmentList.add(equipment.toEquipment())
}
emit(Result.success(equipmentList))
} else {
emit(Result.failure(EquipmentDataFailure.NoEquipmentFound))
}
}
}
}
The main issue here is that if the result.onSuccess
is never hit, the Flow will not emit anything and we won't know about it
From what I can tell, this is because the result
parameter given to us in the transform
lambda is of type Result<T>
, so if we just pass it through the Flow
, it will terminate the Flow
because we're expecting a Flow
with a Result<R>
(that's List<Equipment> in the example above)
The way to work around this is to handle the result.onFailure
case and emit
from there. The thing is that there's no hint at having to do this from the IDE/compiler.
Am I doing something I shouldn't be? And if not, maybe this is something that should be filed with the Kotlin team? Weird one for sure.Ryan Simon
07/09/2020, 8:47 PMFlow<T>
situation that I'd love to get some feedback on.
the gist is that I'm wanting to transform some Flow<T>
to some Flow<R>
, where the transform
function receives a Result<T>
, but should spit out a Result<R>
to line up with the return type of my function
here's what the existing code looks like
override fun getEquipment(type: String): Flow<Result<List<Equipment>>> {
return apiCallHandler.process { equipmentApi.getEquipment(type) }
.transform { result ->
result.onSuccess { response ->
if (response.equipment.isNotEmpty()) {
val equipmentList = ArrayList<Equipment>()
for (equipment in response.equipment) {
equipmentList.add(equipment.toEquipment())
}
emit(Result.success(equipmentList))
} else {
emit(Result.failure(EquipmentDataFailure.NoEquipmentFound))
}
}
}
}
The main issue here is that if the result.onSuccess
is never hit, the Flow will not emit anything and we won't know about it
From what I can tell, this is because the result
parameter given to us in the transform
lambda is of type Result<T>
, so if we just pass it through the Flow
, it will terminate the Flow
because we're expecting a Flow
with a Result<R>
(that's List<Equipment> in the example above)
The way to work around this is to handle the result.onFailure
case and emit
from there. The thing is that there's no hint at having to do this from the IDE/compiler.
Am I doing something I shouldn't be? And if not, maybe this is something that should be filed with the Kotlin team? Weird one for sure.octylFractal
07/09/2020, 8:50 PMtransform
is a 1 -> N
mapping, where N can be zero, but map
is a 1 -> 1
mapping, which is what I think you were going forRyan Simon
07/09/2020, 8:54 PMmap
is the better operator for my use case
though what I really wanted was for the Result
to pass through if it wasn't successful
my process
function returns a Result<T>
that may or may not be a success
. if it's not a success
, i want the failure
to just pass through without being touched by the transform
octylFractal
07/09/2020, 8:57 PMFlow.map
to ensure 1 -> 1
(2) Result.mapCatching
to handle success/failure in mapping to new result
(3) equipment.map
for a simpler transformation to ListRyan Simon
07/09/2020, 9:03 PMoverride fun getEquipment(type: String): Flow<Result<List<Equipment>>> {
return apiCallHandler.process { equipmentApi.getEquipment(type) }
.map { result ->
result.mapCatching { response ->
if (response.equipment.isEmpty()) {
throw EquipmentDataFailure.NoEquipmentFound
} else {
response.equipment.map { it.toEquipment() }
}
}
}
}
Now I get this error from the compiler:
org.jetbrains.kotlin.codegen.CompilationException: Back-end (JVM) Internal error: Couldn't transform method node:
emit$$forInline (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;:
octylFractal
07/09/2020, 9:15 PM.map
to a separate functionRyan Simon
07/09/2020, 9:16 PM