https://kotlinlang.org logo
#coroutines
Title
# coroutines
o

Orhan Tozan

04/02/2020, 11:25 PM
Should I use
channelFlow {}
instead of
flow {}
when I want to launch a coroutine inside of the block?
o

octylFractal

04/02/2020, 11:27 PM
no, you can use
coroutineScope {}
inside
flow {}
o

Orhan Tozan

04/02/2020, 11:29 PM
Why can I call
launch {}
directly inside a channelFlow, but have to wrap it with coroutineScope inside a flow?
Because the difference is that coroutineScope {} is blocking the flow until a job is completed, but I want it to launch a coroutine for collecting another flow
o

octylFractal

04/02/2020, 11:34 PM
because
channelFlow
is implicitly wrapping the whole thing in
coroutineScope
-- you can just do that in
flow {}
and skip the buffering effects of
channelFlow
channelFlow
is of course proper if you want to emit from multiple concurrent sources without using
Flow.merge
o

Orhan Tozan

04/02/2020, 11:36 PM
Why would one use channelFlow for emitting from multiple concurrent sources over using
Flow.merge
?
So besides for wiring a flow with a listener based API, there is no other common usecase for using channelFlow over flow?
o

octylFractal

04/02/2020, 11:40 PM
I'm not sure, personally I've never used it, but maybe others can speak for it
o

Orhan Tozan

04/02/2020, 11:42 PM
I currently have to do the following from my function: my function receives a flow, which does the following two things in parallel: 1. map the flow to another data strcuture, collect and emit that flow as output 2. based on the input flow, collect it and do perform some operations on each collect
s

streetsofboston

04/02/2020, 11:43 PM
The
channelFlow
and
callbackFlow
offer an api/function called
awaitClose { ... }
which suspends forever and allows you to de-register listeners and such in its lambda
Copy code
val myFlow = callbackFlow<Int> {
    val disposable  = myApi.getDataFromSocket { data -> offer(data) }
   
    awaitClose { disposable.close() }
}
o

Orhan Tozan

04/02/2020, 11:46 PM
Thank you Anton, so does that mean I can use flow {} for my usecase I listed in my previous message?
s

streetsofboston

04/03/2020, 1:14 AM
Still hard to tell. Could you post (a sketch of) your code and indicate where your code does not do what you want it to do?
o

Orhan Tozan

04/03/2020, 11:19 AM
I'm now currently doing this:
Copy code
override fun invoke(
        opponentCommunitySupporterId: Int
    ): Flow<State<ImmutableList<UseCaseChatMessage<UseCaseChatMessage.Content>>>> = flow {
        emit(State.Loading)
        val conversationId = run {
            val participants = TwoSizedSet(
                activeCommunitySupporterSession.communitySupporterId,
                opponentCommunitySupporterId
            )
            val getConversationIdResult = getConversationIdByParticipants(oneOnOneChatCrudRepository, participants)
            when (getConversationIdResult) {
                is Result.Failure -> {
                    emit(State.Error(getConversationIdResult.exception))
                    return@flow
                }
                is Result.Success -> {
                    getConversationIdResult.data
                }
            }
        }

        val repoChatMessagesState = oneOnOneChatCrudRepository
            .oneOnOneChatMessagesFromConversationDescending(conversationId)

        val repoChatMessagesDataState = repoChatMessagesState
            .filterIsInstance<State.Data<ImmutableList<RepoChatMessage<RepoChatMessageContent>>>>()

        val markUnreadReceivedRepoChatMessagesAsRead = repoChatMessagesDataState.onEach { repoDataState ->
            val unreadReceivedChatMessages = repoDataState.data.filter {
                it.senderCommunitySupporterId != activeCommunitySupporterSession.communitySupporterId
                        && it.status != RepoChatMessage.Status.READ_BY_OPPONENT
            }
            unreadReceivedChatMessages.forEach { unreadReceivedChatMessage ->
                oneOnOneChatCrudRepository.updateOneOnOneChatMessageStatusToRead(
                    conversationId, unreadReceivedChatMessage.id
                )
            }
        }

        val useCaseChatMessagesState = repoChatMessagesState.map { state ->
            when (state) {
                is State.Error -> state
                is State.Loading -> state
                is State.Data -> state.data
                    .map { it.asUseCaseChatMessage(activeCommunitySupporterSession.communitySupporterId) }
                    .let { State.Data(it.toImmutableList()) }
            }
        }
        coroutineScope {

            markUnreadReceivedRepoChatMessagesAsRead.launchIn(this)

            launch {
                useCaseChatMessagesState.collect { emit(it) }
            }
        }

    }
