Hey I need help implementing a generic class that ...
# coroutines
i
Hey I need help implementing a generic class that executes the provided suspending call with synchronization. Logic for concurrent invocations: 1. First call executes the provided lambda 2. Subsequent calls wait for suspended lambda (assuming call is in progress) and all calls receive the same result 3. Cache is cleared after all concurrent calls complete 4. Future call start flow again from pt. 1 Class
Copy code
internal class SynchronizedExecutor<T>() {
  suspend operator fun invoke(call: suspend CoroutineScope.() -> T): T? {
    // How to implement the logic ?
  }
}
Usage
Copy code
val synchronizedExecutor = SynchronizedExecutor()

// Can be called by multiple threads at the same time
public suspend fun getCredentials(): String {
    return synchronizedExecutor {
      // I  want to implement a caching mechanism where multiple concurrent calls get the same result from a single execution of getCredentialsUseCase().
      getCredentialsUseCase() 
    }
  }
s
You can try something like
Copy code
internal class SynchronizedExecutor<T> {

    private var deferred: Deferred<T>? = null

    suspend operator fun invoke(call: suspend CoroutineScope.() -> T): T {
        val resolved = when (val current = deferred) {
            null -> {
                coroutineScope { async { call() } }
                    .also { deferred = it }
            }
            else -> current
        }
        return resolved.await()
    }
}
s
Will all callers always provide the same lambda? 🤔
☝️ 1
s
Perhaps not the best implementation, but you get the idea with
Deferred
s
If you have the lambda available up front, I'd do it with a shared flow, like this:
Copy code
class SynchronizedExecutor<T>(
  call: suspend () -> T,
  context: CoroutineContext = EmptyCoroutineContext
): AutoCloseable {
  private val scope = CoroutineScope(context)
  private val flow = call.asFlow().shareIn(scope, SharingStarted.WhileSubscribed())
  suspend operator fun invoke() = flow.first()
  override fun close() = scope.cancel()
}
i
Yes all callers will always provide lambda - I want to make this class universal - be able to call any method inside this lambda
s
So how will the
SynchronizedExecutor
know whether two given functions are "the same" or not?
i
no
but I will create new instance of
SynchronizedExecutor
for each type of call
d
Maybe something like this? https://pl.kotl.in/cJN5aAsLL