alex
08/26/2021, 6:21 PMPatrick Ramsey
08/27/2021, 3:31 AM/**
* Returns the context of the current coroutine.
*/
@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
get() {
throw NotImplementedError("Implemented as intrinsic")
}
That is a wild bit of code. How does a suspend inline val
exist in the language, and how is it that all it takes to make it work is to suppress the compiler warning that that’s not allowed?frankelot
08/27/2021, 7:50 AMexcelent video▾
suspending
function is transformed into a regular function that takes an additional parameter Continuation<OriginalReturnType>
... (emphasis in OriginalReturnType
)
But then in the second screenshot, the same Continuation
seems to be used to call multiple suspending functions which different return types (requestToken
and createPost
)
I would think suspend fun requestToken
is transformed to requestToken(Continuation<Token>)
and that suspend fun createPost
should be createPost(Continution<Post>)
right? So how come we can pass sm
into both?Greg Hibberd
08/27/2021, 2:20 PMfun getDefault(): Any {
check(false) { "failed"}
TODO()
}
fun method(param: Any = getDefault()) = flow<Any> {
TODO()
}
i.e avoiding loads of these when you have 2+ default values:
fun method() = flow {
method(getDefault())
.collect {
emit(it)
}
}
Scott Whitman
08/28/2021, 1:26 PMAyfri
08/29/2021, 9:00 AMrunBlocking { //code }
everytime or is there a way to intercept the upper suspend ?Hans Ellegard
08/29/2021, 5:00 PM// You are not allowed to change this class.
class Foo : AutoCloseable {
private val outstream = PipedOutputStream()
private val instream = PipedInputStream(outstream)
override fun close() {
instream.close()
}
fun blockingCall() {
instream.read() // blocks the thread until e.g. the stream is closed
}
}
/**
* Wraps the AutoCloseable usage so that the AutoCloseable is closed when the coroutine is cancelled.
*/
suspend fun <T: AutoCloseable> closeOnCancel(block: (T) -> Unit) {
// How do I implement this function?
// I have tried using suspendCancellableCoroutine, but I think I misunderstood that function.
}
fun cancelBlockingCall() {
val foo = Foo()
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
withTimeout(100) {
foo.closeOnCancel { it.blockingCall() }
}
println("Yay, arriving here means problem solved")
}
}
Could I get some help or pointers to documentation regarding how to implement closeOnCancel
in general?frankelot
08/29/2021, 9:11 PMCoroutineScope
interface. I get that coroutines builders like launch
are extension functions on that class, but why can’t they just be extension function on CoroutineContext
instead?Jan Groen
08/29/2021, 9:19 PMDispatchers.Main
dispatcher (other than using the ServiceLoader
). This would be useful, because a lot of the data needs to be updated from the main game loop. I'm aware that I could create my own dispatcher and I have done that. I just think that it would be more ideomatic if I could use the Dispatchers.Main
Dispatcher.Lilly
08/30/2021, 5:08 PM// In ViewModel
fun toggleScan() {
when {
isScanning.value -> stopScan()
else -> {
startScan()
presenterScope.launch(Dispatchers.Default) {
delay(DISCOVERY_TIMEOUT)
}
}
}
}
What is an elegant way to avoid creating a new coroutine on every button click?j0bro
09/01/2021, 9:52 AMLinuxArm32Hfp
version of coroutines?Андрей Коровин
09/01/2021, 6:29 PMjava.lang.IllegalStateException: This job has not completed yet
@Test
fun testHotFlow() = coroutinesTestRule.testDispatcher.runBlockingTest {
val eventChannel = Channel<String>()
val events = eventChannel.receiveAsFlow()
val event = events.first()
assertEquals(event, null)
}
2. This test doesn’t fail
@Test
fun testHotFlow() = coroutinesTestRule.testDispatcher.runBlockingTest {
val eventChannel = Channel<String>()
val events = eventChannel.receiveAsFlow()
launch {
eventChannel.send("First element")
}
val event = events.first()
assertNotNull(event)
}
Colton Idle
09/02/2021, 4:32 AMPatrick Ramsey
09/02/2021, 6:40 AMsuspendCancellableCoroutine {}
and withTimeout {}
, and I’m running that test using a TestCoroutineDispatcher. In particular, I’m testing that the timeout fires if cont.resume()
is never called within suspendCancellableCoroutine {}
.
My test runs the code in question inside a <test coroutine dispatcher>.runBlockingTest {} block.
What I’m finding is that sometimes, the test passes as expected. But other times (it really feels nondeterministic and timing-related --- it changes depending on which tests are called before it, and whether or not it’s called in a debugger), instead of hitting the timeout, the internal call to dispatcher.advanceUntilIdle()
leaves the test dispatcher idle but the coroutine still running, resulting in an IllegalStateException.Florian
09/02/2021, 10:04 AMgetLongestCompletedStreak
can take a while if the List is large. Will this setup do the list calculation on a background thread?
private val taskStreaksMax =
allTasksWithTaskStatisticsFlow.map { allTasksWithTaskStatistics ->
allTasksWithTaskStatistics.map { taskWithTaskStatistics ->
TaskStreak(
taskWithTaskStatistics.task.id,
taskWithTaskStatistics.taskStatistics.getLongestCompletedStreak()
)
}
}.flowOn(defaultDispatcher)
Orhan Tozan
09/02/2021, 10:40 AMval jvmTest by getting {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-test:1.5.2")
}
}
Vsevolod Tolstopyatov [JB]
09/02/2021, 11:31 AMkotlinx.coroutines
1.5.2 is here!
• Kotlin 1.5.30 with new K/N Apple Silicon targets
• JS bug fixes: proper Dispatchers.Default
on React Native, proper onUndeliveredElement
for JS Channels
• Various Mutex
optimizations
Full changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.5.2Florian
09/02/2021, 12:42 PMsuspend fun List<TaskStatistic>.getCurrentCompletedStreak(): Int =
withContext(Dispatchers.Default) {
var currentStreak = 0
for (statistic in this@getCurrentCompletedStreak.reversed()) {
if (!statistic.taskCompleted) break
currentStreak++
}
currentStreak
}
nilTheDev
09/03/2021, 12:22 PMawait
extension function is wrapping the inherently blocking call inside it to make it work with coroutines. As far as my understanding goes the enqueue
method submit the call into some kind of thread pool that waits in the queue until executed.
Question 1: If I throw 100_000
requests asynchronously what would be the mechanisms that would prevent the program from being crashed? Would that only be the queue that the enqueue
method submit to? Or there are some other mechanisms that suspendCoroutine
provides?
Question 2: What exactly is the role that suspendCoroutine
plays here? Since the extension function Call<T>.await()
is a suspending function why can't it just use the continuation that this function would receive to write the callback? There must be some extra functionality that suspendCoroutine
provides other than access to the continuation. What exactly is that?maxmello
09/03/2021, 12:59 PMApplication
class and always reference them via the Application
instance, does it make any difference if I use GlobalScope.launch
to launch a coroutine, or let my Application
extend CoroutineScope
and pass in the Application
instance to the class to do application.launch
instead? I ask because of the (new?) warnings about DelicateCoroutinesApi
when using GlobalScope. When I use the second approach, the warning goes away, but I would assume they are functionally the same as both the Application’s coroutineScope and GlobalScope live the whole time the app is running? (Note this is mostly about doing things once asynchronously on start of the application)Didier Villevalois
09/03/2021, 4:30 PMprivate class JobMap<K> {
private val jobs = atomic(mapOf<K, Job>())
fun add(key: K, factory: () -> Job) {
jobs.update {
val job = factory()
job.invokeOnCompletion { cause ->
if (cause == null || cause !is CancellationException) jobs.update { it - key }
}
it + (key to job)
}
}
fun remove(key: K) {
jobs.getAndUpdate { it - key }[key]?.cancel()
}
}
Krzysiek Zgondek
09/03/2021, 8:25 PMscope: CoroutineScope
and i have launched a job in that scope:
val context = Dispatchers.Default + CoroutineActionId(id)
val newJob= scope.launch(context) { ... }
given that CoroutineActionId(id)
is an AbstractCoroutineContextElement
how can i reach it having only scope
?
I've tried like this:
val children = scope.coroutineContext[Job]?.children
val activeAction = children?.firstOrNull { it[CoroutineActionId]?.id == id }
if (activeAction != null) {
activeAction.cancelChildren()
activeAction.join()
}
Job
are different than jobs refs returned from launch
but the amount is correctval children = scope.coroutineContext[Job]?.children
list but when i try to get CoroutineActionId
like this
children?.forEach {
println("#state: caid = " + it[CoroutineActionId]?.id)
}
it doesn't existJorge Castillo
09/04/2021, 10:52 AMChannel
does this, but I'm trying to manually write a Queue
that suspends when enqueing an element and the queue is full, and also suspends when dequeing and the queue is empty
class Queue<T>(private val limit: Int = 5) {
private val elements: MutableList<T> = mutableListOf()
suspend fun enqueue(t: T): Unit = suspendCancellableCoroutine { cont ->
var enqueued = false
while (!enqueued) {
if (elements.size < limit) {
elements.add(t)
enqueued = true
cont.resume(Unit)
}
}
}
suspend fun dequeue(): T = suspendCancellableCoroutine { cont ->
var dequeued = false
while (!dequeued) {
if (elements.isNotEmpty()) {
val element = elements.first()
elements.remove(element)
dequeued = true
cont.resume(element)
}
}
}
}
Adding a question about how to test this in a thread.Erik Dreyer
09/04/2021, 8:42 PMJoão Rodrigues
09/04/2021, 11:31 PMKafkaProducer
, and it's send
method (non-suspending, native java function). If I understood correctly, the coroutines for this purpose should be in the <http://Dispatchers.IO|Dispatchers.IO>
context, correct? Can the number of emitting coroutines exceed the number of threads of my machine, given that the send function is non-suspending?Colton Idle
09/05/2021, 8:01 PMval loggedIn: Flow<Boolean> = prefs.data
.map { preferences ->
preferences[authenticated] ?: false
}
This is my first time working with Flow, so I might be misunderstanding it's usage, but my use case is that when my app starts, I'm reading from prefs
and I need to know (at the moment the app starts) whether I'm logged in or logged out, but then every time after the first time, I want it to be async. Is there some typical pattern for when you want a flow... BUT you want to have the actual first item in the Flow before you go any further?uli
09/05/2021, 8:37 PMprefs.data.onStart{ emit (initialPrefs) }
do the job?
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-start.htmlLilly
09/06/2021, 11:32 AMval response = readPacketFlow().first { response -> response is Acknowledgement }
when (response) {
Acknowledgement -> true
else -> false
}
Edit: I would like to return true or false from flow if the first item matches a given predicate.eirikb
09/06/2021, 7:46 PMTower Guidev2
09/07/2021, 7:34 AMTower Guidev2
09/07/2021, 7:34 AMFlow
Once then "disable" it.
I have a Room database table where I have status values per "Action".
The status can be SUCCESS
, EMPTY
, FAILURE
each status row has an ARCHIVED
column that is either true or false.
The DB table resembles this
data class OutcomeDO(
@ColumnInfo(name = "name") val name: String,
@ColumnInfo(name = "outcome") val outcome: Outcome,
@ColumnInfo(name = "archived") val archived: Boolean = false,
) {
@PrimaryKey(autoGenerate = true)
@ColumnInfo(name = "result_local_id")
var resultLocalId: Long = 0L
}
My DAO resembles this
@Query("SELECT * from result_table WHERE name = :name AND archived = :archive")
fun fetch(name: String, archive : Boolean = false): Flow<OutcomeDO>
I query this table like this:-
return database.outcomeDAO().fetch(name = Fred::class.java.name).filterNotNull().flatMapLatest { flowOf(it.outcome) }
and consume the Flow like this:-
outcome.take(1).collect { outcome ->
when (outcome) {
Success -> // display data
Empty -> {
showError(anchorView, R.string.no_data_found)
}
Failure -> {
showError(anchorView, R.string.data_search_failed)
}
}
}
Once the flow has been consumed in the above snippet I wish to set that particular row on the underlying database to ARCHIVED
= true
How can I achieve this?
Is there a Flow operator such as OnCompletion/OnConsumed that I can employ to detect when the flow has been consumed and allow me to update a specific row on my DB table?Joffrey
09/08/2021, 6:46 AMonCompletion
that you can add when you create the flow to detect the end of the collection (normally or exceptionally)