Getting this error:
Copy code
W/System.err: java.lang.IllegalStateException: Flow invariant is violated:
    		Emission from another coroutine is detected.
    		Child of StandaloneCoroutine{Active}@f625a8a, expected child of StandaloneCoroutine{Active}@532c8fb.
    		FlowCollector is not thread-safe and concurrent emissions are prohibited.
    		To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
W/System.err:     at kotlinx.coroutines.flow.internal.SafeCollector_commonKt$checkContext$result$1.invoke(SafeCollector.common.kt:67)
        at kotlinx.coroutines.flow.internal.SafeCollector_commonKt$checkContext$result$1.invoke(Unknown Source:8)
        at kotlin.coroutines.CoroutineContext$Element$DefaultImpls.fold(CoroutineContext.kt:70)
        at kotlinx.coroutines.Job$DefaultImpls.fold(Unknown Source:3)
W/System.err:     at kotlinx.coroutines.JobSupport.fold(JobSupport.kt:28)
        at kotlin.coroutines.CombinedContext.fold(CoroutineContextImpl.kt:131)
        at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:25)
        at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:82)
        at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:68)
        at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:54)
        at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:137)
        at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$invokeSuspend$$inlined$map$1$2.emit(Collect.kt:150)
        at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:59)
        at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAllImpl$1.invokeSuspend(Unknown Source:11)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
W/System.err:     at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:69)
        at kotlinx.coroutines.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:337)
        at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
        at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:109)
        at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:158)
W/System.err:     at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:54)
        at kotlinx.coroutines.BuildersKt.launch(Unknown Source:1)
        at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:47)
        at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source:1)
        at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1.invokeSuspend(ReadChatConversationOfCommunitySupporterUseCaseUsingRepo.kt:78)
        at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1.invoke(Unknown Source:10)
        at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:91)
        at kotlinx.coroutines.CoroutineScopeKt.coroutineScope(CoroutineScope.kt:177)
W/System.err:     at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1.invokeSuspend(ReadChatConversationOfCommunitySupporterUseCaseUsingRepo.kt:74)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
        at android.os.Handler.handleCallback(Handler.java:883)
        at android.os.Handler.dispatchMessage(Handler.java:100)
        at android.os.Looper.loop(Looper.java:214)
        at android.app.ActivityThread.main(ActivityThread.java:7356)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:492)
W/System.err:     at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:930)
s

streetsofboston

04/03/2020, 12:13 PM
At first sight, I see that you call
emit(it)
inside the
coroutineScope { ... }
. This call to emit is therefore inside an other CoroutineScope than the one that started this Flow. This is not allowed. Hence the exception. However, that is why a
channelFlow
was created, to allow emissions onto the flow to happen from an other CoroutineScope than the one that started the flow (the one from the collector). Inside a channelFlow, call
send(it)
instead of
emit(it)
.
o

Orhan Tozan

04/03/2020, 12:17 PM
Thanks, that helps. What is the difference between send() and offer()? I know the one is suspending and one not, but which one is recommended? Because I see alot of examples using offer() instead of send
s

streetsofboston

04/03/2020, 12:21 PM
Still, why do you use a
coroutineScope
call with a
launch
inside of it? Why not just call
markUnread.....
followed by
useCase.....collect{ emit(it)}
without a
coroutineScope
around it?
o

