Hakon Grotte
10/21/2022, 1:10 PMRetry-After
header if it is not ready. (The same http request can be repeated after a delay.)
Moreover, the HTTP endpoint does not process the requests that created the polling ids sequentially.
When a resource is ready it should be produced to a new Kafka topic.
I do realize that Kafka Streams might be a suitable option here, but it sounds quite difficult with the async nature of the http endpoint.
I feel like Kotlin coroutines could benefit this implementation and with some research/thought I have made a "Proof of concept". Following is a very psuedo-ish code flow (pun intended):
fun main = runBlocking { <-- to obtain coroutinescope
// 1. method running 'while loop' consuming Kafka topic inside flow builder. returns Flow<PollingId>
// 2. method consuming flow from "1." and producing to new SharedFlow<Resource>
// flow.onEach { pollingId ->
// CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
// val resource = apiService.pollUntilReady(pollingid)
// resourceFlow.send(resource)
// }.collect()
^-- Launch each polling job in new coroutine job to obtain parallel processing. Should perhaps .map{} instead of .onEach {} to obtain job references for cleanup etc.
// 3. method consuming/collecting SharedFlow<Resource> that method "2." sends/produces to, and sending resources to new Kafka topic
}
Some feedback on the proposed implementation would be appreciated 🙂 Any pitfalls, improvements, is this a bad implementation, etc.