Can multiple different threads 'join' the same Job...
# coroutines
m
Can multiple different threads 'join' the same Job? I have a library which expects blocking calls to some 'authenticate' method. This method will then be called from multiple threads and invoke a 'refreshToken' suspend function (which has to be invoked in runBlocking because 'authenticate' is expected to be blocking) but if some of these calls are called simultaneously I only want the 'refreshToken' to be called once for all of of them. My idea is to create a Job when the first one hits and them all the other could join this Job if it still exists. Is it a good approach or is there a better way?
(Sorry, writing this from my mobile)
z
Yep, any coroutine can call
join()
on any
Job
instance (whether or not those coroutines are running on different threads)
❤️ 1
m
And runBlocking is still an "any coroutine" right?
w
Yes 🙂
☝️ 1
❤️ 1
m
Thank you guys!
u
Just make sure your detection, wether a refresh is already running is not racey.
m
I've prepared something like this
Copy code
/**
 * If it doesn't run, it is started.
 * If it runs, multiple observers might suspend waiting for the result.
 */
class ConnectableCoroutine<T>(
    private val scope: CoroutineScope,
    private val block: suspend () -> T
) {

    private var deferred: Deferred<T>? = null

    suspend fun connect(): T {
        println("CC: Started with deferred: $deferred at ${Thread.currentThread()}")
        if (deferred == null || deferred!!.isCompleted) {
            deferred = scope.async {
                println("CC: Perform at ${Thread.currentThread()}")
                block()
            }
        }
        deferred!!.join()
        return deferred!!.await()
    }

}
And it runs fine when used the "regular" way, i.e. in suspended functions. But it seems to be messed up when used in non-coroutine contexts with`runBlocking` . Or my rough test might be totally wrong, but I don't really have an idea for another (I know the sleeps are bad, but right now I'm trying to simulate what okhttp does under the hood). The test that fails goes like this:
Copy code
@Test
    fun test2() = runBlockingTest {

        coEvery { stringProvider.provideString() } returnsMany listOf("a", "b", "c")

        val connectableCoroutine = ConnectableCoroutine(this) {
            Thread.sleep(1000)
            stringProvider.provideString()
        }

        lateinit var result1: String
        lateinit var result2: String
        lateinit var result3: String

        thread(name = "thread1") {
            result1 = runBlocking { connectableCoroutine.connect() }
        }
        Thread.sleep(200) //let's make sure it's not a race condition
        thread(name = "thread2") {
            result2 = runBlocking { connectableCoroutine.connect() }
        }
        Thread.sleep(2000)
        thread(name = "thread3") {
            result3 = runBlocking { connectableCoroutine.connect() }
        }
        Thread.sleep(1500)

        assertEquals("a", result1)
        assertEquals("a", result2)
        assertEquals("b", result3)

    }
result2 =b
, but it should be
a
, since I connected within the duration of the first job.
hmmmm, it seems to work when I don;t use the testCoroutineScope at all, but instead go with
Copy code
ConnectableCoroutine(CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)) {
Then the job is performed on the right dispatcher (IO). When I leave the
ConnectableCoroutine(this)
, it uses the
thread1
. I guess it's
unconfined
for TestCoroutineDispatcher or sth like this?
u
You consistently access
deferred
concurrently from multiple threads. For example two threads might see
deferred
to be null. Then both would start a new coroutine and overwrite each other's jobs
m
I should synchronize the access right?
u
True, but not with Java's/Kotlin's synchronize! And why do you join at all? I would expect await to wait for completion of the job
m
not with Java's/Kotlin's synchronize
Why not? This seems to do the trick in the test.
Copy code
@Synchronized suspend fun connect(): T
The whole point of this is that there is this
ConnectableCoroutine
that will be accessed by many different threads simultaneously and if one of them calls the
block()
then all of them should wait for the job to be finished instead of launching their own jobs. Is there a better way to achieve that?
(you are right that join doesn't make any difference here)
u
And run blocking is single threaded. Because sleep will block that thread, you have hardly any concurrency at all. That's why case 2 is only called after the first one finished
Try replacing sleep with delay
m
I have another test which looks like this
Copy code
@Test
    fun test1() = runBlockingTest {

        coEvery { stringProvider.provideString() } returnsMany listOf("a", "b", "c")

        pauseDispatcher()

        val connectableCoroutine = ConnectableCoroutine(this) {
            delay(2000)
            stringProvider.provideString()
        }

        lateinit var result1: String
        lateinit var result2: String
        lateinit var result3: String

        launch {
            result1 = connectableCoroutine.connect()
        }

        advanceTimeBy(1000)
        launch {
            result2 = connectableCoroutine.connect()
        }

        advanceTimeBy(2000)
        launch {
            result3 = connectableCoroutine.connect()
        }

        advanceTimeBy(2500)

        assertEquals("a", result1)
        assertEquals("a", result2)
        assertEquals("b", result3)
    }
u
And ..... ?
m
and it runs without any issues
The problem is that this
ConnectableCoroutine
is used in an okhttp.Authenticator which requires the call to authenticate to be blocking. So I need to have a runBlocking call there - that's why I'm also testing with runBlocking and sleeps
u
Passes or fails?
m
passes
u
But there you can pass a dispatcher to run blocking
m
ooooooh 😄
that sounds like what I was looking for
u
Out better: Make your
block
suspending
m
I don't follow you - it is a suspend lambda
👍 1
u
And use withContext(Dispatchers.IO)
withContext(Dispatchers.IO) {Thread.sleep(2000)}
Then you can stay on your single threaded test dispatcher
But forwarding time will not work with sleep
m
I need to grab some sleep, it's been 14 hours at the keyboard now and it's getting hard to understand all of this 😄 @uli thank you so much for your help!
u
Welcome. Good night