Michal Klimczak
02/23/2021, 7:12 PMMichal Klimczak
02/23/2021, 7:13 PMZach Klippenstein (he/him) [MOD]
02/23/2021, 7:13 PMjoin()
on any Job
instance (whether or not those coroutines are running on different threads)Michal Klimczak
02/23/2021, 7:24 PMwasyl
02/23/2021, 7:27 PMMichal Klimczak
02/23/2021, 7:30 PMuli
02/23/2021, 8:44 PMMichal Klimczak
02/23/2021, 9:04 PM/**
* If it doesn't run, it is started.
* If it runs, multiple observers might suspend waiting for the result.
*/
class ConnectableCoroutine<T>(
private val scope: CoroutineScope,
private val block: suspend () -> T
) {
private var deferred: Deferred<T>? = null
suspend fun connect(): T {
println("CC: Started with deferred: $deferred at ${Thread.currentThread()}")
if (deferred == null || deferred!!.isCompleted) {
deferred = scope.async {
println("CC: Perform at ${Thread.currentThread()}")
block()
}
}
deferred!!.join()
return deferred!!.await()
}
}
And it runs fine when used the "regular" way, i.e. in suspended functions. But it seems to be messed up when used in non-coroutine contexts with`runBlocking` . Or my rough test might be totally wrong, but I don't really have an idea for another (I know the sleeps are bad, but right now I'm trying to simulate what okhttp does under the hood). The test that fails goes like this:
@Test
fun test2() = runBlockingTest {
coEvery { stringProvider.provideString() } returnsMany listOf("a", "b", "c")
val connectableCoroutine = ConnectableCoroutine(this) {
Thread.sleep(1000)
stringProvider.provideString()
}
lateinit var result1: String
lateinit var result2: String
lateinit var result3: String
thread(name = "thread1") {
result1 = runBlocking { connectableCoroutine.connect() }
}
Thread.sleep(200) //let's make sure it's not a race condition
thread(name = "thread2") {
result2 = runBlocking { connectableCoroutine.connect() }
}
Thread.sleep(2000)
thread(name = "thread3") {
result3 = runBlocking { connectableCoroutine.connect() }
}
Thread.sleep(1500)
assertEquals("a", result1)
assertEquals("a", result2)
assertEquals("b", result3)
}
result2 =b
, but it should be a
, since I connected within the duration of the first job.Michal Klimczak
02/23/2021, 9:09 PMConnectableCoroutine(CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)) {
Then the job is performed on the right dispatcher (IO). When I leave the ConnectableCoroutine(this)
, it uses the thread1
. I guess it's unconfined
for TestCoroutineDispatcher or sth like this?uli
02/23/2021, 9:11 PMdeferred
concurrently from multiple threads. For example two threads might see deferred
to be null. Then both would start a new coroutine and overwrite each other's jobsMichal Klimczak
02/23/2021, 9:14 PMuli
02/23/2021, 9:17 PMMichal Klimczak
02/23/2021, 9:21 PMnot with Java's/Kotlin's synchronizeWhy not? This seems to do the trick in the test.
@Synchronized suspend fun connect(): T
The whole point of this is that there is this ConnectableCoroutine
that will be accessed by many different threads simultaneously and if one of them calls the block()
then all of them should wait for the job to be finished instead of launching their own jobs. Is there a better way to achieve that?Michal Klimczak
02/23/2021, 9:22 PMuli
02/23/2021, 9:26 PMuli
02/23/2021, 9:26 PMMichal Klimczak
02/23/2021, 9:28 PM@Test
fun test1() = runBlockingTest {
coEvery { stringProvider.provideString() } returnsMany listOf("a", "b", "c")
pauseDispatcher()
val connectableCoroutine = ConnectableCoroutine(this) {
delay(2000)
stringProvider.provideString()
}
lateinit var result1: String
lateinit var result2: String
lateinit var result3: String
launch {
result1 = connectableCoroutine.connect()
}
advanceTimeBy(1000)
launch {
result2 = connectableCoroutine.connect()
}
advanceTimeBy(2000)
launch {
result3 = connectableCoroutine.connect()
}
advanceTimeBy(2500)
assertEquals("a", result1)
assertEquals("a", result2)
assertEquals("b", result3)
}
uli
02/23/2021, 9:30 PMMichal Klimczak
02/23/2021, 9:30 PMMichal Klimczak
02/23/2021, 9:30 PMConnectableCoroutine
is used in an okhttp.Authenticator which requires the call to authenticate to be blocking. So I need to have a runBlocking call there - that's why I'm also testing with runBlocking and sleepsuli
02/23/2021, 9:30 PMMichal Klimczak
02/23/2021, 9:30 PMuli
02/23/2021, 9:31 PMMichal Klimczak
02/23/2021, 9:32 PMMichal Klimczak
02/23/2021, 9:32 PMuli
02/23/2021, 9:34 PMblock
suspendingMichal Klimczak
02/23/2021, 9:36 PMuli
02/23/2021, 9:36 PMuli
02/23/2021, 9:38 PMuli
02/23/2021, 9:39 PMuli
02/23/2021, 9:39 PMMichal Klimczak
02/23/2021, 9:43 PMuli
02/23/2021, 9:59 PM