Hello! Question about synchronizing two independen...
# coroutines
h
Hello! Question about synchronizing two independent methods: I have a class that uses a private
KafkaProducer
object and has a public method 'A' that uses
producer.send
. The k8s pod that runs the jvm has a short-lived keystore TLS certificate that is replaced by a certmanager after 2/3 of it's validity. I have a file watcher running in a coroutine that I create during the class'
init{}
method with the IO dispatcher. When a file change event is detected I run a method 'B' inside the coroutine that closes the producer and then reinstantiates the object:
Copy code
fun methodB(): {
   val newProducer = KafkaProducer(...)
   producer.close()
   producer = newProducer
}
How can I ensure that method A does not run
producer.send
while method B is running, and what would be a good way to do this? My current thoughts are using @Synchronized on the methods or using a ReentrantLock object. These do introduce some overhead though, which feels bad for the method A since method B is run maybe once a week tops.
f
Copy code
val currentFileToProcess: StateFlow<File>

fun File.processedWithKafka(): Flow<Something> = flow {
    // The Kafka producer implementation here.
}

fun currentFileProcessedWithKafka(): Flow<Something> =
    currentFileToProcess.flatMapLatest { it.processedWithKafka() }
h
Thank you for your answer. Ufortunately, I think my poor explanation made you misunderstand the problem:
Copy code
class CustomProducer(
val fileChangeFlow: Flow<FileWatchEvent,
var producer: Producer<String,AvroRecord>
) {
private val lock = ReentrantLock()

init {
  CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
    fileChangeFlow.collect { 
// reset the producer with method B
      methodB()
    }
  }
}

/* method A, run conditionally after a REST api request */
fun methodA() {
lock.lock()
 try { producer.send(producerRecord) }
 finally { lock.unlock() }
}

fun methodB() {
lock.lock()
try {
   val newProducer = KafkaProducer(...)
   producer.close()
   producer = newProducer
} finally { lock.unlock() } 
}

}
My question is if the
ReentrantLock
solution is the best way to prevent method A and method B from using the
producer
object concurrently?
f
It should work. You can also use
synchronized
to make it a bit more readable and less error prone (dropping the
reentrantlock
).
h
Thanks! I forgot to mention that
method A
is suspending, so
@Synchronized
cannot be applied here