Jérémy CROS
06/23/2023, 12:50 PMcombine
operator but it doesn’t quite fit here
Any idea how we could do that? Thanks! 🙏Sam Stone
06/28/2023, 7:33 AMemptyFlow<T>()
returns a flow that can’t be emitted to, can’t be collected, neither, or is equivalent to flowOf<T>()
(just like how listOf<T>()
is in usage equivalent to emptyList<T>()
) but can be emitted to/collected from? The correct answer is: can’t be collected. I think this is counter-intuitive.Erfannj En
07/04/2023, 2:46 PMArun Joseph
07/04/2023, 6:38 PMOsman Saral
07/27/2023, 2:48 PMflow {}
block form another function? What I'm trying to do is this:
private val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
private val exceptionFlow = MutableSharedFlow<Throwable>()
fun initFlow(scope: CoroutineScope) = flow {
scope.launch {
exceptionFlow.collect {
print("throwing")
throw it //throw here to trigger retry
}
}
emit(1)
delay(2000L)
emit(2)
}.retry(2) { cause ->
println("retrying cause: $cause")
true
}.catch {
println("caught $it")
emit(3)
}
suspend fun retry(throwable: Throwable) {
exceptionFlow.emit(throwable)
}
fun main() {
runBlocking {
initFlow(scope).collect {
println(it)
}
delay(500L)
retry(IllegalStateException("error"))
}
}
This prints
1
2
throwing
the exception is not caught and the app doesn't crash. Why doesn't it work and what's the best way to implement something like this?s3rius
08/06/2023, 9:23 AMFlow<T>.chunked(count: Int, timeout: Duration? = null): Flow<List<T>>
. It basically acts like List's chunked
operation: it collects emissions until count
is reached, then emits them as a List<T>
. It includes an optional timeout
after which remaining items are emitted anyway.
Background: I want to write a logger which persists log output in a file. But since there can be dozens of log emissions per second, I want to write them in batches. timeout
is used to ensure that logs are guaranteed to be written after a few seconds at the most.
I would love to have some feedback on
1. Could this be (easily) build with existing flow functions?
2. Is my implementation proper? Covers all cases and works under all circumstances? Did I miss something?
The code and a series of tests to validate functionality can be found in this pastebin: https://pastebin.com/1reJivVLgalex
08/15/2023, 12:17 PMAhmad Dudayef
08/23/2023, 3:12 AMColton Idle
08/24/2023, 3:30 PMKevin Worth
09/05/2023, 3:28 PMflow.catch
that can offer some pointers?
Given a StateFlow, how can I force my unit test to exercise my .catch
? It appears the unit test framework is designed to suppress all `Exception`s other than those that the test is expecting (i.e. assertThrows
). I don’t want to assert an exception is thrown, I want my flow.catch{ … }
to catch it, but I believe the framework is catching it instead.
val uiState: StateFlow<UiState> = myRepository.myModels
.map<List<MyModel>, UiState>(::Success)
.catch {
// TRYING TO OBSERVE THIS IN TEST
emit(UiState.Error(it))
}
.stateIn(...)
Maybe I’ve gone in the entirely wrong direction?
_
EDIT: Looks like @Test(expected = Exception::class)
is helpful to get passed “Suppressed: Exception”, but now I still have the issue of my .catch
apparently not running because I don’t see an emit.Giang
09/07/2023, 12:11 PMgabfssilva
09/25/2023, 1:36 PMabbic
09/26/2023, 12:03 PM@Singleton
class AppEventBus @Inject constructor() {
private val internalSharedFlow =
MutableSharedFlow<AppEvent>()
val allEvents: Flow<AppEvent> get() = internalSharedFlow
inline fun <reified T: AppEvent> observe(): Flow<T> =
allEvents.filterIsInstance()
fun tryPostEvent(event: AppEvent) {
internalSharedFlow.tryEmit(event)
}
suspend fun postEvent(event: AppEvent) {
internalSharedFlow.emit(event)
}
}
I threw it together in 5 minutes so im sure i can make many improvements, bust specifically i was wondering, given that this class will be a singleton, can I rely on the flow staying alive on its own for the lifecycle of the program? sorry if its a strange question, i am probably worrying about nothingKevin Worth
09/29/2023, 3:22 PMonSubscribe
of SharedFlow
as it relates to onStart
of Flow
. First question would be, is there a scenario where onStart
gets called only once and then onSubscribe
is called for each new subscriber and/or re-subscriber? See 🧵 for more details. Any help would be appreciated.Osman Saral
10/06/2023, 9:06 AMSharedFlow
. I was launching a coroutine using CoroutineScope(coroutineContext).launch { }
but I've read that it's not a goo practice. It's suggested to make function a CoroutineScope
extension instead. But it's necessary for me to make that function suspend. How can I implement a timeout mechanism for a suspend function which needs to return the flow?fjoglar
10/19/2023, 9:50 PM@OptIn(ExperimentalCoroutinesApi::class)
class EventRepositoryTest {
private val eventService = EventServiceStub()
private val eventCache = EventCacheStub()
private val eventRepository = EventRepository(
localCache = eventCache,
remoteService = eventService,
)
@Test
fun test_deliversRemoteEvents_onNonValidCache() = runTest {
val remoteEvents = makeEvents(1)
val receivedEvents = mutableListOf<List<Event>>()
backgroundScope.launch(UnconfinedTestDispatcher()) {
eventRepository.load(anyFilter().toList(receivedEvents)
}
eventService.succeedsWithEvents(remoteEvents)
eventCache.failsWithNonValidCache()
assertEquals(remoteEvents, receivedEvents.first())
}
// region Helpers
private class EventServiceStub : EventService {
private var result: Result<List<Event>>? = null
override suspend fun load(filter: EventFilter): Result<List<Event>> {
return result!!
}
fun succeedsWithEvents(remoteEvents: List<Event>) {
result = Result.success(remoteEvents)
}
}
private class EventCacheStub : EventCache {
private val flow = MutableSharedFlow<List<Event>?>()
override fun load(filter: EventFilter): Flow<List<Event>?> = flow
suspend fun succeedsWithEvents(events: List<Event>?) {
flow.emit(events)
}
suspend fun failsWithNonValidCache() {
flow.emit(null)
}
override suspend fun save(events: List<Event>, filter: EventFilter) {
flow.emit(events)
}
}
// endregion
}
Here I am expecting to receive remoteEvents
when the localCache
is not valid, which is when it returns null
. Actually, EventCacheStub emits a null value when calling failsWithNonValidCache
but it does not emit nothing and completes after calling the save
method from the repository implementation.
Here is my repository implementation:
class EventRepository(
private val localCache: EventCache,
private val remoteService: EventService,
) : EventLoader {
override fun load(filter: EventFilter): Flow<List<Event>> {
return localCache.load(filter)
.onEach {
if (it == null) {
remoteService.load(filter).onSuccess { localCache.save(it, filter)}
}
}
.filterNotNull()
}
}
Thanks gratitude thank youabbic
10/23/2023, 9:16 AMFlow<String>
becomes Flow<Result<String>>
for example)?
are there any immediate complications from this approach? Example implementation coming soon in commentsJustin Breitfeller
10/25/2023, 7:27 PMcollect
was canceled, the flow would be canceled as well. What am I missing?Slackbot
12/07/2023, 1:32 PMgalex
12/13/2023, 3:36 PM.timeout()
is called after each emit, so I made the following function:
/**
* If the first element takes too long to emit, the [onTimeout] callback will be called without disturbing the current flow
*/
fun <T> Flow<T>.timeoutFirst(duration: Long, scope: CoroutineScope, onTimeout: () -> Unit): Flow<T> = flow {
var emitted = false
scope.launch {
delay(duration)
if (!emitted) {
onTimeout()
}
}
collect {
emitted = true
emit(it)
}
}
First question:
Is there a way to get the coroutineScope where the flow will be collected so that I don't need to pass a scope?
I was using coroutineScope {}
before instead of scope.launch
but then I couldn't get it to work in unit tests under runTest
Second question:
Instead of a callback, could I use the throwing mechanism as timeout
does?
Thanks in advance! 😊KotlinLeaner
01/05/2024, 3:33 PMSharedFlows
and I'm trying to get only the latest value emitted by the flow. I've tried using the last()
operator, but it's not working as expected.
main
fun main(): Unit = runBlocking {
val eventBus = EventBus()
launch {
val latestValue = eventBus.value.lastOrNull()
if (latestValue != null) {
println("Latest value: $latestValue")
}
}
launch {
repeat(5) {
eventBus.increment(it)
}
}
launch {
delay(5.seconds)
(5..10).forEach {
eventBus.increment(it)
}
}
}
Josh Skeen
01/08/2024, 3:43 PMprivate val _mySharedFlow = MutableSharedFlow<SomeEvent>(
extraBufferCapacity = 10,
onBufferOverflow = SUSPEND,
)
We often run into debate in code review about choosing the correct value for extraBufferCapacity when MSF is used with tryEmit, the thought being “we could lead to a large amount of memory usage with some high volume of events that havent been collected yet”. The question we have is what is the best way to choose
this value or should we consider an alternate approach? It seems arbitrary to us to have to pick a number out of thin air - do you recommend calling tryEmit and using whether it returns false and logging in this case (according to the docs , this only occurs with SUSPEND of onBufferOverflow, correct?) or some other way of choosing - or is this concern misguided?Garret Yoder
01/18/2024, 5:35 PMdoubov
01/24/2024, 10:42 PMfun <T> Flow<T>.throttleLatest(throttlePeriodInMillis: Int) = flow {
coroutineScope {
val context = coroutineContext
var nextMillis = TimeSource.Monotonic.markNow()
var delayPost: Deferred<Unit>? = null
collect {
val current = TimeSource.Monotonic.markNow()
if (nextMillis < current) {
nextMillis = current.plus(throttlePeriodInMillis.milliseconds)
emit(it)
delayPost?.cancel()
} else {
val delayNext = nextMillis
delayPost?.cancel()
delayPost = async(Dispatchers.Default) {
delay(nextMillis - current)
if (delayNext == nextMillis) {
nextMillis = TimeSource.Monotonic.markNow().plus(throttlePeriodInMillis.milliseconds)
withContext(context) {
emit(it)
}
}
}
}
}
}
}
And the corresponding unit test:
@Test
fun test() = runTest {
val count = AtomicInt(0)
val expectedResults = mapOf(
1 to 1,
2 to 7,
3 to 10
)
channelFlow {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
for (i in 1..10) {
send(i)
delay(75)
}
}
}
.throttleLatest(500)
// 1 2 3 4 5 6 7 8 9 10
// __0 _75 150 225 300 375 450 525 600 675 750 825 900 975 1050
// 1 7 10
.onEach { count.incrementAndGet() }
.test {
for (i in 1..3) {
assertEquals(expectedResults[i], awaitItem())
}
awaitComplete()
assertEquals(3, count.get())
}
}
Which works fine on Android/jvm, but fails on Native (iOS).
Error
kotlin.AssertionError: Expected <7>, actual <4>.
Does time move faster on iOS? 😂
Can anyone give me pointers as to why this might be happening/what I'm doing wrong?galvas
02/05/2024, 10:05 AMSharedFlow
, but I was wondering if Flows are the correct API to represent this? Or if this is a problem other people have thought about before or not? There is no interface that indicates that a flow is hot right? Would it make more sense to my implementation to be a Channel
for example?galvas
02/05/2024, 10:08 AMAndrew K
02/23/2024, 12:34 AMKV
03/04/2024, 1:10 PMKotlinLeaner
03/05/2024, 12:01 PMSharedFlow
? I created this class for navigating the screens based on response. This code is just basic example.
class Navigator : NavigationHandler {
private val _destination: MutableSharedFlow<ScreenName> = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
override val destination: SharedFlow<ScreenName> = _destination.asSharedFlow()
override fun navigate(navigationDestination: ScreenName) {
_destination.tryEmit(navigationDestination)
}
}
Djuro
03/06/2024, 12:13 PMMutableStateFlow<T>.update/getAndUpdate/updateAndGet
, is this real atomicity (only one coroutine can read the state at a time when used and update it, meaning any
other coroutine using .update
for example won't be able to get/update the value until the one accessing the resource finishes.
Which statement is the correct one?
1. While 1 coroutine is using .update
on a given state the next one that tries to do so will be waiting
2. It is possible for state to be updated while .update
is being executed meaning a race condition
This is implementation of the state flow
public inline fun <T> MutableStateFlow<T>.update(function: (T) -> T) {
while (true) {
val prevValue = value
val nextValue = function(prevValue)
if (compareAndSet(prevValue, nextValue)) {
return
}
}
}
It is obvious from its implementation that while function(prevValue)
is being executed, value can change in another coroutine and it is also written in docs of StateFlow: function may be evaluated multiple times, if value is being concurrently updated.
Question is, why would this be desired behaviour? Why not lock the resource(state) from the very begining until the update is finished? This can cause an infinite loop also if race condition occurs. Update is then not a classical critical section, but compareAndSet
is instead. What am I missing?