Olekss
02/12/2018, 12:28 PMgildor
02/14/2018, 8:22 AMval channel = produce {
delay(500) // only one value with delay, but can be wrapped to some loop)
send("some value")
}
for (value in channel) {
//Do something
}
Depends on your case of course. What exactly do you want to do?bj0
02/14/2018, 9:14 PMCompletableDeferred
was completely internal, I was using return deferred.await()
and object.onCallback {deferred.complete(it)}
koufa
02/15/2018, 10:37 AMleandrodev
02/15/2018, 12:54 PMConflatedBroadcastChannel
, but discarding the event when it's broadcasted. Is there any channel implementation that is capable of that?chirag
02/16/2018, 7:57 AMclass DataAccess(val id:String) {
private val fooAsync = async { fetchDataFromTableA(id) }
private val barAsync = async { fetchDataFromTableB(id) }
fun getData(): Data {
var foo =fooAsync.await()
var bar = barAsync.await()
return compute(foo,bar)
}
}
Just want to know ,Is it right way to do it ?uhe
02/16/2018, 9:49 AMsuspendAtomicCancellableCoroutine
and suspendCancellableCoroutine
?sdeleuze
02/16/2018, 10:07 AMReceiveChannel
to a shared BroadcastChannel
? I try to convert my Reactor SSE example to Coroutines where I want to use a single stream from my Mongo database to a shared broadcasted stream to browsers. Reactor version: https://github.com/sdeleuze/spring-kotlin-deepdive/blob/step3/src/main/kotlin/io/spring/deepdive/web/ArticleController.kt#L51-L52. Coroutines version without the broadcast feature: https://github.com/sdeleuze/spring-kotlin-deepdive/blob/step3-coroutine/src/main/kotlin/io/spring/deepdive/web/ArticleController.kt#L52-L57.themishkun
02/19/2018, 6:15 PMdh44t
02/20/2018, 10:12 AMthemishkun
02/20/2018, 11:40 AMasad.awadia
02/20/2018, 4:09 PMlaunch { while(true) { //listen to keyboard input and do stuff } }
zak.taccardi
02/20/2018, 11:50 PMlouiscad
02/21/2018, 10:48 AMjkbbwr
02/21/2018, 4:20 PMasyncHandler(this::ping)
Will complain that KFunction<RoutingContext, Unit> doesnt match suspend (RoutingContext) -> Unitdsgryazin
02/21/2018, 7:06 PMMutex.kt
internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
// State is: Empty | LockedQueue | OpDescriptor
// shared objects while we have no waiters
private val _state = atomic<Any?>(if (locked) EmptyLocked else EmptyUnlocked)
// resumeNext is: RESUME_QUIESCENT | RESUME_ACTIVE | ResumeReq
private val _resumeNext = atomic<Any>(RESUME_QUIESCENT)
public override val isLocked: Boolean get() {
_state.loop { state ->
when (state) {
is Empty -> return state.locked !== UNLOCKED
is LockedQueue -> return true
is OpDescriptor -> state.perform(this) // help
else -> error("Illegal state $state")
}
}
}
1. Please, explain, what is meant by each of Empty | LockedQueue | OpDescriptor
?
2. What is for resumeNext is: RESUME_QUIESCENT | RESUME_ACTIVE | ResumeReq
?
3. What means the `help`comment in is OpDescriptor -> state.perform(this) // help
? 😅bdawg.io
02/21/2018, 7:43 PM/ -- [ Job A ] -- \
---- ---- [ Job C ]
\ -- [ Job B ] -- /
jw
02/22/2018, 4:33 AMPaul Woitaschek
02/22/2018, 7:33 AMasad.awadia
02/23/2018, 12:40 AMjkbbwr
02/26/2018, 7:33 PMkrotki
02/27/2018, 1:17 PMlouiscad
02/27/2018, 1:36 PMwithoutclass
02/27/2018, 3:28 PMpablisco
02/27/2018, 4:03 PMbj0
02/27/2018, 4:15 PMwithContext
was different, is that difference documented?elizarov
02/27/2018, 7:03 PMmiha-x64
02/27/2018, 8:36 PMsuspend Function<R>
? Exactly Function
because I don't know yet how many arguments it will accept.
Use-case: hierarchical web routing.
interface Address<T, HANDLER : suspend Function1<Unit>>
class Address1<T> : Address<T, suspend (T) -> Unit>
handles /{a}/
addresses, fetches an object with a
path, supplies fetched object to handler
class Address2<T, U> : Address<U, suspend (T, U) -> Unit>
handles /{a}/{b}/
addresses, fetches T
bound to a
path, fetches its child U
bound to b
path, supplies them to handler
and so on...Jonathan
02/28/2018, 9:03 AMdiesieben07
02/28/2018, 12:51 PMSendChannel
which might be closed? Do I have to just catch the ClosedSendChannelException
?diesieben07
02/28/2018, 12:51 PMSendChannel
which might be closed? Do I have to just catch the ClosedSendChannelException
?Jonathan
02/28/2018, 12:52 PMdiesieben07
02/28/2018, 12:53 PMclose
the correct way?Jonathan
02/28/2018, 12:54 PMisClosedForSend
to know if it is currently close.BroadcastChannel
?diesieben07
02/28/2018, 12:55 PMisClosedForSend
, but that's a race condition, is it not (tocttou)?Jonathan
02/28/2018, 12:57 PMproduce
. this function will handle everything, like closing when done, when cancelled and when failed.diesieben07
02/28/2018, 12:59 PMsendIfPossibleOrReturnFalse
method.do {
val element = TODO() // compute next element
} while (channel.sendIfPossibleOrReturnFalse(element))
Jonathan
02/28/2018, 1:02 PMproduce {
while(isActive) {
val element = TODO()
send(element)
}
}
Then produce
will handle it for youdiesieben07
02/28/2018, 1:02 PMTODO
is? Then send
will throw...Jonathan
02/28/2018, 1:04 PMsend
will throw a CancelledException
.diesieben07
02/28/2018, 1:04 PMproduce
approach, how do I terminate the underlying thread (newSingleThreadContext
) when the channel is closed?Jonathan
02/28/2018, 1:07 PMCancelledException
will be consumed by produce
as it is a normal reason to be terminated. If you use resources you should anyway always ensure they are closed in case of exception. (same as if you wouldn't use coroutines)diesieben07
02/28/2018, 1:08 PMJonathan
02/28/2018, 1:08 PMnewSingleThreadContext
you have of course to ensure that is terminated even in case of exception.diesieben07
02/28/2018, 1:08 PMuse
will not work, will it?Jonathan
02/28/2018, 1:10 PMdiesieben07
02/28/2018, 1:11 PMnewSingleThreadContext().use { ctx ->
return produce(ctx) {
}
}
won't work, right?Jonathan
02/28/2018, 1:11 PMdiesieben07
02/28/2018, 1:12 PMproduce
?Jonathan
02/28/2018, 1:14 PMfun startProducer() {
val context = newSingleThreadContext("my dispatcher")
val result = produce(context) {
context.use {
send(1)
send(2)
send(3)
}
}
}
diesieben07
02/28/2018, 1:14 PMcoroutineContext
as a parameter.Jonathan
02/28/2018, 1:15 PMbj0
02/28/2018, 4:35 PMproduce {
newSingleThreadContet("dispatch").use {
while(isActive)
send(TODO())
}
}
Jonathan
02/28/2018, 4:36 PMproduce
use CommonPool
bj0
02/28/2018, 4:37 PMwithContext
in therewhile
loopJonathan
02/28/2018, 4:38 PMproduce
bj0
02/28/2018, 4:41 PMJonathan
02/28/2018, 4:46 PMbj0
02/28/2018, 4:50 PMJonathan
02/28/2018, 4:56 PMval MyThreadPool = newSingleThreadPool("my thread pool")
and use it for the whole life of the application. calling produce(MyThreadPool) {}
diesieben07
02/28/2018, 5:00 PMCoroutineContext
into my own method and then pass it on to produce
.bj0
02/28/2018, 5:07 PMdiesieben07
02/28/2018, 5:08 PMnewSingleThreadContext().use {}
. It is closed properly.bj0
02/28/2018, 5:18 PM