Hakon Grotte
02/07/2023, 10:42 AMKafkaProducer
object and has a public method 'A' that uses producer.send
. The k8s pod that runs the jvm has a short-lived keystore TLS certificate that is replaced by a certmanager after 2/3 of it's validity. I have a file watcher running in a coroutine that I create during the class' init{}
method with the IO dispatcher. When a file change event is detected I run a method 'B' inside the coroutine that closes the producer and then reinstantiates the object:
fun methodB(): {
val newProducer = KafkaProducer(...)
producer.close()
producer = newProducer
}
How can I ensure that method A does not run producer.send
while method B is running, and what would be a good way to do this? My current thoughts are using @Synchronized on the methods or using a ReentrantLock object. These do introduce some overhead though, which feels bad for the method A since method B is run maybe once a week tops.franztesca
02/07/2023, 2:50 PMval currentFileToProcess: StateFlow<File>
fun File.processedWithKafka(): Flow<Something> = flow {
// The Kafka producer implementation here.
}
fun currentFileProcessedWithKafka(): Flow<Something> =
currentFileToProcess.flatMapLatest { it.processedWithKafka() }
Hakon Grotte
02/08/2023, 7:49 AMclass CustomProducer(
val fileChangeFlow: Flow<FileWatchEvent,
var producer: Producer<String,AvroRecord>
) {
private val lock = ReentrantLock()
init {
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
fileChangeFlow.collect {
// reset the producer with method B
methodB()
}
}
}
/* method A, run conditionally after a REST api request */
fun methodA() {
lock.lock()
try { producer.send(producerRecord) }
finally { lock.unlock() }
}
fun methodB() {
lock.lock()
try {
val newProducer = KafkaProducer(...)
producer.close()
producer = newProducer
} finally { lock.unlock() }
}
}
My question is if the ReentrantLock
solution is the best way to prevent method A and method B from using the producer
object concurrently?franztesca
02/08/2023, 11:40 PMsynchronized
to make it a bit more readable and less error prone (dropping the reentrantlock
).Hakon Grotte
02/09/2023, 7:37 AMmethod A
is suspending, so @Synchronized
cannot be applied here