Dariusz Kuc
01/20/2021, 5:54 PMChannel
based pipelines and was wondering whether you have any suggestions to make it better.
NOTE: for this exercise want to stick to channels only
Consider following basic example - fetch list of items and for each one of them fetch additional information (details + reviews) that will be used to final result
val result = mutableListOf()
val summaries = retrieveSummaries()
for (summary in summaries) {
val details = retrieveDetails(summary.id)
val reviews = retrieveReviews(summary.id)
result.add(buildResult(summary, details, reviews)
}
return result
I could easily parallelize the above by doing something like
summaries.map { summary ->
async {
val details = async { retrieveDetails(summary.id) }
val reviews = async { retrieveReviews(summary.id) }
buildResult(summary, details.await(), reviews.await())
}
}
Currently trying to figure out what would be the "best" way to achieve the same using Channel
. "Simplest" would be to do a sequence of channels
val summaries: ReceiveChannel<Summary> = summariesChannel()
val detailsAndSummaries: ReceiveChannel<Pair<Summary, Detail>> = detailsAndSummariesChannel(summaries)
val result: ReceiveChannel<Result> = resultChannel(detailsAndSummaries)
Looks like BroadcastChannel
is marked obsolete but I guess logically it would make sense to broadcast summaries and then send both details and reviews from separate channels but I am unsure how to read from multiple channels at once (i.e. buildResult
would need results from 3 channels). Any ideas/suggestions?Marco Righini
01/20/2021, 10:53 PMclass CoroutinesReactiveTest {
private val dep = mock<Dep>()
private val underTest = UnderTest(dep)
@Test
fun `Test someOtherMethod called`() = runBlocking {
underTest.methodToTest().test()
verify(dep).someMethod() // Commenting this line test passes
verify(dep).suspendingFun()
verify(dep).someOtherMethod()
}
}
class Dep {
fun someMethod() {
println("someMethod called")
}
suspend fun suspendingFun() {
delay(100)
println("suspendingFun called")
}
fun someOtherMethod() {
println("someOtherMethod called")
}
}
class UnderTest(private val dep: Dep) {
fun methodToTest(): Completable {
return Completable.fromCallable { dep.someMethod() }
.andThen(rxCompletable { dep.suspendingFun() })
.andThen(Completable.fromCallable { dep.someOtherMethod() })
}
}
commenting the first verify test passesJoão Eudes Lima
01/21/2021, 4:18 AMeygraber
01/21/2021, 9:30 PMCancellationException
should I rethrow it?Javier
01/21/2021, 10:01 PMfilterNotNull
a MutableStateFlow
and keep returning a StateFlow
instead of a Flow
? It is forcing me to add an stateIn
to retransform the Flow
into StateFlow
william
01/22/2021, 12:08 AMMutableSharedFlow
is there any consequence for choosing a buffer of Int.MAX_VALUE
? i.e. does it allocate an array of that size or something?
often i would like some buffer to exist but don't have a definitive size in mind so i default to max value rather than something arbitrary between 0 .. MAX_VALUE
Jason
01/22/2021, 7:16 AMLog: Before delay
Log: Init Status: true
Log: Job Status: true
getUserInfo()
will not be called at all ??? What’s happen with flow ?
interface InitRepository {
val initFinish: Flow<Boolean>
}
suspend fun startApp() {
checkStatus()
waitInitializeDone() -> App is stuck on waiting here... Forever waiting
getUserInfo()
}
suspend fun waitInitializeDone() {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
val waitJob = launch {
Log.d("Before delay")
delay(10000L)
throw TimeoutException("Init process timeout")
}
initRepository.initFinish
.onEach { initFinish ->
Log.d("Init Status: $initFinish")
Log.d("Job Status: ${waitJob.isActive}")
if (initFinish && waitJob.isActive) {
waitJob.cancel()
}
}
.launchIn(this)
}
}
christophsturm
01/22/2021, 11:34 AMreturn@coroutineScope withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
Files.readAllBytes(path)
}
Chris Fillmore
01/22/2021, 2:35 PMFunkyMuse
01/25/2021, 10:43 AMprotected val loadingState = BroadcastChannel<Boolean>(Channel.BUFFERED)
val actionReceiver = loadingState.asFlow()
leandro
01/25/2021, 4:06 PMList<Flow<A>>
to Flow<List<A>>
?
I’m currently applying the following:
.flatMapLatest { items: List<Flow<A>> ->
combineTransform(items) {
emit(it.toList())
}
}
but am wondering if this is common enough and/or is handled in a better way with a different operator.ursus
01/25/2021, 6:37 PMgbaldeck
01/26/2021, 1:52 AMursus
01/26/2021, 5:56 AMWukongRework.exe
01/26/2021, 7:17 AMimport kotlinx.coroutines.*
fun main() = runBlocking {
launch {
delay(100)
println("@ launch")
}
withContext(newSingleThreadContext("new Context")) {
delay(200)
println("@ withContext")
}
println("@ runBlocking")
}
// Outputs
@ launch
@ withContext
@ runBlocking
but
import kotlinx.coroutines.*
fun main() = runBlocking {
launch {
delay(100)
println("@ launch")
}
launch(newSingleThreadContext("new Context")) {
delay(200)
println("@ withContext")
}
println("@ runBlocking")
}
// Outputs
@ runBlocking
@ launch
@ withContext
could someone please explain this?Marc Knaup
01/26/2021, 4:28 PMDefault
dispatcher? Or do you you accept that the current dispatcher (whatever it is) is potentially blocked for a few milliseconds?
I have the habit of wrapping such code in withContext(Dispatchers.Default) { … }
, but I’m not actually sure if that’s necessary. Or maybe it’s even a bad idea.Eugen Martynov
01/26/2021, 5:28 PMLucien Guimaraes
01/26/2021, 11:16 PM//Inside UseCase
suspend fun state(): SharedFlow<HomeState> = locationInteractor
.canGetLocations
.flatMapLatest { ... }
.flowOn(ioDispatcher)
.shareIn(
scope = CoroutineScope(Job() + ioDispatcher),
started = SharingStarted.Eagerly,
replay = 1,
)
//Inside ViewModel
suspend fun success(): Flow<List<HikeCard>> = homeInteractor
.state()
.filterIsInstance<HomeInteractor.HomeState.Success>()
.map { ... }
suspend fun error(): Flow<ErrorMessages> = homeInteractor
.state()
.filterIsInstance<HomeInteractor.HomeState.Error>()
.map { ... }
//Inside View
outputs.success().collect { ... } // the only one collected because of order
outputs.error().collect { ... } // never collected, except if moved above previous collector
Am I missing / misunderstanding something? Thanks for your help!Simon Lin
01/27/2021, 7:33 AMVivek Sharma
01/27/2021, 2:49 PMlaunch {
doingNetworkCall() // this is suspend function
}
// doing some work on main thread here like loading
so assume doingNetworkCall() is taking some time, till that time that function get suspended OR it just works in background thread and on completion it returns like callback and do other work
and if this work is getting suspended, then when it is getting resumed to achieve desired outputZach Klippenstein (he/him) [MOD]
01/27/2021, 2:57 PMMutableSharedFlow
are unclear, and I haven’t had enough coffee this morning to figure it out from looking at the code: if there’s no buffer and no replay, is there any difference between DROP_OLDEST
and DROP_LATEST
?
I don’t think there should be, since if there’s no buffer, and no subscriber waiting for a value at the time of emission, the only thing to do is drop the value and there’s no concept of the “oldest” or “latest” value, just “the value trying to be emitted”.Vivek Sharma
01/27/2021, 4:59 PMJob from coroutine scope
is printed first, not Job1
, why is that?
fun main() = runBlocking<Unit> {
launch {
println("Job 1")
}
launch {
println("Job 2")
}
launch {
println("Job 3")
}
coroutineScope {
launch {
println("Job from launch inside coroutine scope")
}
println("Job from coroutine scope ")
}
}
neworldlt
01/27/2021, 6:48 PMval flowA = (1..10).asFlow()
val flowB = flowOf("a","b", "c", "d", "e")
Result I want to get is: 1, 2, a, 3, 4, b, 5, 6, c, 7, 8, d, 9, 10, e
My current solution is to use ReceiveChannel, but it does not look very idiomatic:
launch {
val channelB = flowB.produceIn(this)
flowA.withIndex().transform { item ->
emit(item)
if (item.index > 0 && item.index % 2 == 1)
emit(flowB.receive())
}.collect { /* something useful */ }
}
Jgafner
01/28/2021, 7:34 AMSimon Lin
01/29/2021, 3:34 AM... using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread.So how does it do not to blocking thread but using same thread? When does it really switch another thread? (or never?) --- Edit: The key is
Thread pool
?Slackbot
01/29/2021, 9:27 AMEugen Martynov
01/29/2021, 9:30 AMprivate val testCoroutineDispatcher = TestCoroutineDispatcher()
private val testCoroutineScope = TestCoroutineScope(testCoroutineDispatcher)
I try to run suspend function in it
fun runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) =
testCoroutineScope.runBlockingTest { block() }
And particular is
dispatcherRule.runBlockingTest {
val msg = Gallery.Effects.LoadGallery.run.invoke(this, Gallery.Deps(photoGalleryManager))
assertThat(msg).isEqualTo(Gallery.Msg.FilesLoaded(failure))
}
I’m getting
This job is not completed yet
Even if I change it to:
private val testCoroutineScope = TestCoroutineScope(testCoroutineDispatcher + Job())
I still get that exception, what can I do?
If I run test inside the global scope then test passNiklas Gürtler
01/29/2021, 10:54 AMclass MyActivity {
var coroJob : Job? = null
val coroScope = MainScope()
// Start my coroutine, may be called from any thread
fun startCoro () {
// coroJob should only be accessed from main thread
coroutineScope.launch (Dispatchers.Main) {
if (coroJob == null) {
coroJob = coroutineScope.launch (<http://Dispatchers.IO|Dispatchers.IO>) {
// Do the actual work here ...
withContext(Dispatchers.Main) {
coroJob = null
}
}
}
}
}
}
But reducing the boiler plate?Niklas Gürtler
01/29/2021, 10:59 AMval coroJob = AtomicReference<Job> ()
/// Start my coroutine, may be called from any thread
fun startCoro () {
val coro = coroutineScope.launch(<http://Dispatchers.IO|Dispatchers.IO>, start = LAZY) {
// Do the actual work here ...
coroJob.set(null)
}
// If coroJob isn't null, do nothing.
if (coroJob.compareAndSet(null, coro))
coro.start()
}
which avoids the explicit context switches and should still be thread-safe. Is there a more idiomatic and safe way to do that?christophsturm
01/30/2021, 10:16 AMdeferredList.mapResolved{deferredResult -> ... }
christophsturm
01/30/2021, 10:16 AMdeferredList.mapResolved{deferredResult -> ... }
suspend fun bla(deferred: List<Deferred<Context>>) {
coroutineScope {
deferred.forEach {
launch {
val context = it.await()
println(
context.summary()
)
}
}
}
}
Arslan Armanuly
01/30/2021, 1:57 PMval singleThreadedContext = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
and then
withContext(singleThreadedContext) {
println(context.summary())
}
christophsturm
01/30/2021, 2:27 PMArslan Armanuly
01/30/2021, 3:49 PMval mutex = Mutex()
mutex.withLock { /* code */ }
christophsturm
01/30/2021, 5:36 PMZach Klippenstein (he/him) [MOD]
01/30/2021, 5:36 PMif i do it like this, is there a way to be sure that my launch blocks don’t interrupt each other?
You mean you want the prints to be executed in the order of the list, regardless of which order the deferreds complete in?
christophsturm
01/30/2021, 5:39 PMZach Klippenstein (he/him) [MOD]
01/30/2021, 5:40 PMchristophsturm
01/30/2021, 5:41 PMZach Klippenstein (he/him) [MOD]
01/30/2021, 5:41 PMchristophsturm
01/30/2021, 5:41 PMZach Klippenstein (he/him) [MOD]
01/30/2021, 5:41 PMchristophsturm
01/30/2021, 5:45 PMZach Klippenstein (he/him) [MOD]
01/30/2021, 5:46 PMchristophsturm
01/30/2021, 5:48 PMZach Klippenstein (he/him) [MOD]
01/30/2021, 8:05 PMasSequence()
call on contextInfos
looks redundant though, List
has forEach
as wellchristophsturm
01/30/2021, 10:03 PM