I am emitting a flow of `Either`, but it seems lik...
# flow
j
I am emitting a flow of
Either
, but it seems like the first emission is being mixed if there are more than one emission, so store and store2 are different instances with should have the same emissions:
Copy code
store.stream().take(1).toList().also { println(it) }
store2.stream().take(2).toList().also { println(it) }
That snippet prints:
Copy code
[Right(right=Success(data=[1, 2, 3, 4], isLoading=true))]
[Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=true)), Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=false))]
the flow block is:
Copy code
flow {
    emit(sourceOfTruth.flow().first().asSuccessLoading<F, S>())

    when (val fetch = fetch()) {
        is ResourceFailure<F, S> -> emit(fetch)
        is ResourceSuccess<S> -> sourceOfTruth.insert(fetch.right.data)
    }

    emitAll(sourceOfTruth.flow().map { it.asSuccess() })
}
Even, I am not emitting an
Either
which has the property
isLoading = true
with
1, 2, 3, 4, 7, 8
, because
7
and
8
are inserted after the first emission, when it is fetched, and after the first emission, should not be more
isLoading = true
As I can see, first emit is waiting until fetch() is completed
m
Are you emitting a mutable list that you modify after you've emitted it?
j
Exactly
Maybe the problem is the emission is so fast?
well, I think I added delays and the first emit doesnt emit until the fetch is completed
m
You shouldn’t emit a mutable list but only a copy of that list. Otherwise you modify the list long after it was sent to Flow collectors.
j
Yeah, I can't try to change it but I think it was waiting until fetch is completed, instead of emitting the first one
m
I think the problem is in your
sourceOfTruth
. It emits mutable state. The problem is not that you emit the wrong thing (list). It’s just that that list is changed after it was emitted.
So both
Either
point to the same list that obviously must contain the same elements.
What does your
sourceOfTruth
look like?
j
But why it waits to emit?
Really the first emission is instant, the fetch takes 3000 ms
m
Where exactly does it wait? If
sourceOfTruth
emits instantly then • the
take(1)
example should be instant • the
take(2)
example should need 3000ms
j
yeah, but even collecting instead of using take
the first emission takes 3000 ms and mixed the mutable list
m
Copy code
store.stream().take(1).toList().also { println(it) }
takes 3000ms?
j
that no
it is instant, and it works fine
but store.stream.collect { println(it) }
the first emission should be instant, but it waits until second
I think the problem was using
emit(sourceOfTruth.flow().first().asSuccessLoading<F, S>())
m
Maybe it has no data to emit?
j
I moved it to avoid
sourceOfTruth.flow().first()
to something like
sourceOfTruth.get()
without sharing the same flow and it works but I still would like to know what was happening
But why?
there is the same data
they are fakes data sources
so if take(1) prints 1, 2, 3, 4
the first .collect {} should prints 1, 2, 3, 4 instead of waiting until fetch completes
m
I don’t have enough code to better understand it at this code. Could be something within
sourceOfTruth
. I also don’t know the difference between
store
and
store2
, or if you reuse any stores in your tests.
j
store and store2 are different instantiation of the same thing
Mmm I should save the process so I could share it here, sorry 😕
m
It’s probably enough code to create a Gist :)
But in theory if
take(1).toList()
is instant then should be
.first()
and
.collect { println(it) }
.
Unless something is blocking the thread
Does your
fetch
use
delay
or
Thread.sleep
? 🤔
j
yeah, it uses
delay
but I think without the
delay
I was having the same problem
m
Weird.
take(n)
uses
.collect
too so both should work.
j
but, the fetch is running after, so why it is able to "pause"the first emission if the delay is not even called?
Yeah, strange behavior
m
put some
print
into the emitting code and see where it stops
j
yeah, this is the increible thing
I put a print
Copy code
flow {
    val source = sourceOfTruth.flow().first().asSuccessLoading<F, S>()
    println(source)

    emit(source)

    when (val fetch = fetch()) {
        is ResourceFailure<F, S> -> emit(fetch)
        is ResourceSuccess<S> -> sourceOfTruth.insert(fetch.right.data)
    }

    emitAll(sourceOfTruth.flow().map { it.asSuccess() })
}
the println(source), works perfectly, 1, 2, 3, 4, but the first emission of source, includes 1, 2, 3, 4, 7, 8, even knowing that fetch() will run 3000ms later
m
How do you know? When do you print the emission?
j
really strange, I think the problem was using flow().first(), it is suspending for some reason if the flow blocks doesnt end with it
because it prints this, supposing store.stream().take(2) or collecting:
Copy code
[Right(right=Success(data=[1, 2, 3, 4], isLoading=true))]
[Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=true)), Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=false))]
the first print is the println(source), the second print is take(2)
m
No, here again you print the list of the first emission after it has been messed up by
sourceOfTruth.insert
.
j
but with collect I had the same problem
store.stream().collect{} prints
Copy code
Right(right=Success(data=[1, 2, 3, 4], isLoading=true)) // internal println()
Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=true)) // first collect println(), and it takes 3000 ms
Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=false)) // second collect println()
m
With
fetch
having a delay?
j
with fetch having a delay, but I think without it I had the same problem
but, why emit waits until the fetch with the delay is finished?
m
Yeah there are two issues here. a) The first collection shouldn’t have a delay. b) The list shouldn’t be mutable.
I have no idea how a) happens without more code.
j
Yeah, sorry, the problem is I changed the LocalDataSource code and I dont use flow().first() now, but I havent the previous state with the problem, if I have it I could share it 😕