is there a problem using `ConcurrentHashMap` with...
# coroutines
y
is there a problem using
ConcurrentHashMap
with coroutines? I see 2 potential issues, assuming that I want to compute a value
computeIfAbsent
. I don’t think using
getOrPut
is an option since it is not concurrent and the value may already exist: 1. If my compute function is suspending (can be
runBlocking
with parallel computation to speed the process) then another coroutine (on the same thread) can run the same
synchronized
code. I think that is a problem 2. If I am running on a dispatcher that runs multiple threads and my compute function is suspending again then I can find myself waking up in a different thread then the one that I started in (can that happen?). Will it break the synchronized block inside the
ConcurrentHashMap
? I am not sure what happens here. Is there anything that is coroutine safe equivalent to
ConcurrentHashMap
or do you know if there is something planned to solve these issues? (if what I described are issues)
r
computeIfAbsent knows nothing about coroutines and so requires
mappingFunction
to be blocking (they also recommend in the docs it should be short and simple)
As you suggest, you'll need to use runBlocking to get into a coroutine at all, but then the thread is blocked so there's no notion of waking up or any other code (coroutine or otherwise) running in the same thread until the coroutine finishes
For point 2, that can happen if you use an Unconfined Dispatcher, but it won't affect the synchronized block for the same reason - the thread the synchronized block is running on is fully blocked by the runBlocking
So no issues, but coroutines aren't really providing much value here because the API requires blocking
j
Wrap your values in a Deferred
The compute if absent creates the async coroutine which is fast and then everyone calls await on it
And either they wait for the async operation or it's available already
y
The deferred thing actually sounds like it would work. Thanks
@Robert Williams the
runBlocking
can still be a problem since if inside it we call
delay
or any other suspend function then another coroutine can still use this thread and the synchronized block inside the
ConcurrentHashMap
would not block it
r
runBlocking blocks the thread, doesn't matter if the coroutines are suspended inside: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
j
If the map is called from a coroutine you don't want to call runBlocking in it
r
Deffered thing could work really well, but note that you'll need to suspend for every Map.get() which might make sense or might be a huge pain depending on what else you use the Map for
j
If the value is already available it will not suspend
r
But you still need to be in a suspend fun to be able to call await
Depends on your whole system design how much of a problem that is
j
I see, yes
r
This also means you couldn't make a generic Map<T> implementation which uses Map<Deffered<T>>> internally 😞
Actually, depending on how expensive your map function is
getOrPut
might not be a terrible option. It's not fully atomic but does have some guarantees on a `ConcurrentMap`:
Copy code
This method guarantees not to put the value into the map if the key is already there, but the defaultValue function may be invoked even if the key is already in the map.
But most importantly it's
inline
so will work correctly from a suspend context (no runBlocking needed)
y
Right sounds good, Thanks you @Robert Williams
j
It's just a containsKey+putIfAbsent
y
I wrote a small example that exhibits a strange behavior with
runBlocking
maybe you can explain what is happening and why it’s not behaving as expected
Copy code
var count = 0
val set = mutableSetOf<Int>()
fun sus() = runBlocking { delay(100) }

@Synchronized
fun sync(i: Int) {
    print("in sync $i")
    if (i % 2 !in set) {
        sus()
        set.add(i % 2)
        count++
    }
    print("in sync $i done")
}

fun main() {
    val range = 1..10
    runBlocking {
        print("main")
        range.map { i -> launch { sync(i) } }.joinAll()
    }
    println(count)
    println(set)
}

fun print(msg: String) = println("${Thread.currentThread()} - $msg")
I am expecting that count would be 2 but it is 10. The output looks like this:
Copy code
Thread[main,5,main] - main
Thread[main,5,main] - in sync 1
Thread[main,5,main] - in sync 2
Thread[main,5,main] - in sync 3
Thread[main,5,main] - in sync 4
Thread[main,5,main] - in sync 5
Thread[main,5,main] - in sync 6
Thread[main,5,main] - in sync 7
Thread[main,5,main] - in sync 8
Thread[main,5,main] - in sync 9
Thread[main,5,main] - in sync 10
Thread[main,5,main] - in sync 10 done
Thread[main,5,main] - in sync 9 done
Thread[main,5,main] - in sync 8 done
Thread[main,5,main] - in sync 7 done
Thread[main,5,main] - in sync 6 done
Thread[main,5,main] - in sync 5 done
Thread[main,5,main] - in sync 4 done
Thread[main,5,main] - in sync 3 done
Thread[main,5,main] - in sync 2 done
Thread[main,5,main] - in sync 1 done
10
[0, 1]
notice that
sus
is a function that can be supplied by the user and
sync
can be a java 3rd party lib that uses synchronized blocks like
ConcurrentHashMap
isn’t that broken?
u
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
Copy code
This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.
the problem is the launch -> runBlocking
y
I see
r
I always assumed this was a "should not" as in you shouldn't block when you're in a coroutine
If it's actually broken it should say "must not"
y
can I call a
launch
inside a
runBlocking
and wait for the result and it it won’t break inside a synchronized block?
another question is if I don’t know who called me then I shouldn’t use
runBlocking
? because then I can be called from a regular sequential function or from a launch coroutine scope. isn’t that a possible cause to many bugs when running some java libraries?
if I start from a coroutine scope (in Ktor for example) then I can never use runBlocking to pass a function to java library because it can break the same way the example above breaks
r
Still a bit confused by this but it does look like runBlocking will piggyback off the parent's coroutineContext if started from inside a coroutine
Copy code
// See if context's interceptor is an event loop that we shall use (to support TestContext)
        // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
