Gus
08/19/2020, 9:51 PMretrieveMessages().map {
File("the path").writeBytes(it.message)
it.ack()
}
It's crucial that the ACK message be only sent to the server if and when the the incoming message is safely stored. That's why the ACK is explicit.
I started to model this with a Flow<*>
but I'm finding it rather cumbersome because I'd have to track all the unacknowledged messages so I won't close the connection too soon, which I think is a code smell since flows are supposed to be cold streams and therefore the underlying resource(s) should only be active while the flow is active. So for that reason I'm starting to question whether I should be using flows (or something like it) here, compared to just getting a callback to process each incoming message and sending the ACK to the server when the callback completes without errors; e.g.:
retrieveMessages { message -> File("the path").writeBytes(message) }
Thoughts?travis
08/19/2020, 10:32 PMbroadcastChannel.asFlow()
). You could send
to the BroadcastChannel
upon receipt of a message. Then after the send
call completes reply with the ack.
Just be aware that if you don't have any subscribers of the BroadcastChannel
then the message will be lost (same would also be true of a listener/callback style if you don't have any listeners registered).Gus
08/20/2020, 10:10 AM