Tuan Kiet
06/02/2019, 12:23 PMharoldadmin
06/03/2019, 6:21 AMRendezvousChannel
in the place of PublishSubject
from RxJava in a project, but I need a way to access the current element in the channel without removing it. Something like a 'peek' operation on a LinkedBlockingQueue
.
Now a RendezvousChannel
has a method called poll()
which does give me the current element (or null) but it also removes it from the channel. How can I perform a peek operation on a channel then?louiscad
06/03/2019, 9:50 AMcollect
variant for Flow
that allows to access the previous value, or null for initial value?voddan
06/03/2019, 12:39 PMwithContext
inside a flow{}
, basically. Can there be a realistic situation when such use is permitted? Maybe if emit
is outside of withContext
, or there are several `nested withContext
that work in a way that protect the caller's context?Eric Martori
06/03/2019, 3:25 PMemit
values into a flow
outside its building block. For example sending values when the user clicks a button or if this use-case would be better covered with something else instead of a Flow
.jw
06/03/2019, 6:49 PMTolriq
06/04/2019, 7:11 AMribesg
06/04/2019, 1:28 PMDavide Giuseppe Farella
06/04/2019, 8:17 PMMelih Aksoy
06/05/2019, 8:44 AMflow
in interactors, such as
flow {
emit(Result.State.Loading())
emit(
dataSourceFactory.getGrid(
params.path,
params.page,
params.displaySize
)
)
emit(Result.State.Loaded())
}
An interactor is producing it’s own “state”, but I also want to support multiple interactors with single state. I came with an idea to merge them async in this way:
fun CoroutineScope.merge(vararg results: Flow<SimpleResult<Any>>) = flow {
emit(Result.State.Loading())
val tasks = mutableListOf<Deferred<Unit>>()
for (result in results) {
tasks.add(
async(<http://Dispatchers.IO|Dispatchers.IO>) {
result
.filterNot { it is Result.State }
.collect {
emit(it)
}
}
)
}
tasks.awaitAll()
emit(Result.State.Loaded())
}
I’m wondering if this is a robust solution. This CoroutineScope
is handled properly for closing etc., but what do you think ? Any suggestions to merge flows together while holding ability to ignore some emits ?Tolriq
06/05/2019, 9:10 AMclass XXX : CoroutineScope {
override val coroutineContext = <http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob()
internal val tasks = Channel<Task>(UNLIMITED)
fun startWorkers() {
repeat(150) {
launch {
for (task in tasks) executeTask(task)
}
}
}
}
Paul Woitaschek
06/05/2019, 11:33 AMimport kotlinx.coroutines.*
val job: Job = Job()
val scope = CoroutineScope(Dispatchers.Default + job)
fun throwsAsync(): Deferred<Unit> = scope.async {
throw Exception()
}
fun loadData() = scope.launch {
throwsAsync().await()
}
suspend fun main() {
loadData().join()
scope.launch { println("worked") }.join()
joinAll()
println("done")
}
Why doesn't this crash?projectmoon
06/05/2019, 1:29 PMrunBlocking
... use case here is that i need to, once a day run a bunch of operations as coroutines. currently this is a java scheduled executor, but i want to switch to coroutines for the actual running of the code.vishna
06/05/2019, 3:33 PMprojectmoon
06/05/2019, 5:03 PMlauch { val result = async { doStuff() }.await() }
, assuming doStuff()
is a suspend fun? or would it make more sense to simply call doStuff()
inside the launch block without async
?mdapp
06/05/2019, 5:51 PMrunBlockingTest {
whenever(model.loadData(false)).thenThrow(DeprecatedApiException())
Tolriq
06/06/2019, 10:22 AMclass XXX : CoroutineScope {
override val coroutineContext = <http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob()
val aDispatcher = ThreadPoolExecutor(...).asCoroutineDispatcher()
internal val tasks = Channel<Task>(UNLIMITED)
fun startWorkers() {
repeat(150) {
launch(aDispatcher) {
val initialTask = try {
tasks.receive()
} catch (e: Exception) {
return@launch
}
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
// Correctly run on <http://Dispatchers.IO|Dispatchers.IO>
}
}
}
}
}
class XXX : CoroutineScope {
override val coroutineContext = <http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob()
val aDispatcher = ThreadPoolExecutor(...).asCoroutineDispatcher()
internal val tasks = Channel<Task>(UNLIMITED)
fun startWorkers() {
repeat(150) {
launch(aDispatcher) {
val initialTask = try {
tasks.receive()
} catch (e: Exception) {
return@launch
}
withContext(coroutineContext) {
// Incorrectly runs on aDispatcher instead of <http://Dispatchers.IO|Dispatchers.IO> that is used for coroutineContext
}
}
}
}
}
aaverin
06/06/2019, 11:07 AM@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle(context) {
this@asSingle.await()
}
calling myDeferred.asSingle(coroutineContext)
will immediately call await()
on the deferred, which effectively starts coroutine execution.
Shouldn’t this only happen when Single will be subscribed on?louiscad
06/06/2019, 2:03 PMCompletableDeferred
down a Flow
or a Channel
(exposed as a ReceiveChannel
) is a good idea?
The use case is to allow to observe a state (a sealed class instance), and allow an action depending on that state.Dico
06/06/2019, 4:24 PMAtomicInt
from kotlinx:atomicfu doesn't declare these:
operator fun plusAssign
(+=)
operator fun minusAssign
(-=)
operator fun inc
(++)
operator fun dec
(--)
In Java, these couldn't be atomic, but since you can implement them yourself, they would be.
For inc
and dec
I guess it might be because they would have to return AtomicInt, instead of Int.
Asking here because I didn't see an atomicfu channel.Vsevolod Tolstopyatov [JB]
06/06/2019, 9:14 PMkotlinx.coroutines
version 1.3.0-M1 is here!
A lot of changes in Flow:
* Core Flow
interfaces and operators are graduated from preview status to experimental 😒uspend:
* Serious performance improvements of Flow
* More accurate context preservation invariant that should prevent most of the concurrency and contract violation bugs
* Flow
interface cannot be implemented directly, only via AbstractFlow
to preserve Flow’s invariant
* Separate buffer
operator composable with buffered operator (flowOn
, flowChannel
etc.). No more bufferSize
constant in operators!
* New operators and a lot more!
General changes:
* Scalable state-of-the-art Semaphore
implementation
* Channels API improved: performance, experimental API and consistent exception handling
* withContext
checks cancellation status on exit to make reasoning about sequential concurrent code easier
* JS dispatcher performance
* Various API and performance improvements and bug fixes
Full changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.3.0-M1louiscad
06/07/2019, 9:04 AMflowViaChannel(Channel.CONFLATED) { … }
is channelFlow<T> { … }.buffer(Channel.CONFLATED)
.
Isn't that less efficient as values pass through 2 operators?ribesg
06/07/2019, 9:11 AMkotlinx-coroutines-test
planned for multiplatform?Mark
06/07/2019, 10:24 AMopen class SingletonHolder<out T: Any, in A>(creator: (A) -> T) {
private var creator: ((A) -> T)? = creator
@Volatile private var instance: T? = null
fun getInstance(arg: A): T {
val i = instance
if (i != null) {
return i
}
return synchronized(this) {
val i2 = instance
if (i2 != null) {
i2
} else {
val created = creator!!(arg)
instance = created
creator = null
created
}
}
}
}
gildor
06/07/2019, 10:31 AMThomas
06/07/2019, 12:01 PMcollect
and cancel
functions are both called on the main thread. I tried this with both version 1.2.1 and 1.3.0-M1
var cancelled = false
val job = GlobalScope.launch(Dispatchers.Main) {
val flow = flow {
while (true) {
emit(Unit)
}
}
flow.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.collect {
if (cancelled) { // main thread
throw IllegalStateException()
}
}
}
GlobalScope.launch(Dispatchers.Main) {
delay(1000)
job.cancel() // main thread
cancelled = true
}
tseisel
06/07/2019, 12:35 PMFlow
make sense, or is Flow
dedicated to only cold asynchronous streams while `Channel`s are the best choice for hot streams ?Eric Martori
06/07/2019, 3:00 PMinternal class InvocableFlow<T>(private val scope: CoroutineScope) {
val flow = flow {
for (value in channel.openSubscription()) {
emit(value)
}
}
private val channel = BroadcastChannel<T>(BUFFERED)
fun invoke(data: T) {
scope.launch {
channel.send(data)
}
}
}
Allan Wang
06/07/2019, 5:58 PMblakelee
06/07/2019, 7:58 PMblakelee
06/07/2019, 7:58 PMbdawg.io
06/08/2019, 4:17 AMDeferred
is isolated from how the value is being obtained, so it's not resettable unfortunately. The Flow/generic suspend function should fill your needs thoughlouiscad
06/08/2019, 6:23 AM