KotlinLeaner
02/15/2023, 11:31 AMMutableSharedFlow
and use onSubscription
method to detect new emit
. After first emit I can detect in my onSubscription
, but after second emit it will not called onSubscription
. So is this normal behaviour to work like this?MutableSharedFlow
as a Queue
implementation with single Thread execution with the help of Mutex
.class ExampleViewModel : ViewModel() {
val serviceNumber = ServiceNumber()
val serviceNumberEventFlow = serviceNumber.eventFlow
val mutex = Mutex()
var delayCounter = 0
suspend fun addItem(itemOne: Int = 2, itemTwo: Int = 2): Add {
return mutex.queueWithTimeout("add") {
serviceNumberEventFlow.onSubscription {
serviceNumber.add(itemOne, itemTwo)
delayCounter++
if (delayCounter == 1) {
delay(1000)
Log.w("Delay ", "Delay Started")
serviceNumber.add(8, 8)
}
}.firstOrNull {
it is Add
} as Add? ?: Add("No value")
}
}
suspend fun subItem(itemOne: Int = 2, itemTwo: Int = 2): Sub {
return mutex.queueWithTimeout("sub") {
serviceNumberEventFlow.onSubscription {
serviceNumber.sub(itemOne, itemTwo)
}.firstOrNull {
it is Sub
} as Sub? ?: Sub("No value")
}
}
private suspend fun <T> Mutex.queueWithTimeout(
action: String, timeout: Long = 5000L, block: suspend CoroutineScope.() -> T
): T {
return try {
withLock {
return@withLock withTimeout<T>(timeMillis = timeout, block = block)
}
} catch (e: Exception) {
Log.e("Wrong", " $e Timeout on BLE call: $action")
throw e
}
}
}
class ServiceNumber : Number {
val eventFlow = MutableSharedFlow<Event>(extraBufferCapacity = 50)
private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + <http://Dispatchers.IO|Dispatchers.IO>)
override fun add(itemOne: Int, itemTwo: Int) {
Log.i("ServiceNumber", " Add event trigger with $itemOne -- $itemTwo")
eventFlow.emitEvent(Add("Item added ${itemOne + itemTwo}"))
}
override fun sub(itemOne: Int, itemTwo: Int) {
eventFlow.emitEvent(Sub("Item subtract ${itemOne - itemTwo}"))
}
private fun <T> MutableSharedFlow<T>.emitEvent(event: T) {
scope.launch { emit(event) }
}
}
interface Number {
fun add(itemOne: Int, itemTwo: Int)
fun sub(itemOne: Int, itemTwo: Int)
}
sealed class Event
data class Add(val item: String) : Event()
data class Sub(val item: String) : Event()
class MainActivity : AppCompatActivity() {
private val viewModel: ExampleViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
Theme {
Column {
Button(onClick = {
lifecycleScope.launchWhenCreated {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
val result = viewModel.addItem()
Log.e("Result", "$result")
}
}
}) {
Text("Add")
}
Button(onClick = {
lifecycleScope.launchWhenCreated {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
val result = viewModel.subItem()
Log.e("Result", "$result")
}
}
}) {
Text("Sub")
}
}
}
}
}
}
@Composable
fun Theme(content: @Composable () -> Unit) {
MaterialTheme(content = content)
}
viewmodel.addItem(...)
-> ...
->`ServiceNumber.add()` will trigger and emit
the value and we can see log in console. Inside the Add Button function, I was also added a delay
to trigger ServiceNumber.add()
again to see that onSubscription
will be also retrigger or not. MutableSharedFlow
emit the value as I can see in log but onSubscription
method not called. I don't understand what is the problem in here.rkeazor
02/15/2023, 1:04 PMKotlinLeaner
02/15/2023, 1:12 PMrkeazor
02/15/2023, 1:14 PMKotlinLeaner
02/15/2023, 1:17 PMemitEvent
every new changes. So how can we do that?gildor
02/15/2023, 1:25 PMKotlinLeaner
02/15/2023, 1:26 PMonNext
is just like emit
?onNext
?gildor
02/15/2023, 1:27 PMKotlinLeaner
02/15/2023, 1:30 PMonEach
in my addItem function?
instead of using emitgildor
02/15/2023, 1:31 PMKotlinLeaner
02/15/2023, 1:33 PMonEach
to check values.serviceNumberEventFlow.onEach
but it not display new emit
suspend fun addItem(itemOne: Int = 2, itemTwo: Int = 2): Add {
return mutex.queueWithTimeout("add") {
serviceNumberEventFlow.onEach {
Log.e("TAG", "New Event $it " )
}
serviceNumberEventFlow.onSubscription {
serviceNumber.add(itemOne, itemTwo)
delayCounter++
if (delayCounter == 1) {
delay(1000)
Log.w("Delay ", "Delay Started")
serviceNumber.add(8, 8)
}
}.firstOrNull {
it is Add
} as Add? ?: Add("No value")
}
}
What should I am doing wrong in here?gildor
02/15/2023, 2:33 PMKotlinLeaner
02/15/2023, 2:45 PMonEach
inside addItem
?muliyul
02/15/2023, 6:01 PMonEach
to fire you need to chain it to the rest of the flow.
// wont work
flow.onEach(::println)
flow.collect()
// works
flow.onEach(::println).collect()
uli
02/15/2023, 9:24 PMfirstOrNull
terminate the whole flow after first emission? Then onEmit will also stop fieringserviceNumberEventFlow.onEach {
Log.e("TAG", "New Event $it " )
}.collect()
Or go straight with
serviceNumberEventFlow.collect {
Log.e("TAG", "New Event $it " )
}
But be aware that collect will suspend until the flow terminates. you you might want to wrap all of that into a launch
KotlinLeaner
02/16/2023, 11:30 AM