Morning :smile: I currently have a sequential oper...
# announcements
e
Morning πŸ˜„ I currently have a sequential operation which uses a
Sequence<T>
Copy code
myProviders
  .asSequence()
  .sortedBy(configService::getProviderOrder)
  .flatMap { it.getCommitTypes(context.type) }
  .map { CommitTypePsiElement(project, it) }
  .mapIndexed(::CommitTypeLookupElement)
  .distinctBy(CommitTypeLookupElement::getLookupString)
  .forEach(rs::addElement)
Each
provider
call (
it.getCommitTypes(context.type)
) returns a
Collection<T>
and cannot be converted to Flows. I would like to call every provider asynchronously and then
flatMap
the results, and
collect
. How would you approach this in the code above? This part
Copy code
.map { CommitTypePsiElement(project, it) }
.mapIndexed(::CommitTypeLookupElement)
.distinctBy(CommitTypeLookupElement::getLookupString)
.forEach(rs::addElement)
can remain sequential / blocking.
m
Maybe something like this?
Copy code
coroutineScope {
  myProviders
    .sortedBy(configService::getProviderOrder)
    .map{ async { it.getCommitTypes(context.type) } }
    .awaitAll()
    .asSequence()
    .flatten()
    .map { CommitTypePsiElement(project, it) }
    .mapIndexed(::CommitTypeLookupElement)
    .distinctBy(CommitTypeLookupElement::getLookupString)
    .forEach(rs::addElement)
}
e
@marstran mmmh, interesting, thanks! Just to understand, what would happen if one call to
getCommitTypes
throws? The entire chain would fail, right?
m
Yes. It will throw on
awaitAll
. Those which hasn't completed yet will be cancelled.
e
@marstran okay! I will try to come up with a version which doesn't abort the other operations. This has been a good starting point πŸ˜‰
@marstran Sorry if I bother you again! The example you gave me works perfectly and I adapted it. Now, after reading more on Coroutines and Flows I was trying to move to Flows (I'm used to RxJS). I've come up with
Copy code
runBlocking {
  myProviders
    .sortedBy(configService::getProviderOrder)
    .asFlow()
    .flatMapMerge { it.getCommitTypes(...).asFlow() }
    .onErrorCollect(emptyFlow())
    .flowOn(Dispatchers.Default)
    .toList()
    .map { CommitTypePsiElement(project, it) }
    .mapIndexed(::CommitTypeLookupElement)
    .distinctBy(CommitTypeLookupElement::getLookupString)
    .forEach(rs::addElement)
}
But it seems the calls are still blocking.
Oh btw, let me know if you prefer this to be on StackOverflow.
@marstran yes that's true. But doesn't even your version do it?
Or the await all considers the order of the DeferredResults?
m
Sorry, I deleted the comment. πŸ˜› My original version preserves the order.
Yes, it does
e
@marstran oh okay! Yeah, I would need a concat to preserve the order, but then I lose the asynchronicity I think
m
Yes, that's what I was thinking as well.
e
At least the docs says concat wait for the coroutine to end before starting another one
But I'd need to come up with a better sorting strategy anyway, so let's not focus on the order πŸ˜„
m
Correct. You may be able to use
buffer
to get around it though. Something like this:
Copy code
myProviders
    .sortedBy(configService::getProviderOrder)
    .asFlow()
    .map { 
      flow { 
        emit(it.getCommitTypes()) 
      }.onErrorCollect(emptyFlow()) 
    }
    .buffer(10)
    .flattenConcat()
    .flatMapConcat { it.asFlow() }
    .flowOn(Dispatchers.Default)
    .toList()
    .map { CommitTypePsiElement(project, it) }
    .mapIndexed(::CommitTypeLookupElement)
    .distinctBy(CommitTypeLookupElement::getLookupString)
    .forEach(rs::addElement)
Not sure if it works though. It's getting kinda complex πŸ˜›
I think the
async
version was cleaner. I don't think it makes really sense to use a lazy
Flow
when you want all the elements to be computed in parallel anyway. We're kinda trying to force it to be eager.
πŸ˜… 1
e
@marstran Thanks! So to give you more context, I'm running this inside an IntelliJ Plugin. Currently as I said above I'm using a sequence to call those Providers. To test coroutines "fast" I've done this
Copy code
fun fillResultSetWithTypes(context: TypeCommitContext) {
  val rs = resultSet.withPrefixMatcher(context.type)
  safelyReleaseSemaphore(parameters.process)

  launch {
    val currentTimeMillis = System.currentTimeMillis()
    TYPE_EP.getExtensions(project)
      .sortedBy(configService::getProviderOrder)
      .asFlow()
      .map {
        flow {
          emit(it.getCommitTypes(context.type))
        }.onErrorCollect(emptyFlow())
      }
      .flattenConcat()
      .flatMapConcat { it.asFlow() }
      .flowOn(Dispatchers.Default)
      .toList()
      .map { CommitTypePsiElement(project, it) }
      .mapIndexed(::CommitTypeLookupElement)
      .distinctBy(CommitTypeLookupElement::getLookupString)
      .forEach(rs::addElement)
    val l = System.currentTimeMillis() - currentTimeMillis
    printlnError("Time: $l")
  }
}
With two fake Providers which call
Thread.sleep(2000)
the final time is always 4000+ ms. Only the
async
versions did actually work
And yeah I understand the first version was cleaner. Just experimenting πŸ˜‰
As a last resort, I will go back to the "old" (let me say that) Java concurrency.
@marstran oh btw,
flattenConcat
is what screws it up. With
flattenMerge
it works.
m
But you're missing
buffer
.
e
@marstran yeah I can't use it, I'm on 1.2.21 and it seems missing
m
Oh, ok
e
Although I don't understand why flatMapMerge doesn't work...
Copy code
.flatMapMerge { it.getCommitTypes(...).asFlow() }
Or even what's the difference between
Copy code
.map {
  flow {
    emit(it.getCommitTypes(context.type))
  }.onErrorCollect(emptyFlow())
}
And
Copy code
.map {
  flowOf(it.getCommitTypes(context.type))
     .onErrorCollect(emptyFlow())
}
m
Using
asFlow
or
flowOf
will first eagerly execute
it.getCommitTypes
, and then create a flow from it.
flow { emit(x) }
will create a flow immediately and only emit items when
it.getCommitTypes
is done.
So in the first case,
flatMapMerge
will actually perform all actions, when it only wants to "trigger" it (which it does by collecting the nested flow).
Do you see what I mean?
e
@marstran I'm reading and trying to understand πŸ˜„ Looking at
flowOf
I see
Copy code
public fun <T> flowOf(vararg elements: T): Flow<T> = 
  unsafeFlow {
    for (element in elements) {
        emit(element)
    }
  }
Which is basically the same as
Copy code
flow {
  emit(it.getCommitTypes(context.type))
}
m
With
asFlow
or
flowOf
, you get a
Flow<Flow<T>>
, but all the work has already been done. With
flow { emit(it) }
, you get a
Flow<Flow<T>>
, but no work has been done yet.
No, it's not. Because
it.getCommitTypes()
has already been called when you call
flowOf
. The result is passed by value.
e
@marstran Oh heck... That's the basics. Sorry
Yeah I realized now hahaha
m
πŸ™‚
e
Let me try again to run that code!
@marstran yeah it works, as you said πŸ™‚
I wanted to try the Flow approach mostly because of the fluid error handling with
onError*
m
πŸ‘
πŸ‘ 1
e
@marstran Ok, I think I've got how everything works hahaha But damn, is using a try - catch block the only way to handle an error using
awaitAll
?
I miss the good
CompletableFuture#exceptionally
here!