Yoavya
05/30/2022, 1:52 PMConcurrentHashMap
with coroutines?
I see 2 potential issues, assuming that I want to compute a value computeIfAbsent
. I don’t think using getOrPut
is an option since it is not concurrent and the value may already exist:
1. If my compute function is suspending (can be runBlocking
with parallel computation to speed the process) then another coroutine (on the same thread) can run the same synchronized
code. I think that is a problem
2. If I am running on a dispatcher that runs multiple threads and my compute function is suspending again then I can find myself waking up in a different thread then the one that I started in (can that happen?). Will it break the synchronized block inside the ConcurrentHashMap
? I am not sure what happens here.
Is there anything that is coroutine safe equivalent to ConcurrentHashMap
or do you know if there is something planned to solve these issues? (if what I described are issues)Robert Williams
05/30/2022, 2:01 PMmappingFunction
to be blocking (they also recommend in the docs it should be short and simple)Robert Williams
05/30/2022, 2:02 PMRobert Williams
05/30/2022, 2:06 PMRobert Williams
05/30/2022, 2:08 PMjw
05/30/2022, 2:21 PMjw
05/30/2022, 2:22 PMjw
05/30/2022, 2:22 PMYoavya
05/30/2022, 3:14 PMYoavya
05/30/2022, 3:19 PMrunBlocking
can still be a problem since if inside it we call delay
or any other suspend function then another coroutine can still use this thread and the synchronized block inside the ConcurrentHashMap
would not block itRobert Williams
05/30/2022, 3:29 PMjw
05/30/2022, 3:31 PMRobert Williams
05/30/2022, 3:31 PMjw
05/30/2022, 3:32 PMRobert Williams
05/30/2022, 3:32 PMRobert Williams
05/30/2022, 3:33 PMjw
05/30/2022, 3:34 PMRobert Williams
05/30/2022, 3:41 PMRobert Williams
05/30/2022, 4:07 PMgetOrPut
might not be a terrible option. It's not fully atomic but does have some guarantees on a `ConcurrentMap`:
This method guarantees not to put the value into the map if the key is already there, but the defaultValue function may be invoked even if the key is already in the map.
Robert Williams
05/30/2022, 4:08 PMinline
so will work correctly from a suspend context (no runBlocking needed)Yoavya
05/30/2022, 4:09 PMjw
05/30/2022, 7:20 PMYoavya
05/31/2022, 7:17 AMrunBlocking
maybe you can explain what is happening and why it’s not behaving as expected
var count = 0
val set = mutableSetOf<Int>()
fun sus() = runBlocking { delay(100) }
@Synchronized
fun sync(i: Int) {
print("in sync $i")
if (i % 2 !in set) {
sus()
set.add(i % 2)
count++
}
print("in sync $i done")
}
fun main() {
val range = 1..10
runBlocking {
print("main")
range.map { i -> launch { sync(i) } }.joinAll()
}
println(count)
println(set)
}
fun print(msg: String) = println("${Thread.currentThread()} - $msg")
I am expecting that count would be 2 but it is 10. The output looks like this:
Thread[main,5,main] - main
Thread[main,5,main] - in sync 1
Thread[main,5,main] - in sync 2
Thread[main,5,main] - in sync 3
Thread[main,5,main] - in sync 4
Thread[main,5,main] - in sync 5
Thread[main,5,main] - in sync 6
Thread[main,5,main] - in sync 7
Thread[main,5,main] - in sync 8
Thread[main,5,main] - in sync 9
Thread[main,5,main] - in sync 10
Thread[main,5,main] - in sync 10 done
Thread[main,5,main] - in sync 9 done
Thread[main,5,main] - in sync 8 done
Thread[main,5,main] - in sync 7 done
Thread[main,5,main] - in sync 6 done
Thread[main,5,main] - in sync 5 done
Thread[main,5,main] - in sync 4 done
Thread[main,5,main] - in sync 3 done
Thread[main,5,main] - in sync 2 done
Thread[main,5,main] - in sync 1 done
10
[0, 1]
notice that sus
is a function that can be supplied by the user and sync
can be a java 3rd party lib that uses synchronized blocks like ConcurrentHashMap
isn’t that broken?uli
05/31/2022, 8:54 AMThis function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.
uli
05/31/2022, 8:54 AMYoavya
05/31/2022, 8:56 AMRobert Williams
05/31/2022, 9:01 AMRobert Williams
05/31/2022, 9:01 AMYoavya
05/31/2022, 9:03 AMlaunch
inside a runBlocking
and wait for the result and it it won’t break inside a synchronized block?Yoavya
05/31/2022, 9:12 AMrunBlocking
? because then I can be called from a regular sequential function or from a launch coroutine scope. isn’t that a possible cause to many bugs when running some java libraries?Yoavya
05/31/2022, 9:25 AMRobert Williams
05/31/2022, 11:28 AM// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
Robert Williams
05/31/2022, 11:30 AMsync
and sus
will actually behave like suspend funs even though they're notRobert Williams
05/31/2022, 11:31 AMSynchronized
annotation is doing nothing because all calls are on the same thread (as in the logs)Robert Williams
05/31/2022, 11:33 AMuli
05/31/2022, 12:05 PMRobert Williams
05/31/2022, 12:14 PMRobert Williams
05/31/2022, 12:15 PMuli
05/31/2022, 12:21 PMRobert Williams
05/31/2022, 12:36 PMsus
runBlocking -> doesn't change anything
Passing Default to main
runBlocking -> code works correctly
Passing Default to both -> hangs foreverYoavya
05/31/2022, 12:37 PMpublic class TestSync {
public TestSync(Function<Integer, String> callFunc) {
this.callFunc = callFunc;
}
private final Function<Integer, String> callFunc;
public int count = 0;
public LinkedHashSet<Integer> set = new LinkedHashSet<>();
public synchronized void sync(int i) {
print("in sync " + i);
if (!set.contains(i % 2)) {
callFunc.apply(i);
set.add(i % 2);
count++;
}
print("in sync " + i + " done");
}
public void print(String msg) {
System.out.println(Thread.currentThread() + " - " + msg);
}
}
The kotlin coroutines code:
fun main() {
runBlocking(Dispatchers.Default) {
val testSync = TestSync { i ->
runBlocking {
delay(100)
"x + $i".also { println("${Thread.currentThread()} $it") }
}
}
(1..10).map { i ->
launch { testSync.sync(i) }
}.joinAll()
testSync.count.also(::println)
}
}
The output:
Thread[DefaultDispatcher-worker-2,5,main] - in sync 1
Thread[DefaultDispatcher-worker-2,5,main] x + 1
Thread[DefaultDispatcher-worker-2,5,main] - in sync 1 done
Thread[DefaultDispatcher-worker-11,5,main] - in sync 10
Thread[DefaultDispatcher-worker-11,5,main] x + 10
Thread[DefaultDispatcher-worker-11,5,main] - in sync 10 done
Thread[DefaultDispatcher-worker-10,5,main] - in sync 9
Thread[DefaultDispatcher-worker-10,5,main] - in sync 9 done
Thread[DefaultDispatcher-worker-9,5,main] - in sync 8
Thread[DefaultDispatcher-worker-9,5,main] - in sync 8 done
Thread[DefaultDispatcher-worker-8,5,main] - in sync 7
Thread[DefaultDispatcher-worker-8,5,main] - in sync 7 done
Thread[DefaultDispatcher-worker-7,5,main] - in sync 6
Thread[DefaultDispatcher-worker-7,5,main] - in sync 6 done
Thread[DefaultDispatcher-worker-6,5,main] - in sync 5
Thread[DefaultDispatcher-worker-6,5,main] - in sync 5 done
Thread[DefaultDispatcher-worker-5,5,main] - in sync 4
Thread[DefaultDispatcher-worker-5,5,main] - in sync 4 done
Thread[DefaultDispatcher-worker-4,5,main] - in sync 3
Thread[DefaultDispatcher-worker-4,5,main] - in sync 3 done
Thread[DefaultDispatcher-worker-3,5,main] - in sync 2
Thread[DefaultDispatcher-worker-3,5,main] - in sync 2 done
2
if I don’t use any dispatcher then the code behaves like before where everything was a mess,
if I use a dispatcher on the main function runBlocking or the launch inside then it works like soYoavya
05/31/2022, 12:38 PMYoavya
05/31/2022, 12:39 PMuli
05/31/2022, 12:43 PMRobert Williams
05/31/2022, 12:59 PMThe default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine. See CoroutineDispatcher for the other implementations that are provided by kotlinx.coroutines.
When CoroutineDispatcher is explicitly specified in the context, then the new coroutine runs in the context of the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another runBlocking, then this invocation uses the outer event loop.
Robert Williams
05/31/2022, 12:59 PMuli
05/31/2022, 1:02 PMThis function should not be used from a coroutine
If you do not know who calls you, don’t use it. It is safe in main and in tests.Robert Williams
05/31/2022, 2:15 PMRobert Williams
05/31/2022, 2:17 PMRobert Williams
05/31/2022, 2:20 PM