phldavies
12/22/2022, 7:42 PMCoroutineScope with Resource?simon.vergauwen
12/23/2022, 8:00 AMcoroutineScope outer@{
resourceScope {
coroutineScope inner@{
}
}
}
Anything on outer will most likely leak, but everything related to inner is fine.
By making interface ResourceScope : CoroutineScope it's (almost) impossible to violate this rule because everything will automatically be called on inner@. Unless you explicitly name them like in the same above and do:
coroutineScope outer@{
resourceScope inner@{
outer.launch { }
}
}simon.vergauwen
12/23/2022, 8:02 AMResourceScope a CoroutineScope than it'll always await all jobs launched inside, guaranteeing that it respects resource safety even inside launch or async. It will also mean that launch can "fail" the ResourceScope and thus making it a full participant in Structured Concurrency.phldavies
12/23/2022, 1:17 PM.launchIn(scope) into a Resource<Job>. I was wary of launching from a coroutineScope { } block inside resource {} (Arrow 1.x) as iirc resource { … } is used in the allocate phase and is NonCancellable? Ideally being able to pass in the outer scope where use {} is called into allocate would mean we can launch/async/etc within the usage scope rather than the definition scope. Not sure that makes sense but will clarify if needed 😉simon.vergauwen
12/23/2022, 2:16 PMval r: Resource<Job> =
resource {
val consumer = install({ KafkaConsumer(..) }) { r,_ -> r.close() }
consumer.receive()
.launchIn(this)
}
This is available in 1.1.4-rc.3
resourceScope is nothing more than resource { }.use { }. The params of install is what is NonCancelable, the rest here is cancellable.simon.vergauwen
12/23/2022, 2:24 PM- a separate note it would be great to be able to hook into the created consumer to monitor metricsRight, it creates new
KafkaConsumer and doesn't re-use the existing one created by receive().
reactor-kafka "solves" this by exposing a mutable KafkaConsumer instances... which gets overriden whenever you subscribe to any receive created Flux, and explodes with IllegalStateException at the incorrect time.
I'd like to solve that issue, whilst preparing the new API to be compatible with KMP. So we can provide implementations for JS and Native as well but I haven't been able to come up with a nice API yet.
You want to create a Consumer and be able to call receiveXX on that. Ideally just once, but modelling that in a signature is annoying 😅
fun receive(topicName: String): Pair<Consumer, Flow<Record<K, V>> 😭phldavies
12/23/2022, 2:50 PMprepareConsumer: (KafkaConsumer) -> Unit = {} on receive that you can hook into. My main usecase is to hook up the metrics to Cohortsimon.vergauwen
12/23/2022, 2:51 PMReceiverSettings? 🤔
Or take it into the receive() function?
That could indeed work, could even be suspend CoroutineScope.(Consumer<K, V>) -> Unit.phldavies
12/23/2022, 2:52 PMreceive() but in settings would also work. Generally I’ve just receive straight after constructing settings anyway (and only for that one receive)simon.vergauwen
12/23/2022, 2:54 PMphldavies
12/23/2022, 2:56 PMFlow<KafkaConsumer>.receive(topics) or a more delicate KafkaConsumer.receiveFlow(topics) which allows for the PollLoop/scheduler to be used with a manually created consumer (with the impact being no guarantee of the scheduler being the only one to operate poll )simon.vergauwen
12/23/2022, 3:03 PMKafkaConsumer methods can only be called from the thread that created it, so the constructor would still need to live inside kotlin-kafka and so it cannot be a manually created consumer.
The PollLoop needs to be-able intercept calls to pause/resume so the `PollLoop`_ can keep track if which partitions weresimon.vergauwen
12/23/2022, 3:05 PMinterface KafkaReceiver : AutoCloseable which wraps KafkaConsumer and provide the same methods as the current KafkaReceiver + Consumer<K, V> from Kafka.simon.vergauwen
12/23/2022, 3:06 PMsimon.vergauwen
12/23/2022, 3:06 PMphldavies
12/23/2022, 3:10 PMKafkaConsumer.receiveFlow to be delicate. I’m not sure if we’d want to couple with arrow, but Resource<KafkaConsumer> might fit as a way of plugging in customised consumers (and lifecycle). May be worth using the Consumer<K, V> interface too at some point, as it may allow for using a MockConsumer<K, V> for testing (although that may have other issues).
Is there a better place to discuss kotlin-kafka than the #arrow channel?simon.vergauwen
12/23/2022, 3:21 PMMay be worth using theYes, indeed but it might be a bit tricky to fit this into a KMP design. Although maybe not impossible. TBH I prefer working with test-containers instead ofinterface too at some point,Consumer<K, V>
MockConsumer things like the rebalancing/pausing/resuming is quite hard to do right with mocks.
So far I avoided bringing in Arrow, but it would be nice to have Resource 😅 Even thought about splitting it into a separate base module so you can rely on only Resource 😂
I think the Github issues, or discussions, are best for now.