groostav
06/17/2019, 9:52 AMReceiveChannel<String>
that I'm looking to conflate on a duration of time, such that I can reduce the number of messages to a known rate. In Tomas Mikula's reactFX this is successionEnds(duration)
. any idea what the easiest way to get this is?Amanjeet Singh
06/17/2019, 1:01 PMMelih Aksoy
06/17/2019, 1:55 PMflow
, but having trouble test is finished before all emits happen.
Following code works all fine if I add a delay
after source.stateData.testObserve
@Test
@ExperimentalCoroutinesApi
fun `should update state after load initial`() {
val initialParams = mockk<PageKeyedDataSource.LoadInitialParams<Int>>(relaxed = true)
val initalCallback = mockk<PageKeyedDataSource.LoadInitialCallback<Int, String>>(relaxed = true)
runBlocking {
// Fake loading
source.loadInitial(initialParams, initalCallback)
source.stateData.testObserve {
it shouldBeInstanceOf Result.State::class
}
}
}
I don’t think this is the right way , how do I test a flow
correctly ? And since we’re on it, is there a way to verify a flow is collected / collected n times etc. ?tseisel
06/17/2019, 2:07 PMflow<String> {
emit("Foo")
}
and
channelFlow<String> {
send("Foo")
}
Apart from the receiver, that is FlowCollector
in the first and ProducerScope
in the second.gotoOla
06/17/2019, 3:18 PMursus
06/19/2019, 12:06 AMAllan Wang
06/19/2019, 1:09 AMCoroutineScope.cancel(cause: CancellationException? = null)
return Unit
instead of Nothing
?
Is there a difference if we don’t have a noncancellation scope? When should we use cancel
vs throw CancellationException
?
Question comes down to job.cancel
vs throw
ursus
06/19/2019, 1:48 AMprivate val _state = ConflatedBroadcastChannel<SyncStatus>()
val state: ReceiveChannel<SyncStatus> get() = _state.openSubscription()
fun forceSync() {
_state.offer(SyncStatus.SYNC)
GlobalScope.launch {
val result = if (load()) SyncStatus.IDLE else SyncStatus.FAILED
_state.offer(result)
}
}
how would you handle this if forceSync() took an parameter? Channel per params would be needed @gildorJoan Colmenero
06/19/2019, 12:53 PMmockito
and suspend fun
? I remember when I did unit testing with rxJava
I had to create a Rule
and things like that, do I have to do something special to make it work?
Note: I'm using MVP not MVVM I don't know if it's a good pointblakelee
06/19/2019, 6:20 PMbasher
06/19/2019, 6:32 PMPatrick Jackson
06/19/2019, 7:21 PMdelay()
and then execute some code. Anyone experienced this before? I can post some code later if needed...Andrew Gazelka
06/19/2019, 7:59 PMcollect {}
)Andrew Gazelka
06/19/2019, 9:30 PMval hotFlow = baseFlow.conflate().broadcastIn(this).asFlow()
?
@FlowPreview
@ExperimentalCoroutinesApi
fun main() = runBlocking<Unit> {
val baseFlow = (1..10).asFlow().delayEach(1_000)
val hotFlow = baseFlow.conflate().broadcastIn(this).asFlow()
launch {
hotFlow.collect {
println("A: $it")
}
}
delay(5_000)
launch {
hotFlow.collect {
println("B: $it")
}
}
}
Vlad
06/20/2019, 11:11 AMuhe
06/20/2019, 11:34 AMGlobalScope.launch {
flowOf(1,2,3,4,5)
.delayEach(500)
.map { it * it }
.collect { println(it) } // Type mismatch: inferred type is () -> ??? but FlowCollector<Int> was expected
}
Alexjok
06/20/2019, 12:52 PMval channel = Channel<Int>(1)
fun main() = runBlocking<Unit> {
launcher().start()
test().start()
}
fun CoroutineScope.launcher() = launch(start = CoroutineStart.LAZY) {
while(isActive) {
channel.send(i)
delay(Random.nextLong(100, 6000))
}
}
fun CoroutineScope.test() = launch(start = CoroutineStart.LAZY) {
while (true) {
doSomething(channel.receive())
}
}
fun doSomething(value: Int) {
println(value)
}
eygraber
06/20/2019, 3:14 PMPaul N
06/21/2019, 2:51 PMtulio
06/21/2019, 4:57 PMstreetsofboston
06/21/2019, 5:36 PMFlow
with and without a buffer, in this example:
val flow = flow {
for (i in 1..100) {
emit(1)
delay(200)
}
}.flowOn(Dispatchers.Main) // try replacing it with Dispatchers.IO...
MainScope().launch {
flow.collect {
println(it)
delay(1000)
}
}
This prints out an item each 1200 milliseconds or so. This makes sense.
When using buffer()
, I can speed up the collecting a bit and print out an item each 1000 milliseconds or so. This also make sense to me.
However, just replacing .flowOn(Dispatchers.Main)
with .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, the print-outs now happen each 1000 milliseconds as well…. I expected the frequency to still be 1200 milliseconds.
Is this to be expected, that even without a buffer (with a size > 0), the flow
now acts as if it has a buffer with using different dispatchers like this?streetsofboston
06/22/2019, 5:08 PMRobert Jaros
06/22/2019, 10:01 PMLoránd
06/23/2019, 11:39 AMserebit
06/24/2019, 4:31 AMsuspend operator fun get
not allowed? I assume there's reasoning for themAllan Wang
06/24/2019, 9:24 AMtylerwilson
06/24/2019, 6:28 PMAlexjok
06/25/2019, 12:35 PMBob Glamm
06/25/2019, 2:36 PMfun foo(): (suspend () -> Int) = { 5 }
rook
06/25/2019, 4:06 PMlaunch
rook
06/25/2019, 4:06 PMlaunch
louiscad
06/25/2019, 4:35 PMGlobalScope
, or a scope that doesn't have an explicitly defined dispatcher? Or a dispatcher that is not Dispatchers.Main
? Looking for that should help.rook
06/25/2019, 4:44 PMGlobalScope
for a couple of things at my repository layerlouiscad
06/25/2019, 4:48 PMDispatchers.Default
by default (hence the name).
Cmd+shift+F (Search in Path) and cmd + click (go to declaration or find usages) are my favorite "static analysis tools". I'm not aware of something coroutines specific. You can still put CoroutineName into the scope's context, and log it just before the crashing code.