I am going crazy about concurrency in Kotlin. What...
# kotlin-native
j
I am going crazy about concurrency in Kotlin. What is the best practice of every single Collection type if need concurrency / thread safe mutable collections? Like Map, List etc. I dont want to use Java concurrency lists ideally, if there is any better ways of solving stuff. I think I have a nasty bug from CopyOnWriteArrayList not cleanup some references, because it clone each time.
k
What is your use case?
e
you have posted in the Kotlin/Native channel, where the Java classpath doesn't exist so CopyOnWriteArrayList isn't available anyway
j
Having a List of elements contains anonymous lambdas of bluetooth callback in Android (ScanCallback) where I think each lambda has references to stuff causing some bugs. Anyway I want to know the best way of doing this Kotlin Native if there is any good way :=
k
IF this is for Android why exactly are you using Kotlin native? Are you using the Android NDK?
j
Also I want to know in general what is best practice to use in Kotlin for every single usecase. Cant find good documentation whats available and what I should use for every single scenario. Feels like I need to refresh the best ways of doing stuff.
Wait isnt Kotlin native Kotlin questions, I am confused. Is Kotlin Native something else? 😄
If so apologize 😄
Which channel should I ask in?
e
what is Kotlin/Native: there is a link in the channel description
k
Kotlin native is for kotlin development that isn’t based on the JVM or web; it’s for LLVM based native application development.
j
To many Kotlin stuff Mobile, multiplatform, Native, Wasm and what not 😄
Which channel is the correct one?
k
It depends on if you’re using coroutines or not. If not, then you’re not asking in the correct workspace altogether. You’d have the same bug in Java with shared mutable state. I suspect if you ask this question anyway and it only has to deal with Java-based concurrency APIs nobody will answer the question and you’ll get a not kotlin but kotlin colored reaction. If you are using coroutines then you should ask in #coroutines
j
I am using coroutines yes, but probably the wrong way 😄
k
Generally speaking, if I were in your shoes, I’d lean on
Mutex
or
StateFlow
to ensure atomic operations of a mutable list that can be accessed from multiple coroutines.
j
For this particular case the method itself is not coroutines however. Want to have atomic safe way of mutating collections, and want to be sure whats the best way of doing this in Kotlin, regardless if JVM, coroutines or not, vanilla Kotlin 🙂
kotlinx collections not offering this?
e
kotlin.collections and kotlinx.collections.immutable are two different things. neither offers thread safety
although immutable collections are safe to read from multiple threads, the builders for constructing them aren't
j
It doesnt even have to be a list actually, can be Map if that offers me better possiblities of bullet proof Kotlin APIs doing it thread safe. This is very generic question I know, but seems the regular stuff been using with Flow, Mutex, Semaphores or Concurrent* from Java or such not helping here.
t
I’m not an android person, but you could consider using something like this: https://github.com/sksamuel/aedile. A coroutine wrapper on top of caffeine.
j
Also the list or collection itself should not mutating/cloning itself into having non clean up references, which is my initial bug I think.
e
that's more of a cache replacement, not the same thing
but in most cases, you should structure your application such that if there is only one actor (one thread or one coroutine) that has direct access to the data structure
j
Is this the one to use, as Kotlin doesnt seem to offer me this?
Copy code
val items = Collections.synchronizedList(listOf<MyItem>())
@ephemient Yeah I wish Android SDK was composition pattern driven, then I could be sure everything atomic and safe. But need to combine 🙂
e
no, that uses Java classes to bypass Kotlin's mutable/non-modifiable collection types
j
The ScanCallback is shared in both my list items and Googles code in Android bluetooth scanner which I work with at the moment.
"but in most cases, you should structure your application such that if there is only one actor (one thread or one coroutine) that has direct access to the data structure" Yes the list itself only exists in one place, but having thread issues within that one because how Scanner in Android works with callbacks 😄
I need to keep the reference to a lambda to be able to stop the callback later on, and this is the core issue I think. I probably have an implementation from the past not optimized, which is what I am trying to fix.
r
You just don't try to modify the same data structure in the callback as well as in your internal code. If you are doing Bluetooth, consider using an abstraction like https://github.com/JuulLabs/kable which will abstract away such details somewhat. You'll get a native Kotlin Flow instead of using the callback directly i.e. the callback is not visible to you so you can't do the wrong thing 🙂
j
@rocketraman Yes too late replace the entire implementation with Kable, but yeah looked at it, tempting to refactoring 😄 Would most probably being easier have all as Flows for sure, to omit this problem. Using callbackFlow.
However Kable doesnt offer the reconnection of existing devices, like Companion Device Manager and those kind of things. It only helps me with one single connection. Doing x amount of parallell bluetooth devices is the core issue for sure, some stuff can only happening once and some can be parallelized. Using a lot of Mutex already to deal with these kind of issues.
But yeah I would hope androidx bluetooth and kotlinx collections would help us with these kind of problems in future 😄
r
A callbackFlow is exactly intended for this kind of use case. I would venture to say if there are a bunch of mutexes and stuff for this use case, that code needs some refactoring. Kable can deal with multiple devices -- but yeah, the reconnection logic is at the app level. Shouldn't need mutexes for that though -- I've done this recently with the devices being aggregated into a
StateFlow<List<MyDevice>>
, and that state, or some derivation of it, is used for whatever logic you need.
To give a quick example, you can combine that state with some kind of clock flow to do periodic checks for reconnection.
e
which goes back to another common pattern: figure out the desired state machine rather than constructing one ad-hoc with scattered synchronization primitives
k
I agree. I very rarely use mutex and question when it's used in code review
j
The problem I need one callbackFlow for every single Bluetooth device run in a parent supervisor or such keep track all of them, and being able to cancel them to trigger stop scan into Android SDK, and also reconnecting them. In Kable https://github.com/JuulLabs/kable/blob/main/core/src/androidMain/kotlin/BluetoothLeScannerAndroidScanner.kt looks like creating one scanner per advertisement, but not sure, havent checked how it would look like in reconnection flows yet. Want the reconnection flows happening in different ways from case to case, in some cases regular start, in some cases stop, some cases stop.+ start etc. And all of them should work in parallell, hence my thread issue. I agree with all of above, but I cant do magic and make all written code disappear 😄
But I guess refactoring everything to using only Flow / Corutines is the only thing Kotlin offer me to solve this.
For a quick fix, would it make sense wrap the list inside a MutableStateFlow? Like MutableStateFlow<List<MyItem>>, copy the list and adding one element or remove each time?
k
I mean the way I would approach this from what you’ve described is something like the following:
Copy code
fun BluetoothThingy.connectionFlow(deviceThingy: DeviceThingy, connectionStatus: Flow<Boolean>): Flow<Device> {
  connectionStatus.distinctUntilChanged().flatMapLatest { connected -> 
    if (connected) connectionFlow(deviceThingy) else emptyFlow()
  }
}

