is there a better way to check if a flow is emtpy ...
# coroutines
d
is there a better way to check if a flow is emtpy then,
Copy code
flow.count() == 0 ?
g
Flow is lazy and cold, there is no way to know how many elements are there until you start consuming it and consume all elements, so there is no other way, it in general look as not a correct approach
also this code may never return, it’s valid for flow never send complete/error
what is your use case for it?
d
have a connection from webflux (spring) to cosmos db and this returns a Flow<T> but if there are no elements i want to throw a custum exception from the backend and not an emtpy list (cause i am returning an APPLICATION_JSON)
g
but if there are no elements
But Flow<T> from database is not a container for list of elements, it probably a stream of updates of database
if you want to retrive current state of DB, use Flow as return type is not the best solution, it should be a suspend function
d
like suspend fun : List<T> instead of fun : Flow<T> ?
all docs say to use Flux for > 1 element for a repository as i ve seen so far
g
yes, like suspend for List
I really not sure how this Flow works, it’s asyncronous, will it ever complete?
d
yeah ll complete
count() is a collector (is also a suspend function)
its working so far but maybe a overkill here tbh 🙂
g
if it does, it’s quite strange, but then you should do just this: val t = flow.toList() t.size() == 0
It’s not overkill, I just think such API is incorrect. to expose Flow<T> if actually it represent List<T>
same pattern
g
Shouldn’t in webflux you do this: select().blah.fetch().awaitOneOrNull()
or just await(), for liost
d
Copy code
@GetMapping("/")
	fun findAll(): Flow<User> =
		userRepository.findAll()
yeah i am working with await for single events
replacement for Mono from reactor package
we are pretty new as a team for all this topics but we have much fun :=
g
so it looks that webflux really asyncronously request all items
inteeresting
Copy code
findAll().toList()
is essentially what you need
but I still not sure why the do not have Mono<List<T>> together with suspend List<T> instead of Flux<T> and Flow<T>
because essentially, Flow or Flux as result doesn’t work for you case, you have to return whole result in one JSON
so I really don’t see other solution except findAll().toList()
d
yeah this is exactly what happens under the hood => return as whole json
g
yeah, but I quite surprised that such explicit behaviour is possible to achive only implicitly assuming how Flow<T> or Flux<T> is implemented under the hood
d
so far its working quite nice
the count() == 0 feels ugly but maybe i have to return an empty list and frontend can do the differeniation
g
my problem with count() == 0 is that you anyway need data, count() consumes flow, so if you do this, it would be incorrect:
Copy code
fun getData(): Flow<T> {
  require(flow.count() > 0) { “DB is empty }
  // at this monent DB is changed, items deleted
  return flow // you still have to request flow again
}
```
so maybe this will work for you: return flow.onEmpty { throw DbException(“DB is empty”) }
so consumer will receive exception if flow is empty
d
cool i am gonna try this approach
c
whats the downside of always converting to a list and throwing if its empty?
g
Yeah, I think it what you need exactly, and very reactive, but still a bit strange for me, suspend List<T> is what actually needed and it has nothing to do with blocking code
Downside is that you will change signature of the method
which may be a problem if conumer expects Flow<T>
and does some processing on each element As it pointed out on this article from Spring blog, it can be a problem for processing, to load all data to memory
but if only what consumer does is get result and convert to Json, then from API point of view I would rather use suspend List
Also it depends on how Webflux implemented, maybe such way to receive every item of result asyncronousle is more optimal solution, they somehow parallel it and it supported by DB driver
c
yeah, if the normal path is converting it to a list at some point thats where it should throw
g
I’m just worry about such approach in general, you received some Flow<T>, if your code has no option as get all items, it becomes dangerous, if someone changes upstream implementation and it will never complete In such cases more safe probably to have some timeouts when you collecting data to avoid forever suspended code, which, same as deadlock, is very nasty bug and hard to debug So provide a clear API to receive List of data looks as good solution, and let user decide, use List<T> or Flow<T> depending on case
l
I think you want to return Flow<User> because some database drivers can read 1 row at a time lazily
g
yes, this what I meant by optimisations, that it supported on level of DB driver, it actually pretty cool, but if you application sends all results together, something like HTTP, and not some streaming socket, this lazy select will not help and probably will be a bit slower
d
true Andrey
c
in htttp you can stream the reply and even if you don’t you can start to build the reply while data is still coming in.
g
which not necessary always possible (like case an error for 0 elements) and not always faster, especially if building the reply is not heavy operation
c
even if you just want to build a json string from a list of replies you can avoid creating an immediate List
g
it’s true
until you have something like
count
property in your json