https://kotlinlang.org logo
Title
d

diesieben07

02/28/2018, 12:51 PM
How do I properly send to a
SendChannel
which might be closed? Do I have to just catch the
ClosedSendChannelException
?
j

Jonathan

02/28/2018, 12:52 PM
If it is closed, you cannot send elements to it by definition. So yes, you just have to catch the exception and deal with it.
d

diesieben07

02/28/2018, 12:53 PM
Ok. Then maybe I am going about this wrong. I have a (potentially infinite) source and I want the receiver to be able to tell "hey, I don't need anymore". Is
close
the correct way?
j

Jonathan

02/28/2018, 12:54 PM
If you don't want elements it is okay to close it. But of course it won't be infinite anymore. Note that you can also test
isClosedForSend
to know if it is currently close.
Or depending on your use-case, you maybe are looking for a
BroadcastChannel
?
If using a broadcast you would never close the broadcast, but the receiver could close the opened subscription
d

diesieben07

02/28/2018, 12:55 PM
Yes, I can test
isClosedForSend
, but that's a race condition, is it not (tocttou)?
Well, I need to close the channel, because there are resources associated with it (network stuff...)
j

Jonathan

02/28/2018, 12:57 PM
Then, If possible I would recommend to create the channel with
produce
. this function will handle everything, like closing when done, when cancelled and when failed.
d

diesieben07

02/28/2018, 12:59 PM
That only moves the problem does it not? The channel still has no
sendIfPossibleOrReturnFalse
method.
If so I could just do
do {
    val element = TODO() // compute next element
} while (channel.sendIfPossibleOrReturnFalse(element))
That's what I am trying to achieve.
j

Jonathan

02/28/2018, 1:02 PM
Do this instead:
produce {
	while(isActive) {
		val element = TODO()
		send(element)
	}
}
Then
produce
will handle it for you
d

diesieben07

02/28/2018, 1:02 PM
Will it though? What if the receiver closes the channel during whatever
TODO
is? Then
send
will throw...
j

Jonathan

02/28/2018, 1:04 PM
If the consumer cancel the channel,
send
will throw a
CancelledException
.
d

diesieben07

02/28/2018, 1:04 PM
Yes...
Which I do not want. I want the receiver to be able to tell the producer to stop.
I do not want the receiver to be able to tell the producer to blow up with an exception.
Also, with this
produce
approach, how do I terminate the underlying thread (
newSingleThreadContext
) when the channel is closed?
j

Jonathan

02/28/2018, 1:07 PM
CancelledException
will be consumed by
produce
as it is a normal reason to be terminated. If you use resources you should anyway always ensure they are closed in case of exception. (same as if you wouldn't use coroutines)
d

diesieben07

02/28/2018, 1:08 PM
Okay, so then that part is sorted. How do I close the thread context though?
j

Jonathan

02/28/2018, 1:08 PM
If you use
newSingleThreadContext
you have of course to ensure that is terminated even in case of exception.
d

diesieben07

02/28/2018, 1:08 PM
Yes, but... how can I terminate it from inside the produce? Can I terminate it from "inside itself"?
Just
use
will not work, will it?
j

Jonathan

02/28/2018, 1:10 PM
Mmh. that's a good question, I've never did that
d

diesieben07

02/28/2018, 1:11 PM
I.e. this:
newSingleThreadContext().use { ctx ->
    return produce(ctx) {
    }
}
won't work, right?
j

Jonathan

02/28/2018, 1:11 PM
no it won't work
It may terminate the dispatcher to early
d

diesieben07

02/28/2018, 1:12 PM
Yeah that's what I was thinking.
I guess I will have to pass the context into the method that returns
produce
?
j

Jonathan

02/28/2018, 1:14 PM
fun startProducer() {
  val context = newSingleThreadContext("my dispatcher")

  val result = produce(context) {
    context.use {
      send(1)
      send(2)
      send(3)
    }
  }
}
this should work. But I wonder if there is a better way of doing it.
d

diesieben07

02/28/2018, 1:14 PM
I am doing it outside now, and passing
coroutineContext
as a parameter.
thanks for your help!
j

Jonathan

02/28/2018, 1:15 PM
By the way it may be not necessary in the future to terminate the dispatcher. see: https://github.com/Kotlin/kotlinx.coroutines/issues/261
b

bj0

02/28/2018, 4:35 PM
why does it need to be outside produce?
produce {
    newSingleThreadContet("dispatch").use {
        while(isActive)
            send(TODO())
    }
}
j

Jonathan

02/28/2018, 4:36 PM
@bj0, in your example
produce
use
CommonPool
b

bj0

02/28/2018, 4:37 PM
ah right
you can toss a
withContext
in there
outside the
while
loop
j

Jonathan

02/28/2018, 4:38 PM
Yes indeed. But it is simpler to simply give the context as an argument of
produce
b

bj0

02/28/2018, 4:41 PM
I don't know, I think it's less simple to have to create an essentially local variable outside the scope of where it's used.
seems leaky
j

Jonathan

02/28/2018, 4:46 PM
Yes, but if it is really local, what is the point creating a single thread context ? Why not simply use the common pool ?
I don't see the point of creating a thread pool if it is not to be shared by many coroutines.
b

bj0

02/28/2018, 4:50 PM
thats an implementation detail and I don't know what his program's context is.
j

Jonathan

02/28/2018, 4:56 PM
I'm not sure. I feel that's there is something not correct with this approach. Basically it could leads to a lot of thread creation which is very expensive and bad practice. The good practice is to use thread pool in order to reuse the threads. I would rather create a global ThreadPool like
val MyThreadPool = newSingleThreadPool("my thread pool")
and use it for the whole life of the application. calling
produce(MyThreadPool) {}
d

diesieben07

02/28/2018, 5:00 PM
Yes, I agree with @Jonathan. That's why I ended up passing the
CoroutineContext
into my own method and then pass it on to
produce
.
b

bj0

02/28/2018, 5:07 PM
if you're doing that you're not going to clean it up when one producer closes though
d

diesieben07

02/28/2018, 5:08 PM
I have the whole thing (produce and consume) in a
newSingleThreadContext().use {}
. It is closed properly.
b

bj0

02/28/2018, 5:18 PM
i mean if you're going to "create a global ... and use it for the whole life of the application", obviously use case matters