https://kotlinlang.org logo
#coroutines
Title
# coroutines
k

KotlinLeaner

02/15/2023, 11:31 AM
Hi guys, I am using
MutableSharedFlow
and use
onSubscription
method to detect new
emit
. After first emit I can detect in my
onSubscription
, but after second emit it will not called
onSubscription
. So is this normal behaviour to work like this?
My main project concept is very big, so I cannot add in here, instead I made a very small sample to reproduce my problem. I know this example is very wrong, but I have same scenario in my main project. I am using
MutableSharedFlow
as a
Queue
implementation with single Thread execution with the help of
Mutex
.
ExampleViewModel
Copy code
class ExampleViewModel : ViewModel() {
    val serviceNumber = ServiceNumber()
    val serviceNumberEventFlow = serviceNumber.eventFlow
    val mutex = Mutex()
    var delayCounter = 0

    suspend fun addItem(itemOne: Int = 2, itemTwo: Int = 2): Add {
        return mutex.queueWithTimeout("add") {
            serviceNumberEventFlow.onSubscription {
                serviceNumber.add(itemOne, itemTwo)
                delayCounter++
                if (delayCounter == 1) {
                    delay(1000)
                    Log.w("Delay ", "Delay Started")
                    serviceNumber.add(8, 8)
                }
            }.firstOrNull {
                it is Add
            } as Add? ?: Add("No value")
        }
    }

    suspend fun subItem(itemOne: Int = 2, itemTwo: Int = 2): Sub {
            return mutex.queueWithTimeout("sub") {
            serviceNumberEventFlow.onSubscription {
                serviceNumber.sub(itemOne, itemTwo)
            }.firstOrNull {
                it is Sub
            } as Sub? ?: Sub("No value")
        }
    }

    private suspend fun <T> Mutex.queueWithTimeout(
        action: String, timeout: Long = 5000L, block: suspend CoroutineScope.() -> T
    ): T {
        return try {
            withLock {
                return@withLock withTimeout<T>(timeMillis = timeout, block = block)
            }
        } catch (e: Exception) {
            Log.e("Wrong", " $e Timeout on BLE call: $action")
            throw e
        }
    }
}

class ServiceNumber : Number {
    val eventFlow = MutableSharedFlow<Event>(extraBufferCapacity = 50)
    private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + <http://Dispatchers.IO|Dispatchers.IO>)

    override fun add(itemOne: Int, itemTwo: Int) {
        Log.i("ServiceNumber", " Add event trigger with $itemOne -- $itemTwo")
        eventFlow.emitEvent(Add("Item added ${itemOne + itemTwo}"))
    }

    override fun sub(itemOne: Int, itemTwo: Int) {
        eventFlow.emitEvent(Sub("Item subtract ${itemOne - itemTwo}"))
    }

    private fun <T> MutableSharedFlow<T>.emitEvent(event: T) {
        scope.launch { emit(event) }
    }
}

interface Number {
    fun add(itemOne: Int, itemTwo: Int)
    fun sub(itemOne: Int, itemTwo: Int)
}

sealed class Event
data class Add(val item: String) : Event()
data class Sub(val item: String) : Event()
MainActivity.kt
Copy code
class MainActivity : AppCompatActivity() {

    private val viewModel: ExampleViewModel by viewModels()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContent {
            Theme {
                Column {
                    Button(onClick = {
                        lifecycleScope.launchWhenCreated {
                            withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
                                val result = viewModel.addItem()
                                Log.e("Result", "$result")
                            }
                        }
                    }) {
                        Text("Add")
                    }
                    Button(onClick = {
                        lifecycleScope.launchWhenCreated {
                            withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
                                val result = viewModel.subItem()
                                Log.e("Result", "$result")
                            }
                        }
                    }) {
                        Text("Sub")
                    }
                }
            }
        }
    }
}

