rkeazor
10/01/2019, 12:11 PMDennis Schröder
10/01/2019, 9:52 PMsuspend fun someFunctionCreatesAFlow(context: Dispatchers.Default): Flow<String> = withContext(context) {
flow {
emit(somethingAsAString)
}
}
I guess it is because Flows kinda manage their own context with flow::flowOn()
. Can anybody confirm this?Ran Magen
10/01/2019, 10:42 PMcoroutineScope
that ends up doing nothing. For example should I bother writing:
if (collection.isNotEmpty()) {
coroutineScope {
collection.forEach { item -> launch { doSomeComplexSuspendingWork(item) } }
}
}
Or does it not really matter much and this would be just as efficient (and cleaner to read)
coroutineScope {
collection.forEach { item -> launch { doSomeComplexSuspendingWork(item) } }
}
Robert Menke
10/02/2019, 12:54 AMylemoigne
10/02/2019, 10:11 AMYoav Gross
10/02/2019, 2:10 PMrxjava
events and consume them as flow
, it seems i’m not subscribing correctly with flow or missing to correlating action to rx subscribe
This is the flow
usage, i’m expecting to collect each change to the data from the flowable
i’m listening to:
runBlocking {
getSubtitleFromProjectDataManager()
}
private suspend fun getSubtitleFromProjectDataManager() {
scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
val flow = projectDataManager.getProjectNameFlowable().asFlow()
flow.collect {
subtitle = it
}
launch(Dispatchers.Main) {
view?.rebindToolbar()
}
}
}
This is the rxjava code I’m trying to listen to. Indeed the event isn’t firing again when the data is changed, so I suspect as said before there is a subscription issue:
public Flowable<String> getProjectNameFlowable() {
return RxJavaInterop.toV2Flowable(
getSelectedProject()
.switchMap(projectEntity -> {
Timber.e("fire projectEntity " + projectEntity.title());
return Observable.just(projectEntity.title());
}
));
}
bnn
10/02/2019, 10:19 PMclass Foo(ctx: CoroutineContext) {
val scope = CoroutineScope(ctx)
fun launchSome() = scope.launch { ... }
}
Are there better ways or conventions to do that?
I wonder some possibilities such as ...
- To implement CoroutineScope ( class Foo(ctx: CoroutineContext = ... : CoroutineScope { ... }
)
- To define method as CoroutineScope extension function ( fun CoroutineScope.launchSome() = launch(ctx) {...}
)
Thanks.Brad Murray
10/02/2019, 11:31 PMspierce7
10/03/2019, 3:27 AMGlobalScope.launch {
flow.collect {
}
}
I’ve found other examples where a lambda works as a flow collector.bitkid
10/03/2019, 1:51 PMhttps://www.youtube.com/watch?v=a3agLJQ6vt8▾
taer
10/03/2019, 10:01 PMException in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
Child of StandaloneCoroutine{Active}@2b552920, expected child of BlockingCoroutine{Completing}@2758fe70.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
dharrigan
10/04/2019, 8:36 AMfun updateDetails() = GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
// loop forever
updateDetailsAsync()
// wait until updateDetailsAsync is done
}
fun CoroutineScope.updateDetailsAsync() {
does an REST API lookup
writes into a database
}
Dominaezzz
10/04/2019, 10:38 AMval scope = SupervisorScope()
over val scope = CoroutineScope(CoroutineExceptionHandler { ctx, e -> })
?wakingrufus
10/04/2019, 2:31 PMtseisel
10/04/2019, 2:59 PMFlowable.never()
)
Is something like this correct ?
object NeverFlow : Flow<Nothing> {
override suspend fun collect(collector: FlowCollector<Nothing>) {
delay(Long.MAX_VALUE)
}
}
bnn
10/05/2019, 2:54 AMfun main() = runBlocking {
val f = channelFlow {
launch {
while (true) {
offer(1)
delay(500)
}
}
awaitClose {
println("f1 closed")
}
}
val b = f.broadcastIn(this).apply {
/**
* If uncomment the following code, Exception below will be raised.
*
* java.lang.IllegalStateException: Another handler was already registered
*
* You cannot onClose callback twice for same SendChannel.
*
* There is already onClose callback in this code: awaitClose callback inside channelFlow builder
*
*/
// invokeOnClose { println("b1 closed") }
}
launch {
b.openSubscription().consumeAsFlow().take(5).collect {
println(it)
}
b.cancel()
}.join()
}
As I commented in the inline comment, invokeOnClose
on the BroadcastChannel which is issued by broadcatIn terminal function on the Flow instance created by channelFlow builder would throws java.lang.IllegalStateException Another handler was already registered
.
Though It is possible to understand soon with this short sample code, it's sometimes hard to know whether the Flow instance is created by channelFlow when you broadcastIn
on it.ursus
10/07/2019, 12:09 AMfun cancel(id: Id)
, i.e. I need my jobs keyed for that. Should I have some separate map for this? Feels like scope functionality, but in order for that I'd need the scope to be backed by a map, or iterate its jobs?ylemoigne
10/07/2019, 6:59 AMprintln("Start receiving...
is not executed ? (or point me to the revelant part of documentation)neworldlt
10/07/2019, 10:04 AMawait()
still cancels parent job? Let assume this example: https://pl.kotl.in/DlU1vaR2T. If you comment out await()
part, launch's job will stay activeubu
10/07/2019, 2:03 PMclass Handler : MessageHandler {
private val channel = Channel<Message>(1)
override fun handle(message : Message) {
// TODO send event.
}
fun send(event : Message) {
}
/*
fun observe() : Channel<Message>
fun observe() : Flow<Message>
*/
}
When having `RxJava`’s subjects
, I could call onNext()
, let observers consume it by exposing some Observable
. Is there a way to do the same with Coroutines
? I need to expose some methods that allow to observe stream of events received in handle(message : Message)
method. Thanks a lot in advance!Joe
10/07/2019, 3:46 PMjava.util.Stream
to a Flow
? would the first generally be the preferred way (or is there another, more idiomatic way?):
result.iterator().asFlow().onCompletion { result.close() }
vs
flow {
result.use {
for (edge in it) {
emit(edge)
}
}
}
zak.taccardi
10/07/2019, 6:10 PM1
for property and 2
for functiongroostav
10/07/2019, 9:39 PMFlow<T>
instances into a single Flow<T>
eygraber
10/08/2019, 1:24 AMJoe
10/08/2019, 4:52 AMval job = launch {
val flow = slowBlockingCallThatReturnsIterator().asFlow()
flow.collect {
yield() // allows for cancel() to work
channel.send(it)
}
}
If we call job.cancel()
during the slowBlockingCall (or while the flow is collecting) without the yield()
call, all entries in the iterator still get sent to the channel. If we add back the yield call, then the collection aborts as we initially expected to happen without the yield(). Since channel.send is a suspend function, shouldn't calling it check the coroutine active state?darkmoon_uk
10/08/2019, 6:45 AMGlobalScope
any different in this regard? Will a failed Job in GlobalScope
cancel everything in GlobalScope
?darkmoon_uk
10/08/2019, 6:48 AMsupervisorScope
mandatory wherever a coroutine might fail, to avoid the entire concurrency tree being taken down?Paul Woitaschek
10/08/2019, 8:15 AMPublishSubject
alternative as flow. The closest representation I found is a Channel(RENDEZVOUS)
. However the consumeAsFlow
function states:
If the flow consumer fails with an exception, channel is cancelled.I don't want that. Is there an alternative for that?
Kulwinder Singh
10/08/2019, 11:16 AMcallbackFlow
to listen updates of Firestore Collection using below extension function
@ExperimentalCoroutinesApi
fun Query.getQuerySnapshotFlow(): Flow<QuerySnapshot?> = callbackFlow {
val listenerRegistration = addSnapshotListener { querySnapshot, e ->
if (e != null) {
cancel(message = "", cause = e)
return@addSnapshotListener
}
offer(querySnapshot)
}
awaitClose {
listenerRegistration.remove()
}
}
@ExperimentalCoroutinesApi
fun <T> Query.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T> {
return getQuerySnapshotFlow().map { mapper(it) }
}
and i'm using above extension functions like below inside my ViewModel, but how can i cancel previous Flow
, which is started in else
case below, because when switchMap
is called again then it will keep listening old Collection and will start new one. how can i fix this?
val myAppointments = _appointmentDate.switchMap { date ->
//HERE I WANT TO CANCEL PREVIOUS FLOW STARTED IN ELSE CASE BELOW
liveData {
if (date == null)
emit(AppointmentState.NoAppointment)
else {
myCollection.whereEqualTo(Appointment.DATE, date)
.getDataFlow { it!!.toObjects<List<Appointment>>() }
.collect { emit(AppointmentState.Loaded(it)) }
}
}
}
kupris
10/08/2019, 3:01 PMkupris
10/08/2019, 3:01 PMEvan R.
10/08/2019, 3:03 PMkupris
10/08/2019, 3:09 PM