Orhan Tozan

04/03/2020, 12:22 PM
Because it is
markUnread...().launchIn(scope)
And giving this as scope doesn't work
It does with channelflow tho, so yeas coroutineScope {} is not needed if I use channelFlow
s

streetsofboston

04/03/2020, 12:23 PM
Why not
launchIn(this)
where
this
is the scope of the flow?
o

Orhan Tozan

04/03/2020, 12:23 PM
Doesn't work, gives a compiler error
Copy code
Required: CoroutineScope
Found: FlowCollector
s

streetsofboston

04/03/2020, 12:25 PM
Right, a Flow Collector doesn't have a scope, you'd need the coroutineScope call....
o

Orhan Tozan

04/03/2020, 12:26 PM
Right, channelFlow does have a scope, so coroutineScope not needed there
s

streetsofboston

04/03/2020, 12:27 PM
About
offer
: if the buffer of the channel is full,
offer
returns false and the item is dropped.
offer
is non-suspending
And
send
is suspending; if the buffer is full, it suspends until the buffer has room again.
o

Orhan Tozan

04/03/2020, 12:29 PM
Ah I see, send is more safe then I think
Thanks for the simple explanation. I tried to read it from the docs many times but it is explained more complex there.
s

streetsofboston

04/03/2020, 12:30 PM
Yup, but it depends on your channel and use-case.
My explanation is simpler but glances over some details, though 😀
o

Orhan Tozan

04/03/2020, 1:01 PM
message has been deleted
So I am using callbackFlow here, and want to make sure the received message is sent, However I cant use send here because the callback isn't a suspending environment.
Do you perhaps know a good way to solve this
s

streetsofboston

04/03/2020, 1:02 PM
Then you’d need to do either
offer(it)
or
launch { send(it) }
o

Orhan Tozan

04/03/2020, 1:03 PM
Is there a reason to use
launch { send(it) }
over
offer(it
) ?
Because they will do the same thing, wont they?
Ah I see, there is a difference. When the channel is full, and offer() calls it, then offer wouldnt send it and thats it, but with the launch send approach, you still launch a coroutine to make sure to send the item, awaiting if necesarry
👌 1
s

streetsofboston

04/03/2020, 1:06 PM
If your channel has an unlimited buffer (or you know that is is large enough to never get full), then user
offer
(if you don’t want to drop emissions). Otherwise, use
send
o

Orhan Tozan

04/03/2020, 1:07 PM
How do I know if my channel has an unlimited buffer?
s

streetsofboston

04/03/2020, 1:07 PM
I think a Flow<T> has a buffer method, making it unlimited (one step downstream)
o

Orhan Tozan

04/03/2020, 1:08 PM
So that means that a channelFlow has an unlimited buffer? Thus better off using offer instead of send
s

streetsofboston

04/03/2020, 1:09 PM
Yup, simpler code, one less suspension point (although not expensive, still extra code 🙂 )
No, a channelFlow does not have an unlimited buffer itself, but
channelFlow { ….}.buffer(BUFFERED)
does have an unlimited buffer.
l

louiscad

04/03/2020, 1:42 PM
@streetsofboston The snippet you posted doesn't have an unlimited buffer. Only setting it explicitly to UNLIMITED will make it unlimited.
s

streetsofboston

04/03/2020, 1:44 PM
Yup….. typed this all from my phone 🙂 BUFFERED has a limited default capacity (64 if i’m not mistaken…).
l

louiscad

04/03/2020, 5:30 PM
It's 16. 64 is the default upper bound of the threads count in
<http://Dispatchers.IO|Dispatchers.IO>
.
👍 1
o

Orhan Tozan

04/03/2020, 6:08 PM
So @louiscad, would you generally recommend
offer()
or
send()
?
l

louiscad

04/03/2020, 6:10 PM
@Orhan Tozan Depends on where you call it from and how you want to deal with the receiver being there or not.
105 Views