Should I use `channelFlow {}` instead of `flow {}`...
# coroutines
o
Should I use
channelFlow {}
instead of
flow {}
when I want to launch a coroutine inside of the block?
o
no, you can use
coroutineScope {}
inside
flow {}
o
Why can I call
launch {}
directly inside a channelFlow, but have to wrap it with coroutineScope inside a flow?
Because the difference is that coroutineScope {} is blocking the flow until a job is completed, but I want it to launch a coroutine for collecting another flow
o
because
channelFlow
is implicitly wrapping the whole thing in
coroutineScope
-- you can just do that in
flow {}
and skip the buffering effects of
channelFlow
channelFlow
is of course proper if you want to emit from multiple concurrent sources without using
Flow.merge
o
Why would one use channelFlow for emitting from multiple concurrent sources over using
Flow.merge
?
So besides for wiring a flow with a listener based API, there is no other common usecase for using channelFlow over flow?
o
I'm not sure, personally I've never used it, but maybe others can speak for it
o
I currently have to do the following from my function: my function receives a flow, which does the following two things in parallel: 1. map the flow to another data strcuture, collect and emit that flow as output 2. based on the input flow, collect it and do perform some operations on each collect
s
The
channelFlow
and
callbackFlow
offer an api/function called
awaitClose { ... }
which suspends forever and allows you to de-register listeners and such in its lambda
Copy code
val myFlow = callbackFlow<Int> {
    val disposable  = myApi.getDataFromSocket { data -> offer(data) }
   
    awaitClose { disposable.close() }
}
o
Thank you Anton, so does that mean I can use flow {} for my usecase I listed in my previous message?
s
Still hard to tell. Could you post (a sketch of) your code and indicate where your code does not do what you want it to do?
o
I'm now currently doing this:
Copy code
override fun invoke(
        opponentCommunitySupporterId: Int
    ): Flow<State<ImmutableList<UseCaseChatMessage<UseCaseChatMessage.Content>>>> = flow {
        emit(State.Loading)
        val conversationId = run {
            val participants = TwoSizedSet(
                activeCommunitySupporterSession.communitySupporterId,
                opponentCommunitySupporterId
            )
            val getConversationIdResult = getConversationIdByParticipants(oneOnOneChatCrudRepository, participants)
            when (getConversationIdResult) {
                is Result.Failure -> {
                    emit(State.Error(getConversationIdResult.exception))
                    return@flow
                }
                is Result.Success -> {
                    getConversationIdResult.data
                }
            }
        }

        val repoChatMessagesState = oneOnOneChatCrudRepository
            .oneOnOneChatMessagesFromConversationDescending(conversationId)

        val repoChatMessagesDataState = repoChatMessagesState
            .filterIsInstance<State.Data<ImmutableList<RepoChatMessage<RepoChatMessageContent>>>>()

        val markUnreadReceivedRepoChatMessagesAsRead = repoChatMessagesDataState.onEach { repoDataState ->
            val unreadReceivedChatMessages = repoDataState.data.filter {
                it.senderCommunitySupporterId != activeCommunitySupporterSession.communitySupporterId
                        && it.status != RepoChatMessage.Status.READ_BY_OPPONENT
            }
            unreadReceivedChatMessages.forEach { unreadReceivedChatMessage ->
                oneOnOneChatCrudRepository.updateOneOnOneChatMessageStatusToRead(
                    conversationId, unreadReceivedChatMessage.id
                )
            }
        }

        val useCaseChatMessagesState = repoChatMessagesState.map { state ->
            when (state) {
                is State.Error -> state
                is State.Loading -> state
                is State.Data -> state.data
                    .map { it.asUseCaseChatMessage(activeCommunitySupporterSession.communitySupporterId) }
                    .let { State.Data(it.toImmutableList()) }
            }
        }
        coroutineScope {

            markUnreadReceivedRepoChatMessagesAsRead.launchIn(this)

            launch {
                useCaseChatMessagesState.collect { emit(it) }
            }
        }

    }
Getting this error:
Copy code
W/System.err: java.lang.IllegalStateException: Flow invariant is violated:
    		Emission from another coroutine is detected.
    		Child of StandaloneCoroutine{Active}@f625a8a, expected child of StandaloneCoroutine{Active}@532c8fb.
    		FlowCollector is not thread-safe and concurrent emissions are prohibited.
    		To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
W/System.err:     at kotlinx.coroutines.flow.internal.SafeCollector_commonKt$checkContext$result$1.invoke(SafeCollector.common.kt:67)
        at kotlinx.coroutines.flow.internal.SafeCollector_commonKt$checkContext$result$1.invoke(Unknown Source:8)
        at kotlin.coroutines.CoroutineContext$Element$DefaultImpls.fold(CoroutineContext.kt:70)
        at kotlinx.coroutines.Job$DefaultImpls.fold(Unknown Source:3)