private fun BluetoothThingy.connectionFlow(deviceThingy: DeviceThingy) = callbackFlow {
  val callback = /* register the Android SDK bluetooth callback */

  awaitClose { unregister(callback) } 
}
And then compose that for
n + 1
devices
The above would cancel listeners and re-register them when connection status flips from on-to-off and off-to-on respectively.
You should try out the state flow. It might work as a bandaid if you don’t have time to refactor this.
j
"And then compose that for
n + 1
devices" yeah this is the complexity, how is the best way of compose all devices into one flow, but be able to orchestrate each indivudal devide as well?
k
I mean each device could potentially take a different implementation of
connectionStatus: Flow<Boolean>
and then you could combine all of those flows into one stream
There’s lots of utility functions for doing exactly what you need
j
I need to be able to cancel each individual Flow 😄
But yeah thanks for input, will see whats working here. Apprecatiate it 🙂
r
I'd go the state flow way -- flows for each device can compose but at the end of the day you are affecting some kind of state machine for each device, and you need to keep track of all the state machines. This is your MutableStateFlow. The
update
function lets you update it atomically.
With Kable, you would call update from the peripheral state tracking flow, and from the advertisements flow.
j
Yeah I am usuaully doing the StateFlow.update like:
Copy code
scanningRequests.update {
            it + ScanRequest(
                config = config,
                scanCallback = scanCallback
            )
        }
This is the way I usually prefer, but cant control how all code is written 🙂
So yeah probably a list of Flows I can cancel, and cancel using stopScan in my case from each callbackFlow 🙂