Orhan Tozan
04/02/2020, 11:25 PMchannelFlow {}
instead of flow {}
when I want to launch a coroutine inside of the block?octylFractal
04/02/2020, 11:27 PMcoroutineScope {}
inside flow {}
Orhan Tozan
04/02/2020, 11:29 PMlaunch {}
directly inside a channelFlow, but have to wrap it with coroutineScope inside a flow?Orhan Tozan
04/02/2020, 11:31 PMoctylFractal
04/02/2020, 11:34 PMchannelFlow
is implicitly wrapping the whole thing in coroutineScope
-- you can just do that in flow {}
and skip the buffering effects of channelFlow
octylFractal
04/02/2020, 11:35 PMchannelFlow
is of course proper if you want to emit from multiple concurrent sources without using Flow.merge
Orhan Tozan
04/02/2020, 11:36 PMFlow.merge
?Orhan Tozan
04/02/2020, 11:37 PMoctylFractal
04/02/2020, 11:40 PMOrhan Tozan
04/02/2020, 11:42 PMstreetsofboston
04/02/2020, 11:43 PMchannelFlow
and callbackFlow
offer an api/function called awaitClose { ... }
which suspends forever and allows you to de-register listeners and such in its lambdastreetsofboston
04/02/2020, 11:45 PMval myFlow = callbackFlow<Int> {
val disposable = myApi.getDataFromSocket { data -> offer(data) }
awaitClose { disposable.close() }
}
Orhan Tozan
04/02/2020, 11:46 PMstreetsofboston
04/03/2020, 1:14 AMOrhan Tozan
04/03/2020, 11:19 AMOrhan Tozan
04/03/2020, 11:19 AMoverride fun invoke(
opponentCommunitySupporterId: Int
): Flow<State<ImmutableList<UseCaseChatMessage<UseCaseChatMessage.Content>>>> = flow {
emit(State.Loading)
val conversationId = run {
val participants = TwoSizedSet(
activeCommunitySupporterSession.communitySupporterId,
opponentCommunitySupporterId
)
val getConversationIdResult = getConversationIdByParticipants(oneOnOneChatCrudRepository, participants)
when (getConversationIdResult) {
is Result.Failure -> {
emit(State.Error(getConversationIdResult.exception))
return@flow
}
is Result.Success -> {
getConversationIdResult.data
}
}
}
val repoChatMessagesState = oneOnOneChatCrudRepository
.oneOnOneChatMessagesFromConversationDescending(conversationId)
val repoChatMessagesDataState = repoChatMessagesState
.filterIsInstance<State.Data<ImmutableList<RepoChatMessage<RepoChatMessageContent>>>>()
val markUnreadReceivedRepoChatMessagesAsRead = repoChatMessagesDataState.onEach { repoDataState ->
val unreadReceivedChatMessages = repoDataState.data.filter {
it.senderCommunitySupporterId != activeCommunitySupporterSession.communitySupporterId
&& it.status != RepoChatMessage.Status.READ_BY_OPPONENT
}
unreadReceivedChatMessages.forEach { unreadReceivedChatMessage ->
oneOnOneChatCrudRepository.updateOneOnOneChatMessageStatusToRead(
conversationId, unreadReceivedChatMessage.id
)
}
}
val useCaseChatMessagesState = repoChatMessagesState.map { state ->
when (state) {
is State.Error -> state
is State.Loading -> state
is State.Data -> state.data
.map { it.asUseCaseChatMessage(activeCommunitySupporterSession.communitySupporterId) }
.let { State.Data(it.toImmutableList()) }
}
}
coroutineScope {
markUnreadReceivedRepoChatMessagesAsRead.launchIn(this)
launch {
useCaseChatMessagesState.collect { emit(it) }
}
}
}
Orhan Tozan
04/03/2020, 11:28 AMOrhan Tozan
04/03/2020, 11:28 AMW/System.err: java.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
Child of StandaloneCoroutine{Active}@f625a8a, expected child of StandaloneCoroutine{Active}@532c8fb.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
W/System.err: at kotlinx.coroutines.flow.internal.SafeCollector_commonKt$checkContext$result$1.invoke(SafeCollector.common.kt:67)
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt$checkContext$result$1.invoke(Unknown Source:8)
at kotlin.coroutines.CoroutineContext$Element$DefaultImpls.fold(CoroutineContext.kt:70)
at kotlinx.coroutines.Job$DefaultImpls.fold(Unknown Source:3)
W/System.err: at kotlinx.coroutines.JobSupport.fold(JobSupport.kt:28)
at kotlin.coroutines.CombinedContext.fold(CoroutineContextImpl.kt:131)
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:25)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:82)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:68)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:54)
at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:137)
at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$invokeSuspend$$inlined$map$1$2.emit(Collect.kt:150)
at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:59)
at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAllImpl$1.invokeSuspend(Unknown Source:11)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
W/System.err: at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:69)
at kotlinx.coroutines.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:337)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:109)
at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:158)
W/System.err: at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:54)
at kotlinx.coroutines.BuildersKt.launch(Unknown Source:1)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:47)
at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source:1)
at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1.invokeSuspend(ReadChatConversationOfCommunitySupporterUseCaseUsingRepo.kt:78)
at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1$1.invoke(Unknown Source:10)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:91)
at kotlinx.coroutines.CoroutineScopeKt.coroutineScope(CoroutineScope.kt:177)
W/System.err: at com.hello247.app.usecases.communitysupporter.usingrepository.chat.ReadChatConversationOfCommunitySupporterUseCaseUsingRepo$invoke$1.invokeSuspend(ReadChatConversationOfCommunitySupporterUseCaseUsingRepo.kt:74)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
at android.os.Handler.handleCallback(Handler.java:883)
at android.os.Handler.dispatchMessage(Handler.java:100)
at android.os.Looper.loop(Looper.java:214)
at android.app.ActivityThread.main(ActivityThread.java:7356)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:492)
W/System.err: at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:930)
streetsofboston
04/03/2020, 12:13 PMemit(it)
inside the coroutineScope { ... }
. This call to emit is therefore inside an other CoroutineScope than the one that started this Flow. This is not allowed. Hence the exception.
However, that is why a channelFlow
was created, to allow emissions onto the flow to happen from an other CoroutineScope than the one that started the flow (the one from the collector). Inside a channelFlow, call send(it)
instead of emit(it)
.Orhan Tozan
04/03/2020, 12:17 PMstreetsofboston
04/03/2020, 12:21 PMcoroutineScope
call with a launch
inside of it? Why not just call markUnread.....
followed by useCase.....collect{ emit(it)}
without a coroutineScope
around it?Orhan Tozan
04/03/2020, 12:22 PMmarkUnread...().launchIn(scope)
Orhan Tozan
04/03/2020, 12:22 PMOrhan Tozan
04/03/2020, 12:22 PMstreetsofboston
04/03/2020, 12:23 PMlaunchIn(this)
where this
is the scope of the flow?Orhan Tozan
04/03/2020, 12:23 PMOrhan Tozan
04/03/2020, 12:24 PMRequired: CoroutineScope
Found: FlowCollector
streetsofboston
04/03/2020, 12:25 PMOrhan Tozan
04/03/2020, 12:26 PMstreetsofboston
04/03/2020, 12:27 PMoffer
: if the buffer of the channel is full, offer
returns false and the item is dropped. offer
is non-suspendingstreetsofboston
04/03/2020, 12:28 PMsend
is suspending; if the buffer is full, it suspends until the buffer has room again.Orhan Tozan
04/03/2020, 12:29 PMOrhan Tozan
04/03/2020, 12:30 PMstreetsofboston
04/03/2020, 12:30 PMstreetsofboston
04/03/2020, 12:31 PMOrhan Tozan
04/03/2020, 1:01 PMOrhan Tozan
04/03/2020, 1:02 PMOrhan Tozan
04/03/2020, 1:02 PMstreetsofboston
04/03/2020, 1:02 PMoffer(it)
or launch { send(it) }
Orhan Tozan
04/03/2020, 1:03 PMlaunch { send(it) }
over offer(it
) ?Orhan Tozan
04/03/2020, 1:03 PMOrhan Tozan
04/03/2020, 1:05 PMstreetsofboston
04/03/2020, 1:06 PMoffer
(if you don’t want to drop emissions). Otherwise, use send
Orhan Tozan
04/03/2020, 1:07 PMstreetsofboston
04/03/2020, 1:07 PMstreetsofboston
04/03/2020, 1:08 PMOrhan Tozan
04/03/2020, 1:08 PMstreetsofboston
04/03/2020, 1:09 PMstreetsofboston
04/03/2020, 1:10 PMchannelFlow { ….}.buffer(BUFFERED)
does have an unlimited buffer.louiscad
04/03/2020, 1:42 PMstreetsofboston
04/03/2020, 1:44 PMlouiscad
04/03/2020, 5:30 PM<http://Dispatchers.IO|Dispatchers.IO>
.Orhan Tozan
04/03/2020, 6:08 PMoffer()
or send()
?louiscad
04/03/2020, 6:10 PM