W/System.err:     at kotlinx.coroutines.JobSupport.fold(JobSupport.kt:28)
        at kotlin.coroutines.CombinedContext.fold(CoroutineContextImpl.kt:131)
        at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:25)
        at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:82)
        at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:68)
        at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:54)
        at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:137)
        at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$invokeSuspend$$inlined$map$1$2.emit(Collect.kt:150)
        at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:59)
        at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAllImpl$1.invokeSuspend(Unknown Source:11)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
W/System.err:     at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:69)
        at kotlinx.coroutines.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:337)
        at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
        at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:109)
        at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:158)
W/System.err:     at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:54)
        at kotlinx.coroutines.BuildersKt.launch(Unknown Source:1)
        at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:47)
        at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source:1)
        at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1.invokeSuspend(ReadChatConversationOfCommunitySupporterUseCaseUsingRepo.kt:78)
        at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1.invoke(Unknown Source:10)
        at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:91)
        at kotlinx.coroutines.CoroutineScopeKt.coroutineScope(CoroutineScope.kt:177)
W/System.err:     at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1.invokeSuspend(ReadChatConversationOfCommunitySupporterUseCaseUsingRepo.kt:74)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
        at android.os.Handler.handleCallback(Handler.java:883)
        at android.os.Handler.dispatchMessage(Handler.java:100)
        at android.os.Looper.loop(Looper.java:214)
        at android.app.ActivityThread.main(ActivityThread.java:7356)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:492)
W/System.err:     at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:930)
s
At first sight, I see that you call
emit(it)
inside the
coroutineScope { ... }
. This call to emit is therefore inside an other CoroutineScope than the one that started this Flow. This is not allowed. Hence the exception. However, that is why a
channelFlow
was created, to allow emissions onto the flow to happen from an other CoroutineScope than the one that started the flow (the one from the collector). Inside a channelFlow, call
send(it)
instead of
emit(it)
.
o
Thanks, that helps. What is the difference between send() and offer()? I know the one is suspending and one not, but which one is recommended? Because I see alot of examples using offer() instead of send
s
Still, why do you use a
coroutineScope
call with a
launch
inside of it? Why not just call
markUnread.....
followed by
useCase.....collect{ emit(it)}
without a
coroutineScope
around it?
o
Because it is
markUnread...().launchIn(scope)
And giving this as scope doesn't work
It does with channelflow tho, so yeas coroutineScope {} is not needed if I use channelFlow
s
Why not
launchIn(this)
where
this
is the scope of the flow?
o
Doesn't work, gives a compiler error
Copy code
Required: CoroutineScope
Found: FlowCollector
s
Right, a Flow Collector doesn't have a scope, you'd need the coroutineScope call....
o
Right, channelFlow does have a scope, so coroutineScope not needed there
s
About
offer
: if the buffer of the channel is full,
offer
returns false and the item is dropped.
offer
is non-suspending
And
send
is suspending; if the buffer is full, it suspends until the buffer has room again.
o
Ah I see, send is more safe then I think
Thanks for the simple explanation. I tried to read it from the docs many times but it is explained more complex there.
s
Yup, but it depends on your channel and use-case.
My explanation is simpler but glances over some details, though 😀
o
message has been deleted
So I am using callbackFlow here, and want to make sure the received message is sent, However I cant use send here because the callback isn't a suspending environment.
Do you perhaps know a good way to solve this
s
Then you’d need to do either
offer(it)
or
launch { send(it) }
o
Is there a reason to use
launch { send(it) }
over
offer(it
) ?
Because they will do the same thing, wont they?
Ah I see, there is a difference. When the channel is full, and offer() calls it, then offer wouldnt send it and thats it, but with the launch send approach, you still launch a coroutine to make sure to send the item, awaiting if necesarry
👌 1
s
If your channel has an unlimited buffer (or you know that is is large enough to never get full), then user
offer
(if you don’t want to drop emissions). Otherwise, use
send
o
How do I know if my channel has an unlimited buffer?
s
I think a Flow<T> has a buffer method, making it unlimited (one step downstream)
o
So that means that a channelFlow has an unlimited buffer? Thus better off using offer instead of send
s
Yup, simpler code, one less suspension point (although not expensive, still extra code 🙂 )
No, a channelFlow does not have an unlimited buffer itself, but
channelFlow { ….}.buffer(BUFFERED)
does have an unlimited buffer.
l
@streetsofboston The snippet you posted doesn't have an unlimited buffer. Only setting it explicitly to UNLIMITED will make it unlimited.
s
Yup….. typed this all from my phone 🙂 BUFFERED has a limited default capacity (64 if i’m not mistaken…).
l
It's 16. 64 is the default upper bound of the threads count in
<http://Dispatchers.IO|Dispatchers.IO>
.
👍 1
o
So @louiscad, would you generally recommend
offer()
or
send()
?
l
@Orhan Tozan Depends on where you call it from and how you want to deal with the receiver being there or not.
139 Views