Dariusz Kuc
01/20/2021, 5:54 PMChannel
based pipelines and was wondering whether you have any suggestions to make it better.
NOTE: for this exercise want to stick to channels only
Consider following basic example - fetch list of items and for each one of them fetch additional information (details + reviews) that will be used to final result
val result = mutableListOf()
val summaries = retrieveSummaries()
for (summary in summaries) {
val details = retrieveDetails(summary.id)
val reviews = retrieveReviews(summary.id)
result.add(buildResult(summary, details, reviews)
}
return result
I could easily parallelize the above by doing something like
summaries.map { summary ->
async {
val details = async { retrieveDetails(summary.id) }
val reviews = async { retrieveReviews(summary.id) }
buildResult(summary, details.await(), reviews.await())
}
}
Currently trying to figure out what would be the "best" way to achieve the same using Channel
. "Simplest" would be to do a sequence of channels
val summaries: ReceiveChannel<Summary> = summariesChannel()
val detailsAndSummaries: ReceiveChannel<Pair<Summary, Detail>> = detailsAndSummariesChannel(summaries)
val result: ReceiveChannel<Result> = resultChannel(detailsAndSummaries)
Looks like BroadcastChannel
is marked obsolete but I guess logically it would make sense to broadcast summaries and then send both details and reviews from separate channels but I am unsure how to read from multiple channels at once (i.e. buildResult
would need results from 3 channels). Any ideas/suggestions?Dominaezzz
01/20/2021, 6:14 PMselect
and channel.onReceive
.Dariusz Kuc
01/20/2021, 6:45 PMselect
is able to pick first available element from X channels -> my use case in the above is that I need to consume messages from 3 channels to create a single result typezip
operator (which is marked as deprecated on channels)jimn
01/24/2021, 3:38 AMDariusz Kuc
01/24/2021, 7:05 AM