Is it possible to cancel a `Flow` manually but als...
# coroutines
l
Is it possible to cancel a
Flow
manually but also waits until all launched coroutines in this flow have finished? I have a channel which is
receivedAsFlow
which in turn keeps the flow running until the channel is closed (by user or by exception). I would like to escape from this flow but also wait until all coroutines have finished...
z
How are you launching coroutines in the flow? What do you mean by escape from the flow – you want the collector to stop collecting (execute the next line after the
collect{}
call)?
l
This is what I'm doing but the flow never completes.
subscribeToPacketChannel()
is just a call to
channel.receiveAsFlow()
. So as I wrote in the comments, the flow collects multiple packets but this happens faster than parsing each packet. So while last packet has been collected there might be 1, 4 or even 6 packets still busy with parsing. After all packets are parsed I would expect that the flow completes but since it's collecting from a channel it never completes.
What do you mean by escape from the flow – you want the collector to stop collecting (execute the next line after the 
collect{}
 call)?
By "escape" I mean that I would expect, that the flow completes after all packets have been parsed and the next line is executed
z
Do you mean to launch those parsing coroutines as children of that outer coroutine scope (
packetCollectorJob
, which isn’t actually a job but a
CoroutineScope
)?
It’s unclear from this code what is actually consuming the deserialized packets, so I’m a bit confused
l
Sorry, the comments make it hard to read. In contrast to this I have made a sample how I would expect it to run: Kotlin playground
Is it still unclear?
Do you mean to launch those parsing coroutines as children of that outer coroutine scope (
packetCollectorJob
, which isn’t actually a job but a 
CoroutineScope
)?
I forgot to remove this line with
packetCollectorJob
. It is part of my current workaround:
Copy code
coroutineScope {
	launch {
		val packetCollectorJob = this
		dataSource.subscribeToPacketChannel().collect { packet ->
			try {
				coroutineScope {
					// Launch parsing in its own coroutine because
					// it might happen that parsing hasn't finished
					// while next packet arrives
					launch(<http://Dispatchers.IO|Dispatchers.IO>) {
						packet.deserialize(destination) // non-suspending function, destination is a Map
					}
				}
			} catch (e: LastPacketException) {
			    // Within deserialize: when last packet is determined
				// this exception is thrown. But detection of last packet
				// does not mean all packets are parsed/ all child coroutines
				// have been finished. So I delay execution before canceling the scope
                // which in turn cancels the flow which is the goal
                launch {
				    delay(6000L) // TODO dirty workaround
				
				    /*
				     * Cancel coroutine which in turn cancels the flow.
				     * This is necessary because [subscribeToPacketChannel] function
				     * relies on a channel, i.e. the flow collector is only cancelled when
				     * channel is closed by user or by exception of the flow collector itself.
				     */
				    packetCollectorJob.cancel()
                }
			}
		}
	}

	launch { // another coroutine }
}
z
in
packet.deserialize(destination, deviceProfile)
, is
destination
a callback of some sort?
l
Nope just a
Map
z
ah
l
ah ok sorry now I understand why u were confused
z
Does
deserialize
actually do IO, or just process data that’s already in memory?
l
It does IO, within I'm working with Buffer from "okio" library
z
Using
Buffer
doesn’t mean you’re doing IO,
Buffer
is just an in-memory data structure
The reason i’m asking: if all your work is CPU-bound, then you should be doing it on the
Default
dispatcher, not
IO
. And if that’s the case, then are you sure that deserializing is actually so expensive that each packet has to be done in parallel?
If not, then it would be a lot simpler to just decode serially, and then you don’t have to solve this problem at all. You could still run the processing as a whole on a different dispatcher if you want, e.g. to get off the main thread.
l
ok then it doesn't do IO. To be honest, I don't know the exact differences between Default and IO. I think it is not expensive but I'm doing it to run "receiving packets" and "parse packets" in parallel to reduce the duration and get the output (here:
destination
) faster
z
E.g.:
Copy code
coroutineScope {
  launch(Dispatchers.Default) {
    dataSource.subscribeToPacketChannel()
      .map {
        try {
          packet.deserialize(destination, …)
          return@map true
        } catch (e: LastPacketException) {
          return@map false
        }
      }
      .takeWhile { it }
      .collect()
  }

  // launch other coroutines
}
l
Let me try this
z
“IO” in this case means asking the JVM to do something that will cause the current thread to block and wait for the OS to do something and get back to it. I.e., the thread is blocking the caller, but it’s not actually doing any work – the thread is just sitting around in the OS, waiting to be scheduled again when the OS has a response to give it. The only resources such a thread consumes while blocked is memory, not processing.
IO
is good for this sort of thing because it uses a thread pool with a large thread count. It assumes that, at any given time, all of its threads will be blocked on OS requests. That is, not doing any actual work, just waiting for the OS to reschedule them. Because these threads aren’t actually doing any processing themselves, there’s no reason to limit them to the number of available processors/cores. It’s bad for CPU-bound work because the system can only run as many tasks in parallel as there are physical processors/cores – any more than that, and the OS is going to have to start swapping between threads, giving them each a chance to run. Switching threads is relatively expensive, since the OS has to swap out all the registers and stuff. So while the OS can handle more threads actively doing work than there are processors to do it, it’s not as efficient.
Default
is good for CPU-bound work because it uses a thread pool that basically has only as many threads as there are actual physical cores/processors available. This means that, if the
Default
dispatcher is saturated with work, the available processing hardware will be saturated with work, but no more coroutines will be resumed until there’s another physical processor available. It’s bad for IO because if one of the threads is blocked on IO, that means there’s effectively one physical processor that can’t be used to do actual work while the jvm is waiting for the OS to return a response.
l
Wow thanks for the detailed explanation! The main reason for choosing IO for coroutines was that my data source is a bluetooth classic device and I thought bluetooth operations are some sort of IO. So to be sure, bluetooth classic operations (read/write from/to socket) are no IO operations? Btw, your snippet works like a charm 🙂
z
Socket operations are IO in this sense (and any sense), yes. But if you’ve got an okio
Buffer
in memory that’s been filled already, then working with that
Buffer
directly is not IO in this sense, it’s effectively the same thing as doing operations on a
MutableList
.
l
ok so in general one can say that all operations that happens in memory are not IO and operations performing on peripherie like file system, bluetooth is IO right?
z
correct
l
One last question for the learning effect. What I did previously:
Copy code
coroutineScope {
	launch(<http://Dispatchers.IO|Dispatchers.IO>) {
		packet.deserialize(destination)
	}
}
isn't it the same as just calling
packet.deserialize(destination)
? So in the end it doesn't really make sense or?
z
It’s more like
Copy code
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
  packet.deserialize(destination)
}
but yea if you actually did want that behavior, i would use
withContext
instead of `coroutineScope`+`launch`
l
Thats not what I was thinking about. Correct me if I'm wrong. From the point of view that my first plan was to run the packets in parallel, doing this:
Copy code
coroutineScope {
	launch(<http://Dispatchers.IO|Dispatchers.IO>) {
		packet.deserialize(destination)
	}
}
does not make sense, because I run packet in it's own coroutine but since it's wrapped with
coroutineScope
the effect of parallelism is neutralized (because coroutineScope waits for completion of its childs so in the end the packets are parsed sequentially), so it's the same as just calling
packet.deserialize(destination)
. Am I right with this assumption?
z
Correct, they would not happen in parallel. That’s why I asked if you mean to launch those in the
packetCollectorJob
scope, since that seemed to be your intent.
l
Ah ok now I understand why you were aksing this. I will go with your sample, that was good. Thanks for your time, really appreciated!
👍 1
a
Perfect explanation for
Default
and
IO
Dispatchers. Funny thing is I already knew this stuff, but damn... That explanation made me learn again something I already knew😂