eygraber
07/15/2021, 6:23 AMcallbackFlow
what's the best way to determine if I should still emit? isActive
or !isClosedForSend
(or both)?Timo Gruen
07/15/2021, 10:06 AMrunBlocking {
collection.createIndexes(indexes)
}
I keep get issues because the awaitSingle()
(which is inside the createIndexes(...)
) is being stopped, and no value is being returned.
Any help there?Nikiizvorski
07/15/2021, 1:45 PMoverride suspend fun waitForFix(timeoutMsec: Long, scope: CoroutineScope): Location? {
return getLocation() ?: let {
val channel = Channel<Location?>(1)
val timeout = scope.launch {
delay(timeoutMsec)
channel.send(null)
}
getUpdates { fix ->
scope.launch {
timeout.cancel()
channel.send(fix)
}
}
channel.receive()
}
}
Omkar Amberkar
07/15/2021, 4:42 PMisClosedForSend
flag is still false in case I cancel the job, and there is no other way to close the channelflow that I am aware of. need some help 🙂
fun poll(gameId: String, dispatcher: CoroutineDispatcher, ) = channelFlow {
while (!isClosedForSend) {
try {
send(repository.getDetails(id))
delay(MIN_REFRESH_TIME_MS)
} catch (throwable: Throwable) {
Timber.e("Debug: error -> ${throwable.message}")
}
invokeOnClose { Timber.e("Debug: channel flow closed.") }
}
}
marcinmoskala
07/16/2021, 2:51 PMJob::join
does not resume when coroutine is just done? The documentation suggests that it should, but I cannot make in resume any other way but by crashing or cancelling a coroutine.
import kotlinx.coroutines.*
fun main(): Unit = runBlocking {
val job = Job()
launch(job) {
delay(200)
}
job.join()
}
// Runs forever
florent
07/16/2021, 9:12 PMimport kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow
import org.junit.Test
class ExampleUnitTest {
@Test
fun `kotlin language error maybe`() {
val channel = Channel<String>()
channel.receiveAsFlow()
.map { State.Content(it) as State }
.catch { emit(State.Error) }
.onStart { emit(State.Loading) }
}
sealed class State {
object Loading : State()
object Error : State()
data class Content(val content: String) : State()
}
}
If I keep the as State
the compiler mark is as unnecessary, if I remove it the code doesn't compile. Is there an alternative way to write this so it compiles? I have tried using the <> syntax but it doesn't seems to workmarcinmoskala
07/17/2021, 8:25 AMSupervisorJob
is designed in this way, that it needs to be a part of scope, and does not work as a simple context passed as an argument?
fun main(): Unit = runBlocking(SupervisorJob()) {
launch {
delay(100)
throw Error()
}
launch {
delay(200)
print("Done")
}
}
// Just Error
fun main(): Unit = runBlocking {
with(CoroutineScope(coroutineContext + SupervisorJob())) {
launch {
delay(100)
throw Error()
}
val job = launch {
delay(200)
print("Done")
}
job.join()
}
}
// Error...
// Done
xxfast
07/19/2021, 11:17 PMsuspend { //some suspending calls here }
and this compiles just fine. Is this the same as runBlocking{}
🤔William Reed
07/21/2021, 12:51 PMdata class ApiResponse(
val someId: Long,
val success: Boolean,
)
suspend fun ApiResponse.poll(interval: Long) {
var result = success
while (!result) {
result = sendSomeApiRequest(someId)
delay(interval)
}
}
A requirement I need to support is allowing users of this function to change the interval
value while polling occurs based on outside information. would the most idiomatic way to do this be using a StateFlow
? so my above function would become
suspend fun ApiResponse.poll(interval: StateFlow<Long>)
Pitel
07/22/2021, 6:50 AMblock: suspend () -> T
and block: suspend CoroutineScope.() -> T
?Pitel
07/22/2021, 9:07 AMrunBlocking
for JS?
The project I'm working has this: expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
I tried someting like:
var result: T? = null
val job = GlobalScope.launch { result = block }
while (!job.isCompleted) {}
return result!!
Yeah, I know it has many mistakes, but the main problem is the while loop block the execution of the coroutine.Melih Aksoy
07/22/2021, 9:29 AMprivate var job: Job? = null
set(value) {
println("Setting value")
field = value
}
job = lifecycleScope.launch() {
println("Started")
try {
println("Try")
} catch (e: CancellationException) {
println("Catch")
} finally {
println("Finally")
}
}
prints
Started
Try
Catch
Finally
Setting value
in order.
Which means when using default coroutine start, launch
returns job
reference after contents are executed ( if execution is fast enough ). So there’s no guarantee when you’ll be handled job
reference when you do the assignment. I didn’t find any mention of this in any docs, but I was expecting it to at least wait for reference to be handled before starting execution 🤔
This is probably troublesome in scenarios if you do an if ( job != null ) return
check or similar ( clear in finally
( finally { job = null }
). You may end up job
being assigned and not getting cleared.
P.S. Using CorotuineStart.LAZY
followed by job?.start()
resolves this issue, I’m just asking if is this expected from default implementation 🤔 ?Sudhir Singh Khanger
07/22/2021, 12:45 PMval watcher = object :TextWatcher{
private var searchFor = ""
override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {
val searchText = s.toString().trim()
if (searchText == searchFor)
return
searchFor = searchText
launch {
delay(300) //debounce timeOut
if (searchText != searchFor)
return@launch
// do our magic here
}
}
override fun afterTextChanged(s: Editable?) = Unit
override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit
}
I came across this piece of code as a sample of debouncing. But it seems to me that it basically launches a new coroutine every time onTextChanged
is called and then it executes the code after 300 seconds. That sounds like even a worse idea.andylamax
07/22/2021, 6:43 PMMoses Mugisha
07/23/2021, 7:13 PMimport io.ktor.util.DispatcherWithShutdown
import io.ktor.util.InternalAPI
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.launch
class SQSPoller(private val numWorkers: Int = 10
) : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = <http://Dispatchers.IO|Dispatchers.IO> + supervisorJob
private val stopRequest: CompletableJob = Job()
// this is limited to a pool size of 64 threads. or the number of cores(which ever is higher)
@OptIn(InternalAPI::class)
private val tasksDispatcher = DispatcherWithShutdown(<http://Dispatchers.IO|Dispatchers.IO>)
private lateinit var rootJob: Job
fun start() = launch(tasksDispatcher) {
println("Starting SQS Queue Poller")
println("starting")
val sqsMessageProducer = launchSQSPoller()
val workers = List(numWorkers) {
worker(it, sqsMessageProducer)
}
// distribute work among numWorkers
for (worker in workers) {
worker.join()
}
stopRequest.join()
}.apply {
rootJob = this
}
/** [fetchMessages] long polls the SQS queue for new messages*/
fun fetchMessages(): List<Int> {
return arrayListOf(1,3)
}
/** [launchSQSPoller] fetch sqs messages` .*/
private fun CoroutineScope.launchSQSPoller(): ReceiveChannel<Int> = produce {
loopForever {
val messages = fetchMessages()
messages.forEach {
send(it)
}
}
}
private fun processMessage(message: Int) {
println("sqs: worker ${Thread.currentThread()} processing messages")
println(message)
Thread.sleep(400)
}
/** [loopForever] triggers a function as long as the coroutine scope is active and the unleash flag is set */
private suspend fun loopForever(block: suspend () -> Unit) {
while (true) {
try {
block()
Thread.yield()
} catch (ex: Exception) {
println("coroutine on ${Thread.currentThread().name} cancelled")
} catch (ex: Exception) {
println("${Thread.currentThread().name} failed with {$ex}. Retrying...")
ex.printStackTrace()
}
}
println("coroutine on ${Thread.currentThread().name} exiting")
}
/** [worker] consumes the SQS messages */
private fun CoroutineScope.worker(id: Int, channel: ReceiveChannel<Int>) = launch(tasksDispatcher) {
println("sqs: worker $id processing messages")
// fanout messages
for (message in channel) {
launch { processMessage(message) }
}
}
}
I am implementing a SQS polling job, that polls the queue and does fan out to process the messages, but I keep running into performance issues. What I am I doing wrong?Dean Djermanović
07/24/2021, 8:45 AMUI
, Presentation
, Business
and Data
. I’m launching a coroutines from the Presentation
layer where I have setup backgroundScope
with an exception handler. I want to share a Flow
in the data layer using shareIn
operator. Which scope should I pass to it? If I pass different scope than the backgroundScope
used in the presentation layer, the exceptions won’t be propagated up to the exception handler in that backgroundScope
. Also I want to share that Flow
globally, not in the scope of that ViewModel
in the presentation layer. So the question is how to share a Flow
on the data layer from some repository class which is a singleton and still propagate the exceptions to the exception handler that’s setup in the presentation layer?camdenorrb
07/25/2021, 8:53 PMpackage me.camdenorrb.generaltesting.coroutines
import kotlinx.coroutines.*
object TestBreaking1 {
@JvmStatic
fun main(args: Array<String>) {
runBlocking {
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
delay(10000000)
}
println("Here1")
}
println("Here2")
}
}
This doesn't print "Here2" until the delay is done, is this expected?William Reed
07/26/2021, 9:11 PMfun foo(): Flow<Foo> = flow {
while (context.isActive) {
// ... process and emit ...
delay(someMutableDelay) // wait until doing this again
}
}
someMutableDelay
is a mutable var
in the same class as fun foo()
- when it is changed I want the current running delay
to be cancelled and the next iteration of the loop to start immediately. any suggestions for how to achieve this?Adrien Poupard
07/27/2021, 9:23 AMclass Lambda: (String) -> String {
override fun invoke(p1: String): String {
TODO("Not yet implemented")
}
}
Would it be possible to add the feature to do?
class SuspendLambda: suspend (String) -> String {
override suspend fun invoke(p1: String): String {
TODO("Not yet implemented")
}
}
jeff
07/27/2021, 3:02 PMkotlinx.coroutines.delay
on Android uses Handler.postDelayed
under the hood. That surprised me -- is it true? For Dispatcher.Main only, or all dispatchers?Scott Kruse
07/27/2021, 9:36 PMviewModelScope.launch {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
doStuff()
}
}
suspend fun doStuff() {
doStuffWithRx()
}
fun doStuffWithRx() {
Single.fromCallable {
// hit network
}
.subscribeOn(<http://Schedulers.io|Schedulers.io>())
.observeOn(AndroidSchedulers.mainThread())
.doOnSuccess { result ->
}
.subscribe()
}
TwoClocks
07/28/2021, 1:45 AMenleur
07/28/2021, 6:02 PMwithTimeout
but for the whole scope?azabost
07/28/2021, 6:38 PMsuspendCancellableCoroutine
to bridge some 3rd party API in my app. It's working OK-ish but I'm wondering how reliable it is considering the callback I need to implement requires returning some values, e.g.
suspend fun displayFoo() {
val foo = getFoo()
delay(10_000)
println(foo)
}
suspend fun getFoo(): Foo = suspendCancellableCoroutine { continuation ->
val callback = FooLoader
.addListener(object : RequestListener<Foo> {
override fun onLoadFailed(e: FooException): Boolean {
continuation.resumeWithException(e)
return false
}
override fun onLoadSuccessful(foo: Foo): Boolean {
continuation.resume(foo)
return false
}
})
.load()
continuation.invokeOnCancellation {
callback.cancel()
}
}
How is this actually working?
Does the thread calling onLoadSuccessful
execute the full delay(10_000)
before returning return false
or is it executed as soon as the calling thread is "free" (because delay
is suspending, not blocking)?
I'm also wondering what would happen if the continuation failed e.g.
suspend fun displayFoo() {
val foo = getFoo()
delay(10_000)
throw NPE
}
tad
07/29/2021, 7:42 PMlifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
try {
session.load()
awaitCancellation()
} finally {
runBlocking {
// This is a suspending function that performs I/O and needs to complete.
session.recordBackgroundTime()
}
}
}
}
Tristan
07/30/2021, 4:56 PMclass IosDispatcher : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatch_async(dispatch_get_main_queue()) { block.run() }
}
}
actual class PrimaryScope : CoroutineScope {
private val dispatcher = IosDispatcher()
private val job = Job()
override val coroutineContext: CoroutineContext
get() = dispatcher + job
}
fun <T>observe(origin: Flow<T>, block: (T) -> Unit): PrimaryScope {
val scope = PrimaryScope()
origin.onEach {
block(it)
}.launchIn(scope)
return scope
}
But when I do use it, I am getting the error from the screenshot.
I use it like this (I tried with empty block in case it was related to a reference lost, or similar):
[UnityKotlinCoroutineHelperKt observeOrigin: self.tokenStorage.events
block:^(id result) {
}];
Do you have some example I could use? Or article explaining how to achieve this? Thanks a lot for your lights.Gleno
07/30/2021, 7:24 PMCancellationException
?Jan Skrasek
07/30/2021, 10:10 PMMutableSharedFlow()
and Channel()
with runBlockingTest
I would expect both to behave the same; Yet when not using extraBufferaCapacity = 1, my following test fails for MutableSharedFlow and not for Channel. (See gist)
https://gist.github.com/hrach/94ba0dc427ad2f08a73467d8972516a2
• why is there a difference?
• is both behavior correct?
• is correct that there is a difference?Tuan Kiet
07/31/2021, 4:35 AMHank
07/31/2021, 8:45 AMHank
07/31/2021, 8:45 AMJan Skrasek
07/31/2021, 9:40 AMHank
07/31/2021, 9:42 AMRichard Gomez
08/01/2021, 12:34 AMselect { }
function. Perhaps something like:
// Ugly code
val requests = List<Deferred<String>>(5) {
delay(Random.nextInt(0, 1000).toLong())
async { "Response #$it" }
}
val response = select<String> {
requests.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
requests.forEach { it.cancel() }
"Deferred $index produced answer '$answer'"
}
}
}
Nick Allen
08/01/2021, 4:19 AMFlow
withTimeout(limit) {
merge(::func1.asFlow(),::func2.asFlow(),... ). first()
}
(formatting not working on phone)Jan Skrasek
08/01/2021, 11:45 AMRichard Gomez
08/01/2021, 4:55 PMyour's example seems reasonable, what's not working?Nothing in particular, just confirming that I have the right idea. 🙂 Is there a more idiomatic way to cancel a list of jobs than
requests.forEach(Job::cancel)
?
I suppose you could run them in an ad-hoc context and cancel that once the first is completed.