Which I think in your example means that
sync
and
sus
will actually behave like suspend funs even though they're not
Definitely the
Synchronized
annotation is doing nothing because all calls are on the same thread (as in the logs)
If sync were written in Java I don't think this would happen because the coroutineContext wouldn't be automatically propagated (but I may be completely wrong on this 🤷 )
u
I guess you are wrong 🙂 I don’t think Kotlin will commpile any coroutines support into non-suspend functions. You can let Android Studio decompile the code for you (->show bytecode -> decompile) to check. fun sync should decompile down to a no thrills java method.
r
I definitely want to be wrong but I'm otherwise struggling to explain what's going on in the example code
Even before you get into the runBlocking there's multiple instances of sync running at the same time on the same thread
u
runBlocking without dispatcher will use the blocked thread as dispatcher. this is why all coroutines run on the same thread. Try passing the default dispatcher to `fun sus`’s runBlocking. As posted above, it might still missbehave, but at least you won’t be on the same thread
r
Passing Default to
sus
runBlocking -> doesn't change anything Passing Default to
main
runBlocking -> code works correctly Passing Default to both -> hangs forever
y
looks like there might be an issue with the main thread dispatcher since when I use a different dispatcher somewhere along that flow it starts to behave as expected. I modified the code a little to represent a java lib: the java lib code
Copy code
public class TestSync {
    public TestSync(Function<Integer, String> callFunc) {
        this.callFunc = callFunc;
    }

    private final Function<Integer, String> callFunc;
    public int count = 0;
    public LinkedHashSet<Integer> set = new LinkedHashSet<>();
    public synchronized void sync(int i) {
        print("in sync " + i);
        if (!set.contains(i % 2)) {
            callFunc.apply(i);
            set.add(i % 2);
            count++;
        }
        print("in sync " + i + " done");
    }

    public void print(String msg) {
        System.out.println(Thread.currentThread() + " - " + msg);
    }
}
The kotlin coroutines code:
Copy code
fun main() {
    runBlocking(Dispatchers.Default) {
        val testSync = TestSync { i ->
            runBlocking {
                delay(100)
                "x + $i".also { println("${Thread.currentThread()}  $it") }
            }
        }
        (1..10).map { i ->
            launch { testSync.sync(i) }
        }.joinAll()
        testSync.count.also(::println)
    }
}
The output:
Copy code
Thread[DefaultDispatcher-worker-2,5,main] - in sync 1
Thread[DefaultDispatcher-worker-2,5,main]  x + 1
Thread[DefaultDispatcher-worker-2,5,main] - in sync 1 done
Thread[DefaultDispatcher-worker-11,5,main] - in sync 10
Thread[DefaultDispatcher-worker-11,5,main]  x + 10
Thread[DefaultDispatcher-worker-11,5,main] - in sync 10 done
Thread[DefaultDispatcher-worker-10,5,main] - in sync 9
Thread[DefaultDispatcher-worker-10,5,main] - in sync 9 done
Thread[DefaultDispatcher-worker-9,5,main] - in sync 8
Thread[DefaultDispatcher-worker-9,5,main] - in sync 8 done
Thread[DefaultDispatcher-worker-8,5,main] - in sync 7
Thread[DefaultDispatcher-worker-8,5,main] - in sync 7 done
Thread[DefaultDispatcher-worker-7,5,main] - in sync 6
Thread[DefaultDispatcher-worker-7,5,main] - in sync 6 done
Thread[DefaultDispatcher-worker-6,5,main] - in sync 5
Thread[DefaultDispatcher-worker-6,5,main] - in sync 5 done
Thread[DefaultDispatcher-worker-5,5,main] - in sync 4
Thread[DefaultDispatcher-worker-5,5,main] - in sync 4 done
Thread[DefaultDispatcher-worker-4,5,main] - in sync 3
Thread[DefaultDispatcher-worker-4,5,main] - in sync 3 done
Thread[DefaultDispatcher-worker-3,5,main] - in sync 2
Thread[DefaultDispatcher-worker-3,5,main] - in sync 2 done
2
if I don’t use any dispatcher then the code behaves like before where everything was a mess, if I use a dispatcher on the main function runBlocking or the launch inside then it works like so
I can even use a single thread dispatcher and it will still work as expected
so the main thread dispatcher behaves differently and it’s unclear to me why
u
learning: don’t do it 😉 All the behaviour you find out experimentally will be undocumented and can change at any time. All we know is the quote above from the docs.
r
Actually there's a lot of documentation on runBlocking without an explicit Dispatcher:
Copy code
The default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine. See CoroutineDispatcher for the other implementations that are provided by kotlinx.coroutines.
When CoroutineDispatcher is explicitly specified in the context, then the new coroutine runs in the context of the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another runBlocking, then this invocation uses the outer event loop.
👍 1
It's definitely a valid use case so "don't do it" isn't really an explanation of what's going on
u
As stated explicitly, it is not valid to use it within a coroutine:
Copy code
This function should not be used from a coroutine
If you do not know who calls you, don’t use it. It is safe in main and in tests.
r
As above, "should" here means bad practice rather than must not do (kotlin philosophy is to prefer compiler errors for must nots)
The point about the outer event loop is specifically to support nested runBlocking without deadlocks: https://github.com/Kotlin/kotlinx.coroutines/issues/860
I think the real issue here is just that classical synchronisation doesn't work so well with coroutines running on the same thread (see here and here)
Which might be a problem but I guess in your real example you won't be starting all the modification requests as coroutines on the same thread
1802 Views