ursus
01/02/2022, 4:04 PMprivate fun SharedPreferences.onSharedPreferenceChangedFlow(): Flow<String> {
return callbackFlow {
val listener = SharedPreferences.OnSharedPreferenceChangeListener { _, key ->
trySend(key)
}
registerOnSharedPreferenceChangeListener(listener)
awaitClose { unregisterOnSharedPreferenceChangeListener(listener) }
}
}
seems to work, however I do know there is one nasty behavior of the SharedPreferences.registerOnSharedPreferenceChangeListener
, that it only keeps weak references, so the listener might get gc-ed
so I need to hard reference the listener
How would I turn it into a field? Or rather, I think I need a proper CallbackFlow subclass, not just use the builder, right?
And looking at sources, everything is private / internal apiliminal
01/03/2022, 12:04 AMAstronaut4449
01/03/2022, 6:15 AMFlorian Walther (live streaming)
01/03/2022, 3:11 PMursus
01/03/2022, 9:47 PMfarzad
01/04/2022, 9:32 AMthana
01/04/2022, 10:50 AMCoroutineScope
interface anymore. unfortunately it doesn't say anything about the WHY despite using the builder function would be more straightforward. why is it more straightforward?Norbi
01/04/2022, 12:51 PMjava.lang.reflect.Proxy
for a Kotlin interface with suspend
methods?
(I would call the proxy methods only from Kotlin code.)
Thanks.Rak
01/04/2022, 2:49 PMAslo
01/04/2022, 6:30 PMElliot Barlas
01/04/2022, 6:39 PMVictor Cardona
01/04/2022, 11:37 PMursus
01/05/2022, 4:28 AMBUFFERED
and expected for consumers to use buffer
operator?Paul Woitaschek
01/05/2022, 9:32 AMprivate suspend fun testBody(scope: CoroutineScope) {
val value = MutableStateFlow(1)
val sharingJob = Job()
val flow = value.shareIn(scope + sharingJob, SharingStarted.WhileSubscribed(replayExpirationMillis = 0), replay = 1)
check(flow.first() == 1)
value.value = 2
check(flow.first() == 2)
sharingJob.cancel()
}
@Test
fun withRunBlockingTest() = runBlockingTest {
testBody(this)
}
@Test
fun withRunTest() = runTest {
testBody(this)
}
This passes with runBlockingTest but fails with runTest.
It is possible to fix it by adding a runCurrent before calling flow.first a second time but I think thats not how it’s supposted to be and makes me very uncomfortable writing tests without having a runCurrent between every single lineFlorian Walther (live streaming)
01/05/2022, 12:25 PMSystemClock.elapsedRealtime
in a test.janvladimirmostert
01/05/2022, 8:41 PMFlorian Walther (live streaming)
01/06/2022, 12:05 PMcountDownInterval
is 0 I want to skip the loop and just delay until the time is over. The problem is that delay is inexact (that's why I do the comparison with the SystemClock time). How could I achieve a (somewhat) exact delay?janvladimirmostert
01/06/2022, 10:42 PMclass FileFlow(private val uri: String) {
private data class FileBaton(
val fileChannel: AsynchronousFileChannel,
val flow: ProducerScope<ByteArray>,
val buffer: ByteBuffer,
val position: Long,
val handleError: (message: String, e: Throwable?) -> Unit = { message, e ->
flow.cancel(message, e)
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
fileChannel.close()
}
}
)
fun read(bufferSize: DataSize): Flow<ByteArray> = callbackFlow {
val path = Path.of(uri)
val baton = FileBaton(
buffer = ByteBuffer.allocate(bufferSize.B),
fileChannel = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
AsynchronousFileChannel.open(path, StandardOpenOption.READ)
},
flow = this,
position = 0,
)
baton.fileChannel.read(
baton.buffer,
baton.position,
baton,
object : CompletionHandler<Int, FileBaton> {
override fun completed(read: Int, baton: FileBaton) {
if (read > 0) {
trySendBlocking(
baton.buffer.array().sliceArray(0 until read)
).onFailure { e ->
baton.handleError(e?.message ?: "", e)
}
baton.buffer.rewind()
baton.fileChannel.read(
baton.buffer,
baton.position + read,
baton.copy(
position = baton.position + read
),
this
)
} else {
baton.flow.channel.close()
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
baton.fileChannel.close()
}
}
}
override fun failed(e: Throwable?, baton: FileBaton) {
baton.handleError(e?.message ?: "", e)
}
}
)
awaitClose {
}
}
}
FileFlow("/home/.../blah.txt").read(250.kB).collect {
print(String(it))
}
Chris Fillmore
01/07/2022, 12:05 PMStateFlow<Job>
, so that connection lifecycles can be observed, cancelled, reconnected, etc. I have an Android app which connects to several websocket endpoints on different protocols, including plain websockets, socketio, action cable, graphql subscriptions, so being able to define a common pattern like this would be useful to me. More in 🧵Trevor Stone
01/07/2022, 7:09 PM.stateIn(
CoroutineScope(Dispatchers.Default),
SharingStarted.WhileSubscribed(),
null
)
and getting Cannot start an undispatched coroutine in another thread DefaultDispatcher from current MainThread
as an error from runTestrkechols
01/08/2022, 10:50 AMGroup
object, which has some normal data to display (e.g. id
), but also has a list of User
objects (which each have their own data to display)
I'd of course like my database to have group data separated from the User
data, since `User`s can exist without a group, with multiple groups, etc. So, each group in the db will simply list the relevant User
IDs; I call this a ShallowGroup
I know how to get a flow to observe a single User
, or how to get a flow to observe a single ShallowGroup
but I can't figure out how to get from those a flow that produces full Group
objects that reflect the changes to the inner User
objects.
My best understanding of how it would work:
1. Get a flow of ShallowGroup
objects.
2. In a map
call on that flow, use the user IDs in that ShallowGroup
snapshot to get a flow for each User
, and:
3. use combine
on the previous list of flows of Users
to get a flow which produces `List<User>`/`Group`
4. ?? somehow flatten the Flow<Flow<Group>>
to just a Flow<Group>
??
Note that the contents of each ShallowGroup
object received determine which User
objects need to be observed.
Can someone point me to what the best way is to handle flows for nested objects like this?iamthevoid
01/09/2022, 9:32 AMNorbi
01/09/2022, 9:36 AMStylianos Gakis
01/09/2022, 4:19 PMlifecycleOwner.repeatOnLifecycle(Lifecycle.State.RESUMED) {
launch {
val callback = LifecycleEventObserver { _: LifecycleOwner, event: Lifecycle.Event ->
if (event == Lifecycle.Event.ON_PAUSE) {
// do something on pause
}
}
lifecycleOwner.lifecycle.addObserver(callback)
try {
awaitCancellation()
} finally {
lifecycleOwner.lifecycle.removeObserver(callback)
}
}
}
But this try/finally
with the awaitCancellation()
combination feels a bit weird. Is there a nicer way to do this in general?dimsuz
01/09/2022, 7:09 PMval externalFlow = flowOf(1,3,4)
val myFlow = flow<Int> {
coroutineScope { externalFlow.collect { this@flow.emit(it + 8) } }
emit(1)
emit(99)
}
I've used coroutineScope
, but I'm worried that it won't be cancelled when myFlow
collection is cancelled or interrupted.
It's just that I've found internal flowScope
builder in coroutines.core
and it says:
This builder is similar to coroutineScope with the only exception that it ties lifecycle of children and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
Can I achieve similar effect for my case? Or using coroutineScope
like I did is ok?Stylianos Gakis
01/09/2022, 9:56 PMsuspendCancellableCoroutine
over suspendCoroutine
. If I got a use case where all I need to do is something like:
suspend fun MediaPlayer.seekToPercent(
@FloatRange(from = 0.0, to = 1.0) percentage: Float,
) = suspendCoroutine<Unit> { cont ->
val callback = MediaPlayer.OnSeekCompleteListener {
this.setOnSeekCompleteListener(null)
cont.resume(Unit)
}
setOnSeekCompleteListener(callback)
val positionToSeekTo = (duration.toFloat() * percentage).toInt()
seekTo(positionToSeekTo)
}
Is there just no reason to use suspendCancellableCoroutine
?
Basically I wonder if there are any other implications regarding the cancellability of suspendCoroutine
and if it behaves in any different way to suspendCancellableCoroutine
, or all that they do differently is that in a suspendCancellableCoroutine
we have access to more than just resume(value: T)
and resumeWithException(exception: Throwable)
?
Reading the documentation doesn’t fill all the gaps in my headEli
01/09/2022, 10:49 PMlaunch
or async
on the calling function. I'm trying to implement this as close to the example as I can, but no matter what I do, it doesn't seem to be executing the items in the flow concurrently, but only sequentially. Here's a simplified code snippet of what I'm attempting:
enum class Ordinal(val delay: Long) {
FIRST(100),
SECOND(200),
THIRD(300),
FOURTH(400),
FIFTH(500),
SIXTH(600),
SEVENTH(700)
}
private fun ordinalFlow() = flow {
for (ordinal in Ordinal.values()) {
delay(ordinal.delay)
emit(ordinal)
}
}
@Test
fun asyncBufferFlowTest() {
val blockTime = measureTimeMillis {
runBlocking {
val timer = measureTimeMillis {
ordinalFlow()
.buffer()
.collect { println("Collected: $it") }
}
println("Done in $timer")
}
}
println("Block time in $blockTime ms")
}
Guilherme Almeida
01/10/2022, 11:50 AMlimitedParallelism
slicing I am not sure I fully understand how it works. Looking at the example below I am launching Access #1
and a second later Access #2
.
Using the accessDispatcher
with limited parallelism of 1 I would think the Access #2
could only start after the Access #1
was finished. So I expected the prints to go 1,2,3 but they go 1,3,2 instead. Could anyone point out why this is the actual behaviour? I know this can be solved with something like a Mutex, but I am trying to understand the behaviour of these new sliced dispatchers 😄
private val mainScope = MainScope()
private val accessDispatcher = Dispatchers.IO.limitedParallelism(1)
public fun foo() {
mainScope.launch {
mainScope.launch(accessDispatcher + CoroutineName("Access #1")) {
println("Step 1")
delay(5.seconds)
println("Step 2")
}
delay(1.seconds)
mainScope.launch(accessDispatcher + CoroutineName("Access #2")) {
println("Step 3")
}
}
}
Florian Walther (live streaming)
01/10/2022, 1:15 PMadvanceTimeBy
? For example, I'm using a fake timesource so the actual advanced time doesn't matter.Chris Fillmore
01/10/2022, 2:47 PMJob
state changes? (i.e. going from New -> Active -> Cancelled/Completed)Chris Fillmore
01/10/2022, 2:47 PMJob
state changes? (i.e. going from New -> Active -> Cancelled/Completed)Joffrey
01/10/2022, 2:49 PMNew -> Active
transitionChris Fillmore
01/10/2022, 2:50 PMJoffrey
01/10/2022, 2:52 PMChris Fillmore
01/10/2022, 2:52 PMinvokeOnStart
Joffrey
01/10/2022, 2:56 PMJob
API for this, though? It looks like your use case calls for a custom interfaceChris Fillmore
01/10/2022, 2:59 PMgildor
01/10/2022, 3:45 PMChris Fillmore
01/10/2022, 3:46 PM.join()
will also start the jobZach Klippenstein (he/him) [MOD]
01/10/2022, 8:47 PMChris Fillmore
01/10/2022, 9:11 PMYou’re trying to model a separate lifecycle without a coroutine job?No. To give a concrete example, I already have this implemented for Apollo subscriptions. It looks roughly like:
class ApolloSubscription(
private val client: ApolloClient,
private val coroutineScope: CoroutineScope,
) {
private val _job = MutableStateFlow(subscriptionJob())
val job = _job.asStateFlow()
private var subscriptionCall: ApolloSubscriptionCall? = null
init {
job
.onEach {
it.invokeOnCompletion {
subscriptionCall?.cancel()
subscriptionCall = null
if (/* should reconnect */) {
_job.value = subscriptionJob()
}
}
}
.launchIn(coroutineScope)
}
private fun subscriptionJob() = coroutineScope.launch(start = CoroutineStart.LAZY) {
suspendCancellableCoroutine {
subscriptionCall = client.subscribe(...).apply {
execute(object: ApolloSubscriptionCall.Callback {
// Callback methods here that handle connection/websocket events,
// and resume the continuation on completion/failure
})
}
}
}
}
val subscription = ApolloSubscription(...)
// Connect to the websocket
subscription.job.value.start()
// OR
subscription.job
.onEach {
it.start()
}
.launchIn(myClientScope)
Job
is just meant to model, at a high level, whether or not there is an open connection, and its state. This seems in line with the definition for Job
, from the docs
Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
Zach Klippenstein (he/him) [MOD]
01/10/2022, 9:18 PMtheThat was my point, that a coroutine job shouldn’t model anything other than a coroutine jobis just meant to modelJob
Chris Fillmore
01/10/2022, 9:22 PMZach Klippenstein (he/him) [MOD]
01/10/2022, 9:28 PMJob
type, even if that ends up just wrapping a coroutine job internally, to give yourself room to maneuver in the future.Chris Fillmore
01/10/2022, 9:28 PMJob
is not stable for inheritance, according to the docs. That’s not something I want to pursue.Zach Klippenstein (he/him) [MOD]
01/10/2022, 9:29 PMApolloSubscription
is responsible for creating, cancelling, and recreating subscription calls, but not starting them. Why the asymmetry?Chris Fillmore
01/10/2022, 9:31 PMStateFlow<Job>
just allows client code to observe an open connection as it changes state, reconnects, etc. This seems unremarkable to me.
My original ask was whether I could observe a Job going from New -> Active state.Zach Klippenstein (he/him) [MOD]
01/10/2022, 9:33 PMChris Fillmore
01/10/2022, 9:33 PMZach Klippenstein (he/him) [MOD]
01/10/2022, 9:42 PMApolloSubscription
uses coroutines to manage calls, and how it does so, is an implementation detail of that class) – this is a pretty well-documented design best practice.
But this whole discussion is moot anyway: If you need to observe when a coroutine job is started, i don’t think there are any hooks for that. You’ll have to do that some other way.Chris Fillmore
01/10/2022, 9:57 PMinterface StatefulConnection {
fun connect()
fun disconnect()
val state: StateFlow<State>
sealed interface State {
object Initializing
object Connecting
object Connected
... others ...
}
}
I find this creates a lot of intermediate state and is a risk for bugs. What I really want to know is if the connection is still alive or if there was an error, and I want to be able to cancel the Job. Tracking the Job that’s running the socket connection does this just fine. It doesn’t feel like shoehorning at all. (I’ve done plenty of shoehorning before… this doesn’t feel like it, though I could be wrong.)// Working from memory here...
suspend fun HttpClient.wss(request: HttpRequest, block: suspend CoroutineScope.() -> Unit)
block
, which I found I liked.launch { /* entire lifetime of your connection is contained here */ }