Mike Conley
03/17/2020, 6:09 AMfoo
actually doesAlessandro Tagliapietra
03/17/2020, 6:14 AMis Iterable<*>
) which makes senseMike Conley
03/17/2020, 6:15 AMA
in Iterable<A>
is
checks happen at runtimeAlessandro Tagliapietra
03/17/2020, 6:16 AMMike Conley
03/17/2020, 6:16 AMAlessandro Tagliapietra
03/17/2020, 6:17 AMMike Conley
03/17/2020, 6:18 AMT
here?Alessandro Tagliapietra
03/17/2020, 6:19 AMMike Conley
03/17/2020, 6:19 AMAlessandro Tagliapietra
03/17/2020, 6:19 AMMike Conley
03/17/2020, 6:19 AMAlessandro Tagliapietra
03/17/2020, 6:20 AMorg.apache.avro.specific.SpecificRecord
Mike Conley
03/17/2020, 6:20 AMAlessandro Tagliapietra
03/17/2020, 6:20 AMMike Conley
03/17/2020, 6:20 AMAlessandro Tagliapietra
03/17/2020, 6:22 AMMike Conley
03/17/2020, 6:22 AMIterable
-- separate into three?Alessandro Tagliapietra
03/17/2020, 6:23 AMMike Conley
03/17/2020, 6:23 AMfoo(stuff: Iterable<Metric>, metricProducer)
Alessandro Tagliapietra
03/17/2020, 6:24 AMMike Conley
03/17/2020, 6:24 AMAlessandro Tagliapietra
03/17/2020, 6:24 AMMike Conley
03/17/2020, 6:24 AMAlessandro Tagliapietra
03/17/2020, 6:25 AMMike Conley
03/17/2020, 6:25 AMAlessandro Tagliapietra
03/17/2020, 6:26 AMMike Conley
03/17/2020, 6:26 AMAlessandro Tagliapietra
03/17/2020, 6:27 AMprivate var stateProducer = KafkaProducer<String, State>(Settings.getProducer("api-states"))
and then
stateProducer.send(ProducerRecord("states", machineId, stateRecord))
Mike Conley
03/17/2020, 6:28 AMprivate fun producerFor(thing: SpecificRecord): ProducerType {
val klass = thing::class
when (klass) {
is Metric -> metricProducer,
is Event -> eventProducer,
...
}
Alessandro Tagliapietra
03/17/2020, 6:28 AMMike Conley
03/17/2020, 6:29 AMfun foo(stuff: Iterable<SpecificRecord>) {
stuff.forEach {
val producer = producerFor(it)
// do stuff with producer and it
}
}
Alessandro Tagliapietra
03/17/2020, 6:29 AM...each(
producer = producerFot(it)
producer.startTransaction()
producer.send(it)
producer.commitTransaction()
)
Mike Conley
03/17/2020, 6:30 AMAlessandro Tagliapietra
03/17/2020, 6:30 AMMike Conley
03/17/2020, 6:30 AMAlessandro Tagliapietra
03/17/2020, 6:30 AMproducer.startTransaction()
list.each(
producer.send(it)
)
producer.commitTransaction()
Mike Conley
03/17/2020, 6:31 AMAlessandro Tagliapietra
03/17/2020, 6:31 AMMike Conley
03/17/2020, 6:31 AMAlessandro Tagliapietra
03/17/2020, 6:31 AMMike Conley
03/17/2020, 6:31 AMAlessandro Tagliapietra
03/17/2020, 6:32 AMMike Conley
03/17/2020, 6:32 AMAlessandro Tagliapietra
03/17/2020, 6:32 AMyes but the type within a single iterator is always the same^ that's what I meant by this sorry
Mike Conley
03/17/2020, 6:32 AMAlessandro Tagliapietra
03/17/2020, 6:35 AMMike Conley
03/17/2020, 6:35 AMAlessandro Tagliapietra
03/17/2020, 6:36 AMtry {
stateProducer.beginTransaction()
stateProducer.send(ProducerRecord("states", machineId, state))
stateProducer.commitTransaction()
} catch (e: ProducerFencedException) {
stateProducer.close()
recreateStateProducer()
} catch (e: OutOfOrderSequenceException) {
stateProducer.close()
recreateStateProducer()
} catch (e: AuthorizationException) {
stateProducer.close()
recreateStateProducer()
} catch (e: KafkaException) {
stateProducer.abortTransaction()
}
Mike Conley
03/17/2020, 6:37 AMAlessandro Tagliapietra
03/17/2020, 6:37 AMMike Conley
03/17/2020, 6:37 AMAlessandro Tagliapietra
03/17/2020, 6:37 AMMike Conley
03/17/2020, 6:38 AMAlessandro Tagliapietra
03/17/2020, 6:38 AMMike Conley
03/17/2020, 6:41 AMcedric
03/17/2020, 8:06 PMproducer.transaction {
list.each {
producer.send(this)
}
}
and that would automatically start and end the transaction.Alessandro Tagliapietra
03/17/2020, 8:06 PMMike Conley
03/17/2020, 8:08 PMcedric
03/17/2020, 8:09 PMclass Producer {
fun transaction(lambda: () -> Boolean) {
startTransaction()
lambda()
commitTransaction()
}
}
try/catch
, etc... but you get the ideaAlessandro Tagliapietra
03/17/2020, 8:15 PMcedric
03/17/2020, 8:15 PMMike Conley
03/17/2020, 8:15 PMcedric
03/17/2020, 8:15 PMfun Producer.transaction(...) {
Alessandro Tagliapietra
03/17/2020, 8:16 PMprivate fun <K, V> wrapInTransaction(producer: KafkaProducer<K, V>, lambda: (producer: KafkaProducer<K, V>) -> Unit) {
try {
producer.beginTransaction()
lambda(producer)
producer.commitTransaction()
} catch (e: ProducerFencedException) {
eventProducer.close()
} catch (e: OutOfOrderSequenceException) {
eventProducer.close()
} catch (e: AuthorizationException) {
eventProducer.close()
} catch (e: KafkaException) {
eventProducer.abortTransaction()
}
}
Mike Conley
03/17/2020, 8:16 PMcedric
03/17/2020, 8:17 PMfoo.x()
foo.y()
I introduce an apply
foo
once. DRY!Mike Conley
03/17/2020, 8:18 PMcedric
03/17/2020, 8:18 PMMike Conley
03/17/2020, 8:18 PMAlessandro Tagliapietra
03/17/2020, 8:21 PMempty
Mike Conley
03/17/2020, 8:22 PMempty
? because I read apply
Alessandro Tagliapietra
03/17/2020, 8:22 PMcedric
03/17/2020, 8:22 PMfoo.x()
foo.y()
// becomes
foo.apply {
x()
y()
}
Alessandro Tagliapietra
03/17/2020, 8:22 PMMike Conley
03/17/2020, 8:23 PMwith(foo) {
x()
y()
}
cedric
03/17/2020, 8:23 PMwith
Mike Conley
03/17/2020, 8:23 PMAlessandro Tagliapietra
03/17/2020, 8:24 PM