@Composable
fun Theme(content: @Composable () -> Unit) {
    MaterialTheme(content = content)
}
Problem When I am click on Add Button first time,
viewmodel.addItem(...)
->
...
->`ServiceNumber.add()` will trigger and
emit
the value and we can see log in console. Inside the Add Button function, I was also added a
delay
to trigger
ServiceNumber.add()
again to see that
onSubscription
will be also retrigger or not.
MutableSharedFlow
emit the value as I can see in log but
onSubscription
method not called. I don't understand what is the problem in here.
w6cCu.png
r

rkeazor

02/15/2023, 1:04 PM
onSubscription only happens once I believe
k

KotlinLeaner

02/15/2023, 1:12 PM
Is there not a better way to onSubscription every time?
r

rkeazor

02/15/2023, 1:14 PM
Not sure what your trying to do. You said you wanted to detect new emissions.. that sounds like a new usecase
k

KotlinLeaner

02/15/2023, 1:17 PM
Yes, I want to detect
emitEvent
every new changes. So how can we do that?
g

gildor

02/15/2023, 1:25 PM
onEach called on every emit,
k

KotlinLeaner

02/15/2023, 1:26 PM
Sorry to ask you but
onNext
is just like
emit
?
where should I put this
onNext
?
g

gildor

02/15/2023, 1:27 PM
onEach is an extension function of Flow
onNext called on every item emitted by Flow
Oh, really sorry, I meant onEach
Old habit from RxJava where it called onNext for Observable
k

KotlinLeaner

02/15/2023, 1:30 PM
onEach
in my addItem function? instead of using emit
g

gildor

02/15/2023, 1:31 PM
Sorry, maybe I misunderstood your case, but isn't you need a callback which is Invoked on every emit of new value?
Emit is to send value to Flow, onEach is a callback for every value emitted
k

KotlinLeaner

02/15/2023, 1:33 PM
yes I need a callback to invoke every new value emitted.
Ok I'll try to use
onEach
to check values.
I tried to use
serviceNumberEventFlow.onEach
but it not display new emit
Copy code
suspend fun addItem(itemOne: Int = 2, itemTwo: Int = 2): Add {
    return mutex.queueWithTimeout("add") {

        serviceNumberEventFlow.onEach {
            Log.e("TAG", "New Event $it " )
        }

        serviceNumberEventFlow.onSubscription {
            serviceNumber.add(itemOne, itemTwo)
            delayCounter++
            if (delayCounter == 1) {
                delay(1000)
                Log.w("Delay ", "Delay Started")
                serviceNumber.add(8, 8)
            }
        }.firstOrNull {
            it is Add
        } as Add? ?: Add("No value")
    }
}
What should I am doing wrong in here?
Screenshot 2023-02-15 at 13.40.15.png
g

gildor

02/15/2023, 2:33 PM
You cannot just add onEach, it returns flow, you have to use it, it doesn't change original flow
It's how all flow operators work
So in your case add onEach after serviceNumber.eventFlow.onEach
k

KotlinLeaner

02/15/2023, 2:45 PM
So should I called 2 times
onEach
inside
addItem
?
m

muliyul

02/15/2023, 6:01 PM
In order for
onEach
to fire you need to chain it to the rest of the flow.
Copy code
// wont work
flow.onEach(::println)

flow.collect()

// works
flow.onEach(::println).collect()
u

uli

02/15/2023, 9:24 PM
Wouldn’t
firstOrNull
terminate the whole flow after first emission? Then onEmit will also stop fiering
make it
Copy code
serviceNumberEventFlow.onEach {
            Log.e("TAG", "New Event $it " )
        }.collect()
Or go straight with
Copy code
serviceNumberEventFlow.collect {
            Log.e("TAG", "New Event $it " )
        }
But be aware that collect will suspend until the flow terminates. you you might want to wrap all of that into a
launch
k

KotlinLeaner

02/16/2023, 11:30 AM
Perfect it works. Thanks guys
19 Views