Orhan Tozan
04/04/2020, 12:07 PMfun messagesFromConversation(conversationId: String): Flow<List<Message>> = channelFlow {
val messageHistory = getMessageHistory()
offer(messageHistory)
val accumulatedMessages = messageHistory.toMutableList()
val sentMessages: Flow<Message> = sentMessagesFromConversation(conversationId)
sentMessages.collect { sentMessage ->
acumulatedMessages.add(0, sentMessage)
offer(accumulatedMessages)
}
}
but Im getting a weird issue that on every collect of the sentmessages, the accumulatedmessages are not represeting the full list. I think it's some sort of mutable state sharing concurrency issue, but I have no idea how to better approach this issue. Do I need to some sort of ConcurrentMutableList?Marc Knaup
04/04/2020, 12:09 PMoffer
may return false
if an element wasn’t accepted due to the channel being busy.
Did you mean to use send
?send(accumulatedMessages.toList())
offer
-> send
Orhan Tozan
04/04/2020, 12:19 PMMarc Knaup
04/04/2020, 12:20 PMthis
changed between breakpoints?Orhan Tozan
04/04/2020, 12:21 PMvar accumulatedMessages
Marc Knaup
04/04/2020, 12:21 PMOrhan Tozan
04/04/2020, 12:21 PMMarc Knaup
04/04/2020, 12:22 PMprintln
may also help here.
Maybe multiple flows are created instead of one.
There should be no need for special immutable lists if you only send copies around 🙂listOf(message) + snapshot.queue
to create a new immutable list instead of mutating any existing one.Orhan Tozan
04/04/2020, 12:25 PMvar accumulatedMessages: List<Message> = messageHistory
sentMessages.collect { sentMessage ->
accumulatedMessages = accumulatedMessages + sentMessage
send(accumulatedMessages)
}
Marc Knaup
04/04/2020, 12:27 PMcollect
in your flow and both are never-ending streams then the second collect
should never be called because the first will run forever?Orhan Tozan
04/04/2020, 12:28 PMMarc Knaup
04/04/2020, 12:28 PMOrhan Tozan
04/04/2020, 12:30 PMMarc Knaup
04/04/2020, 12:33 PMoffer
should be send
.
Any reason you’ve used offer
?Orhan Tozan
04/04/2020, 12:34 PMMarc Knaup
04/04/2020, 12:35 PMoffer
is okay if you are okay with some elements never being sent in certain situations. But I guess that isn’t okay here 🙂Orhan Tozan
04/04/2020, 12:35 PMMarc Knaup
04/04/2020, 12:36 PMconflate
at the end.
That may make it irrelevant whether to use offer
or send
because with conflate
the buffer is never full 🤔Orhan Tozan
04/04/2020, 12:37 PMMarc Knaup
04/04/2020, 12:38 PMOrhan Tozan
04/04/2020, 12:39 PMaccumulatedMessages = accumulatedMessages + sentMessage
send(accumulatedMessages)
Marc Knaup
04/04/2020, 12:39 PMOrhan Tozan
04/04/2020, 12:46 PMMarc Knaup
04/04/2020, 12:48 PMawaitClose
you may as well just use flow { … }
?
It should stay open automatically though.println
at the start of the flow?
Just to make sure that we’re only observing a single flow.Orhan Tozan
04/04/2020, 12:49 PMMarc Knaup
04/04/2020, 12:49 PMOrhan Tozan
04/04/2020, 12:50 PMMarc Knaup
04/04/2020, 12:52 PMcoroutineScope { … }
Orhan Tozan
04/04/2020, 12:52 PMMarc Knaup
04/04/2020, 12:53 PMflow {}
and coroutineScope {}
work just fine together.Orhan Tozan
04/04/2020, 12:55 PMMarc Knaup
04/04/2020, 12:56 PMflow {}
and launch {}
don’t seem to work together.Orhan Tozan
04/04/2020, 12:56 PMW/System.err: java.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
Child of StandaloneCoroutine{Active}@f625a8a, expected child of StandaloneCoroutine{Active}@532c8fb.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
W/System.err: java.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
Child of StandaloneCoroutine{Active}@f625a8a, expected child of StandaloneCoroutine{Active}@532c8fb.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
Marc Knaup
04/04/2020, 12:57 PMval flowContext = coroutineContext
and then use it inside:
withContext(flowContext) {
emit(…)
}
Orhan Tozan
04/04/2020, 12:59 PMFlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
Marc Knaup
04/04/2020, 1:01 PMchannelFlow
you’ve silenced the warning but you code must also be thread-safe then. But it isn’t due to mutable state.Orhan Tozan
04/04/2020, 1:01 PMsentChatMessages
flow is coming from here, outside of the class as a private fieldprivate val _sentOneOnOneChatMessages: Channel<OneOnOneChatMessage<ChatMessageContent>> = Channel()
private val sentChatMessages: Flow<OneOnOneChatMessage<ChatMessageContent>> =
_sentOneOnOneChatMessages.receiveAsFlow()
also {}
Marc Knaup
04/04/2020, 1:15 PMwithContext(flowContext) {
// on each code (including mutable state access)
}
Orhan Tozan
04/04/2020, 1:19 PMMarc Knaup
04/04/2020, 1:19 PMOrhan Tozan
04/04/2020, 1:20 PMMarc Knaup
04/04/2020, 1:21 PMJob
.Orhan Tozan
04/04/2020, 1:23 PMCreates an instance of the cold Flow with elements that are sent to a SendChannel provided to the builder's block of code via ProducerScope. It allows elements to be produced by code that is running in a different context or concurrently. The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.
Marc Knaup
04/04/2020, 1:23 PMOrhan Tozan
04/04/2020, 1:24 PMMarc Knaup
04/04/2020, 1:25 PMOrhan Tozan
04/04/2020, 1:25 PMMarc Knaup
04/04/2020, 1:30 PMOrhan Tozan
04/04/2020, 1:50 PMMarc Knaup
04/04/2020, 1:50 PMOrhan Tozan
04/04/2020, 1:51 PMMarc Knaup
04/04/2020, 1:52 PMOrhan Tozan
04/04/2020, 1:53 PMMarc Knaup
04/04/2020, 1:53 PMOrhan Tozan
04/04/2020, 1:53 PMMarc Knaup
04/04/2020, 1:54 PMOrhan Tozan
04/04/2020, 1:54 PMMarc Knaup
04/04/2020, 2:02 PMOrhan Tozan
04/04/2020, 2:02 PMasLiveData()
on FlowMarc Knaup
04/04/2020, 2:04 PMLiveData
if possible. Not sure though if Kotlin covers all use cases yet 🤔Orhan Tozan
04/04/2020, 2:05 PM// RELEVANT
Marc Knaup
04/04/2020, 2:10 PMConversationContentSnapshot
.
It includes messages, send queue, loading states, errors, etc.
But my UI is still imperative so I just collect all snapshots and save the latest one in a property in the presenter.Orhan Tozan
04/04/2020, 2:14 PMMarc Knaup
04/04/2020, 2:16 PMOrhan Tozan
04/04/2020, 2:18 PM