https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

Michal Klimczak

02/23/2021, 7:12 PM
Can multiple different threads 'join' the same Job? I have a library which expects blocking calls to some 'authenticate' method. This method will then be called from multiple threads and invoke a 'refreshToken' suspend function (which has to be invoked in runBlocking because 'authenticate' is expected to be blocking) but if some of these calls are called simultaneously I only want the 'refreshToken' to be called once for all of of them. My idea is to create a Job when the first one hits and them all the other could join this Job if it still exists. Is it a good approach or is there a better way?
(Sorry, writing this from my mobile)
z

Zach Klippenstein (he/him) [MOD]

02/23/2021, 7:13 PM
Yep, any coroutine can call
join()
on any
Job
instance (whether or not those coroutines are running on different threads)
❤️ 1
m

Michal Klimczak

02/23/2021, 7:24 PM
And runBlocking is still an "any coroutine" right?
w

wasyl

02/23/2021, 7:27 PM
Yes 🙂
☝️ 1
❤️ 1
m

Michal Klimczak

02/23/2021, 7:30 PM
Thank you guys!
u

uli

02/23/2021, 8:44 PM
Just make sure your detection, wether a refresh is already running is not racey.
m

Michal Klimczak

02/23/2021, 9:04 PM
I've prepared something like this
Copy code
/**
 * If it doesn't run, it is started.
 * If it runs, multiple observers might suspend waiting for the result.
 */
class ConnectableCoroutine<T>(
    private val scope: CoroutineScope,
    private val block: suspend () -> T
) {

    private var deferred: Deferred<T>? = null

    suspend fun connect(): T {
        println("CC: Started with deferred: $deferred at ${Thread.currentThread()}")
        if (deferred == null || deferred!!.isCompleted) {
            deferred = scope.async {
                println("CC: Perform at ${Thread.currentThread()}")
                block()
            }
        }
        deferred!!.join()
        return deferred!!.await()
    }

}
And it runs fine when used the "regular" way, i.e. in suspended functions. But it seems to be messed up when used in non-coroutine contexts with`runBlocking` . Or my rough test might be totally wrong, but I don't really have an idea for another (I know the sleeps are bad, but right now I'm trying to simulate what okhttp does under the hood). The test that fails goes like this:
Copy code
@Test
    fun test2() = runBlockingTest {

        coEvery { stringProvider.provideString() } returnsMany listOf("a", "b", "c")

        val connectableCoroutine = ConnectableCoroutine(this) {
            Thread.sleep(1000)
            stringProvider.provideString()
        }

        lateinit var result1: String
        lateinit var result2: String
        lateinit var result3: String

        thread(name = "thread1") {
            result1 = runBlocking { connectableCoroutine.connect() }
        }
        Thread.sleep(200) //let's make sure it's not a race condition
        thread(name = "thread2") {
            result2 = runBlocking { connectableCoroutine.connect() }
        }
        Thread.sleep(2000)
        thread(name = "thread3") {
            result3 = runBlocking { connectableCoroutine.connect() }
        }
        Thread.sleep(1500)

        assertEquals("a", result1)
        assertEquals("a", result2)
        assertEquals("b", result3)

    }
result2 =b
, but it should be
a
, since I connected within the duration of the first job.
hmmmm, it seems to work when I don;t use the testCoroutineScope at all, but instead go with
Copy code
ConnectableCoroutine(CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)) {
Then the job is performed on the right dispatcher (IO). When I leave the
ConnectableCoroutine(this)
, it uses the
thread1
. I guess it's
unconfined
for TestCoroutineDispatcher or sth like this?
u

uli

02/23/2021, 9:11 PM
You consistently access
deferred
concurrently from multiple threads. For example two threads might see
deferred
to be null. Then both would start a new coroutine and overwrite each other's jobs
m

Michal Klimczak

02/23/2021, 9:14 PM
I should synchronize the access right?
u

uli

02/23/2021, 9:17 PM
True, but not with Java's/Kotlin's synchronize! And why do you join at all? I would expect await to wait for completion of the job
m

Michal Klimczak

02/23/2021, 9:21 PM
not with Java's/Kotlin's synchronize
Why not? This seems to do the trick in the test.
Copy code
@Synchronized suspend fun connect(): T
The whole point of this is that there is this
ConnectableCoroutine
that will be accessed by many different threads simultaneously and if one of them calls the
block()
then all of them should wait for the job to be finished instead of launching their own jobs. Is there a better way to achieve that?
(you are right that join doesn't make any difference here)
u

uli

02/23/2021, 9:26 PM
And run blocking is single threaded. Because sleep will block that thread, you have hardly any concurrency at all. That's why case 2 is only called after the first one finished
Try replacing sleep with delay
m

Michal Klimczak

02/23/2021, 9:28 PM
I have another test which looks like this
Copy code
@Test
    fun test1() = runBlockingTest {

        coEvery { stringProvider.provideString() } returnsMany listOf("a", "b", "c")

        pauseDispatcher()

        val connectableCoroutine = ConnectableCoroutine(this) {
            delay(2000)
            stringProvider.provideString()
        }

        lateinit var result1: String
        lateinit var result2: String
        lateinit var result3: String

        launch {
            result1 = connectableCoroutine.connect()
        }

        advanceTimeBy(1000)
        launch {
            result2 = connectableCoroutine.connect()
        }

        advanceTimeBy(2000)
        launch {
            result3 = connectableCoroutine.connect()
        }

        advanceTimeBy(2500)

        assertEquals("a", result1)
        assertEquals("a", result2)
        assertEquals("b", result3)
    }
u

uli

02/23/2021, 9:30 PM
And ..... ?
m

Michal Klimczak

02/23/2021, 9:30 PM
and it runs without any issues
The problem is that this
ConnectableCoroutine
is used in an okhttp.Authenticator which requires the call to authenticate to be blocking. So I need to have a runBlocking call there - that's why I'm also testing with runBlocking and sleeps
u

uli

02/23/2021, 9:30 PM
Passes or fails?
m

Michal Klimczak

02/23/2021, 9:30 PM
passes
u

uli

02/23/2021, 9:31 PM
But there you can pass a dispatcher to run blocking
m

Michal Klimczak

02/23/2021, 9:32 PM
ooooooh 😄
that sounds like what I was looking for
u

uli

02/23/2021, 9:34 PM
Out better: Make your
block
suspending
m

Michal Klimczak

02/23/2021, 9:36 PM
I don't follow you - it is a suspend lambda
👍 1
u

uli

02/23/2021, 9:36 PM
And use withContext(Dispatchers.IO)
withContext(Dispatchers.IO) {Thread.sleep(2000)}
Then you can stay on your single threaded test dispatcher
But forwarding time will not work with sleep
m

Michal Klimczak

02/23/2021, 9:43 PM
I need to grab some sleep, it's been 14 hours at the keyboard now and it's getting hard to understand all of this 😄 @uli thank you so much for your help!
u

uli

02/23/2021, 9:59 PM
Welcome. Good night
5 Views