phldavies
12/22/2022, 7:42 PMCoroutineScope
with Resource
?simon.vergauwen
12/23/2022, 8:00 AMcoroutineScope outer@{
resourceScope {
coroutineScope inner@{
}
}
}
Anything on outer
will most likely leak, but everything related to inner
is fine.
By making interface ResourceScope : CoroutineScope
it's (almost) impossible to violate this rule because everything will automatically be called on inner@
. Unless you explicitly name them like in the same above and do:
coroutineScope outer@{
resourceScope inner@{
outer.launch { }
}
}
simon.vergauwen
12/23/2022, 8:02 AMResourceScope
a CoroutineScope
than it'll always await all jobs launched inside, guaranteeing that it respects resource safety even inside launch
or async
. It will also mean that launch
can "fail" the ResourceScope
and thus making it a full participant in Structured Concurrency.phldavies
12/23/2022, 1:17 PM.launchIn(scope)
into a Resource<Job>
. I was wary of launching from a coroutineScope { }
block inside resource {}
(Arrow 1.x) as iirc resource { … }
is used in the allocate phase and is NonCancellable
? Ideally being able to pass in the outer scope where use {}
is called into allocate would mean we can launch/async/etc within the usage scope rather than the definition scope. Not sure that makes sense but will clarify if needed 😉simon.vergauwen
12/23/2022, 2:16 PMval r: Resource<Job> =
resource {
val consumer = install({ KafkaConsumer(..) }) { r,_ -> r.close() }
consumer.receive()
.launchIn(this)
}
This is available in 1.1.4-rc.3
resourceScope
is nothing more than resource { }.use { }
. The params of install
is what is NonCancelable
, the rest here is cancellable
.simon.vergauwen
12/23/2022, 2:24 PM- a separate note it would be great to be able to hook into the created consumer to monitor metricsRight, it creates new
KafkaConsumer
and doesn't re-use the existing one created by receive()
.
reactor-kafka "solves" this by exposing a mutable KafkaConsumer
instances... which gets overriden whenever you subscribe
to any receive
created Flux
, and explodes with IllegalStateException
at the incorrect time.
I'd like to solve that issue, whilst preparing the new API to be compatible with KMP. So we can provide implementations for JS and Native as well but I haven't been able to come up with a nice API yet.
You want to create a Consumer
and be able to call receiveXX
on that. Ideally just once, but modelling that in a signature is annoying 😅
fun receive(topicName: String): Pair<Consumer, Flow<Record<K, V>>
😭phldavies
12/23/2022, 2:50 PMprepareConsumer: (KafkaConsumer) -> Unit = {}
on receive that you can hook into. My main usecase is to hook up the metrics to Cohortsimon.vergauwen
12/23/2022, 2:51 PMReceiverSettings
? 🤔
Or take it into the receive()
function?
That could indeed work, could even be suspend CoroutineScope.(Consumer<K, V>) -> Unit
.phldavies
12/23/2022, 2:52 PMreceive()
but in settings would also work. Generally I’ve just receive straight after constructing settings anyway (and only for that one receive)simon.vergauwen
12/23/2022, 2:54 PMphldavies
12/23/2022, 2:56 PMFlow<KafkaConsumer>.receive(topics)
or a more delicate KafkaConsumer.receiveFlow(topics)
which allows for the PollLoop/scheduler to be used with a manually created consumer (with the impact being no guarantee of the scheduler being the only one to operate poll
)simon.vergauwen
12/23/2022, 3:03 PMKafkaConsumer
methods can only be called from the thread that created it, so the constructor would still need to live inside kotlin-kafka
and so it cannot be a manually created consumer.
The PollLoop
needs to be-able intercept calls to pause/resume
so the `PollLoop`_ can keep track if which partitions weresimon.vergauwen
12/23/2022, 3:05 PMinterface KafkaReceiver : AutoCloseable
which wraps KafkaConsumer
and provide the same methods as the current KafkaReceiver
+ Consumer<K, V>
from Kafka.simon.vergauwen
12/23/2022, 3:06 PMsimon.vergauwen
12/23/2022, 3:06 PMphldavies
12/23/2022, 3:10 PMKafkaConsumer.receiveFlow
to be delicate. I’m not sure if we’d want to couple with arrow, but Resource<KafkaConsumer>
might fit as a way of plugging in customised consumers (and lifecycle). May be worth using the Consumer<K, V>
interface too at some point, as it may allow for using a MockConsumer<K, V>
for testing (although that may have other issues).
Is there a better place to discuss kotlin-kafka
than the #arrow channel?simon.vergauwen
12/23/2022, 3:21 PMMay be worth using theYes, indeed but it might be a bit tricky to fit this into a KMP design. Although maybe not impossible. TBH I prefer working with test-containers instead ofinterface too at some point,Consumer<K, V>
MockConsumer
things like the rebalancing/pausing/resuming is quite hard to do right with mocks.
So far I avoided bringing in Arrow, but it would be nice to have Resource
😅 Even thought about splitting it into a separate base module so you can rely on only Resource
😂
I think the Github issues, or discussions, are best for now.