Astronaut4449
02/19/2020, 6:55 PMChannel
constructor takes a capacity
argument of type Int. capacity
is also used to define the type of the channel by the use of magic constants such as RENDEVOUS=0, BUFFERED=-2, UNLIMITED=Int.MAX_VALUE.
val c1 = Channel<Int>(capacity = 15)
val c2 = Channel<Int>(capacity = RENDEVOUS) // dislike
Kotlin has the great feature of sealed classes. Wouldn't it be better to have an API like:
// Limited(capacity: Int) and object Rendevous belong to sealed class ChannelType
val c1 = Channel<Int>(type = Limited(15))
val c2 = Channel<Int>(type = Rendevous) // Rendevous is an object here
kevinherron
02/19/2020, 10:20 PMJakub Gwóźdź
02/20/2020, 9:44 AMfun <T, R> runConcurrently(nThreads: Int, params: Collection<T>, op: suspend (T) -> R): List<R> {
return runBlocking {
Executors.newFixedThreadPool(nThreads).asCoroutineDispatcher().use { pool ->
params.map { p -> async(pool) { op(p) } }
}.awaitAll()
}
}
and it works well when I DON'T use any yielding call inside my op()
, e.g:
runConcurrently(3, (1..8).toList()) { param ->
counter.increase()
val sleepTime = 1000L / param
println("Started $param in thread ${Thread.currentThread().name}, counter is $counter, sleepTime is $sleepTime")
Thread.sleep(sleepTime)
println("Stopping $param in thread ${Thread.currentThread().name}")
counter.decrease()
}
But as soon as I call any suspending function inside, e.g. change Thread.sleep()
to delay()
, it switches rest of execution to DefaultExecutor, and my pool is not limiting anymore. What am I doing wrong? And what is current best way to limit thread pool? I know newFixedThreadPool is deprecated, but there is no alternatives afaik.magnumrocha
02/20/2020, 4:33 PMchannelFlow
or callbackFlow
) without sending a error to the consumers?magnumrocha
02/20/2020, 4:35 PMchannelFlow
even I don't passing some Exception, internally the flow will throw a CloseSendChannelException
Joe
02/20/2020, 11:44 PMjava.util.stream.Stream.consumeAsFlow()
to process a stream as a flow. Sometimes, it appears that the finally { stream.close() }
block is not getting called (possibly on certain cancellation / error conditions?) (confirmed by duplicating the StreamFlow class and adding a log statement), although if I add an onCompletion {}
to the returned flow, that block does get called even when the finally gets skipped. Anything that can explain this behavior? Will probably move back to using the onCompletion
to close the Stream in the meantime though.Chantry Cargill
02/21/2020, 2:12 PM@GetMapping("/things")
suspend fun getThings() = coroutineScope {
val thing = async {
suspendingCall()
}
val anotherThing = async {
suspendingCallAgain()
}
listOf(thing.await(), anotherThing.await())
}
Leon K
02/21/2020, 2:18 PMsuspend
fun and a non-suspending function. I cannot just do @Synchronized
on the blocking one and use Mutex
on the suspend-one as these don't exclude each other, such that there is still the posibility of two concurrent accesses.
It seems as tough Mutex
is not usable from non-blocking code, and @Synchronized
doesn't help with `suspend fun`s.
what's the best approach here?rkeazor
02/22/2020, 9:15 PMBrendan Weinstein
02/24/2020, 9:10 AMclass StateMachine(val scope: CoroutineScope) {
private val inputActions: BroadcastChannel<Rankings.Action> = BroadcastChannel(Channel.BUFFERED)
private var isInitialized = atomic(false)
init {
scope.launch {
inputActions.asFlow()
.onStart { isInitialized.value = true //onStart hook is called just before collect gets called }
.collect { action ->
//handle action
}
// CANNOT CALL FUNCTION HERE BECAUSE `collect` SUSPENDS
}
}
fun dispatchAction(action: Rankings.Action) = scope.launch {
while(!isInitialized.value) {
}
inputActions.send(action)
}
}
Michael Friend
02/24/2020, 6:33 PMmomrak
02/25/2020, 7:02 AMfun handleDelete(extIds: List<String>) {
val ids = someClient1.getInternalIds(extIds) // get deleteable IDs
try {
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) { // use IO dispatcher as someClient2 makes a call to some remote service.
ids.forEach{ launch { someClient2.deleteId(it) } }
}
} catch (e: Exception) {
handleException(e)
}
}
and I want to test it. My team uses Mockito and not Mockk, if that makes any difference.
Maybe the test does not make sense, but as I am not very familiar with coroutines I want to test that these are run correctly in paralell and that they exit the runBlocking block after they're all done. So I was thinking of mocking the responses from someClient2
so that say i have extIds = listof("123", "456")
and the calls take respectively 500ms and 750ms, I want to check that in total this function does not use much more time than ~750ms and not (500+750)=1250ms.
Is this possible using Mockito? And is it actually a useful test? 🤔Slackbot
02/25/2020, 11:19 PMursus
02/26/2020, 3:52 AMJason
02/26/2020, 10:33 AMrxJava
Single.timer(INTERVAL_SECONDS, TimeUnit.SECONDS)
.flatMap{ // call API }
.repeat()
.subscribeOn(<http://Schedules.io|Schedules.io>())
I heard someone mentioned using flow
. It look likes
while(true){
// call
// delay(intervalMili)
}
So I think it’s not looking good .bod
02/26/2020, 2:01 PMsuspend
lambda. I think there's 2 way I can do this:
• have 2 different functions: something like handle(onSuccess: () -> Unit)
and handleSuspend(onSuccess: suspend () -> Unit)
• just have the suspend version and it can also accept a regular (non suspend) lambda
My question is is there a performance penalty (or otherwise) in doing the second option?LastExceed
02/26/2020, 2:10 PMsuspend
?Animesh Sahu
02/26/2020, 2:21 PMval handler = CoroutineExceptionHandler { context, exception ->
Log.d("exception", exception.message ?: "", exception)
suspensionFunc() // <--- This part, such as for calling a network requests.
}
What i thought of are:
1. Wrap the context with a CoroutineScope()
and then launch with it.
2. Call using runBlocking{}
, but isn't best idea cause it could affect app performance specially if ran on Main threads
3. Use GlobalScope()
4. Use GlobalScope()
but launch to IO dispatcher.Esa
02/27/2020, 8:58 AMMaurice Jouvet
02/27/2020, 9:44 AMval entryCoroutineContext = newFixedThreadPoolContext(5, "sync_entry_fixed")
CoroutineScope(entryCoroutineContext).launch {
val syncEntries = syncEntryDao.findAll()
syncEntries.forEach { syncEntry ->
Timber.d("Got this data to sync: $syncEntry") // All data are shown here I exepct just 5 to 5
syncEntry.locked = true
syncEntryDao.update(syncEntry)
val response = webServices.syncEntriesCoroutine(clientName, syncEntry.date, syncEntry.blueAppId, syncEntry.offset, syncEntry.limit)
if (response.isSuccessful) {
...
}
}
corneil
02/27/2020, 11:55 AMVlad
02/28/2020, 10:49 AMdelay(Long.MAX_VALUE)
or
Mutex(true).run { lock() }
or something else?KotlinIsMyFav
02/28/2020, 5:44 PMKotlinIsMyFav
02/28/2020, 10:06 PMDavid Glasser
02/29/2020, 2:43 AMfun listFiles(bucket: String, prefix: String): Flow<Blob> = flow {
var page = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
gcs.list(bucket, BlobListOption.prefix(prefix))
}
while (true) {
emit(page)
if (!page.hasNextPage()) {
break
}
page = withContext(<http://Dispatchers.IO|Dispatchers.IO>) { page.nextPage }
}
}.flatMapConcat { page ->
page.values.asFlow()
}
but I don't know if that is rightVsevolod Kaganovych
03/01/2020, 8:41 AMRxJava
perviously: presentation layer knows nothing about RxJava
, we only pass callback from UseCases
with Success or Error. But now we switch to Flow
a here’s what I need to understand: I don’t want a ViewModel
to know anything about the Flow
, so I pass a job
to a ViewModel
to unsubscribe from it and callbacks according to state and use mainScope()
function in every UseCase
, so I don’t use a ViewModelScope
. I need to know your thoughts about it. By doing this I’m sure that if I need to switch in future to other framework than coroutines flow
I would do it pretty smoothly.Joan Colmenero
03/02/2020, 12:42 PMval time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
because if for instance one.await() crashes what will happen? or if both crashes, is there any way to handle these errors? Or in case I just want to continue with the execution instead of adding the values just printing them, is there any way to print the first one (the one that did not crash) and the others doesn't (if it crashes)?Mohamed Ibrahim
03/02/2020, 3:52 PMFlow<Product>
which emits one by one, how I make it to collect those in list so I could receive a list in collect()
Luis Munoz
03/02/2020, 4:07 PMJoan Colmenero
03/03/2020, 10:12 AMwithContext
from my viewModel and I'm planning to put it on the usecase
/ repository
something like this
launch{
updateView()
withContext(<http://Dispatchers.IO|Dispatchers.IO>){
doLogin(username,password)
}
updateView()
}
This is how I'm doing it now ^
Then if I want to remove the withContext(<http://Dispatchers.IO|Dispatchers.IO>)
I can do it like this
suspend fun doLogin(usernarme: String, password: String) = <http://Dispatchers.IO|Dispatchers.IO> { repo.doLogin(username,password)... }
So, is there any way to test it correctly?
I'm using mockK
so, I know with coEvery
is going to work, but is it safe to do not put Dispatchers.Uncofined
for testing purposes?
Also do you think it's a good idea to do not change context on viewModel and then just do this
launch {
updateView()
doLogin(username,password)
updateView()
}
Joan Colmenero
03/03/2020, 10:12 AMwithContext
from my viewModel and I'm planning to put it on the usecase
/ repository
something like this
launch{
updateView()
withContext(<http://Dispatchers.IO|Dispatchers.IO>){
doLogin(username,password)
}
updateView()
}
This is how I'm doing it now ^
Then if I want to remove the withContext(<http://Dispatchers.IO|Dispatchers.IO>)
I can do it like this
suspend fun doLogin(usernarme: String, password: String) = <http://Dispatchers.IO|Dispatchers.IO> { repo.doLogin(username,password)... }
So, is there any way to test it correctly?
I'm using mockK
so, I know with coEvery
is going to work, but is it safe to do not put Dispatchers.Uncofined
for testing purposes?
Also do you think it's a good idea to do not change context on viewModel and then just do this
launch {
updateView()
doLogin(username,password)
updateView()
}
streetsofboston
03/03/2020, 12:35 PMJoan Colmenero
03/03/2020, 2:00 PM<http://Dispatchers.IO|Dispatchers.IO>
instead of the Disptachers.Uncofined
?tjohnn
03/03/2020, 7:45 PM