Hi everyone, I'm thinking about how I can accompli...
# coroutines
r
Hi everyone, I'm thinking about how I can accomplish some UDP socket programming using coroutines. Currently in my (non-coroutine) code I have a class that maintains a
DatagramSocket
as internal state and implements
Runnable
so it can be scheduled to run in the background on a
ThreadPoolExecutor
. Right now this class polls every so often to check a concurrent queue for messages to send and other application code uses some
send
methods on it to add messages to the queue for sending. Conceptually I think I need the following ingredients to port this to coroutines: 1.
newSingleThreadContext
2.
runBlocking { }
on that single thread context to get it started 3. A
SendChannel
for other code to interact with to actually send 4.
delay(POLL_INTERVAL)
for polling (although, I'm not sure I actually need this if I use
RENDEZVOUS
) Is this ingredients list sound? Is there anything that jumps out at you as unnecessary or even missing? From what I've read I could probably lump most of this together with
actor
, but that has been marked
ObsoleteCoroutineApi
so I'll probably stay away from it for now. Whatever I end up doing for Tx I hope to repurpose as much as possible for Rx where Ill have some additional complexity around pushing messages out to other application code (probably using a
SharedFlow
). I'd appreciate any thoughts, and if you know of any examples of this sort of thing I'd appreciate that as well.
ktor
comes to mind as having a lot of socket related code + coroutine support, but it would be a pretty big dependency to include just for the socket APIs.
f
You don't need runBlocking and a separate thread, you can run the coroutine consuming the channel on some existing dispatcher (IO most likely), or a specific thread if you need, but without runBlocking. Your current behaviour uses a queue (hence a buffer), be careful choosing the appropriate buffer size for your component, you may want to buffer a bunch (or all) of runnables, depending in the desired behavior. Why so you need to poll every interval? Shouldn't the runnable be processed asap? Channel.receive will suspend until there is something available in the channel, why would you use delay?
r
Good point w.r.t. using
delay
, I'm still learning my way around so it. It looks like
BUFFERED
, with
onBufferOverflow = BufferOverfl
Small update on this: I recently updated the non-coroutines implementation to stop using a polling interval as well because it was causing performance issues under load. It now uses a
BlockingQueue
for Tx and relies on
Socket::receive
's inherent "block until something is available" behavior to do what you described here for a coroutines implementation. Should make for a very straight forward translation to coroutines.