Mike Conley
03/17/2020, 6:09 AMfoo actually doesAlessandro Tagliapietra
03/17/2020, 6:14 AMis Iterable<*>) which makes senseMike Conley
03/17/2020, 6:15 AMMike Conley
03/17/2020, 6:15 AMA in Iterable<A>Mike Conley
03/17/2020, 6:15 AMis checks happen at runtimeAlessandro Tagliapietra
03/17/2020, 6:16 AMMike Conley
03/17/2020, 6:16 AMMike Conley
03/17/2020, 6:16 AMMike Conley
03/17/2020, 6:17 AMAlessandro Tagliapietra
03/17/2020, 6:17 AMAlessandro Tagliapietra
03/17/2020, 6:18 AMMike Conley
03/17/2020, 6:18 AMT here?Alessandro Tagliapietra
03/17/2020, 6:19 AMAlessandro Tagliapietra
03/17/2020, 6:19 AMMike Conley
03/17/2020, 6:19 AMAlessandro Tagliapietra
03/17/2020, 6:19 AMMike Conley
03/17/2020, 6:19 AMAlessandro Tagliapietra
03/17/2020, 6:19 AMMike Conley
03/17/2020, 6:19 AMMike Conley
03/17/2020, 6:19 AMAlessandro Tagliapietra
03/17/2020, 6:20 AMorg.apache.avro.specific.SpecificRecordMike Conley
03/17/2020, 6:20 AMAlessandro Tagliapietra
03/17/2020, 6:20 AMMike Conley
03/17/2020, 6:20 AMMike Conley
03/17/2020, 6:21 AMAlessandro Tagliapietra
03/17/2020, 6:22 AMMike Conley
03/17/2020, 6:22 AMMike Conley
03/17/2020, 6:22 AMIterable -- separate into three?Alessandro Tagliapietra
03/17/2020, 6:23 AMAlessandro Tagliapietra
03/17/2020, 6:23 AMAlessandro Tagliapietra
03/17/2020, 6:23 AMAlessandro Tagliapietra
03/17/2020, 6:23 AMMike Conley
03/17/2020, 6:23 AMMike Conley
03/17/2020, 6:24 AMfoo(stuff: Iterable<Metric>, metricProducer)Alessandro Tagliapietra
03/17/2020, 6:24 AMMike Conley
03/17/2020, 6:24 AMAlessandro Tagliapietra
03/17/2020, 6:24 AMMike Conley
03/17/2020, 6:24 AMAlessandro Tagliapietra
03/17/2020, 6:25 AMAlessandro Tagliapietra
03/17/2020, 6:25 AMMike Conley
03/17/2020, 6:25 AMAlessandro Tagliapietra
03/17/2020, 6:26 AMMike Conley
03/17/2020, 6:26 AMMike Conley
03/17/2020, 6:26 AMMike Conley
03/17/2020, 6:26 AMAlessandro Tagliapietra
03/17/2020, 6:27 AMprivate var stateProducer = KafkaProducer<String, State>(Settings.getProducer("api-states"))
and then
stateProducer.send(ProducerRecord("states", machineId, stateRecord))Alessandro Tagliapietra
03/17/2020, 6:28 AMMike Conley
03/17/2020, 6:28 AMprivate fun producerFor(thing: SpecificRecord): ProducerType {
val klass = thing::class
when (klass) {
is Metric -> metricProducer,
is Event -> eventProducer,
...
}Alessandro Tagliapietra
03/17/2020, 6:28 AMAlessandro Tagliapietra
03/17/2020, 6:28 AMAlessandro Tagliapietra
03/17/2020, 6:28 AMMike Conley
03/17/2020, 6:29 AMMike Conley
03/17/2020, 6:29 AMMike Conley
03/17/2020, 6:29 AMMike Conley
03/17/2020, 6:29 AMfun foo(stuff: Iterable<SpecificRecord>) {
stuff.forEach {
val producer = producerFor(it)
// do stuff with producer and it
}
}Mike Conley
03/17/2020, 6:29 AMAlessandro Tagliapietra
03/17/2020, 6:29 AM...each(
producer = producerFot(it)
producer.startTransaction()
producer.send(it)
producer.commitTransaction()
)Mike Conley
03/17/2020, 6:30 AMAlessandro Tagliapietra
03/17/2020, 6:30 AMMike Conley
03/17/2020, 6:30 AMAlessandro Tagliapietra
03/17/2020, 6:30 AMproducer.startTransaction()
list.each(
producer.send(it)
)
producer.commitTransaction()Alessandro Tagliapietra
03/17/2020, 6:30 AMMike Conley
03/17/2020, 6:31 AMAlessandro Tagliapietra
03/17/2020, 6:31 AMMike Conley
03/17/2020, 6:31 AMAlessandro Tagliapietra
03/17/2020, 6:31 AMMike Conley
03/17/2020, 6:31 AMMike Conley
03/17/2020, 6:31 AMMike Conley
03/17/2020, 6:31 AMAlessandro Tagliapietra
03/17/2020, 6:32 AMMike Conley
03/17/2020, 6:32 AMAlessandro Tagliapietra
03/17/2020, 6:32 AMyes but the type within a single iterator is always the same^ that's what I meant by this sorry
Mike Conley
03/17/2020, 6:32 AMMike Conley
03/17/2020, 6:34 AMAlessandro Tagliapietra
03/17/2020, 6:35 AMMike Conley
03/17/2020, 6:35 AMAlessandro Tagliapietra
03/17/2020, 6:36 AMAlessandro Tagliapietra
03/17/2020, 6:36 AMtry {
stateProducer.beginTransaction()
stateProducer.send(ProducerRecord("states", machineId, state))
stateProducer.commitTransaction()
} catch (e: ProducerFencedException) {
stateProducer.close()
recreateStateProducer()
} catch (e: OutOfOrderSequenceException) {
stateProducer.close()
recreateStateProducer()
} catch (e: AuthorizationException) {
stateProducer.close()
recreateStateProducer()
} catch (e: KafkaException) {
stateProducer.abortTransaction()
}Alessandro Tagliapietra
03/17/2020, 6:36 AMAlessandro Tagliapietra
03/17/2020, 6:36 AMMike Conley
03/17/2020, 6:37 AMAlessandro Tagliapietra
03/17/2020, 6:37 AMMike Conley
03/17/2020, 6:37 AMAlessandro Tagliapietra
03/17/2020, 6:37 AMAlessandro Tagliapietra
03/17/2020, 6:37 AMAlessandro Tagliapietra
03/17/2020, 6:38 AMMike Conley
03/17/2020, 6:38 AMAlessandro Tagliapietra
03/17/2020, 6:38 AMMike Conley
03/17/2020, 6:41 AMcedric
03/17/2020, 8:06 PMproducer.transaction {
list.each {
producer.send(this)
}
}
and that would automatically start and end the transaction.Alessandro Tagliapietra
03/17/2020, 8:06 PMAlessandro Tagliapietra
03/17/2020, 8:08 PMMike Conley
03/17/2020, 8:08 PMcedric
03/17/2020, 8:09 PMclass Producer {
fun transaction(lambda: () -> Boolean) {
startTransaction()
lambda()
commitTransaction()
}
}cedric
03/17/2020, 8:09 PMtry/catch, etc... but you get the ideaAlessandro Tagliapietra
03/17/2020, 8:15 PMcedric
03/17/2020, 8:15 PMMike Conley
03/17/2020, 8:15 PMcedric
03/17/2020, 8:15 PMfun Producer.transaction(...) {Alessandro Tagliapietra
03/17/2020, 8:16 PMprivate fun <K, V> wrapInTransaction(producer: KafkaProducer<K, V>, lambda: (producer: KafkaProducer<K, V>) -> Unit) {
try {
producer.beginTransaction()
lambda(producer)
producer.commitTransaction()
} catch (e: ProducerFencedException) {
eventProducer.close()
} catch (e: OutOfOrderSequenceException) {
eventProducer.close()
} catch (e: AuthorizationException) {
eventProducer.close()
} catch (e: KafkaException) {
eventProducer.abortTransaction()
}
}Alessandro Tagliapietra
03/17/2020, 8:16 PMAlessandro Tagliapietra
03/17/2020, 8:16 PMMike Conley
03/17/2020, 8:16 PMcedric
03/17/2020, 8:17 PMfoo.x()
foo.y()
I introduce an applycedric
03/17/2020, 8:17 PMfoo once. DRY!Mike Conley
03/17/2020, 8:18 PMcedric
03/17/2020, 8:18 PMMike Conley
03/17/2020, 8:18 PMAlessandro Tagliapietra
03/17/2020, 8:21 PMemptyMike Conley
03/17/2020, 8:22 PMempty? because I read applyMike Conley
03/17/2020, 8:22 PMMike Conley
03/17/2020, 8:22 PMAlessandro Tagliapietra
03/17/2020, 8:22 PMcedric
03/17/2020, 8:22 PMfoo.x()
foo.y()
// becomes
foo.apply {
x()
y()
}Alessandro Tagliapietra
03/17/2020, 8:22 PMMike Conley
03/17/2020, 8:23 PMwith(foo) {
x()
y()
}Mike Conley
03/17/2020, 8:23 PMMike Conley
03/17/2020, 8:23 PMcedric
03/17/2020, 8:23 PMwithMike Conley
03/17/2020, 8:23 PMAlessandro Tagliapietra
03/17/2020, 8:24 PM