Hey, I am trying to create a polling mechanism usi...
# announcements
o
Hey, I am trying to create a polling mechanism using
sharedflow
which would be eventually replaced by network call polling every few seconds. This is what I am got so far but still not what I expect, my expectation 1. Start polling once at least one terminal operator is present
collect
2. Stop polling when no terminal operator 3. Share same value across all the terminal operator
collect
, but as you can see, the values are different. Also to add, when I
cancel
a
pollingScope
, the
send
function is still called since the flow is active and keeps throwing
exception
due to job cancellation. Help much appreciated!!!
Copy code
@ExperimentalCoroutinesApi
    fun poll(scope: CoroutineScope) = callbackFlow {
        invokeOnClose { Timber.e("Debug: channel closed") }
        while (true) {
            try {
                val nextInt = Random.nextInt(0, 100)
                Timber.e("Debug: nextInt -> $nextInt")
                send(nextInt)
                delay(MIN_REFRESH_TIME_MS)
            } catch (throwable: Throwable) {
                Timber.e("Debug: error -> ${throwable.message}")
            }
        }
    }.shareIn(scope = scope, replay = 1, started = SharingStarted.WhileSubscribed())
Copy code
pollingScope.launch {
                engine.poll(applicationScope).collect {
                    Timber.e("Debug: collect 1 -> $it")
                }
            }
            pollingScope.launch {
                engine.poll(applicationScope).collect {
                    Timber.e("Debug: collect 2 -> $it")
                }
            }
            pollingScope.launch {
                engine.poll(randomScope).collect {
                    Timber.e("Debug: collect 3 -> $it")
                }
            }
Copy code
$poll: Debug: nextInt -> 59
$poll: Debug: nextInt -> 68
$poll: Debug: nextInt -> 73
$poll$3$invokeSuspend$$inlined$collect: Debug: collect 2 -> 73
$poll$2$invokeSuspend$$inlined$collect: Debug: collect 1 -> 59
$poll$4$invokeSuspend$$inlined$collect: Debug: collect 3 -> 68
s
#C1CFAFJSK would be a better place to ask
d
`You going to have to store the result of
shareIn
.
Right now you're creating multiple shared flows
o
more info @Dominaezzz? or rather what you are trying to convey
d
I'm saying you have to do this.
Copy code
val flow = engine.poll(applicationScope)
   pollingScope.launch {
           flow.collect {
                    Timber.e("Debug: collect 1 -> $it")
                }
            }
            pollingScope.launch {
                flow.collect {
                    Timber.e("Debug: collect 2 -> $it")
                }
            }
            pollingScope.launch {
                flow.collect {
                    Timber.e("Debug: collect 3 -> $it")
                }
            }
o
well these
collect
operators can be in different parts of the application, so I suppose I need to convert the poll function to a property and share that property from a function
👍🏼 1
will try that
how do i stop one of the
collect
without affecting others? I see if I cancel one of the scopes, there is an error thrown by the
send
function
d
Odd, you're sure it's the same shared flow right? Might be a bug if so
o
oh is it, if you get the job and cancel it then it works fine, but if you cancel the scope itself, you'll end up with
send
throwing error of job cancel
d
Ah yeah, you can't cancel the scope until your done.
Send will throw cancellation exception, which is not an error
o
I don't understand this error, why was it needed by kotlin to throw it? I wanted those jobs to be canceled and so i did. the flow should have stopped whatever it was doing then and resume again when i add a new operator. is there a correct way of doing this?
d
The only way to make it stop is by throwing an exception. It's the only cooperative way to interrupt a thread.