https://kotlinlang.org logo
#flow
Title
# flow
g

galex

12/13/2023, 3:36 PM
Hello! I need a timeout mechanism for the first item emitted only but
.timeout()
is called after each emit, so I made the following function:
Copy code
/**
 * If the first element takes too long to emit, the [onTimeout] callback will be called without disturbing the current flow
 */
fun <T> Flow<T>.timeoutFirst(duration: Long, scope: CoroutineScope, onTimeout: () -> Unit): Flow<T> = flow {
    
    var emitted = false

    scope.launch {
        delay(duration)
        if (!emitted) {
            onTimeout()
        }
    }

    collect {
        emitted = true
        emit(it)
    }
}
First question: Is there a way to get the coroutineScope where the flow will be collected so that I don't need to pass a scope? I was using
coroutineScope {}
before instead of
scope.launch
but then I couldn't get it to work in unit tests under
runTest
Second question: Instead of a callback, could I use the throwing mechanism as
timeout
does? Thanks in advance! 😊
f

franztesca

12/13/2023, 8:43 PM
Copy code
fun <T> Flow<T>.timeoutFirst(duration: Duration) = flow {
    coroutineScope {
        val values = buffer(Channel.RENDEZVOUS).produceIn(this)
        withTimeout(duration) {
            emit(values.receiveCatching().onClosed { return@withTimeout }.getOrThrow())
        }
        emitAll(values.consumeAsFlow())
    }
}
a

aarkling

01/02/2024, 10:40 PM
OMG I literally came here for this exact same problem 😀
What is the buffer for?
I think this works (modification of alex’s example):
Copy code
class TimeoutFirstCancellationException : CancellationException()

fun <T> Flow<T>.timeoutFirst(duration: Duration): Flow<T> = flow {
  coroutineScope {
    var emitted = false

    launch {
      delay(duration)
      if (!emitted) {
        throw TimeoutFirstCancellationException()
      }
    }

    collect {
      emitted = true
      emit(it)
    }
  }
}
Nvm. Above doesn’t work.
This does:
Copy code
private fun <T> Flow<T>.timeoutFirst(duration: Duration): Flow<T> = channelFlow {
    val emitted = Job()

    launch {
      collect {
        emitted.cancelAndJoin()
        send(it)
      }
    }

    try {
      withTimeout(duration) { emitted.join() }
    } catch (e: TimeoutCancellationException) {
      if(emitted.isActive) throw e
    }
  }
g

galex

01/03/2024, 4:42 AM
@aarkling I ended up with this:
Copy code
fun <T : Any?> Flow<T>.timeoutFirstNotNull(duration: Long, onTimeout: () -> Unit): Flow<T> = flow {
    coroutineScope {

        var emitted = false

        launch {
            delay(duration)
            if (!emitted) {
                onTimeout()
            }
        }

        collect {
            if (it != null && !emitted) {
                emitted = true
            }
            emit(it)
        }
    }
}
That's without throw mechanism, yours looks actually better I think