Tolriq
03/10/2020, 8:15 AMLuis Munoz
03/10/2020, 3:07 PMZach Klippenstein (he/him) [MOD]
03/10/2020, 5:37 PMMohamed Ibrahim
03/10/2020, 5:40 PMniqo01
03/10/2020, 6:25 PMJoe
03/10/2020, 6:26 PMchannel.consumeAsFlow()
• Send a message to the channel that results in another coroutine that sends another message to the channel that results in decrementing a CountDownLatch
• In the main thread, await the CountDownLatch
With 1.3.3, the await took ~180ms. With 1.3.4 its taking ~1.4 seconds. Not sure how this affects the production code yet, but is there a change in 1.3.4 that would explain this?azabost
03/11/2020, 10:58 AMFlow.launchIn()
says it is “a shorthand for `scope.launch { flow.collect() }`” but when I compare behaviors in unit tests it is a little different.
Having a simple flow:
private val integersFlow = flow {
var i = 1
while(true) {
delay(100)
emit(i++)
}
}
and using test scope and dispatcher:
private val observingScopeJob1 = SupervisorJob()
private val observingScope1 = CoroutineScope(observingScopeJob1)
private val testDispatcher = TestCoroutineDispatcher()
When I do the following:
@Test
fun `should receive 10 integers - collect`() = runBlocking {
var observedIntegers = 0
observingScope1.launch(testDispatcher) {
integersFlow.collect {
observedIntegers++
}
}
observedIntegers.shouldEqual(0)
testDispatcher.advanceTimeBy(1000)
observedIntegers.shouldEqual(10)
Unit
}
the test passes ✅
but when I convert it into RxJava-style like this:
@Test
fun `should receive 10 integers - launchIn`() = runBlocking {
var observedIntegers = 0
integersFlow
.onEach { observedIntegers++ }
.flowOn(testDispatcher)
.launchIn(observingScope1)
observedIntegers.shouldEqual(0)
testDispatcher.advanceTimeBy(1000)
observedIntegers.shouldEqual(10)
Unit
}
the test fails ❎
unless I add an additional delay(100)
before testDispatcher.advanceTimeBy(1000)
which seems very weird to me. Like… why is that needed to delay the main thread in unit test to allow the collecting coroutine to be launched at all?tseisel
03/11/2020, 4:44 PMCoroutineScope
that is a child of another ?
What I tried:
val parentScope = ...
val childScope = parentScope + SupervisorJob()
parentScope.cancel()
parentScope.isActive // false
childScope.isActive // true (!!)
It seems that cancelling the parent does not cancel its children. I guess I'm missing something here...Luis Munoz
03/11/2020, 8:24 PMgroostav
03/11/2020, 9:16 PMtextField.debouncedAsTyping(): Flow<Change<String, String>>
seemed like such a good idea, but back pressure/cancellation is kicking my butt.rrva
03/11/2020, 9:50 PMMikael Alfredsson
03/12/2020, 8:45 AMOfir Bar
03/12/2020, 8:57 AMvineethraj49
03/12/2020, 11:18 AMGlobalScope.async
does it gotoOla
03/12/2020, 1:32 PMclass MyQueueProcessor: CoroutineScope by CouroutineScope(<http://Dispatchers.IO|Dispatchers.IO>) {
suspend fun start() {
pollMessages()
processors.addAll(
(1..5).map {
launchMessageProcessor()
}
)
}
suspend fun pollMessages() = launch {
while(true) {
val messages = //... get messages from a queue
internalChannel.send(messages)
}
}
suspend fun launchMessageProcessor() = launch {
for (messages in internalChannel) {
// process each 'messages'
}
}
}
but for both pollMessages and launchMessageProcessor I get "Ambiguous coroutineContext due to CoroutineScope receiver of suspend function". How should I interpret this?
Is it because suspend fun start() is public and you inherit the coroutineContext from the caller? Since I extend CoroutineScope, the launch and async stuff I use should run on that scope with that dispatcher (IO) right?rahulrav
03/12/2020, 3:44 PMShawn Karber_
03/12/2020, 8:19 PMcharlesmuchene
03/13/2020, 1:31 PMmyanmarking
03/13/2020, 6:32 PMMohamed Ibrahim
03/13/2020, 8:48 PMflow {…}
builder, So I’m creating a watcher inside it here is the code, my question is how I clear inner listeners when flow canceled ?
fun EditText.textChangesFlow(): Flow<TextViewTextChangeEvent> {
flow<TextViewTextChangeEvent> {
val watcher = object : TextWatcher {
override fun afterTextChanged(s: Editable?) {
emit(TextViewTextChangeEvent(s.toString()))
}
override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
}
override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {
}
}
addTextChangedListener(watcher)
//how to call this when flow is canceled or finished
//removeTextChangedListener(watcher)
}
}
Andrew
03/14/2020, 4:13 AMMohamed Ibrahim
03/14/2020, 3:05 PMlaunchIn()
?JP
03/16/2020, 9:37 AMRun the code and check the log; we can see that all the coroutines still run on the main UI thread. We haven’t yet employed multithreading in any way, but already we have the benefits of running coroutines concurrently!
It’s very easy for us to change this code to run “contributors” coroutines on different threads from the common thread pool. Specifyas the context argument for theDispatchers.Default
function:async
async(Dispatchers.Default) { ... }```
I’m sort of confused. If I don’t specify a dispatcher in the argument for
async
function, isn’t the default dispatcher Dispatchers.Default
? Yet, I did see the logs change
from
147 [AWT-EventQueue-0] INFO Contributors - Clearing result
2251 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 54 repos
2678 [AWT-EventQueue-0 @coroutine#7] INFO Contributors - kotlin-benchmarks: loaded 7 contributors
...
to
164 [AWT-EventQueue-0] INFO Contributors - Clearing result
2172 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 54 repos
2527 [DefaultDispatcher-worker-5 @coroutine#10] INFO Contributors - anko-example: loaded 2 contributors
2621 [DefaultDispatcher-worker-5 @coroutine#9] INFO Contributors - kotlinx.html: loaded 15 contributors
...
But when I read the comments in the source code of Builders.common.kt
, it was stated like this:
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with corresponding [context] element.
and in Dispatchers.kt
, it was stated:
/**
* The default [CoroutineDispatcher] that is used by all standard builders like
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used
* by this dispatcher is equal to the number of CPU cores, but is at least two.
* Level of parallelism X guarantees that no more than X tasks can be executed in this dispatcher in parallel.
*/
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
Could someone explain what I’m missing?JP
03/16/2020, 12:31 PMAndrey Stepankov
03/16/2020, 5:04 PMJamie Taylor
03/16/2020, 5:13 PMfun main() = runBlocking<Unit> {
val a = newSingleThreadContext("a")
val b = newSingleThreadContext("b")
launch {
val set = mutableSetOf<Int>()
repeat(100000) {
withContext(a) {
set.add(it)
}
withContext(b) {
set.remove(it)
}
}
println(set.size)
}
}
I'd have thought it wouldn't be since set is being modified on multiple threads and isn't thread safe. On the other hand I can't see anything in the docs warning me not to do this and I haven't managed to produce any errors by doing seemingly unsafe operations.Steve
03/16/2020, 6:19 PMManuel Vivo
03/17/2020, 9:15 AMhttps://www.youtube.com/watch?v=IQf-vtIC-Uc&list=PLWz5rJ2EKKc_T0fSZc9obnmnWcjvmJdw_&index=5▾
myanmarking
03/17/2020, 11:16 AMmyanmarking
03/17/2020, 1:07 PMpublic fun <T> Flow<T>.onEmpty(action: suspend FlowCollector<T>.() -> Unit): Flow<T> {
var valueCount: Int = 0
return flow {
collect { valueCount += 1; emit(it) }
if(valueCount == 0) action()
}
}
does this makes sense in terms of implementation ?myanmarking
03/17/2020, 1:07 PMpublic fun <T> Flow<T>.onEmpty(action: suspend FlowCollector<T>.() -> Unit): Flow<T> {
var valueCount: Int = 0
return flow {
collect { valueCount += 1; emit(it) }
if(valueCount == 0) action()
}
}
does this makes sense in terms of implementation ?elizarov
03/17/2020, 1:48 PMmyanmarking
03/17/2020, 1:49 PMelizarov
03/17/2020, 1:51 PM