alexsullivan114
04/02/2020, 7:20 PMChannel
that I'm converting to a flow and listening to. As soon as the channel is created I'm sending through an initial default value. Right before that happens I'm observing the flow
using the onEach pattern. The flow, however, is not receiving the initial value, presumably because there's an asynchronous launch involved. Does anyone have any idea how to get around this issue?Javier
04/02/2020, 8:16 PMursus
04/02/2020, 9:57 PMsuspend fun someAction() = coroutineScope {
...
}
Orhan Tozan
04/02/2020, 11:25 PMchannelFlow {}
instead of flow {}
when I want to launch a coroutine inside of the block?Darren Gu
04/03/2020, 3:31 AMSam Garfinkel
04/03/2020, 4:51 PMzak.taccardi
04/03/2020, 8:49 PMTestCoroutineScope
to control when an actor
coroutine processes messages. I did the following:
1. Constructed actor
with TestCoroutineScope
via runBlockingTest { .. }
2. used testCoroutineScope.pauseDispatcher()
3. called actor.send(message)
And without ever calling pauseDispatcher()
, the actor
receives and processes the message
. Is this intentional?Marc Knaup
04/04/2020, 10:16 AMBroadcastChannel
?
I want an event flow where the sender is suspended until all collectors have processed an event.
Even the smallest capacity BroadcastChannel
is still non-blocking :(
private val _events = BroadcastChannel<MyEvent>(1) // for sending
val events = _events.asFlow() // for collecting
Marc Knaup
04/04/2020, 12:04 PMsend(…)
will block untill all flows have processed the sent element.
Is there something like that already?Orhan Tozan
04/04/2020, 12:07 PMfun messagesFromConversation(conversationId: String): Flow<List<Message>> = channelFlow {
val messageHistory = getMessageHistory()
offer(messageHistory)
val accumulatedMessages = messageHistory.toMutableList()
val sentMessages: Flow<Message> = sentMessagesFromConversation(conversationId)
sentMessages.collect { sentMessage ->
acumulatedMessages.add(0, sentMessage)
offer(accumulatedMessages)
}
}
but Im getting a weird issue that on every collect of the sentmessages, the accumulatedmessages are not represeting the full list. I think it's some sort of mutable state sharing concurrency issue, but I have no idea how to better approach this issue. Do I need to some sort of ConcurrentMutableList?bod
04/04/2020, 3:17 PMsuspendCancellableCoroutine
I sometimes see code that calls cont.resume(result)
but when I try that, it's looking for a onCancellation
argument, I so I need to do cont.resume(result) {}
instead. Ideas why? (May be relevant: I'm in Kotlin-js).Shan
04/05/2020, 12:01 AMJeevan Deep Singh
04/05/2020, 9:40 AMgammanik
04/05/2020, 9:44 AMMaciek
04/06/2020, 5:07 AMthrottleFirst
from rxJava? So after emitting value, block the future emissions for given durationAndrey Stepankov
04/06/2020, 2:34 PM@Test
fun test_ConflatedBroadcastChannel_runBlocking() {
runBlocking {
val channel = ConflatedBroadcastChannel<Int>()
val collect = async {
channel.asFlow().collect {
println("collected $it")
}
}
val send = async {
(0 until 5).forEach {
println("send $it")
channel.send(it)
}
channel.cancel()
}
listOf(collect, send).awaitAll()
}
// console out
// send 0
// send 1
// send 2
// send 3
// send 4
// collected 0
// collected 4
}
@Test
fun test_ConflatedBroadcastChannel_runBlockingTest() {
runBlockingTest {
val channel = ConflatedBroadcastChannel<Int>()
val collect = async {
channel.asFlow().collect {
println("collected $it")
}
}
val send = async {
(0 until 5).forEach {
println("send $it")
channel.send(it)
}
channel.cancel()
}
listOf(collect, send).awaitAll()
}
// console out
// send 0
// collected 0
// send 1
// collected 1
// send 2
// collected 2
// send 3
// collected 3
// send 4
// collected 4
}
Jorge Recio
04/07/2020, 5:52 AMIntelliJ IDEA 2019.3.4 (Community Edition)
Build #IC-193.6911.18, built on March 17, 2020
Runtime version: 11.0.6+8-b520.43 x86_64
VM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
macOS 10.15.4Can anyone help me?
zak.taccardi
04/08/2020, 12:16 AMuserId
in a Flow<State>
where the userId
changes over time?
data class State(
val userId: String, // the current user may change over time
val isHungry: Boolean, // user may become hungry over time
val currentTime: Int // hot code path, changes frequently
)
fun CoroutineScope.handleFirstStateForGoodPeople(states: Flow<State>) {
states
// when EACH user has `isHungry==true`
// I want to call a function
// but only the FIRST time this state is reached PER user (distinct by `userId`)
.onEachInternal { state -> callSomeFunction() }
.launchIn(this)
}
wasyl
04/08/2020, 7:49 AMDataFlow
(/`StateFlow`) or share()
operator for example, but there are many more. It’d be nice to see what improvements are planned and what are the initial version targets. I don’t even know how to filter YouTrack to find related issues 🤔liminal
04/08/2020, 12:26 PMmap
and filter
nesting should not be necessary. Any ideas how to improve it?
filterByStarCountChannel.asFlow()
.flatMapLatest { minStarCount ->
userReposRepository.getUserRepos(username)
.map { repoList ->
repoList.filter { it.stars >= minStarCount.stars }
}
}
Stephen Edwards
04/08/2020, 2:11 PMContinuation
of a suspending function to later inject values (.resume()
) on it to facilitate mocking of a similar pattern to mocking a function returning a `Single`/`Observable` with an RxRelay
that is exposed to the test?LastExceed
04/08/2020, 2:50 PMCompletableFuture<T>
to Deferred<T>
using kotlinx.coroutines ?Michael Friend
04/08/2020, 4:11 PM@Test
fun eternalTest() = testCoroutineRule.testDispatcher.runBlockingTest {
launch {
while (isActive) {
delay(1000)
}
}
Assert.fail()/throw Exception/verify(...)
}
My expectation is that the exception from an assertion failure or otherwise would cancel the testCoroutineScope which then cancels all children, including the code inside launch. What actually happens is that the job in the launch is never cancelled and continues forever, causing runBlockingTest to hang forever. Unless the coroutine test library is injecting a SupervisorJob
into testCoroutineScope somewhere the cancellation should be propagated. Am i missing something and if not, does anyone know of a workaround?ubu
04/08/2020, 6:08 PMFlow.withLatestFrom(other: Flow<B>, transform: suspend (A,B) -> R
) that takes several other
Flows? I need values emitted downstream only when source flow changes.Marc Knaup
04/08/2020, 6:28 PMFlow
more readable? 🤔
Also, is there a shorter way to “initialize” distinctUntilChanged
with a value that it doesn’t emit? (I use .onStart().distinctUntilChanged().drop()
here.)ubu
04/09/2020, 9:13 AMfun <A, B, C : Any, R> Flow<A>.withLatestFrom(
other: Flow<B>,
another: Flow<C>,
transform: suspend (A, B, C) -> R
): Flow<R> = flow {
coroutineScope {
val latestB = AtomicReference<B?>()
val latestC = AtomicReference<C?>()
val outerScope = this
launch {
try {
other.collect { latestB.set(it) }
} catch (e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
launch {
try {
another.collect { latestC.set(it) }
} catch (e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
collect { a: A ->
val b = latestB.get()
val c = latestC.get()
if (b != null && c != null) {
emit(transform(a, b, c))
}
}
}
}
Hi guys. Inspired by this implementation by @elizarov, I needed to re-implement this operator to accept several streams. Is there any problems inherent to this implementation? Two launch
blocks disturb me somehow.Paulius Ruminas
04/09/2020, 9:19 AMTestCoroutineScope
for my test but it loops forever. Simplified test case:
@Test
fun a() = runBlockingTest {
launch {
while(true) {
delay(60_000)
println("called")
}
}
advanceTimeBy(70_000)
}
What am I missing?Erik
04/09/2020, 2:00 PMNonCancellable
(https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-non-cancellable.html) coroutine context? Just before my UI finishes (Android activity onDestroy
), I want to complete a certain async task and be sure that it finishes before the coroutine scope's (viewModelScope
) cancellation completes. So on the scope I call scope.launch(NonCancellable) { /* Do async task that must complete */ }
.
If that task takes long enough for the the scope to be cancelled before the task completes, then this will not cancel the task, but let it run. So the scope will not complete until this task finishes.
Did I just introduce a bug or make use of a feature?
Are there safer alternatives that guarantee task completion?
(🤖 bonus question: in theory, if that task takes, say, a minute, or an hour, what would eventually happen to my process?)Luis Munoz
04/09/2020, 7:08 PMCLOVIS
04/10/2020, 12:48 PMsuspend fun <T> cache(requests: ReceiveChannel<Pair<Int
, SomeKindOfFuture<T>>> {
val cache = mutableMapOf<Int, T>()
for (request in requests) {
...
}
}
But now I'm stuck. What I would like is that if the value is not in the cache, an expensive calculation is started, but I don't want the `cache`function to block nor suspend: if the calculation is started, I still want the cache to be able to reply to other requests, so I can't do something like:
if (request.first in cache)
request.second.complete(cache[request.first])
else {
cache.add(request.first, launch { someCalculation(request.second) }.await())
}
I'm starting to think the best solution would be that the cache could receive either a ‘get' or ‘set' request, and when receiving a ‘get' that does not exists, it would create a new coroutine which would do the calculation and then send a ‘set' request, so the cache could continue processing requests in the meantime? What do you think?