I have this method. It sends a message out over a ...
# coroutines
c
I have this method. It sends a message out over a websocket and waits for a reply.
Copy code
suspend fun sendAndWait(message: Message): Message = withTimeout(5000) {
  launch {
    send(message)
  }
  // 'incoming' is a Flow<Message>
  incoming.first { /* logic here which matches outbound messages with inbound ones by some matching id */ }
}
The timeout never works. It never times out, I don’t get an exception. There are two suspension points:
send(message)
and
incoming.first { }
. Is there something I’m misunderstanding about timeouts?
withTimeout
has otherwise worked for me; I’m not sure what’s different in this case. Any ideas?
j
I don't think
send(message)
counts as a suspension point when it's being `launch`ed
c
Yeah I wasn’t sure about that. But calling
Flow.first()
should work, I thought
☝️ 1
s
Where does
incoming
come from? If the
Flow<Message>
is not cancellable, then`withTimeout` will not return before it's finished.
c
incoming
is a SharedFlow with incoming websocket frames. I am using Ktor/OkHttp websockets. I understand that cancellation can occur at any suspension point. But I guess I am confused by the statement in the docs:
All the suspending functions in
kotlinx.coroutines
are cancellable.
I am expecting
Flow.first()
to be cancellable. But perhaps I need a workaround like
Copy code
suspend fun sendAndWait(message: Message): Message = withTimeout(5000) {
  val job = async {
    launch {
      send(message)
    }
    
    incoming.first()
  }
  job.await()
}
Btw thank you for responding, I appreciate the help
The following simple program fails as I expect
Copy code
fun main(args: Array<String>) = runBlocking {
    val incoming = MutableSharedFlow<Unit>()
    val result = withTimeout(3000) {
        incoming.first()
    }
    println("Got result: $result")
}
Copy code
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 3000 ms
s
I am expecting
Flow.first()
to be cancellable. But perhaps I need a workaround like
That might depend, for example I think the following is prone to hanging.
Copy code
flow<Int> {
   withContext(NonCancellable) {
      delay(Long.MAX_VALUE)
      emit(1)
   }
}.first()
j
@Chris Fillmore Are you able to reproduce this behavior in some standalone code i.e. outside of the project you're working on?
c
No. The simple example I wrote above fails as expected
If I spent more time on it I could perhaps repro, but I don’t have the time right now
j
Got it. It's puzzling behavior. Seems there's something going on "outside the frame" of what we're looking at that we're not seeing, and that's causing this. I tried some variations on what you're trying to do and couldn't repro either.