Is there any guidance/best practice for mixing `Co...
# arrow
p
Is there any guidance/best practice for mixing
CoroutineScope
with
Resource
?
s
What are your use-cases? There is actually only one mistake you can do, but I thought of a way to fix it.
Copy code
coroutineScope outer@{
  resourceScope {
    coroutineScope inner@{

    }
  }
}
Anything on
outer
will most likely leak, but everything related to
inner
is fine. By making
interface ResourceScope : CoroutineScope
it's (almost) impossible to violate this rule because everything will automatically be called on
inner@
. Unless you explicitly name them like in the same above and do:
Copy code
coroutineScope outer@{
  resourceScope inner@{
     outer.launch { }
  }
}
If we make
ResourceScope
a
CoroutineScope
than it'll always await all jobs launched inside, guaranteeing that it respects resource safety even inside
launch
or
async
. It will also mean that
launch
can "fail" the
ResourceScope
and thus making it a full participant in Structured Concurrency.
p
Currently my usecase is to encapsulate a flow consumer as a resource (in this case, a KafkaReceiver from your kotlin-kafka library 😉 - a separate note it would be great to be able to hook into the created consumer to monitor metrics) I’m currently wrapping the result of
.launchIn(scope)
into a
Resource<Job>
. I was wary of launching from a
coroutineScope { }
block inside
resource {}
(Arrow 1.x) as iirc
resource { … }
is used in the allocate phase and is
NonCancellable
? Ideally being able to pass in the outer scope where
use {}
is called into allocate would mean we can launch/async/etc within the usage scope rather than the definition scope. Not sure that makes sense but will clarify if needed 😉
s
Yes, it makes sense and I am actually doing the same in this project. https://github.com/47deg/gh-alerts-subscriptions-kotlin/blob/main/src/main/kotlin/alerts/notification/service.kt#L31 This should be fine:
Copy code
val r: Resource<Job> =
  resource {
    val consumer = install({ KafkaConsumer(..) }) { r,_ -> r.close() }
    consumer.receive()
    .launchIn(this)
}
This is available in
1.1.4-rc.3
resourceScope
is nothing more than
resource { }.use { }
. The params of
install
is what is
NonCancelable
, the rest here is
cancellable
.
- a separate note it would be great to be able to hook into the created consumer to monitor metrics
Right, it creates new
KafkaConsumer
and doesn't re-use the existing one created by
receive()
. reactor-kafka "solves" this by exposing a mutable
KafkaConsumer
instances... which gets overriden whenever you
subscribe
to any
receive
created
Flux
, and explodes with
IllegalStateException
at the incorrect time. I'd like to solve that issue, whilst preparing the new API to be compatible with KMP. So we can provide implementations for JS and Native as well but I haven't been able to come up with a nice API yet. You want to create a
Consumer
and be able to call
receiveXX
on that. Ideally just once, but modelling that in a signature is annoying 😅
fun receive(topicName: String): Pair<Consumer, Flow<Record<K, V>>
😭
p
given the usecase - you could have an optional
prepareConsumer: (KafkaConsumer) -> Unit = {}
on receive that you can hook into. My main usecase is to hook up the metrics to Cohort
s
In the
ReceiverSettings
? 🤔 Or take it into the
receive()
function? That could indeed work, could even be
suspend CoroutineScope.(Consumer<K, V>) -> Unit
.
p
I was thinking on
receive()
but in settings would also work. Generally I’ve just receive straight after constructing settings anyway (and only for that one receive)
s
Right, that's actually a pretty good solution. If you're interested feel free to create a PR, and I can cut a new release when it's merged.
p
An alternative might be to provide a
Flow<KafkaConsumer>.receive(topics)
or a more delicate
KafkaConsumer.receiveFlow(topics)
which allows for the PollLoop/scheduler to be used with a manually created consumer (with the impact being no guarantee of the scheduler being the only one to operate
poll
)
s
Right, I thought about that but there is a couple other issues.
KafkaConsumer
methods can only be called from the thread that created it, so the constructor would still need to live inside
kotlin-kafka
and so it cannot be a manually created consumer. The
PollLoop
needs to be-able intercept calls to
pause/resume
so the `PollLoop`_ can keep track if which partitions were
For KMP we probably need to create a facade, so we could potentially create
interface KafkaReceiver : AutoCloseable
which wraps
KafkaConsumer
and provide the same methods as the current
KafkaReceiver
+
Consumer<K, V>
from Kafka.
That would solve the issue, but it would make it more difficult to interop with the current Java Kafka eco-system...
Feedback, and more thoughts are very much appreciated! 🙏 Your feedback is always great @phldavies thank you.
p
Understood on the issues - exactly the reason I’d consider the
KafkaConsumer.receiveFlow
to be delicate. I’m not sure if we’d want to couple with arrow, but
Resource<KafkaConsumer>
might fit as a way of plugging in customised consumers (and lifecycle). May be worth using the
Consumer<K, V>
interface too at some point, as it may allow for using a
MockConsumer<K, V>
for testing (although that may have other issues). Is there a better place to discuss
kotlin-kafka
than the #arrow channel?
s
May be worth using the
Consumer<K, V>
interface too at some point,
Yes, indeed but it might be a bit tricky to fit this into a KMP design. Although maybe not impossible. TBH I prefer working with test-containers instead of
MockConsumer
things like the rebalancing/pausing/resuming is quite hard to do right with mocks. So far I avoided bringing in Arrow, but it would be nice to have
Resource
😅 Even thought about splitting it into a separate base module so you can rely on only
Resource
😂 I think the Github issues, or discussions, are best for now.