Hi everyone I'm working with Kafka via Kotlin and ...
# server
a
Hi everyone I'm working with Kafka via Kotlin and we want to transform its
Future
implementation (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java) as
Deferred
. Could this be a solution?
Copy code
fun <T> Future<T>.asDeferred(dispatcher: CoroutineDispatcher = Dispatchers.Default): Deferred<T> {
    return GlobalScope.async(dispatcher) {
        get()
    }
}
r
The general approach to converting Future's is to use
suspendCoroutine
-- that away your suspending
await
for the Future's result is executed on the caller's scope. Here is how you could implement a suspending await for
KafkaFuture
--
FutureRecordMetadata
is probably pretty much the same:
Copy code
suspend inline fun <T> KafkaFuture<T>.await() = suspendCoroutine<T> { cont ->
  whenComplete { t, e ->
    if(e == null) cont.resume(t)
    else cont.resumeWithException(e)
  }
}
Though I just realized this relies on the
whenComplete
callback, which
FutureRecordMetadata
may or may not have.
Looking at
FutureRecordMetadata
it doesn't appear to have any callback, so its a basic blocking Future. That means you should probably run the async get on an IO scope.
a
Exactly
r
Check out the second response on this SO for a possible alternate approach that doesn't require a blocking wait on an IO thread: https://stackoverflow.com/questions/50793362/how-to-use-suspendcoroutine-to-turn-java-7-future-into-kotlin-suspending-functio
Yeah, that only works with a JDK8 CompletableFuture.
Note your approach uses
Dispatchers.Default
, not the IO dispatcher.
a
Is it better to use the IO as default value?
r
Yes,
get
is a blocking call which will tie up a thread until it completes. It should therefore run on the IO dispatcher.
a
So do you think that I should use the approach described in SO? Or can I just switch to the
IO
dispatcher?
r
I'd probably use the IO dispatcher. Might be a good question for #coroutines though.
b
Are `KafkaFuture`s cancellable? If so, you should wire that up using
suspendCancellableCoroutine
r
I believe they are, good tip
@bdawg.io Would this be the right implementation?
Copy code
suspend inline fun <T> KafkaFuture<T>.await() = suspendCancellableCoroutine<T> { cont ->
  cont.invokeOnCancellation { cancel(true) }
  whenComplete { t, e ->
    if(e == null) cont.resume(t)
    else cont.resumeWithException(e)
  }
}
b
@rocketraman That looks good. I wouldn’t pass
true
to the cancel though. Futures are fairly whacky and probably shouldn’t be cancelled via interupt
r
This is a KafkaFuture though -- I guess it depends what their implementation is.
Looking at KafkaFutureImpl, the value seems to be ignored in any case.
b
Another option could be to loop and
delay(10)
while
isDone()
is false to still be suspend compatible.
Copy code
suspend fun <T> FutureRecordMetadata.await(): T {
    while (!isDone) {
        delay(10)
    }
    if (isCancelled) {
        throw CancellationException("Future was cancelled")
    }
    return get()
}
Oh lol, looping while
isDone
is false is a suggestion on that SO question you shared
r
Yup
I'm curious about the efficiency of a coroutine-based spinloop like this. Would it cause lots of context switching?
b
I’ve always wondered the same thing. Too bad there isn’t a first class approach to suspending until some property value changes
r
I'm guessing yes because the underlying thread still has to switch context -- coroutines just let us avoid an extra thread allocation for the spinloop
b
Yeah.
IO
is still probably the better way, it’s just fun to think about ways to solve the same problem haha
r
Indeed