What’s the best way to mix a long running loop wit...
# coroutines
s
What’s the best way to mix a long running loop with coroutines. For example, I want to do something like
while (true) { consumer.take() }
, where consumer.take is a blocking call on a JMS client. Normally, I’d launch a new thread for this, and then shut down the thread once the program exits.
o
You'll still have to have some thread doing the same thing with coroutines. Typically this style of coroutine gets put on the
<http://Dispatchers.IO|Dispatchers.IO>
thread pool, which has a lot of extra threads that can be blocked, or you might want to have your own thread pool with a similarly high number of threads. In any case, I would
yield()
after every
take()
to participate in cooperative cancellation, but otherwise, the loop is still how you handle this in coroutines. If you wanted to also run some code inside the loop, you might send the data over
Channel
-- do
yourIOScope.produce { while (true) channel.send(consumer.take()) }
, and then use the resulting channel to retrieve results when you need them, in a suspending manner. But it all depends on your use case.
s
I would only need one thread for the loop, so I could make a thread for it, and wrap that in a dispatcher. Can I use a dispatcher as the basis to launch a coroutine?
l
You can also just use
while (isActive)
if the semantics suit your use case well.
s
while (isActive) { } will block the calling thread, so I need to wrap it in something.
o
you can use a dispatcher to launch, you just need to wrap it in a Scope:
CoroutineScope(your-dispatcher)
s
So just working out loud,
Copy code
val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    CoroutineScope(dispatcher).launch {
      while (true) {
        val messages = consumer.poll(Duration.ofSeconds(5))
      }
    }
l
Checking if
isActive
will not block it but will not let other coroutines run cooperatively on the same dispatcher as yield() or other suspending and dispatching function would.
s
@louiscad sorry, I thought by isActive you just meant some kind of volatile or atomic boolean
so
isActive
here will check that the coroutine is still running, how does that help with cooperation, I’m not clear on that
l
You can stop looping when it's no longer active (i.e. cancelled).
s
Right, that’s what I thought, but while it’s active it’s the same as while true.
@octylFractal if i use a decided thread for this coroutine then I shouldn’t need to use yield right. In fact if I use a dedicated thread, am I gaining anything with the coroutine at all.
o
not too much, but if you do a hand-off to Channel like I suggested, you gain the ability to suspend in other contexts for the same call
s
Yeah I want to use a channel (or a flow?) for this
you gain the ability to suspend in other contexts for the same call
can you explain that ?
o
if you use your single-thread like so:
Copy code
val messages = CoroutineScope(dispatcher).produce {
      while (true) {
        channel.send(consumer.take())
      }
    }
in another coroutine, you can do
messages.receive()
(or similar call) to suspend for the result of
consumer.take
. it will still block the single-thread-dispatcher, but it won't block other threads that use
receive()
👍 1
s
I get you, because without the channel, the consumer(s) would themselves also have to block
o
yes
s
That’s only an advantage if I have more than one consumer right (I will be doing that)
o
maybe if you want your code to read in a certain way, it can also be useful. if you have other suspend calls, you probably don't want to have a lot of
withContext
calls in your code, switching from your blocking thread to e.g. a UI thread (not sure if that's relevant to you)
if you don't have other suspend calls, then you probably don't need it
s
Yeah no UI thread here.
basically, reading from a consumer, and then doing something with that message. Then blocking until another message comes in. I could pass that message over a channel to be consumed, but actually I would only have 1 consumer per channel anyway, so I could just pick up the message and process it inside the thread blocked on the consumer. I guess one advantage is that if what I need to do with the message is CPU intensive, I can have the consumer run back on the CPU thread pool
o
yes, and have the next message already fetched for you as well if you use
produce(capacity = 1)
s
Would you recommend using produce over manually creating a channel with a buffer ?
o
yes, because it will handle closing the channel for you on exceptions
s
ok
very very helpful thank you @octylFractal