christiangruber
01/08/2020, 11:07 PMcollect{}
to run in a particular context? Should I move what was done in a collect() to an onEach{} and then let collect{} just trigger final logging?Abhishek Bansal
02/12/2020, 4:38 PMflowOf(
remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
.catch { error -> Timber.e(error) },
remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
Timber.i("Response Received")
}
my expectation here was that I will get first result first and then second after sometime as response comes from server. But the problem here is collect only gets called after server returns result.Adam Bennett
03/02/2020, 12:51 PMpublish(selector: (Flow<T> -> Flow<R>)
? I’m looking at this issue and I have the exact same use-case https://github.com/Kotlin/kotlinx.coroutines/issues/1086#issuecomment-585174549Antoine Gagnon
03/11/2020, 7:23 PMAbhishek Bansal
03/16/2020, 11:30 AMfun main() = runBlocking<Unit> {
val test = Test()
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
println("Receive 1")
test.getResponseChannel()
.collect { println("Collect 1 $it") }
}
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
println("Receive 2")
test.getResponseChannel()
.collect {
println("Collect 2 $it")
}
}
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
println("Receive 3")
test.getResponseChannel()
.collect {
println("Collect 3 $it")
}
}
println("Sending A")
test.responseChannel.send("A")
println("Sending B")
test.responseChannel.send("B")
println("Sending C")
test.responseChannel.send("C")
}
class Test {
val responseChannel = BroadcastChannel<String>(BUFFERED)
fun getResponseChannel(): Flow<String> {
val channel = responseChannel.openSubscription()
return channel
.consumeAsFlow()
.onCompletion {
println("Cancelled")
channel.cancel()
}
}
}
Here you will see that onCompletion()
and hence “Cancelled” never prints. My requirement is to be able to add and detach observers to a broadcast channel. It that achievable?Maciek
03/17/2020, 5:51 AMjdiaz
03/19/2020, 11:08 PMAli Sabzevari
05/15/2020, 6:19 PM"-Xopt-in=kotlinx.coroutines.ExperimentalCoroutinesApi"
to compiler args. Like this:
compileKotlin {
kotlinOptions {
jvmTarget = "1.8"
apiVersion = "1.3"
languageVersion = "1.3"
freeCompilerArgs = ["-progressive", "-Xopt-in=kotlinx.coroutines.ExperimentalCoroutinesApi"]
}
}
fatih
05/16/2020, 4:53 PM// If the collecting scope is cancelled before this map operation,
// seems like map operation is not cancelled and throws an exception
numbersFlow.map {
// There is an operation which throws exception
}
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
I have to use ensureActive()
before mapping but is it the expected behaviour?enighma
05/20/2020, 9:55 PMMarcin Wisniowski
06/01/2020, 7:20 PMChannel
representing a job queue, and a method that `send`s a new job to the queue. A separate coroutine is an infinite receive
loop that processes the jobs. How would I represent that with Flow? I'm not sure how to use the flow {}
builder since I don't know what to emit
upfront, the jobs come from outside.adk
06/09/2020, 7:08 AMKoya Sivaji
06/15/2020, 9:27 PMI am new to kotlin flows and trying to add Flows to one of my project and facing an issue while converting list of objects of one type to another as shown below. Basically, am not sure which operator to be used for this.
//Repository - method return type is Flow<List<HistoryEvent>>
fun getHistoryEvents(): Flow<List<HistoryEvent>>
//ViewModel - calling repository method and trying to convert data to List<AccessoryEvent>
val accessoryEvents : LiveData<List<AccessoryEvent>>= liveData{
repository.getHistoryEvents()
// what operator to be used to convert from List<HistoryEvent> to List<AccessoryEvent>
.collect()
}
I tried like below
val eventHistory: LiveData<List<AccessoryEventType>> =
liveData {
deviceRepository.getDeviceEventHistoryFor("abcd")
.map { historyEvents ->
emit((historyEvents.map { AccessoryEventType.getEventType(it) }))
}
.collect()
}
after this, all elements in the returned list are same and equals to the last emitted element. Am not sure what I am doing wrong
Maciek
07/06/2020, 6:18 PMTony Blundell
07/10/2020, 1:12 AMvar highest = 0
for (i in myIntegers) {
if (i > highest) {
highest = i
doSomethingElse()
}
}
Oleh Havrysh
08/11/2020, 3:33 PMmiqbaldc
08/14/2020, 12:47 PM@Test
fun testFlow1() {
(1..5)
.asFlow()
.onEach { println(it) } // prints: 1, 2, 3, 4, 5
}
@Test
fun testFlow2() {
data class Abc(val id: Int)
val list: Flow<List<Abc>> = flow { emit(listOf(Abc(0), Abc(1), Abc(2))) }
list
.onEach { println(it) } // prints: list of object instead of each single abc object
}
Does currently possible to iterate each index for testFlow2
without using it like this:
list
.onEach {
it.forEach { abc -> println(abc.id) }
}
My goals:
list
.onEach { abc ->
println(abc.id) // prints: [0, 1, 2]
}
Ryan Pierce
08/15/2020, 7:55 PMSharedFlow
and a StateFlow.buffer(_UNLIMITED_)
? Buffering a StateFlow seems to mimic SharedFlow for a simple test I created. Is there a fundamental difference between the way those two flows behave?Canato
09/01/2020, 5:45 PMDoes any one can guide me to an article or sample app where this works?
Not sure what is the best single responsibility for this case.
• Do I retrieve all information from the Endpoint and emit after each map?
• Can I use Flow for pagination ?
• Maybe this is not a good case for Flow?
Thanks ;)Dario Pellegrini
09/03/2020, 7:27 AMansman
09/04/2020, 7:55 PMtransformLatest
but for combineTransform
? I'm looking specifically for something that cancels the work if a new item comes in. For now I can just zip up the items in combine
to a pair and use transformLatest
muthuraj
09/07/2020, 9:00 AMStateFlow
to emit some states and do some work based on that. My code looks like this
state.onEach(this::handleState)
.launchIn(scope)
Now since the onEach
block is a suspending function, if another state is emitted while it is executing, it doesn't cancel the current suspending function. Instead it waits for the handleState
function to finish it's execution and then next state's handling is being done. I realise this is actually done intentionally to handle back pressure and all, but what I want is to cancel onEach
execution when new state is received and run onEach
for the new state.
Currently I'm running the handleState
block in another coroutine using scope.launch { }
, store that job as member variable, then cancel that in onEach
before launching new coroutine. Is there any other way to achieve this?Matti MK
09/07/2020, 12:02 PMfun fetchData() {
val genresWithMovies: List<GenreWithMovies> = dataSource.queryGenres().map { genre ->
val movies = dataSource2.queryMovies(genre.id)
genre.movies = movies
genre
}
}
Any tips?Ryan Pierce
09/08/2020, 3:42 AMDaniele B
09/14/2020, 9:37 PMcommonMain
) looks like this:
class CoreViewModel {
internal val mutableStateFlow = MutableStateFlow(AppState())
val stateFlow: StateFlow<AppState>
get() = mutableStateFlow
}
Can anyone show how it’s possible to collect the StateFlow
value in Swift, on the iosApp side?
I am looking for a way to connect the StateFlow value to an ObservableObject.Christian S.
09/18/2020, 7:16 PMMarc Knaup
10/04/2020, 5:26 PMTrym Nilsen
10/05/2020, 4:09 PMcollect
with its own coroutine scope for each item it receives? E.g lets say I have an updating list of bicycles, a Flow<List<Bicycle>>
. For each bicycle I can request a price, unfortunately this API requires me to request the prices for a single bicycle at a time. This is all fine i can spin up some requests using a couple of async
builders, no problem so far. The API for requesting prices might be slow at times, slower than the stream of new bicycles that becomes available or taken. This however creates the issue of still be working on fetching prices for bikes that are no longer available or taken. Is there any way to create a coroutine scope for each item collected from my Flow<List<Bicycle>>
and then cancel it when a new item is collected, If I understand correctly due to the sequential nature of flows, the next item is not requested until the previous is finished/completed?Alex
10/13/2020, 9:53 AMMutableStateFlow
? I have been using ConflatedBroadcastChannel
, but would like to move to flow completely. I know there is Flow
itself, but I have not found a way to send values to it from outside the creation lambda.Marc Knaup
10/13/2020, 9:47 PMstateIn
need an initial value?
I keep writing .stateIn(…, …, initialValue = null).filterNotNull()
Marc Knaup
10/13/2020, 9:47 PMstateIn
need an initial value?
I keep writing .stateIn(…, …, initialValue = null).filterNotNull()
ephemient
10/13/2020, 9:55 PMStateFlow.value
return without an initial value?Marc Knaup
10/13/2020, 9:59 PMsuspend fun value()
instead.ephemient
10/13/2020, 10:00 PMgildor
10/13/2020, 11:19 PMMarc Knaup
10/14/2020, 12:41 AMephemient
10/14/2020, 12:51 AMMarc Knaup
10/14/2020, 1:21 AM.conflate().distinctUntilChanged()
before each .shareIn
to achieve the same behavior.gildor
10/14/2020, 1:52 AMSo it was designed for a subset of potential use casesExactly. as any other abstraction I believe, has some use cases in mind
MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)