https://kotlinlang.org logo
#flow
Title
# flow
j

Javier

01/12/2021, 9:59 PM
I am emitting a flow of
Either
, but it seems like the first emission is being mixed if there are more than one emission, so store and store2 are different instances with should have the same emissions:
Copy code
store.stream().take(1).toList().also { println(it) }
store2.stream().take(2).toList().also { println(it) }
That snippet prints:
Copy code
[Right(right=Success(data=[1, 2, 3, 4], isLoading=true))]
[Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=true)), Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=false))]
the flow block is:
Copy code
flow {
    emit(sourceOfTruth.flow().first().asSuccessLoading<F, S>())

    when (val fetch = fetch()) {
        is ResourceFailure<F, S> -> emit(fetch)
        is ResourceSuccess<S> -> sourceOfTruth.insert(fetch.right.data)
    }

    emitAll(sourceOfTruth.flow().map { it.asSuccess() })
}
Even, I am not emitting an
Either
which has the property
isLoading = true
with
1, 2, 3, 4, 7, 8
, because
7
and
8
are inserted after the first emission, when it is fetched, and after the first emission, should not be more
isLoading = true
As I can see, first emit is waiting until fetch() is completed
m

Marc Knaup

01/13/2021, 9:31 AM
Are you emitting a mutable list that you modify after you've emitted it?
j

Javier

01/13/2021, 11:10 AM
Exactly
Maybe the problem is the emission is so fast?
well, I think I added delays and the first emit doesnt emit until the fetch is completed
m

Marc Knaup

01/13/2021, 11:12 AM
You shouldn’t emit a mutable list but only a copy of that list. Otherwise you modify the list long after it was sent to Flow collectors.
j

Javier

01/13/2021, 11:30 AM
Yeah, I can't try to change it but I think it was waiting until fetch is completed, instead of emitting the first one
m

Marc Knaup

01/13/2021, 11:32 AM
I think the problem is in your
sourceOfTruth
. It emits mutable state. The problem is not that you emit the wrong thing (list). It’s just that that list is changed after it was emitted.
So both
Either
point to the same list that obviously must contain the same elements.
What does your
sourceOfTruth
look like?
j

Javier

01/13/2021, 12:24 PM
But why it waits to emit?
Really the first emission is instant, the fetch takes 3000 ms
m

Marc Knaup

01/13/2021, 12:25 PM
Where exactly does it wait? If
sourceOfTruth
emits instantly then • the
take(1)
example should be instant • the
take(2)
example should need 3000ms
j

Javier

01/13/2021, 12:26 PM
yeah, but even collecting instead of using take
the first emission takes 3000 ms and mixed the mutable list
m

Marc Knaup

01/13/2021, 12:27 PM
Copy code
store.stream().take(1).toList().also { println(it) }
takes 3000ms?
j

Javier

01/13/2021, 12:27 PM
that no
it is instant, and it works fine
but store.stream.collect { println(it) }
the first emission should be instant, but it waits until second
I think the problem was using
emit(sourceOfTruth.flow().first().asSuccessLoading<F, S>())
m

Marc Knaup

01/13/2021, 12:28 PM
Maybe it has no data to emit?
j

Javier

01/13/2021, 12:29 PM
I moved it to avoid
sourceOfTruth.flow().first()
to something like
sourceOfTruth.get()
without sharing the same flow and it works but I still would like to know what was happening
But why?
there is the same data
they are fakes data sources
so if take(1) prints 1, 2, 3, 4
the first .collect {} should prints 1, 2, 3, 4 instead of waiting until fetch completes
m

Marc Knaup

01/13/2021, 12:30 PM
I don’t have enough code to better understand it at this code. Could be something within
sourceOfTruth
. I also don’t know the difference between
store
and
store2
, or if you reuse any stores in your tests.
j

Javier

01/13/2021, 12:31 PM
store and store2 are different instantiation of the same thing
Mmm I should save the process so I could share it here, sorry 😕
m

Marc Knaup

01/13/2021, 12:32 PM
It’s probably enough code to create a Gist :)
But in theory if
take(1).toList()
is instant then should be
.first()
and
.collect { println(it) }
.
Unless something is blocking the thread
Does your
fetch
use
delay
or
Thread.sleep
? 🤔
j

Javier

01/13/2021, 12:36 PM
yeah, it uses
delay
but I think without the
delay
I was having the same problem
m

Marc Knaup

01/13/2021, 12:36 PM
Weird.
take(n)
uses
.collect
too so both should work.
j

Javier

01/13/2021, 12:37 PM
but, the fetch is running after, so why it is able to "pause"the first emission if the delay is not even called?
Yeah, strange behavior
m

Marc Knaup

01/13/2021, 12:38 PM
put some
print
into the emitting code and see where it stops
j

Javier

01/13/2021, 12:38 PM
yeah, this is the increible thing
I put a print
Copy code
flow {
    val source = sourceOfTruth.flow().first().asSuccessLoading<F, S>()
    println(source)

    emit(source)

    when (val fetch = fetch()) {
        is ResourceFailure<F, S> -> emit(fetch)
        is ResourceSuccess<S> -> sourceOfTruth.insert(fetch.right.data)
    }

    emitAll(sourceOfTruth.flow().map { it.asSuccess() })
}
the println(source), works perfectly, 1, 2, 3, 4, but the first emission of source, includes 1, 2, 3, 4, 7, 8, even knowing that fetch() will run 3000ms later
m

Marc Knaup

01/13/2021, 12:40 PM
How do you know? When do you print the emission?
j

Javier

01/13/2021, 12:40 PM
really strange, I think the problem was using flow().first(), it is suspending for some reason if the flow blocks doesnt end with it
because it prints this, supposing store.stream().take(2) or collecting:
Copy code
[Right(right=Success(data=[1, 2, 3, 4], isLoading=true))]
[Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=true)), Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=false))]
the first print is the println(source), the second print is take(2)
m

Marc Knaup

01/13/2021, 12:42 PM
No, here again you print the list of the first emission after it has been messed up by
sourceOfTruth.insert
.
j

Javier

01/13/2021, 12:42 PM
but with collect I had the same problem
store.stream().collect{} prints
Copy code
Right(right=Success(data=[1, 2, 3, 4], isLoading=true)) // internal println()
Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=true)) // first collect println(), and it takes 3000 ms
Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=false)) // second collect println()
m

Marc Knaup

01/13/2021, 12:43 PM
With
fetch
having a delay?
j

Javier

01/13/2021, 12:43 PM
with fetch having a delay, but I think without it I had the same problem
but, why emit waits until the fetch with the delay is finished?
m

Marc Knaup

01/13/2021, 12:44 PM
Yeah there are two issues here. a) The first collection shouldn’t have a delay. b) The list shouldn’t be mutable.
I have no idea how a) happens without more code.
j

Javier

01/13/2021, 12:51 PM
Yeah, sorry, the problem is I changed the LocalDataSource code and I dont use flow().first() now, but I havent the previous state with the problem, if I have it I could share it 😕