https://kotlinlang.org logo
#coroutines
Title
# coroutines
s

Stanislav Kral

10/11/2023, 9:13 PM
Hello guys, I am kinda new to coroutines (but not to the concept of streams/observables) and I'm not quite sure whether the following way of creating a flow is correct: • suppose I have a sealed interface of events
Copy code
sealed interface Event {
    object TransactionStarted: Event
    object TransactionPrintingStarted: Event
    class TransactionPrintingError(val retry: () -> Unit): Event
}
• and a method that initiates a transaction and prints the result
Copy code
suspend startTransaction() = flow {
    emit(Event.TransactionStarted)
    // ...
    emit(Event.TransactionPrintingStarted)

    // start printing, may return error

    // print error occurs
    val error = "Out of paper" // received from a print method

    // how to give the flow's collector a chance to retry printing?

    emit(Event.TransactionPrintingError({
         // retry callback invoked
         // ... print again
    }))
    
    // suspend here to not cancel the flow?
    // retry callback may be invoked
}
How do I wait for a callback from an object sent via
emit
? I've tried using
suspendCancellableCoroutine
but that didn't work, since I can't call
emit
inside the body. Thanks!
f

Francesc

10/11/2023, 9:22 PM
You should separate the flow of state, from the printing logic. What I would suggest to do is the following: • have a flow of events that indicates the printing stage (transaction started, printing started, and so on) • have a function to start the print process, and in this function push updates to your flow as you move along the different states • optionally have a retry as part of your print process, or let the caller decide if they want to retry, as they'd be notified of the error when they observe the stream Also, I'll point out that a function that returns a
flow
should not be `suspend`ing
s

Stanislav Kral

10/11/2023, 9:52 PM
I designed both
startTransaction
and
printTransactionResult
as high-level methods. In this flow I'd like to combine the general requirement of a transaction flow (perform a TX and print its result). This flow contains the following logic: • perform tx • check result • if unsucessful abort (don't print at all) • if successful, print the result • if print fails, give the consumer an option to retry printing Having it coupled together in a flow is a thing I desire, as it may be reused multiple times without the user having to implement the logic each time. Thanks for pointing out that functions returning a
flow
should not be `suspend`ing
f

Francesc

10/11/2023, 9:56 PM
the caller doesn't have to implement any logic, what I'm advocating for is to have a flow that callers can subscribe to, in order to observe the printing process, and a separate method to start that process. If you want to have a single flow you can add a
retry
to the flow and have it restart on failure.
s

Stanislav Kral

10/11/2023, 10:07 PM
Apologize if I misunderstood your advice, but wouldn't adding a retry operator cause the TX to be performed again in case of a print error? The requirement is to perform the tx only once, however, there can be multiple print requests when printing fails.
I would like the caller to just subscribe to a single flow that emits events as described in the first message.
f

Francesc

10/11/2023, 10:18 PM
yes, a retry operator restarts the whole flow. To be honest, I don't understand why you'd want a single method to do everything, it's counter to the single responsibility principle, but if that's what you want to do, you can but it's not what I would recommend. If you have a callback method that you want to convert into a suspend method, you ahve to use
suspendCoroutine
or
suspendCancelableCoroutine
and then loop inside your flow.
you wouldn't call
emit
inside the
suspendCoroutine
method, you'd return a value and then you'd emit that value in the flow
Copy code
fun myFlow(): Flow<Result> = flow {
    // emit other stuff
    val result = foo()
    if (result) emit(Result.Success)
    // other stuff
}

suspend fun foo() = suspendCancellableCoroutine<Boolean> { cont ->
    val callback = object : SomeInterface {
        override fun onSuccess() {
            cont.resume(true)
        }

        override fun onFailure() {
            cont.resume(false)
        }
    }
    api.register(callback)
    cont.invokeOnCancellation { api.unregister(callback }
}
p

Patrick Steiger

10/12/2023, 5:16 AM
It is possible to achieve something like you want but you’re thinking of bidirectional communication with the flow. That is doable but I feel like you will be better served if choosing a different approach altogether
s

Stanislav Kral

10/12/2023, 6:12 AM
I understand that this breaks the single responaibility, however, I am not sure how else would I be able to make this business logic reusable: • start a tx • based on tx result either ◦ print ▪︎ give ability to retry printing ◦ finish
Copy code
// store locally
val txResult = performTx()

fun print() {
    if (txResult.success) {
        val printResult = printTxResult(txResult)
        if (!printResult) {
             displayPrintError = true
        }
    }
}

print()

// somewhere in UI layer

if (displayPrintError) {
    showPrintErrorDialog(
        onDismiss = {
            print() 
        }
    )
}
A bidirectional communication with the flow, as Patrick suggested, would make it work, as I would be able to reuse that piece of logic elsewhere. Note that printing a receipt after a TX is finished is in most cases mandatory (except for a critical printer failure)- that's why I'd like to couple it with starting a transaction and inspecting its result.
In reality, the flow may involve more steps/pieces of logic, that's why I'd like to couple it so I am able to reuse it.
p

Patrick Steiger

10/12/2023, 6:07 PM
You can use flatmapped flows to implement this retry logic specific for a subsection of the logic. Define a flow for the logic you want to be retry able.
Copy code
val retryCmnds = MutableSharedFlow<Unit>()
val txflow = flow {
   something()
}.flatMapLatest {
  retryCmnds.onStart { emit(Unit) }.map {
    print()
  }
}

fun retry() {
  launch { retryCmnds.emit(Unit)
}
s

Stanislav Kral

10/13/2023, 7:17 AM
Thanks for the suggestion of usiing
MutableSharedFlow
, I've tried doing something like this:
Copy code
flow {       
    // perform tx, emit multiple events
    // ...
    emit(Event.TransactionFinished)
    
    // notify that receipt is being printed
    emit(Event.PrintingReceipt)

    val retryFlow = MutableSharedFlow<Boolean>()

    while (true) {
        val error = printResult()
        if (error == null) {
            emit(Event.PrintFinished)
            return@flow
        }

        emit(Event.PrintFailed(
            error = error,
            retryCallback = {
                retryFlow.tryEmit(true)
            }
        ))

        val retry = retryFlow.first() && currentCoroutineContext().isActive

        if (!retry) {
            return@flow
        }
    }
}
Are there any obvious mistakes?
2 Views