How do I properly send to a `SendChannel` which mi...
# coroutines
d
How do I properly send to a
SendChannel
which might be closed? Do I have to just catch the
ClosedSendChannelException
?
j
If it is closed, you cannot send elements to it by definition. So yes, you just have to catch the exception and deal with it.
d
Ok. Then maybe I am going about this wrong. I have a (potentially infinite) source and I want the receiver to be able to tell "hey, I don't need anymore". Is
close
the correct way?
j
If you don't want elements it is okay to close it. But of course it won't be infinite anymore. Note that you can also test
isClosedForSend
to know if it is currently close.
Or depending on your use-case, you maybe are looking for a
BroadcastChannel
?
If using a broadcast you would never close the broadcast, but the receiver could close the opened subscription
d
Yes, I can test
isClosedForSend
, but that's a race condition, is it not (tocttou)?
Well, I need to close the channel, because there are resources associated with it (network stuff...)
j
Then, If possible I would recommend to create the channel with
produce
. this function will handle everything, like closing when done, when cancelled and when failed.
d
That only moves the problem does it not? The channel still has no
sendIfPossibleOrReturnFalse
method.
If so I could just do
Copy code
do {
    val element = TODO() // compute next element
} while (channel.sendIfPossibleOrReturnFalse(element))
That's what I am trying to achieve.
j
Do this instead:
Copy code
produce {
	while(isActive) {
		val element = TODO()
		send(element)
	}
}
Then
produce
will handle it for you
d
Will it though? What if the receiver closes the channel during whatever
TODO
is? Then
send
will throw...
j
If the consumer cancel the channel,
send
will throw a
CancelledException
.
d
Yes...
Which I do not want. I want the receiver to be able to tell the producer to stop.
I do not want the receiver to be able to tell the producer to blow up with an exception.
Also, with this
produce
approach, how do I terminate the underlying thread (
newSingleThreadContext
) when the channel is closed?
j
CancelledException
will be consumed by
produce
as it is a normal reason to be terminated. If you use resources you should anyway always ensure they are closed in case of exception. (same as if you wouldn't use coroutines)
d
Okay, so then that part is sorted. How do I close the thread context though?
j
If you use
newSingleThreadContext
you have of course to ensure that is terminated even in case of exception.
d
Yes, but... how can I terminate it from inside the produce? Can I terminate it from "inside itself"?
Just
use
will not work, will it?
j
Mmh. that's a good question, I've never did that
d
I.e. this:
Copy code
newSingleThreadContext().use { ctx ->
    return produce(ctx) {
    }
}
won't work, right?
j
no it won't work
It may terminate the dispatcher to early
d
Yeah that's what I was thinking.
I guess I will have to pass the context into the method that returns
produce
?
j
Copy code
fun startProducer() {
  val context = newSingleThreadContext("my dispatcher")

  val result = produce(context) {
    context.use {
      send(1)
      send(2)
      send(3)
    }
  }
}
this should work. But I wonder if there is a better way of doing it.
d
I am doing it outside now, and passing
coroutineContext
as a parameter.
thanks for your help!
j
By the way it may be not necessary in the future to terminate the dispatcher. see: https://github.com/Kotlin/kotlinx.coroutines/issues/261
b
why does it need to be outside produce?
Copy code
produce {
    newSingleThreadContet("dispatch").use {
        while(isActive)
            send(TODO())
    }
}
j
@bj0, in your example
produce
use
CommonPool
b
ah right
you can toss a
withContext
in there
outside the
while
loop
j
Yes indeed. But it is simpler to simply give the context as an argument of
produce
b
I don't know, I think it's less simple to have to create an essentially local variable outside the scope of where it's used.
seems leaky
j
Yes, but if it is really local, what is the point creating a single thread context ? Why not simply use the common pool ?
I don't see the point of creating a thread pool if it is not to be shared by many coroutines.
b
thats an implementation detail and I don't know what his program's context is.
j
I'm not sure. I feel that's there is something not correct with this approach. Basically it could leads to a lot of thread creation which is very expensive and bad practice. The good practice is to use thread pool in order to reuse the threads. I would rather create a global ThreadPool like
val MyThreadPool = newSingleThreadPool("my thread pool")
and use it for the whole life of the application. calling
produce(MyThreadPool) {}
d
Yes, I agree with @Jonathan. That's why I ended up passing the
CoroutineContext
into my own method and then pass it on to
produce
.
b
if you're doing that you're not going to clean it up when one producer closes though
d
I have the whole thing (produce and consume) in a
newSingleThreadContext().use {}
. It is closed properly.
b
i mean if you're going to "create a global ... and use it for the whole life of the application", obviously use case matters