christophsturm
11/29/2021, 8:59 AMMaciek
11/29/2021, 3:09 PMcallbackFlow
with awaitClose
work here?
fun CoroutineScope.onCancel(action: () -> Unit) {
callbackFlow<Unit> {
awaitClose(action)
}.launchIn(this)
}
mcpiroman
11/29/2021, 5:59 PMval focusRequests = MutableSharedFlow<Unit>(onBufferOverflow = BufferOverflow.DROP_LATEST, extraBufferCapacity = 1)
focusRequests.tryEmit(Unit) // on frontend
focusRequests.collect { ... } // on backend
but I feel like this is an abuse of the SharedFlow. Is there more specific construct I should use in such cases?frankelot
11/29/2021, 7:01 PMactor
of buffer 1 that DROPS_OLD when the buffer is overflown. I’ve read the docs but I’m not sure I get it
Is this it?
actor<Int>(capacity = Channel.CONFLATED)
natario1
11/30/2021, 10:30 AMQueue<In, Out>(process: suspend (In) -> Out, onBufferOverflow, capacity)
where I can send input and suspend until output is ready: val out = queue.send(in)
, use trySend
and so on. I'm curious if it's just me, do you ever find yourself needing something similar and what do you use?Anvith
11/30/2021, 12:33 PMscana
12/01/2021, 3:23 PMMutableStateFlow
would only emit last item in this example (running from Kotlin scratch file)?
val x = MutableStateFlow("Test")
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
x.collect {
println(it)
}
}
x.value = "First"
x.value = "Second"
x.value = "Fourth"
println("Waiting...")
Thread.sleep(1000)
Result:
Waiting...
Fourth
ansman
12/01/2021, 10:29 PMTestCoroutineScheduler
? I see TestCoroutineScheduler
being used but delays are still being skipped which is super confusing to me as I don’t see the point of it if delays aren’t skippedSergio Crespo Toubes
12/02/2021, 10:45 AMprivate val myLocationsChannel = Channel<MyLocation>(Channel.BUFFERED)
val myLocationsFlow = myLocationsChannel.receiveAsFlow()
How can i send an error to the channel? For example when i havent location permissions.
ThanksTim Malseed
12/02/2021, 11:28 AMdata class Flag<T : Any>(val key: String, val description: String, val defaultValue: T)
class FlagManager() {
private val flagMap = mutableMapOf<Flag<out Any>, MutableStateFlow<Any>>()
fun registerFlag(flag: Flag<Any>) {
if (flagMap.containsKey(flag)) {
throw IllegalStateException("Flag cannot be registered more than once")
}
flagMap[flag] = MutableStateFlow(flag.defaultValue)
}
fun <T : Any> getFlagState(flag: Flag<T>): StateFlow<T> {
if (!flagMap.containsKey(flag)) {
throw IllegalStateException("Flag not registered")
}
return (flagMap[flag] as MutableStateFlow<T>).asStateFlow()
}
fun <T : Any> updateFlagState(flag: Flag<T>, value: T) {
if (!flagMap.containsKey(flag)) {
throw IllegalStateException("Flag not registered")
}
(flagMap[flag] as MutableStateFlow<T>).value = value
}
}
ursus
12/02/2021, 2:26 PMclass Syncer {
private val scope = CoroutineScope(SupervisorJob() + <http://Dispatcher.Io|Dispatcher.Io>)
private val _state = MutableStateFlow<State>(Idle)
val state: Flow<State> get() = _state
fun sync() {
scope.launch {
_state.value = Syncing
someSuspendingApiAndDbStuff()
_state.value = Synced
}
}
}
I want to test if I see correct state
emissions if I call sync()
Leonardo Borges
12/02/2021, 3:04 PMmockK
.
class MyUseCase {
operator suspend fun invoke(val someId: UUID) = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
val response = someAPI.requestDataById(someId)
// process the response...
}
}
Using JUnit5, there are a few exceptions I want to test, such as below:
@Test
fun testCustomExceptionIsThrown() {
coEvery { someAPI.requestDataById(someId) returns someData }
val thrown = assertThrows<CustomException> {
runBlocking { myUseCase(someId) }
}
assertEquals("customExceptionMessage", thrown.message)
}
This is always returning an JobCancellationException: Parent job is Completed;
and I actually can't figure out why.
Also, is this the right way to use those libraries?ursus
12/03/2021, 1:08 AMyschimke
12/03/2021, 12:06 PMsuspend fun getX() : X
fun getXFlow(): Flow<X>
val a = getX()
val b = getXFlow().first()
Obviously the implementations may be different (one off query vs query and subscribe). But can the implementation optimise the call to first() which is the when the subscription happens and short circuit it? Othewise it's tempting to put both variants on most repository methods. But it would be nice if there was an expectation that first() should optimise to the suspend fun case.ursus
12/03/2021, 1:19 PMclass Syncer(private val db: Db) {
private val scope = CoroutineScope(SupervisorJob + Dispatchers Io)
fun sync() {
scope.launch {
...
db.insertSomething()
}
}
}
I'd like to assert that db.readSomething()
reads what it should after I call Syncer.sync()
There is nothing to suspend on to know when the sync actually finished (like some status flow)
Is injecting Dispatchers.Undispatched
instead of Io
to make it synchronous, so I can assert right after sync()
returns, idiomatic?
Is there something better?Victor Rendina
12/03/2021, 5:12 PMdimsuz
12/03/2021, 5:35 PMFlow<T>
what would be the most idiomatic way to to the following:
someFlow.collect { v ->
process(v)
if (isVeryFirstItem) {
someSharedFlow.emit("received first item")
}
}
I.e. I want to do a side-effect only once after receiving the first emission. I could make isVeryFirstFirstItem
some kind of a volatile or AtomicBoolean, but somehow I don't like this.David W
12/03/2021, 10:05 PMfun main() {
val scope = CoroutineScope(Job())
scope.launch { println("launch") }
}
fun main() {
val scope = CoroutineScope(Job())
scope.launch { println("launch") }
scope.launch { println("launch") }
}
David W
12/03/2021, 11:14 PMval someStateFlow: StateFlow<Unit> = ...
GlobalScope.launch(Dispatchers.Default) { someStateFlow.collect { ... } }
David W
12/03/2021, 11:30 PMrefreshScope = CoroutineScope(Job())
refreshScope.launch(Dispatchers.Default) {
println("Clicked Refresh button pntln.")
}
but this produces text?
refreshScope = CoroutineScope(Job())
refreshScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
println("Clicked Refresh button pntln.")
}
The only difference is the Dispatcher (IO works, Default doesn't).
The issue is somewhere else in my code, I expect, and there's far too much to include; both code snippets work in a vacuum. I'm just hoping for a direction to start looking.tseisel
12/04/2021, 8:52 PMSharedFlow
, which has no close
method...Ayfri
12/06/2021, 6:19 AMursus
12/07/2021, 3:14 AM<http://Dispatchers.IO|Dispatchers.IO>.limitedParallelism(1)
does it guaranteed I'll get the same thread always?
(Its for testing purposes, where I want to replace Main dispatcher with something single threaded)MRSasko
12/07/2021, 7:44 AMfun fetchInfo(): Flow<State<SportInfo>> =
flow {
val result = memoryInfoResult
if (result != null) {
emit(result)
} else {
fetchSportInfo()
.onEach { result ->
when (result) {
is Data -> {
memoryInfoResult = result
emit(result)
}
is Error, is Loading -> {
emit(result)
}
}
}
}
}
Mikołaj Karwowski
12/07/2021, 11:22 AM@Test
fun advanceTimeTest() = runBlockingTest {
foo()
advanceTimeBy(2_000) // advanceTimeBy(2_000) will progress through the first two delays
println("Main 1")
// virtual time is 2_000, next resume is at 2_001
advanceTimeBy(2) // progress through the last delay of 501 (note 500ms were already advanced)
println("Main 2")
// virtual time is 2_0002
}
fun CoroutineScope.foo() {
launch {
println("Foo 1")
delay(1_000) // advanceTimeBy(2_000) will progress through this delay (resume @ virtual time 1_000)
// virtual time is 1_000
println("Foo 2")
delay(500) // advanceTimeBy(2_000) will progress through this delay (resume @ virtual time 1_500)
// virtual time is 1_500
println("Foo 3")
delay(501) // advanceTimeBy(2_000) will not progress through this delay (resume @ virtual time 2_001)
// virtual time is 2_001
}
}
The result is:
Foo 1
Foo 2
Foo 3
Main 1
Main 2Shouldn't it be: Foo1, Foo2, Main1, Foo3, Main2 ?
Pedro Alberto
12/07/2021, 2:24 PMval <T> MutableStateFlow<T>.readOnly
get() = this as StateFlow<T>
now I realised there is a function called asStateFlow()
but when you go into asStateFlow the function does something different
public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
ReadonlyStateFlow(this, null)
mkrussel
12/07/2021, 5:13 PMResult
.
I can see that the body of the suspend function is returning what I expect. The toString
return Success(null)
or something like that. The caller of the suspend function is then getting an object with a toString of Success(Success(null))
.
When I return that from another suspend function it does not get wrapped a third time and still logs as Success(Success(null))
.
This is running on Android with Kotlin 1.5.31
and coroutines 1.5.2-native-mt
groostav
12/07/2021, 8:58 PMdimsuz
12/08/2021, 12:57 PMinterface Service {
fun start(): Job
}
val job = myServiceInstance.start()
job.cancel()
is it ok to implement start like
fun start() { val scope = createScope(); return scope.coroutineContext[Job] }
Or is there some better way?dimsuz
12/08/2021, 2:46 PMFlow.collectIndexed
and it's implemented like this:
collect(object : FlowCollector<T> {
private var index = 0
override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
})
I'm curious, why is this thread-safe? Can't it happen so that index
is initialized on one thread and incremented in another? (if some withContext
will somehow be used by client)
Or are there some invariants which are at play here?dimsuz
12/08/2021, 2:46 PMFlow.collectIndexed
and it's implemented like this:
collect(object : FlowCollector<T> {
private var index = 0
override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
})
I'm curious, why is this thread-safe? Can't it happen so that index
is initialized on one thread and incremented in another? (if some withContext
will somehow be used by client)
Or are there some invariants which are at play here?louiscad
12/08/2021, 4:29 PMdimsuz
12/08/2021, 4:42 PM