adjorno
05/15/2025, 8:19 AMinternal class AnalyticsQueueWithFileCache(
private val delegate: AnalyticsQueue,
) : AnalyticsQueue {
override val events: List<MockEvent> = delegate.events
private val scope = CoroutineScope(SupervisorJob())
private var fileDumpJob: Job? = null
override fun emitEvent(event: MockEvent) {
println("New event - ${event.event}")
delegate.emitEvent(event)
val newAnalytics = Analytics(events.subList(0, events.size).sortedBy { it.timestamp })
val prevJob = fileDumpJob
prevJob?.cancel()
println("cancel - ${event.event}")
fileDumpJob = null
fileDumpJob = scope.launch {
println("before cancelAndJoin - ${event.event}")
prevJob?.cancelAndJoin()
println("after cancelAndJoin - ${event.event}")
dumpAnalyticsToFile(event, newAnalytics)
fileDumpJob = null
}
}
override fun clear() {
delegate.clear()
}
private suspend fun dumpAnalyticsToFile(event: MockEvent, analytics: Analytics) = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
ensureActive()
val json = json.encodeToString<Analytics>(analytics)
ensureActive()
saveDataToPublicFile(json, "analytics.json")
println("File dumped - ${event.event}")
}
}
Unfortunately I am randomly missing 1 or 2 of last emitted events in the file compared to the in-memory implementation.
It feels I am messing up something with the coroutines. Could anyone help to spot the issue?Youssef Shoaib [MOD]
05/15/2025, 8:46 AMemitEvent
can run in parallel with events e1, e2 where e1 happens first, reaches the newAnalytics
line and completes it, but then e2 happens, and emitEvent
runs fully, hence fileDumpJob
is actually the one with knowledge of e2. Then, the thread continues, and cancels prevJov
, thus e2 is lost forever.
I think you can get by if you read fileDumpJob
at the very very start.Youssef Shoaib [MOD]
05/15/2025, 8:49 AMChannel<Unit>
would be better. Then, you simply send(Unit)
to remind it to dump the file, but it reads events
internally, so it always has the latest data (the bad alternative being that you use a Channel<List<MockEvent>>
, and then due to parallelism, the coroutine receives listOf(e1, e2)
first, then listOf(e1)
adjorno
05/15/2025, 9:22 AMinternal class AnalyticsQueueWithFileCache(
private val delegate: AnalyticsQueue,
) : AnalyticsQueue {
override val events: List<MockEvent> = delegate.events
private val scope = CoroutineScope(SupervisorJob())
private var fileDumpJob: Job? = null
override fun emitEvent(event: MockEvent) {
println("New event - ${event.event}")
delegate.emitEvent(event)
val prevJob = fileDumpJob
prevJob?.cancel()
println("cancel - ${event.event}")
fileDumpJob = scope.launch {
println("Cancelling - ${event.event}")
prevJob?.cancelAndJoin()
println("Cancelled - ${event.event}")
dumpAnalyticsToFile(event)
fileDumpJob = null
}
}
override fun clear() {
delegate.clear()
}
private suspend fun dumpAnalyticsToFile(event: MockEvent) = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
println("Sorting - ${event.event}")
ensureActive()
val analytics = Analytics(events.subList(0, events.size).sortedBy { it.timestamp })
println("Sorted - ${event.event}")
println("Encoding - ${event.event}")
ensureActive()
val json = json.encodeToString<Analytics>(analytics)
println("Encoded - ${event.event}")
println("Dumping - ${event.event}")
ensureActive()
saveDataToPublicFile(json, "analytics.json")
println("Dumped - ${event.event}")
}
}
Youssef Shoaib [MOD]
05/15/2025, 9:24 AMadjorno
05/15/2025, 9:24 AMensureActive
works. I am seeing these logs:
New event - network_request_completed
cancel - network_request_completed
New event - workflow_completed
cancel - workflow_completed
Cancelling - workflow_completed
New event - flow_completed
cancel - flow_completed
Cancelling - flow_completed
Cancelled - flow_completed
Sorting - flow_completed
Sorted - flow_completed
Encoding - flow_completed
Cancelling - network_request_completed
Encoded - screen_forward
Dumping - screen_forward
Cancelled - network_request_completed
Sorting - network_request_completed
Sorted - network_request_completed
Encoding - network_request_completed
Encoded - flow_completed
Dumping - flow_completed
Dumped - flow_completed
Encoded - network_request_completed
Dumping - network_request_completed
Dumped - network_request_completed
How could it be that the very first event network_request_completed
printed that the Job is Cancelled
but then I still see all the stages of dumping like Sorting, Encoding and Dumpin
.
I would expect that ensureActive
throws an CancellationException
adjorno
05/15/2025, 9:33 AMYoussef Shoaib [MOD]
05/15/2025, 9:37 AMYoussef Shoaib [MOD]
05/15/2025, 9:40 AMadjorno
05/15/2025, 9:42 AMYoussef Shoaib [MOD]
05/15/2025, 9:47 AMadjorno
05/15/2025, 9:49 AM