Edoardo Luppi
04/01/2020, 9:52 AMSequence<T>
myProviders
.asSequence()
.sortedBy(configService::getProviderOrder)
.flatMap { it.getCommitTypes(context.type) }
.map { CommitTypePsiElement(project, it) }
.mapIndexed(::CommitTypeLookupElement)
.distinctBy(CommitTypeLookupElement::getLookupString)
.forEach(rs::addElement)
Each provider
call (it.getCommitTypes(context.type)
) returns a Collection<T>
and cannot be converted to Flows.
I would like to call every provider asynchronously and then flatMap
the results, and collect
.
How would you approach this in the code above?
This part
.map { CommitTypePsiElement(project, it) }
.mapIndexed(::CommitTypeLookupElement)
.distinctBy(CommitTypeLookupElement::getLookupString)
.forEach(rs::addElement)
can remain sequential / blocking.marstran
04/01/2020, 9:56 AMcoroutineScope {
myProviders
.sortedBy(configService::getProviderOrder)
.map{ async { it.getCommitTypes(context.type) } }
.awaitAll()
.asSequence()
.flatten()
.map { CommitTypePsiElement(project, it) }
.mapIndexed(::CommitTypeLookupElement)
.distinctBy(CommitTypeLookupElement::getLookupString)
.forEach(rs::addElement)
}
Edoardo Luppi
04/01/2020, 9:57 AMgetCommitTypes
throws? The entire chain would fail, right?marstran
04/01/2020, 10:02 AMawaitAll
. Those which hasn't completed yet will be cancelled.Edoardo Luppi
04/01/2020, 10:04 AMEdoardo Luppi
04/01/2020, 12:00 PMrunBlocking {
myProviders
.sortedBy(configService::getProviderOrder)
.asFlow()
.flatMapMerge { it.getCommitTypes(...).asFlow() }
.onErrorCollect(emptyFlow())
.flowOn(Dispatchers.Default)
.toList()
.map { CommitTypePsiElement(project, it) }
.mapIndexed(::CommitTypeLookupElement)
.distinctBy(CommitTypeLookupElement::getLookupString)
.forEach(rs::addElement)
}
But it seems the calls are still blocking.Edoardo Luppi
04/01/2020, 12:08 PMEdoardo Luppi
04/01/2020, 12:11 PMEdoardo Luppi
04/01/2020, 12:12 PMmarstran
04/01/2020, 12:12 PMmarstran
04/01/2020, 12:12 PMEdoardo Luppi
04/01/2020, 12:13 PMmarstran
04/01/2020, 12:13 PMEdoardo Luppi
04/01/2020, 12:13 PMEdoardo Luppi
04/01/2020, 12:14 PMmarstran
04/01/2020, 12:17 PMbuffer
to get around it though. Something like this:
myProviders
.sortedBy(configService::getProviderOrder)
.asFlow()
.map {
flow {
emit(it.getCommitTypes())
}.onErrorCollect(emptyFlow())
}
.buffer(10)
.flattenConcat()
.flatMapConcat { it.asFlow() }
.flowOn(Dispatchers.Default)
.toList()
.map { CommitTypePsiElement(project, it) }
.mapIndexed(::CommitTypeLookupElement)
.distinctBy(CommitTypeLookupElement::getLookupString)
.forEach(rs::addElement)
marstran
04/01/2020, 12:18 PMmarstran
04/01/2020, 12:23 PMasync
version was cleaner. I don't think it makes really sense to use a lazy Flow
when you want all the elements to be computed in parallel anyway. We're kinda trying to force it to be eager.Edoardo Luppi
04/01/2020, 12:23 PMfun fillResultSetWithTypes(context: TypeCommitContext) {
val rs = resultSet.withPrefixMatcher(context.type)
safelyReleaseSemaphore(parameters.process)
launch {
val currentTimeMillis = System.currentTimeMillis()
TYPE_EP.getExtensions(project)
.sortedBy(configService::getProviderOrder)
.asFlow()
.map {
flow {
emit(it.getCommitTypes(context.type))
}.onErrorCollect(emptyFlow())
}
.flattenConcat()
.flatMapConcat { it.asFlow() }
.flowOn(Dispatchers.Default)
.toList()
.map { CommitTypePsiElement(project, it) }
.mapIndexed(::CommitTypeLookupElement)
.distinctBy(CommitTypeLookupElement::getLookupString)
.forEach(rs::addElement)
val l = System.currentTimeMillis() - currentTimeMillis
printlnError("Time: $l")
}
}
With two fake Providers which call Thread.sleep(2000)
the final time is always 4000+ ms.
Only the async
versions did actually workEdoardo Luppi
04/01/2020, 12:27 PMEdoardo Luppi
04/01/2020, 12:27 PMEdoardo Luppi
04/01/2020, 12:34 PMflattenConcat
is what screws it up. With flattenMerge
it works.marstran
04/01/2020, 12:38 PMbuffer
.Edoardo Luppi
04/01/2020, 12:39 PMmarstran
04/01/2020, 12:39 PMEdoardo Luppi
04/01/2020, 12:39 PM.flatMapMerge { it.getCommitTypes(...).asFlow() }
Edoardo Luppi
04/01/2020, 12:41 PM.map {
flow {
emit(it.getCommitTypes(context.type))
}.onErrorCollect(emptyFlow())
}
And
.map {
flowOf(it.getCommitTypes(context.type))
.onErrorCollect(emptyFlow())
}
marstran
04/01/2020, 12:41 PMasFlow
or flowOf
will first eagerly execute it.getCommitTypes
, and then create a flow from it.marstran
04/01/2020, 12:42 PMflow { emit(x) }
will create a flow immediately and only emit items when it.getCommitTypes
is done.marstran
04/01/2020, 12:43 PMflatMapMerge
will actually perform all actions, when it only wants to "trigger" it (which it does by collecting the nested flow).marstran
04/01/2020, 12:43 PMEdoardo Luppi
04/01/2020, 12:44 PMflowOf
I see
public fun <T> flowOf(vararg elements: T): Flow<T> =
unsafeFlow {
for (element in elements) {
emit(element)
}
}
Which is basically the same as
flow {
emit(it.getCommitTypes(context.type))
}
marstran
04/01/2020, 12:44 PMasFlow
or flowOf
, you get a Flow<Flow<T>>
, but all the work has already been done.
With flow { emit(it) }
, you get a Flow<Flow<T>>
, but no work has been done yet.marstran
04/01/2020, 12:46 PMit.getCommitTypes()
has already been called when you call flowOf
. The result is passed by value.Edoardo Luppi
04/01/2020, 12:46 PMEdoardo Luppi
04/01/2020, 12:46 PMmarstran
04/01/2020, 12:47 PMEdoardo Luppi
04/01/2020, 12:47 PMEdoardo Luppi
04/01/2020, 12:57 PMEdoardo Luppi
04/01/2020, 12:58 PMonError*
marstran
04/01/2020, 12:58 PMEdoardo Luppi
04/01/2020, 2:46 PMawaitAll
?Edoardo Luppi
04/01/2020, 2:46 PMCompletableFuture#exceptionally
here!