Hello! I am tasked with creating a "polling servic...
# coroutines
h
Hello! I am tasked with creating a "polling service" that consumes "polling ids" from a Kafka topic and runs HTTP requests to an API with these ids. Assuming no failures, the HTTP endpoint returns: • HTTP.OK and the applicable resource if it is ready • HTTP.Accepted and
Retry-After
header if it is not ready. (The same http request can be repeated after a delay.) Moreover, the HTTP endpoint does not process the requests that created the polling ids sequentially. When a resource is ready it should be produced to a new Kafka topic. I do realize that Kafka Streams might be a suitable option here, but it sounds quite difficult with the async nature of the http endpoint. I feel like Kotlin coroutines could benefit this implementation and with some research/thought I have made a "Proof of concept". Following is a very psuedo-ish code flow (pun intended):
Copy code
fun main = runBlocking { <-- to obtain coroutinescope
  // 1. method running 'while loop' consuming Kafka topic inside flow builder. returns Flow<PollingId>

 // 2. method consuming flow from "1." and producing to new SharedFlow<Resource> 
 // flow.onEach { pollingId ->
 //  CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch { 
 //    val resource = apiService.pollUntilReady(pollingid)
 //    resourceFlow.send(resource)
 //  }.collect()
^-- Launch each polling job in new coroutine job to obtain parallel processing. Should perhaps .map{} instead of .onEach {} to obtain job references for cleanup etc.

// 3. method consuming/collecting SharedFlow<Resource> that method "2." sends/produces to, and sending resources to new Kafka topic
}
Some feedback on the proposed implementation would be appreciated 🙂 Any pitfalls, improvements, is this a bad implementation, etc.