Hello everyone! I am trying to build the wrapper a...
# coroutines
a
Hello everyone! I am trying to build the wrapper around the events queue that would dump all the events to some file
Copy code
internal class AnalyticsQueueWithFileCache(
    private val delegate: AnalyticsQueue,
) : AnalyticsQueue {

    override val events: List<MockEvent> = delegate.events

    private val scope = CoroutineScope(SupervisorJob())
    private var fileDumpJob: Job? = null

    override fun emitEvent(event: MockEvent) {
        println("New event - ${event.event}")
        delegate.emitEvent(event)

        val newAnalytics = Analytics(events.subList(0, events.size).sortedBy { it.timestamp })
        val prevJob = fileDumpJob
        prevJob?.cancel()
        println("cancel - ${event.event}")
        fileDumpJob = null
        fileDumpJob = scope.launch {
            println("before cancelAndJoin - ${event.event}")
            prevJob?.cancelAndJoin()
            println("after cancelAndJoin - ${event.event}")
            dumpAnalyticsToFile(event, newAnalytics)
            fileDumpJob = null
        }
    }

    override fun clear() {
        delegate.clear()
    }

    private suspend fun dumpAnalyticsToFile(event: MockEvent, analytics: Analytics) = withContext(<http://Dispatchers.IO|Dispatchers.IO>)  {
        ensureActive()
        val json = json.encodeToString<Analytics>(analytics)
        ensureActive()
        saveDataToPublicFile(json, "analytics.json")
        println("File dumped - ${event.event}")
    }
}
Unfortunately I am randomly missing 1 or 2 of last emitted events in the file compared to the in-memory implementation. It feels I am messing up something with the coroutines. Could anyone help to spot the issue?
y
You should have your dump job always print the latest analytics I feel. Otherwise, I think right now,
emitEvent
can run in parallel with events e1, e2 where e1 happens first, reaches the
newAnalytics
line and completes it, but then e2 happens, and
emitEvent
runs fully, hence
fileDumpJob
is actually the one with knowledge of e2. Then, the thread continues, and cancels
prevJov
, thus e2 is lost forever. I think you can get by if you read
fileDumpJob
at the very very start.
1
Maybe having the job instead be a long-running coroutine that you communicate with using a
Channel<Unit>
would be better. Then, you simply
send(Unit)
to remind it to dump the file, but it reads
events
internally, so it always has the latest data (the bad alternative being that you use a
Channel<List<MockEvent>>
, and then due to parallelism, the coroutine receives
listOf(e1, e2)
first, then
listOf(e1)
a
Thanks. I think the sorting is a time-consuming CPU operation and there is a issue with placing it there. I reshuffled it to dump the latest version of analytics all the time and so far so good:
Copy code
internal class AnalyticsQueueWithFileCache(
    private val delegate: AnalyticsQueue,
) : AnalyticsQueue {

    override val events: List<MockEvent> = delegate.events

    private val scope = CoroutineScope(SupervisorJob())
    private var fileDumpJob: Job? = null

    override fun emitEvent(event: MockEvent) {
        println("New event - ${event.event}")
        delegate.emitEvent(event)

        val prevJob = fileDumpJob
        prevJob?.cancel()
        println("cancel - ${event.event}")

        fileDumpJob = scope.launch {
            println("Cancelling - ${event.event}")
            prevJob?.cancelAndJoin()
            println("Cancelled - ${event.event}")
            dumpAnalyticsToFile(event)
            fileDumpJob = null
        }
    }

    override fun clear() {
        delegate.clear()
    }

    private suspend fun dumpAnalyticsToFile(event: MockEvent) = withContext(<http://Dispatchers.IO|Dispatchers.IO>)  {
        println("Sorting - ${event.event}")
        ensureActive()
        val analytics = Analytics(events.subList(0, events.size).sortedBy { it.timestamp })
        println("Sorted - ${event.event}")
        println("Encoding - ${event.event}")
        ensureActive()
        val json = json.encodeToString<Analytics>(analytics)
        println("Encoded - ${event.event}")
        println("Dumping - ${event.event}")
        ensureActive()
        saveDataToPublicFile(json, "analytics.json")
        println("Dumped - ${event.event}")
    }
}
y
nice! I think that should be good now! I don't think there's a chance for the job to disappear here, since every one who cancels it immediately puts it back in.
a
But it feels I don't understand how cooperative cancellation and
ensureActive
works. I am seeing these logs:
Copy code
New event - network_request_completed
cancel - network_request_completed
New event - workflow_completed
cancel - workflow_completed
Cancelling - workflow_completed
New event - flow_completed
cancel - flow_completed
Cancelling - flow_completed
Cancelled - flow_completed
Sorting - flow_completed
Sorted - flow_completed
Encoding - flow_completed
Cancelling - network_request_completed
Encoded - screen_forward
Dumping - screen_forward
Cancelled - network_request_completed
Sorting - network_request_completed
Sorted - network_request_completed
Encoding - network_request_completed
Encoded - flow_completed
Dumping - flow_completed
Dumped - flow_completed
Encoded - network_request_completed
Dumping - network_request_completed
Dumped - network_request_completed
How could it be that the very first event
network_request_completed
printed that the Job is
Cancelled
but then I still see all the stages of dumping like
Sorting, Encoding and Dumpin
. I would expect that
ensureActive
throws an
CancellationException
So it feels that the solution is still not ideal. And the question is: How to cancel the ongoing job?
y
Maybe the job shouldn't be cancelled? A long-running coroutine doesn't actually consume much of anything. It just stays suspended until it receives a signal. A Channel of unit solution would probably work great here then! Or `Channel<MockEvent>`if you want to print the same diagnostic printlns as you have now. I think the issue may be again threads running in parallel assume no running job exists t1 goes in, sees no job running, so doesn't cancel anything, then prepares its own job. Meanwhile, T2 also doesn't see a job running, so prepares its own, and sets it and returns. Then, t1 continues, so it sets its job to the field, overriding t2's job. Thus, t2's job will never get cancelled
In fact, the same situation can happen but with t1's job getting lost, which is likely what you're seeing here. All that needs to happen is that both threads see no active job at the same time, so then both launch a new coroutine and set it, and thus it's a race as to which job is stored, but one will always be lost in this circumstance
a
I don't want to write to the same file unnecessary. If I get 3 events at the same time I want only 1 (the last one) job to sort-encode-dump and all others to be cancelled so there is no issues with I/O.
y
You just need to synchronize it somehow so that only one thread manages that. The solution with channels would do that by having 1 coroutine that manages that. Then, that coroutine can store a "latest event stored" or something, and when it receives a signal, it only writes the file if the latest event has changed. Because only this coroutine can access it, there's no races
a
Probably I will try this option. I am reading this doc and I think I do everything according to it but somehow the execution still continues for the old jobs. https://kotlinlang.org/docs/cancellation-and-timeouts.html#making-computation-code-cancellable