ursus
03/14/2021, 2:13 PMSimon Lin
03/15/2021, 6:47 AMScott Kruse
03/15/2021, 4:03 PMReceiveChannel
. MainScope
is a basic scope tied to Dispatchers.Main
-- For some reason, despite calling cancel()
from the calling viewmodel's onCleared
function, This job continues to run after back pressing and reentering the app. If i change the scope to viewmodelScope
the coroutine is cancelled appropriately. Can anyone shed some light on this?
fun run(params: Params, onEachResult: (any: Any) -> Unit) {
mainScope.job = mainScope.launch {
run(params).consumeEach(onEachResult)
}
}
fun cancel() {
mainScope.job.cancel()
}
internal class MainScope(private val mainContext: CoroutineContext) : CoroutineScope {
var job: Job = Job()
private val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
showError(throwable)
}
override val coroutineContext: CoroutineContext = mainContext + job + exceptionHandler
fun showError(t: Throwable) {
Timber.e(t)
throw t
}
}
melatonina
03/15/2021, 4:13 PMimport java.util.concurrent.Executors
import kotlinx.coroutines.asCoroutineDispatcher
import kotlin.concurrent.thread
fun highPriorityContext() = Executors.newSingleThreadExecutor { runnable ->
thread(priority = Thread.MAX_PRIORITY) { runnable.run() }
}.asCoroutineDispatcher()
Simon Lin
03/16/2021, 6:40 AMSharedFlow
(replay is 1) like Channel::receive
? for example:
suspend fun generateSomething() : String {
val result = _channel.receive() // Change Channel to SharedFlow
return "Result is $result"
}
Doru N.
03/16/2021, 4:38 PMtimeout
operator on Kotlin Flow? Namely, for my use case, I want to emit values from a Flow downstream, until a timeout happened (timeout duration should reset after every new emission), then stop emitting (no error) ?
The closest impl I see to what I need is the debounce
operator, with the diff that I need to close / complete the flow when timeout happens.knthmn
03/17/2021, 8:34 AMDispatchers
to a test dispatcher when testing?
https://craigrussell.io/2019/11/unit-testing-coroutine-suspend-functions-using-testcoroutinedispatcher/adambl4
03/17/2021, 10:48 AM@Test
fun test() {
val job = GlobalScope.launch(CoroutineName("my_name")) {
delay(1000)
}
assertEquals("my_name", job[CoroutineName]?.name)
}
Shabinder Singh
03/17/2021, 3:53 PMAndrey B.
03/17/2021, 4:16 PMviewModelScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) { someFunction() }
....
fun someFunction(){}
It's compiling!!! But whyyyy? Shouldn't someFunction() be suspendable??Astronaut4449
03/17/2021, 5:28 PMsuspend fun <http://java.io|java.io>.File.readTextAsync(charset: Charset)
or suspend fun java.nio.file.Path.readTextAsync(charset: Charset)
that is cancellable?Denis Sazonov
03/18/2021, 9:37 AMigor.wojda
03/18/2021, 11:05 AMfal
03/19/2021, 12:45 AMprivate val refreshFlow =
MutableSharedFlow<Unit>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST).apply {
tryEmit(Unit)
}
init {
fetchNews()
}
fun refreshData() {
refreshFlow.tryEmit(Unit)
}
private fun fetchNews() {
viewModelScope.launch {
refreshFlow
.flatMapLatest { fetchLatestNewsFlow() }
...
.collect()
}
Is this ok? Are there any better solutions?Lilly
03/19/2021, 1:37 AMprivate val packetChannel: Channel<ResponsePacket> by lazy { Channel(Channel.UNLIMITED) }
that receives ResponsePacket
which is just a data class which holds a ByteArray
. After making a request to the API that produces the packets, I offer it via the channel (packets are only offered on request and communication is sequential, so request waits until packet is offered). I would like to consume these packets via flow but some packets are different which require different actions on collection, see screenshot. I'm clueless how to achieve this. For case 1 I would have to request 3 packets and while packet A is received I would parse it but then would have to wait until packet B + C is received and parsed and then merge the results of these and emit the result or otherwise case 2. That's the part I can't get my head around. Some help is much appreciated 🙏Sudhir Singh Khanger
03/19/2021, 9:21 AMrmyhal
03/19/2021, 9:32 AMwithTimeout(time) { operation }
but without canceling the operation?
What I want to achieve: If an operation takes longer than N
millis I want to have a callback about this and I want the operation to continue working. Something like:
withTime(
time = 500L,
block = { operation },
onTimeOver = { }
)
Marco Pierucci
03/19/2021, 9:56 PMprivate val _event = MutableSharedFlow<S>(extraBufferCapacity = 1)
val event: SharedFlow<S> get() = _event.asSharedFlow()
be a correct alternative to Channels
for single shots events? Or would events would still be dropped if there are no subscribersFredrik Rødland
03/20/2021, 4:37 PMpackage samples
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicLong
fun main() {
val c = AtomicLong()
for (i in 1..1_000_000L)
GlobalScope.launch {
c.addAndGet(i)
}
println(c.get())
}
PS! if this is better suited as question in #getting-started please tell me and I’ll move it there.natario1
03/20/2021, 6:37 PMcombine(flow1, flow2, ..., flowN, transform)
that does not wait for all flows to emit the first value? I'd happily receive a smaller array if some flow does not have a value yet.Ch8n
03/21/2021, 9:12 AMthis▾
Natsuki(开元米粉实力代购)
03/22/2021, 7:56 AMRemy Benza
03/22/2021, 9:28 AMSupervisorJob
?Se7eN
03/22/2021, 4:43 PM<http://Dispatchers.IO|Dispatchers.IO>
or Dispatchers.Default
for drawing with canvas on a bunch of bitmaps (GIF frames)?Daniele B
03/22/2021, 5:12 PMval commonTest by getting {
dependencies {
implementation(kotlin("test-common"))
implementation(kotlin("test-annotations-common"))
implementation ("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.3")
}
}
and I am getting this:
org.gradle.internal.resolve.ArtifactNotFoundException: Could not find kotlinx-coroutines-test-1.4.3-samplessources.jar (org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.3).
therealbluepandabear
03/23/2021, 4:49 AMnatario1
03/23/2021, 10:16 AMcoroutineScope { }
vs. just executing them in the outer scope? Assuming the outer context has a regular Job
.Lilly
03/23/2021, 4:54 PMwhile (!packetChannel.isEmpty) {
packetChannel.receive()
}
in flows when I convert the channel to a flow:
packetChannel.receiveAsFlow()
Edit: I would like to consume the channel as a flow, but instead of collect all values then parse I would like to collect one value, parse it, wait for next one then emit final resultBrian Dilley
03/23/2021, 5:49 PMtrue
result that it finds):
override fun emailExists(email: String): Boolean {
val results = shards.asyncAll { userDao.emailExists(email) }
return runBlocking {
results.map { it.await() }
.firstOrNull { it }
} ?: false
}
the shards.asyncAll method is:
fun <T> async(
shardId: Long,
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T): Deferred<T> {
return scope.async(context, start) {
selectShard(shardId)
block()
}
}
fun <T> asyncAll(
shardIds: Collection<Long> = this.shardIds,
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T): List<Deferred<T>> {
return shardIds.map { async(it, context, start, block) }
}
So first question is if that code above (emailExists
) will work, and the second question is: how can i change it so that the first Deferred<Boolean>
result that returns that is true
returns from the method (rather than it testing them in order as they come back - I’d like it so that the very first one that comes back with a result of true
triggers a return)therealbluepandabear
03/23/2021, 7:41 PMtherealbluepandabear
03/23/2021, 7:41 PMjw
03/23/2021, 7:52 PMtherealbluepandabear
03/23/2021, 7:54 PMephemient
03/23/2021, 7:54 PMjw
03/23/2021, 7:54 PMhttps://www.youtube.com/watch?v=YrrUCSi72E8▾
ephemient
03/23/2021, 7:56 PMtherealbluepandabear
03/23/2021, 8:19 PMursus
03/23/2021, 8:55 PMNatsuki(开元米粉实力代购)
03/24/2021, 6:17 AM