https://kotlinlang.org logo
Title
l

Lilly

04/04/2022, 9:54 PM
Is it possible to launch a coroutine in a flow operator?
suspend fun producer() = flow {}

private suspend packetFlow(...): Flow<ResponsePacket> {
  return producer()
     .transformWhile {
        // how to launch coroutine here that does not block?
        // coroutineScope {} blocks so it's not an option
        launch {
          delay(2000L)
          state = 1 // set state
          return@transformWhile false // terminate flow
        }
     }
}
I would like to start a timer and terminate the flow after time has expired.
e

ephemient

04/05/2022, 5:05 AM
flow {
    withTimeoutOrNull(2000L) {
        emitAll(producer())
    }
}
l

Lilly

04/05/2022, 9:10 AM
@ephemient The code inside the coroutine should be called after the timeout and not while the time is running out. Also it is necessary to run the coroutine within the caller function not in producer. Are there any other options?
e

ephemient

04/05/2022, 9:39 AM
no, once you return the flow, you don't know when and where it will be subscribed to - it could happen any number of times at any point in the future
(unless you then wrap it with a sharedflow, which has different constraints)
l

Lilly

04/05/2022, 5:41 PM
@ephemient Not sure what you mean. The producer flow never terminates naturally because of:
suspend fun producer() = flow {
  while(true) {
     //Bluetooth input stream
  }
}
That's why I use
transformWhile
. I can terminate the flow manually without cancel it. Also I can wrap this flow into
channelFlow
and have access to `ProducerScope`:
private suspend packetFlow(...): Flow<ResponsePacket> = channelFlow {
  launch {  
	producer()
     .transformWhile {
	   while(condition) {
	    
			launch {
			  delay(2000L)
			  isLastPacket = 1 // set state
			  // When coroutine finished nothing happens. It does not recover
			}

            // I would expect to recover from coroutine here
	    }
		
		// Leave flow if false
		return@transformWhile !isLastPacket
     }.collect()
  }
This works so far, but after the coroutine finishes I would expect it goes on with the return statement to leave the flow, but instead...I don't know what the program is doing at this point. With the coroutine I want to check if the producer emits another packet in the specified time, if not I can ensure that the current packet is the last packet (isLastPacket = 1) and leave the flow (return@transformWhile !isLastPacket).