Joan Colmenero
10/27/2019, 8:16 PMclass ContentPresenter(
private val useCase: ContentUseCase,
private val contextPool: CoroutineContextProvider = CoroutineContextProvider()
) {
....
open class CoroutineContextProvider {
open val Main: CoroutineContext by lazy { UI }
open val IO: CoroutineContext by lazy { CommonPool }
}
Then inside of each presenter I'm doing
launch(contextPool.Main) {
val content = withContext(<http://contextPool.IO|contextPool.IO>) {
repository.requestContent()
}
view.displayContent(content)
}
On every presenter I'm passing the CoroutineContextProvider
in order to test it with unit testing
then for testing I have the TestCouroutineContextProvider
which I use Unconfined
, the thing is, do you guys are you using something like AbstractPresenter
where you have there the Job
and then you only from the presenter call it?
I'd like to abstract it a little bit, and then could use my presenter without passing the CoroutineContextProvider
via parameter, and then use it also in unit testing
. Could you help me out with this?Mani
10/28/2019, 1:04 PMslackIncomingHook.sendText(SlackMessage("Attempting RTS for son's of length: ${stockOutwardNoteIds.size}, warehouseId: ${warehouse.id()}"))
stockOutwardNoteIds.map {
async {
warehousing.packStockOutwardNote(warehouse.id(), it)
freshRTSHelper.markRTSForFreshOrder(warehouseId = warehouse.id(), stockOutwardNoteId = it)
}
}.awaitAll()
sdeleuze
10/28/2019, 1:51 PMawait()
on the returned Promise
) in JavaScript callbacks like onclient = { }
? cc @Vsevolod Tolstopyatov [JB]itnoles
10/28/2019, 8:13 PMJoan Colmenero
10/28/2019, 9:42 PMopen class CoroutineContextProvider {
open val Main: CoroutineContext by lazy { UI }
open val IO: CoroutineContext by lazy { CommonPool }
}
Is the same as :
open class CoroutineContextProvider {
open val main: CoroutineContext by lazy { Dispatchers.Main }
open val IO: CoroutineContext by lazy { <http://Dispatchers.IO|Dispatchers.IO> }
}
I was using experimental coroutines and now I have to update themMarcin Gryszko
10/29/2019, 6:20 AMSuspendLambda
resumes its execution after last suspension point when invokeSuspend
is called again? Is the magic field L$0
inside SuspendLambda
responsible for this?
I'm working with a dead-simple snippet to understand the inner workings:
fun main() {
val i = runBlocking {
delay(1000)
1
}
println(i)
}
groostav
10/29/2019, 9:26 PMcoroutineContext
with a finalize
that can detect an abandoned coroutine? Maybe print a warning?James
10/29/2019, 11:08 PMPaulius Ruminas
10/30/2019, 8:14 AM_location
channel was closed and we are not sure how can this happen since _location
is a private member and there is no call to _location.close()
. What are we missing?
Dagger
@Provides
@Singleton
fun gpsLocationService(): GpsLocationService = GpsLocationService(
context = context,
coroutineContext = Dispatchers.Default
)
Service
class GpsLocationService(
private val context: Context,
override val coroutineContext: CoroutineContext
) : CoroutineScope {
private val logger = Logger.get(this::class)
private val _location = BroadcastChannel<Location>(1)
private val interval: Millis = GpsUpdateInterval.High.interval
private val fusedLocationClient = LocationServices.getFusedLocationProviderClient(context)
private val locationCallback: LocationCallback = object : LocationCallback() {
override fun onLocationResult(locationResult: LocationResult) {
if (GpsHelper.isGpsEnabled(context)) {
logger.trace("Location acquired (${locationResult.lastLocation}).")
_location.sendBlocking(locationResult.lastLocation)
} else {
logger.trace("GPS is disabled, ignoring location change.")
}
}
}
suspend fun requestLocation(): Location? = _location.openSubscription().consume {
val location = withTimeoutOrNull(15.seconds) { receiveOrNull() }
location
}
}
Nikky
10/30/2019, 10:17 AMManuel Vivo
10/30/2019, 10:55 AMprivate val channel = ConflatedBroadcastChannel<T>
or private val channel = BroadcastChannel<T>(Channel.CONFLATED)
?AJ Alt
10/30/2019, 6:24 PMCoroutineScope.future
or rxSingle
functions from the integrations libraries. Those functions are all implemented with AbstractCoroutine
, which is marked as internal. Is there a recommended way to implement that type of functionality without internal classes, or is there no way around internal APIs a the moment?Paul Woitaschek
10/30/2019, 8:55 PM@Test
fun experiment() = runBlockingTest {
PublishProcessor.create<Unit>()
.replay(1)
.refCount()
.asFlow()
.first()
}
This fails with java.lang.IllegalStateException: This job has not completed yet
oO
Can someone explan this?danny
10/30/2019, 11:54 PMlouiscad
10/31/2019, 7:00 AMcombine
.Khan
10/31/2019, 9:47 AMAlexjok
11/01/2019, 9:07 AMsuspend fun test() {
async { someWork() }
}
How call async inside suspend fun?Jonathan Mew
11/01/2019, 9:52 AMrunBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
try {
thirdParty.doNetworkStuff()
} catch (e: Exception) {
log.warn("recoverable error", e)
}
}
Robert Jaros
11/01/2019, 1:24 PMlaunch
to GlobalScope.launch
the exception is printed twice:
https://pl.kotl.in/13sfoRVtk
Shouldn't the unhandled exception cancel the job in both cases?Adam Powell
11/01/2019, 2:06 PM1 3 2
given that withContext
returns the T
from the block, which would be the Unit
from executing println(2)
first?ubu
11/01/2019, 11:56 PMFlow
emitting 1
, 2
, 3
. Is there a way to transform this flow to emit: [1]
, [1,2]
, [1,2,3]
. I know there’s this fold
operator, but it’s terminal. In my case I need a value to be emitted on each accumulation step. Is there any appropriate operator out-of-the-box?Nav Singh
11/02/2019, 4:51 AMalexsullivan114
11/02/2019, 8:33 PMinit
of a view model of mine may look like this:
init {
myObservable
.filter { whatever() }
.map { whatever() }
.subscribe { pushToMySubject() }
myOtherObservable
.filter { whatever() }
.map { whatever() }
.subscribe { pushToMyOtherSubject() }
myThirdObservable
.filter { whatever() }
.map { whatever() }
.subscribe { pushToMyThirdSubject() }
}
But when using flow, we can only call collect
(which if I'm not mistaken is analagous to subscribe
in RxJava) from within a coroutine context, which makes sense. But, as far as I can tell, calling collect
blocks that coroutine context (my terminology might be garbage here) so you can't really have the above setup because we wouldn't execute the last two subscribes until the first observable completes.
So two questions:
1. Is my understanding correct?
2. What's the preferred path to handle this sort of scenario?Khan
11/02/2019, 11:56 PMKroppeb
11/03/2019, 2:09 AMDeferred<V>
but only fail if both fail. Would this work/is there a better way to do it?scottiedog45
11/03/2019, 3:22 PMval scope = CoroutineScope(errorHandler)
scope.launch {
throw SomeSillyException()
}
val scope = CoroutineScope()
scope.launch(errorHandler) {
throw SomeSillyException()
}
william
11/03/2019, 6:19 PMVincent Williams
11/03/2019, 7:25 PMcollect { }
in the unit test and then verify that the correct item comes through, however this throws IllegalStateException: This job has not completed yet
Stephan Schroeder
11/04/2019, 11:17 AMfun <T, R> Iterable<T>.pmap(
exec: ExecutorService,
transform: (T) -> R
): List<R>
which uses an instance of ExecutorService to do all the transformations in parallel. An implementation of this is given e.g. here: https://stackoverflow.com/questions/34697828/parallel-operations-on-kotlin-collections/35638609#35638609
The problem is that I want to continue to work on the Rs as soon as they’re done, so List<R> is the wrong collection class (because a list will not be returned until all transformations are done). First I was thinking Sequence<R> would be the anwser for this. But now I think it isn’t. Firstly I could even make it work because I can’t simply modify to code to return a Sequence<R):
= sequence{
....
for (item in this) {
exec.submit { yield transform(item) }
}
....
}
is actually a compile-time error because I can’t call yield from a “sub-lambda”). But also a Sequence is supposed to be lazy, so you don’t start computing the next value until it’s requested. So what is the correct collection-like class??
Is this what coroutines’ Flow
is for? I guess I could also use a Java Stream
, but that just seams so unidiomatic.
UPDATE:
So I tried the flow-builder but it runs into the same problem as with Sequence:
fun <T, R> pFlowMap(
items: Iterable<T>,
exec: ExecutorService,
transform: (T) -> R): Flow<R> = flow {
for (item in items) {
exec.submit { emit(transform(item)) }
}
exec.shutdown()
exec.awaitTermination(1, TimeUnit.DAYS)
}
The emit
is flagged with “Suspension functions can be called only within coroutine body”. So I tried wrapping that into a `runBlocking`:
runBlocking {
for (item in items) {
exec.submit { emit(transform(item)) }
}
}
but that dosn’t change a thing.aerb
11/04/2019, 1:23 PM