Circusmagnus
12/23/2020, 12:35 PMisFull()
function on SendChannel, but that is depracated without replacementjean
12/23/2020, 1:09 PM@get:Rule
val coroutineTestRule = CoroutineTestRule()
class StateFlowTestClass(
coroutineDispatcher: CoroutineDispatcher = Dispatchers.Main
) {
private val _state = MutableStateFlow(0)
val state: StateFlow<Int> = _state
init {
CoroutineScope(coroutineDispatcher).launch {
blockingTask()
}
}
private suspend fun blockingTask() {
delay(10.seconds)
_state.value = 1
}
}
@Test
fun `test on StateFlow`() = coroutineTestRule.runBlockingTest {
advanceTimeBy(11.seconds.toLongMilliseconds())
val value = StateFlowTestClass(TestCoroutineDispatcher()).state.value
Assert.assertEquals(1, value)
}
when I run test on StateFlow I get the following error : java.lang.AssertionError: Expected:1, Actual:0
. How am I suppose to fix this? How about when there is no delay but just a bunch of code taking time to execute, is there a way to wait it to be done?Nikky
12/23/2020, 10:28 PMval derferred = async { ... }
while(!deferred.isCompleted) {
delay(10)
}
return deferred.getCompleted()
except since i am not in a coroutine i cannot use delay outside of it..Christopher Elías
12/25/2020, 11:37 PMRobert Jaros
12/26/2020, 12:23 AMReceiveChannel
and then pass the channel somewhere else in my app. Is the following approach a good solution (in sense of resources consumption and performance)?
1. Transform the channel to a Flow
with consumeAsFlow()
2. Use a bunch for different flow operations (map
, mapNotNull
etc.)
3. Transform Flow
back to the ReceiveChannel
with produceIn()
Andrew Gazelka
12/28/2020, 12:39 AMshareIn
alt which evaluates lazily? i.e.,
aFlow = flow(1,2,3).onEach{ println("A $it") }
shared = aFlow.lazyCache()
shared.take(1).forEach{ println("B $it")}
shared.take(2).forEach{ println("C $it")}
would print “A1 B1 A2 C2"
… or something to that effectAndrew Gazelka
12/28/2020, 1:15 AMchristophsturm
12/30/2020, 4:27 PMandylamax
12/31/2020, 1:30 PMMarc Knaup
01/01/2021, 4:10 PMwithContext(scope1.coroutineContext) {
withContext(scope2.coroutineContext) {
doSomeWork()
}
}
Similarly with `Job`:
val context2 = scope1.coroutineContext + SupervisorJob()
withContext(scope1.coroutineContext) {
withContext(context2) {
doSomeWork()
}
}
Would the cancellation of one context affect the other context?
Is that the proper way to do that?Peter Ertl
01/02/2021, 11:16 PMflow.batch(size=20).collect { items -> /* do some batched operation e.g. a database insert */ }
Brais Gabin
01/03/2021, 11:19 AMStream
returned by Files.walk(Path)
to a Flow
. I could use the extension function consumeAsFlow()
but Files.walk(Path)
blocks the calling thread. So, what's the correct way to do it? I implemented this but it doesn't feel right:
flow<Path> {
Files.walk(path)
.forEach { path -> runBlocking { emit(path) } }
}
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
And, as side question, is there any library that implements the filesystem api as correct suspend functions?João Eudes Lima
01/04/2021, 2:14 AMTim Malseed
01/04/2021, 6:23 AMActor
, and I'd like to be able to remove/cancel an operation when a new one comes in, depending on the progress of the previous operation.Rainer Schlonvoigt
01/04/2021, 11:06 AMval dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val scope = CoroutineScope(dispatcher)
for (i in 1..5) {
scope.launch {
println(i)
}
}
rtsketo
01/04/2021, 1:05 PMMain.immediate
isn't the default dispatcher of MainScope()
? Is there any disadvantage in using immediate
?Mark
01/04/2021, 1:17 PMsuspend fun foo(progressLambda: (Long) -> Unit)
and I want to convert this to a function returning a StateFlow<State>
where State
is a sealed class providing Progress
, Success
etc. Initially I thought of using callbackFlow
but that seems to be more for functions that run asynchronously. Is there a more appropriate way?Kshitij Patil
01/05/2021, 11:53 AMStateFlow
and after changing the state to desired object, I immediately set it back to null. This already sounding a bad practice so could someone guide me here with which Coroutine Flow/Channel should I use?Yevhenii Datsenko
01/05/2021, 12:35 PMA default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers. Thus, tryEmit call succeeds and returns true only if there are no subscribers (in which case the emitted value is immediately lost).That means that we might lose some emitted values. To tackle this problem I created the following function:
/**
* This function emits value when the receiver shared flow gets the first subscriber
* It helps to avoid loosing events while view is not subscribed to shared flow
*/
suspend fun <T> MutableSharedFlow<T>.emitLazily(value: T) {
subscriptionCount.first { it > 0 }
emit(value)
}
The question is will the first
function throw an NoSuchElementException
in this case? As far as I understand, it should not do it 😄) Because, the shared flow never completes and we can not convert it into a completing one without adding subscriber.
It will be extremely helpful to hear your thoughts))Colton Idle
01/05/2021, 1:36 PMval tickerChannel = ticker(delayMillis = 10 * 60_000, initialDelayMillis = 0)
repeat(10) {
tickerChannel.receive()
println("TIMER SUCCESS")
}
I want it to repeat forever though. I'm probably missing something dumb, but can I signify forever somehow, or do I have to use Integer.MAX_VALUE?myanmarking
01/05/2021, 1:45 PMRob
01/06/2021, 12:34 AMjeff
01/06/2021, 4:52 PMursus
01/07/2021, 1:30 AMprivate val state = MutableStateFlow(IDLE)
fun doSomething() {
if (state.value == LOADING) return
state.value = LOADING
scope.launch {
try {
actuallyDoSomething()
state.value = SUCCESS
} catch(ex: Exception) {
state.value = ERROR
}
}
}
i.e. this is not guaranteed to be only single execution, right? I need to make reading & writing of LOADING synchronized, correct?Rob
01/07/2021, 2:36 PMList<Flow<T>>
? Something that would have this signature (List<Flow<T>>) -> Flow<List<T>>
? I’m updating a RecyclerView using AsyncListDiffer.submitIList()
.leandro
01/07/2021, 3:29 PMSendChannel
backed by a RENDEZVOUS
implementation, but wondering if MutableSharedFlow
(with replay=0
) would be better (i assume due to less overhead?!)tylerwilson
01/07/2021, 5:15 PMLukas Lechner
01/08/2021, 12:51 PMAyomide
01/09/2021, 6:01 PMSharedFlow
- I'm trying to figure out how to keep the web socket session (DefaultClientWebSocketSession
) running in the background so that I can collect the SharedFlow
messages and display them on screen as they stream in.
I tried using coroutineScope.launch{}
but it seems like the web socket session would stop as soon as the coroutine scope block ended. Not sure what to do to make sure the process runs forever in the backgroundtimrijckaert
01/10/2021, 4:51 PMMutableStateFlow
that allows duplicate events to be send.
Basically mitigate the default distinctUntilChanged
behavior?