andrea.santurbano
03/26/2019, 12:44 PMFuture
implementation (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java) as Deferred
. Could this be a solution?
fun <T> Future<T>.asDeferred(dispatcher: CoroutineDispatcher = Dispatchers.Default): Deferred<T> {
return GlobalScope.async(dispatcher) {
get()
}
}
rocketraman
03/26/2019, 1:08 PMsuspendCoroutine
-- that away your suspending await
for the Future's result is executed on the caller's scope.
Here is how you could implement a suspending await for KafkaFuture
-- FutureRecordMetadata
is probably pretty much the same:
suspend inline fun <T> KafkaFuture<T>.await() = suspendCoroutine<T> { cont ->
whenComplete { t, e ->
if(e == null) cont.resume(t)
else cont.resumeWithException(e)
}
}
rocketraman
03/26/2019, 1:10 PMwhenComplete
callback, which FutureRecordMetadata
may or may not have.rocketraman
03/26/2019, 1:14 PMFutureRecordMetadata
it doesn't appear to have any callback, so its a basic blocking Future. That means you should probably run the async get on an IO scope.andrea.santurbano
03/26/2019, 1:22 PMandrea.santurbano
03/26/2019, 1:23 PMrocketraman
03/26/2019, 1:23 PMrocketraman
03/26/2019, 1:23 PMrocketraman
03/26/2019, 1:25 PMDispatchers.Default
, not the IO dispatcher.andrea.santurbano
03/26/2019, 1:28 PMrocketraman
03/26/2019, 1:29 PMget
is a blocking call which will tie up a thread until it completes. It should therefore run on the IO dispatcher.andrea.santurbano
03/26/2019, 1:30 PMIO
dispatcher?rocketraman
03/26/2019, 3:15 PMbdawg.io
03/27/2019, 4:20 PMsuspendCancellableCoroutine
rocketraman
03/27/2019, 4:21 PMrocketraman
03/27/2019, 4:25 PMsuspend inline fun <T> KafkaFuture<T>.await() = suspendCancellableCoroutine<T> { cont ->
cont.invokeOnCancellation { cancel(true) }
whenComplete { t, e ->
if(e == null) cont.resume(t)
else cont.resumeWithException(e)
}
}
bdawg.io
03/27/2019, 4:26 PMtrue
to the cancel though. Futures are fairly whacky and probably shouldn’t be cancelled via interuptrocketraman
03/27/2019, 4:27 PMrocketraman
03/27/2019, 4:28 PMbdawg.io
03/27/2019, 4:29 PMdelay(10)
while isDone()
is false to still be suspend compatible.
suspend fun <T> FutureRecordMetadata.await(): T {
while (!isDone) {
delay(10)
}
if (isCancelled) {
throw CancellationException("Future was cancelled")
}
return get()
}
bdawg.io
03/27/2019, 4:30 PMisDone
is false is a suggestion on that SO question you sharedrocketraman
03/27/2019, 4:30 PMrocketraman
03/27/2019, 4:31 PMbdawg.io
03/27/2019, 4:33 PMrocketraman
03/27/2019, 4:33 PMbdawg.io
03/27/2019, 4:34 PMIO
is still probably the better way, it’s just fun to think about ways to solve the same problem haharocketraman
03/27/2019, 4:34 PM