ursus
01/02/2022, 4:04 PMprivate fun SharedPreferences.onSharedPreferenceChangedFlow(): Flow<String> {
return callbackFlow {
val listener = SharedPreferences.OnSharedPreferenceChangeListener { _, key ->
trySend(key)
}
registerOnSharedPreferenceChangeListener(listener)
awaitClose { unregisterOnSharedPreferenceChangeListener(listener) }
}
}
seems to work, however I do know there is one nasty behavior of the SharedPreferences.registerOnSharedPreferenceChangeListener
, that it only keeps weak references, so the listener might get gc-ed
so I need to hard reference the listener
How would I turn it into a field? Or rather, I think I need a proper CallbackFlow subclass, not just use the builder, right?
And looking at sources, everything is private / internal apiliminal
01/03/2022, 12:04 AMAstronaut4449
01/03/2022, 6:15 AMFlorian Walther (live streaming)
01/03/2022, 3:11 PMursus
01/03/2022, 9:47 PMfarzad
01/04/2022, 9:32 AMthana
01/04/2022, 10:50 AMCoroutineScope
interface anymore. unfortunately it doesn't say anything about the WHY despite using the builder function would be more straightforward. why is it more straightforward?Norbi
01/04/2022, 12:51 PMjava.lang.reflect.Proxy
for a Kotlin interface with suspend
methods?
(I would call the proxy methods only from Kotlin code.)
Thanks.Rak
01/04/2022, 2:49 PMAslo
01/04/2022, 6:30 PMElliot Barlas
01/04/2022, 6:39 PMVictor Cardona
01/04/2022, 11:37 PMursus
01/05/2022, 4:28 AMBUFFERED
and expected for consumers to use buffer
operator?Paul Woitaschek
01/05/2022, 9:32 AMprivate suspend fun testBody(scope: CoroutineScope) {
val value = MutableStateFlow(1)
val sharingJob = Job()
val flow = value.shareIn(scope + sharingJob, SharingStarted.WhileSubscribed(replayExpirationMillis = 0), replay = 1)
check(flow.first() == 1)
value.value = 2
check(flow.first() == 2)
sharingJob.cancel()
}
@Test
fun withRunBlockingTest() = runBlockingTest {
testBody(this)
}
@Test
fun withRunTest() = runTest {
testBody(this)
}
This passes with runBlockingTest but fails with runTest.
It is possible to fix it by adding a runCurrent before calling flow.first a second time but I think thats not how it’s supposted to be and makes me very uncomfortable writing tests without having a runCurrent between every single lineFlorian Walther (live streaming)
01/05/2022, 12:25 PMSystemClock.elapsedRealtime
in a test.janvladimirmostert
01/05/2022, 8:41 PMFlorian Walther (live streaming)
01/06/2022, 12:05 PMcountDownInterval
is 0 I want to skip the loop and just delay until the time is over. The problem is that delay is inexact (that's why I do the comparison with the SystemClock time). How could I achieve a (somewhat) exact delay?janvladimirmostert
01/06/2022, 10:42 PMclass FileFlow(private val uri: String) {
private data class FileBaton(
val fileChannel: AsynchronousFileChannel,
val flow: ProducerScope<ByteArray>,
val buffer: ByteBuffer,
val position: Long,
val handleError: (message: String, e: Throwable?) -> Unit = { message, e ->
flow.cancel(message, e)
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
fileChannel.close()
}
}
)
fun read(bufferSize: DataSize): Flow<ByteArray> = callbackFlow {
val path = Path.of(uri)
val baton = FileBaton(
buffer = ByteBuffer.allocate(bufferSize.B),
fileChannel = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
AsynchronousFileChannel.open(path, StandardOpenOption.READ)
},
flow = this,
position = 0,
)
baton.fileChannel.read(
baton.buffer,
baton.position,
baton,
object : CompletionHandler<Int, FileBaton> {
override fun completed(read: Int, baton: FileBaton) {
if (read > 0) {
trySendBlocking(
baton.buffer.array().sliceArray(0 until read)
).onFailure { e ->
baton.handleError(e?.message ?: "", e)
}
baton.buffer.rewind()
baton.fileChannel.read(
baton.buffer,
baton.position + read,
baton.copy(
position = baton.position + read
),
this
)
} else {
baton.flow.channel.close()
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
baton.fileChannel.close()
}
}
}
override fun failed(e: Throwable?, baton: FileBaton) {
baton.handleError(e?.message ?: "", e)
}
}
)
awaitClose {
}
}
}
FileFlow("/home/.../blah.txt").read(250.kB).collect {
print(String(it))
}
Chris Fillmore
01/07/2022, 12:05 PMStateFlow<Job>
, so that connection lifecycles can be observed, cancelled, reconnected, etc. I have an Android app which connects to several websocket endpoints on different protocols, including plain websockets, socketio, action cable, graphql subscriptions, so being able to define a common pattern like this would be useful to me. More in 🧵Trevor Stone
01/07/2022, 7:09 PM.stateIn(
CoroutineScope(Dispatchers.Default),
SharingStarted.WhileSubscribed(),
null
)
and getting Cannot start an undispatched coroutine in another thread DefaultDispatcher from current MainThread
as an error from runTestrkechols
01/08/2022, 10:50 AMGroup
object, which has some normal data to display (e.g. id
), but also has a list of User
objects (which each have their own data to display)
I'd of course like my database to have group data separated from the User
data, since `User`s can exist without a group, with multiple groups, etc. So, each group in the db will simply list the relevant User
IDs; I call this a ShallowGroup
I know how to get a flow to observe a single User
, or how to get a flow to observe a single ShallowGroup
but I can't figure out how to get from those a flow that produces full Group
objects that reflect the changes to the inner User
objects.
My best understanding of how it would work:
1. Get a flow of ShallowGroup
objects.
2. In a map
call on that flow, use the user IDs in that ShallowGroup
snapshot to get a flow for each User
, and:
3. use combine
on the previous list of flows of Users
to get a flow which produces `List<User>`/`Group`
4. ?? somehow flatten the Flow<Flow<Group>>
to just a Flow<Group>
??
Note that the contents of each ShallowGroup
object received determine which User
objects need to be observed.
Can someone point me to what the best way is to handle flows for nested objects like this?iamthevoid
01/09/2022, 9:32 AMNorbi
01/09/2022, 9:36 AMStylianos Gakis
01/09/2022, 4:19 PMlifecycleOwner.repeatOnLifecycle(Lifecycle.State.RESUMED) {
launch {
val callback = LifecycleEventObserver { _: LifecycleOwner, event: Lifecycle.Event ->
if (event == Lifecycle.Event.ON_PAUSE) {
// do something on pause
}
}
lifecycleOwner.lifecycle.addObserver(callback)
try {
awaitCancellation()
} finally {
lifecycleOwner.lifecycle.removeObserver(callback)
}
}
}
But this try/finally
with the awaitCancellation()
combination feels a bit weird. Is there a nicer way to do this in general?dimsuz
01/09/2022, 7:09 PMval externalFlow = flowOf(1,3,4)
val myFlow = flow<Int> {
coroutineScope { externalFlow.collect { this@flow.emit(it + 8) } }
emit(1)
emit(99)
}
I've used coroutineScope
, but I'm worried that it won't be cancelled when myFlow
collection is cancelled or interrupted.
It's just that I've found internal flowScope
builder in coroutines.core
and it says:
This builder is similar to coroutineScope with the only exception that it ties lifecycle of children and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
Can I achieve similar effect for my case? Or using coroutineScope
like I did is ok?Stylianos Gakis
01/09/2022, 9:56 PMsuspendCancellableCoroutine
over suspendCoroutine
. If I got a use case where all I need to do is something like:
suspend fun MediaPlayer.seekToPercent(
@FloatRange(from = 0.0, to = 1.0) percentage: Float,
) = suspendCoroutine<Unit> { cont ->
val callback = MediaPlayer.OnSeekCompleteListener {
this.setOnSeekCompleteListener(null)
cont.resume(Unit)
}
setOnSeekCompleteListener(callback)
val positionToSeekTo = (duration.toFloat() * percentage).toInt()
seekTo(positionToSeekTo)
}
Is there just no reason to use suspendCancellableCoroutine
?
Basically I wonder if there are any other implications regarding the cancellability of suspendCoroutine
and if it behaves in any different way to suspendCancellableCoroutine
, or all that they do differently is that in a suspendCancellableCoroutine
we have access to more than just resume(value: T)
and resumeWithException(exception: Throwable)
?
Reading the documentation doesn’t fill all the gaps in my headEli
01/09/2022, 10:49 PMlaunch
or async
on the calling function. I'm trying to implement this as close to the example as I can, but no matter what I do, it doesn't seem to be executing the items in the flow concurrently, but only sequentially. Here's a simplified code snippet of what I'm attempting:
enum class Ordinal(val delay: Long) {
FIRST(100),
SECOND(200),
THIRD(300),
FOURTH(400),
FIFTH(500),
SIXTH(600),
SEVENTH(700)
}
private fun ordinalFlow() = flow {
for (ordinal in Ordinal.values()) {
delay(ordinal.delay)
emit(ordinal)
}
}
@Test
fun asyncBufferFlowTest() {
val blockTime = measureTimeMillis {
runBlocking {
val timer = measureTimeMillis {
ordinalFlow()
.buffer()
.collect { println("Collected: $it") }
}
println("Done in $timer")
}
}
println("Block time in $blockTime ms")
}
Guilherme Almeida
01/10/2022, 11:50 AMlimitedParallelism
slicing I am not sure I fully understand how it works. Looking at the example below I am launching Access #1
and a second later Access #2
.
Using the accessDispatcher
with limited parallelism of 1 I would think the Access #2
could only start after the Access #1
was finished. So I expected the prints to go 1,2,3 but they go 1,3,2 instead. Could anyone point out why this is the actual behaviour? I know this can be solved with something like a Mutex, but I am trying to understand the behaviour of these new sliced dispatchers 😄
private val mainScope = MainScope()
private val accessDispatcher = Dispatchers.IO.limitedParallelism(1)
public fun foo() {
mainScope.launch {
mainScope.launch(accessDispatcher + CoroutineName("Access #1")) {
println("Step 1")
delay(5.seconds)
println("Step 2")
}
delay(1.seconds)
mainScope.launch(accessDispatcher + CoroutineName("Access #2")) {
println("Step 3")
}
}
}
Florian Walther (live streaming)
01/10/2022, 1:15 PMadvanceTimeBy
? For example, I'm using a fake timesource so the actual advanced time doesn't matter.Chris Fillmore
01/10/2022, 2:47 PMJob
state changes? (i.e. going from New -> Active -> Cancelled/Completed)