Hello, I'm wondering how I can improve the followi...
# coroutines
j
Hello, I'm wondering how I can improve the following implementation. The function executes several search APIs in different directories. I want to update my list with each API return and also be notified when everything is finished. The code is a bit simplified, I normally have a lot more APIs and I have to process the results before adding them to the list to, among other things, remove duplicates. I wonder if it isn't more idiomatic with Flow. But I haven't had a chance to get into it yet.
Copy code
private var mutex = Mutex()
private val results = mutableListOf<Any>()

private suspend fun addResultsToList(data: List<Any>) {
    mutex.withLock { results.addAll(data) }
}

fun search(query: String, searchCriteria: SearchCriteria, delayMs: Long = 500L) {
    searchJob?.cancel()
    searchJob = searchScope.launch {

            delay(delayMs)

            val apis = listOf(
                async { addResultsToList(api1(query, searchCriteria)) },
                async { addResultsToList(api2(query, searchCriteria)) },
                async { addResultsToList(api3(query, searchCriteria)) },
                async { addResultsToList(api4(query, searchCriteria)) }
            )

            apis.awaitAll()
    }
}
s
Hi, do you need to be able to to receive intermediate search results? For example, do you need to be able to access the result of
api1
call before
api2-4
results are available?
s
☝️ very good question. There are for sure ways to simplify that code, but it all depends on what the requirements are. Who else will access the
results
list, and when?
r
@jhiertz you could use a
channelFlow
+
debounce
Copy code
private val delay = 500.milliseconds
private val search = Channel<Search>()

fun search(query: String, searchCriteria: SearchCriteria) {
    search.trySendBlocking(Search(query, searchCriteria))
}

val results: Flow<List<Any>> = search.receiveAsFlow()
    .debounce(delay)
    .flatMapLatest { (query, searchCriteria) ->
        api(query, searchCriteria)
    }

private fun api(query: String, searchCriteria: SearchCriteria): Flow<List<Any>> {
    return channelFlow {
        launch { send(api1(query, searchCriteria)) }
        launch { send(api2(query, searchCriteria)) }
        launch { send(api3(query, searchCriteria)) }
        launch { send(api4(query, searchCriteria)) }
    }.runningFold(emptyList()) { acc, value -> acc + value }
}
I don't know if there are any signals to indicate that all of the api calls finished, but you could create your own type for that and this should get you started in the right direction.
j
First, I created an implementation where I waited for the results of all the APIs. But some of them can take a while, and I need to display the results as soon as they're available. The `results`list is used in a view to display search results as they come in.
s
Do you need to observe the
results
list from multiple places, e.g., from several screens? If not, you can delete
val results
and your
api
method should return a
Flow
emitting intermediate search results.
☝️ 1
j
There are several screens that use this search method, with different criteria. But never at the same time
s
implementation as suggested by Nathan:
Copy code
class SearchCriteria()

    fun api1(query: String, searchCriteria: SearchCriteria) = listOf("foo", "bar")
    fun api2(query: String, searchCriteria: SearchCriteria) = listOf("baz", "1")
    fun api3(query: String, searchCriteria: SearchCriteria) = listOf("2", "3")

    private fun search(query: String, searchCriteria: SearchCriteria): Flow<List<String>> {
        return channelFlow {
            launch { send(api1(query, searchCriteria)) }
            launch { send(api2(query, searchCriteria)) }
            launch { send(api3(query, searchCriteria)) }
        }.runningFold(emptyList()) { acc, value -> acc + value }
    }
when the flow completes, it indicates that all search results have been collected
kodee happy 1
I tried to cover it with a test:
Copy code
@Test
    fun searchTest() = runTest {
        val expectedApiSearchResults = listOf(
            listOf("foo", "bar"),
            listOf("baz", "1"),
            listOf("2", "3")
        )

        val expectedFinalSearchResult = expectedApiSearchResults.flatten()

        search(query = "testquery", searchCriteria = SearchCriteria()).test {
            // collect intermediate search results
            val intermediateSearchResults = mutableListOf<List<String>>()

            // verify there are exactly 4 intermediate search results (including an empty list)
            repeat(expectedApiSearchResults.size + 1) { i ->
                // await the next intermediate search result
                val intermediateSearchResult = awaitItem()

                // verify that the intermediate search results are subsets of the final search result
                assertTrue(
                    intermediateSearchResult.all {
                        it in expectedFinalSearchResult
                    }
                )

                intermediateSearchResults.add(intermediateSearchResult)

                if (i > 0) {
                    // verify that individual API results are emitted as part of the intermediate search results
                    val lastDiffBetweenIntermediateSearchResults = intermediateSearchResults[i] - intermediateSearchResults[i - 1]
                    assertTrue(lastDiffBetweenIntermediateSearchResults in expectedApiSearchResults)
                }
            }

            // verify the final search result
            assertEquals(expectedFinalSearchResult, intermediateSearchResults.last())

            awaitComplete()
        }
    }
j
@Stanislav Kral Thanks, it's a pleasant code to read I'll try this implementation