rkeazor
01/23/2020, 7:13 AMubu
01/23/2020, 11:11 AMViewModel
, I tend to use ConflatedBroadcastChannel
(fooChannel
) as some kind of reactive field, which I transform to Flow
(fooFlow
), which I hold also as a field of this ViewModel
when needed, in order to observe it, combine streams, etc. For me, it’s like using BehaviorSubject
from RxJava
. Sometimes I need to access the current value of this channel to perform some fire-and-forget operation . What is then more correct, less error-prone:
fooFlow.take(1).collect { performSomeSuspendableOperation(it) }
or
performSomeSuspendableOperation(fooChannel.value)
Thanks guys!koufa
01/23/2020, 2:56 PMJan
01/24/2020, 12:06 PMviralshah
01/24/2020, 6:09 PMgotoOla
01/24/2020, 10:05 PMtaer
01/24/2020, 10:54 PMprivate suspend fun poll2() {
while (true) {
consumer.poll(Duration.ofMillis(500))
.forEach {
channel.send("ping")
}
}
}
}
Channel is a kotlin Channel. If consumer.poll returns empty, the foreach doesn't execute. So we don't ever execute any suspendable code, so this loops forever, not giving back the thread to the coroutine. If I add a delay(1)
immediately preceeding the poll, then other coroutines get a chance to run. Is there a cheaper alternative to an arbitrary 1ms delay?
BTW, this is an attempt to wrap a Kafka consumer output into a easier to consume Kotlin channel.ursus
01/26/2020, 4:48 PMPaul Woitaschek
01/27/2020, 7:00 AMEsa
01/27/2020, 8:32 AMtaskChannel
, a resultChannel
, a producer and multiple consumers. The producer is lightning fast and the consumers need a bit of time. Once the producer is done, it closes the taskChannel.
launch {
tasks.forEach {
taskChannel.send(it)
}
taskChannel.close()
}
Then this is the consumer side of things.
launch {
repeat (10) {
launch {
for (task in taskChannel) {
val result = process(task)
resultChannel.send(result)
}
}
}
}
This resultChannel undergoes one final step, which is logging the results:
for (result in resultChannel) {
log(result)
}
resultChannel.close()
My question is really just if I’ve closed the channels at the correct times etc. Also, is this a correct usage of the channels, or have I created some antipatterns or some such here? Any replies appreciated. It’s my first time looking into channels, and I like the way they let me start all three processes (production, consumption and logging) at the same timegotoOla
01/27/2020, 9:45 AMmingkangpan
01/27/2020, 1:49 PMlaunch(Dispatchers.Unconfined)
and
launch(start = CoroutineStart.UNDISPATCHED)
?
afaiu both will excute job immediatley in the same thread right?tseisel
01/27/2020, 3:07 PMDispatchers.Unconfined
? What kind of use-case does it solve ?marcinmoskala
01/27/2020, 3:59 PMContinuationInterceptor
returning my custom continuation. Though cannot find an event on coroutine suspension. interceptContinuation
is called much less often than resumeWith
.dimsuz
01/27/2020, 10:24 PMlaunch
like so:
myScope.launch {
suspendingFunction1()
someFlow
.onEach { suspendingFunction2() }
.collect {
someChannel.send(Unit) // 'send' is also a suspending fun
}
}
Will each of them block? (inside this coroutine)lupajz
01/27/2020, 10:29 PMparentScope.launch { sub.consumeEach { ... } }
parentScope.launch { sub.consumeEach { ... } }
parentScope.launch(NonCancellable) { sub.consumeEach { ... } }
Now the last one job has extra NonCancellable context, am I correct to assume that the last subscriber won't closed if I would cancel the parentScope ?paulex
01/28/2020, 9:26 AMfun main(args:Array<String>) = runBlocking<Unit>{
InternalLoggerFactory.setDefaultFactory(Log4JLoggerFactory.INSTANCE)
val server = AppServer(this)
server.run()
}
I want to main thread to stay alive until a shutdown...Timur Atakishiev
01/28/2020, 10:04 AMsomeList.forEach {
GlobalScope.launch {
if (validator.validate(it)){
someStorage.add(it)
}
}
}
Hi guys, I am expecting that validate function is going to be excecuted in N numbers(N = size of someList) of coroutines. hewever, my application is working sequentialy, first it is validating first element, second element and so on. Should it work sequentially? Or am I doing something wrong?dimsuz
01/28/2020, 11:51 AMSingle
in coroutines? for example Single.fromCallable { doNetworkRequest() }
would emit only upon subscription. Should I just use Flow
which calls a suspending function inside? or is this too complex?Paul Woitaschek
01/29/2020, 3:39 PMAbhishek Bansal
01/29/2020, 3:54 PMSocketTimeOutExceptions
which is fine. I have all my API calls wrapped in try.. catch()
block like this
try {
viewModelScope.launch {
apiService.doCall()
// do other stuff
}
} catch(e: Exception) {}
and in case of flow I am using .catch()
operator for catching exceptions. My problem is my app randomly crashes with crashstack looking like this.
DefaultDispatcher-worker-1
Process: <appId>, PID: 6524
<http://java.net|java.net>.SocketTimeoutException: timeout
at okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:677)
at okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:685)
at okhttp3.internal.http2.Http2Stream.takeHeaders(Http2Stream.java:154)
at okhttp3.internal.http2.Http2ExchangeCodec.readResponseHeaders(Http2ExchangeCodec.java:136)
at okhttp3.internal.connection.Exchange.readResponseHeaders(Exchange.java:115)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:94)
This does not happen with all timeouts but happen at random. It seem like there is something is off with coroutine setup/usage/exception handling. Any pointers will help. Thanks!myanmarking
01/29/2020, 4:23 PMmyanmarking
01/29/2020, 4:29 PM@Test
fun `test try-catch outside launch is useless`() = runBlockingTest {
var result: Int = 0
try {
launch {
throw IllegalStateException()
result = 1
}
} catch (e: Exception) {
result = 2
}
Assert.assertTrue(result == 0)
}
@Test
fun `test try-catch inside launch catch the exception`() = runBlockingTest {
var result: Int = 0
launch {
try {
throw IllegalStateException()
result = 1
} catch (e: Exception) {
result = 2
}
}
Assert.assertTrue(result == 2)
}
dimsuz
01/29/2020, 5:24 PMsbyrne
01/29/2020, 6:51 PMCLOVIS
01/29/2020, 9:13 PMdiesieben07
01/30/2020, 9:09 PMJob
with a parent (parent is long-running), is it a memory/resource leak to not complete or cancel the Job? Will the parent hold on to it and prevent it from being garbage collected?rook
01/30/2020, 9:34 PMFlow
to an Rx construct. Unfortunately, this never seems to complete.Dave Jensen
01/31/2020, 12:21 AMMatt
01/31/2020, 5:03 AMMatt
01/31/2020, 5:03 AMoctylFractal
01/31/2020, 5:05 AMMatt
01/31/2020, 5:07 AMprivate suspend fun parseLeaves(leaves: List<Leaf>){
coroutineScope {
// launches all leaves into seperate coroutines to be dealt with as they can
leaves.forEach { launch { analyzeBlock(it) } }
}
}
Is how Im launching my coroutinesoctylFractal
01/31/2020, 5:08 AMMatt
01/31/2020, 5:08 AMMatt
01/31/2020, 5:10 AM