sboishtyan
05/12/2021, 4:48 PMFlow
Observable.create(
{ emitter ->
val process: Process = ProcessBuilder(commandAndArgs)
.redirectErrorStream(true)
.start()
emitter.setCancellation {
process.destroy()
}
)
So idea is that I want when someone starts collecting the flow I will create a process and when a client stops collecting I will destroy the processZach Klippenstein (he/him) [MOD]
05/12/2021, 5:08 PMursus
05/13/2021, 3:51 AMsuspend fun send(someId) {
val currentState = db.getState(someId)
if (currentState == null) error("Unknown id")
if (currentState == Sent) return // idempotent
if (currentState == Sending) .. mutex?
try {
db.setState(Sending, someId)
api.send(..)
db.setState(Sent, someId)
} catch (ex) {
db.setState(Idle, someId)
}
}
This is very common pattern in apps I make.
What would the idiomatic way to make this code thread/coroutine safe?
Obviously, if same id ist being sent concurrently multiple times, state might get smashed
Is it okay to use Mutex around the whole function? Granted, only same ids needs to be serialized -- should I then have a Map<SomeId, Mutex>? Is there maybe some conditional Mutex?
Or maybe not mutex at all, and use some sort of concurrent counter as not to revert to Idle when other coroutine is Sending etc?
Or some Channel magic I dont know?louiscad
05/13/2021, 4:06 AM1.5.0-RC-native-mt
is out!Karan Sharma
05/13/2021, 11:43 AMAbhishek Dewan
05/13/2021, 3:51 PMviewModelScope.launch {
flow1.collect {}
flow2.collect {}
}
dan.the.man
05/13/2021, 5:36 PMfun <T> LifecycleOwner.stateFlow(stateFlow: StateFlow<T>, funCollect: (T) -> Unit) {
lifecycleScope.launchWhenStarted {
stateFlow.collect() {
funCollect(it)
}
}
}
Is how I subscribe to a Stateflow. I then subscribe to that in my fragment. However, very infrequently, my Stateflow doesn't emit.
Timber.d(//This log is hit)
model.data.emit(result)
The log above is hit, in theory I would think it should have emit, but for some reason, my subscription is never hit in my fragment. The correct object is being pushed to/observed, not sure what's happening, any thoughts?Colton Idle
05/14/2021, 2:08 AM@Test
fun `test network call`() {
var response: Response<Projects>
GlobalScope.async {
response = service.getMostPopularProjects()
}
assertThat(response.body().popularProjectSize().toString()).isGreaterThan("1")
}
ms
05/14/2021, 10:07 AMVsevolod Tolstopyatov [JB]
05/14/2021, 3:34 PMtrySend
, tryReceive
, and receiveCatching
instead of error-prone poll
and offer
* Reactive integrations, callbackFlow
, and channelFlow
are promoted to stable API
* CoroutinesTimeout
JUnit5 rule
And a lot more! Full changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.5.0Yan Pujante
05/14/2021, 5:28 PMjava.lang.Process
and although this does NOT trigger an error, the call to waitFor
is flagged as inaprorriate blocking call. Is there a better way to do it? Code in thread to not pollute here...wakingrufus
05/14/2021, 7:12 PMClassCastException: kotlin.coroutines.intrinsics.CoroutineSingletons cannot be cast to ...
I am using suspend fun
as well as Flows, but I am not sure where to start with this.ursus
05/16/2021, 3:09 PMclass Syncer(private val somePartialSyncComponentNeverRelatedToUi) {
private val scope = CoroutineScope(<http://Dispatcher.IO|Dispatcher.IO>)
fun sync() {
scope.launch {
somePartialSyncComponent.doWhatever()
}
}
}
class SomePartialSyncComponentNeverRelatedToUi {
suspend func doWhatever {
withContext here ??? <--------
}
}
Is it not wasteful context switch?hho
05/17/2021, 12:48 PMsuspend fun
, and the "Inappropriate blocking method call" inspection flags my code:
val message = objectMapper.readTree(inputString)
What can I do about it? It's not doing I/O, it's operating on a String.efemoney
05/17/2021, 3:09 PMStanley Gomes
05/18/2021, 3:49 AMviewmodelScope.launch {
val deferredFetchAndPersist = async {
some suspend fun()
}
val deferredTimer = async {
delay(3000)
}
awaitAll(deferredFetchAndPersist, deferredTimer)
hideImageView()
}
This works but I feel it's a little too verbose. Is there a better way I can achieve this?
ThanksPablo
05/18/2021, 3:38 PMfun <T> debounce(
delayMillis: Long = 300L,
scope: CoroutineScope,
action: (T) -> Unit
): (T) -> Unit {
var debounceJob: Job? = null
return { param: T ->
if (debounceJob == null) {
debounceJob = scope.launch {
action(param)
delay(delayMillis)
debounceJob = null
}
}
}
}
Where I can do
fun setDebounceListener(view: Button, onClickListener: View.OnClickListener) {
val clickWithDebounce: (view: View) -> Unit =
debounce(scope = MainScope()) {
onClickListener.onClick(it)
}
view.setOnClickListener(clickWithDebounce)
}
Does it make sense to use debounce here? It's just to avoid double click on a Buttonspierce7
05/18/2021, 4:18 PMnew video on kotlin coroutines 1.5▾
GlobalScope
delicate api legitimate use cases.
It’s my understanding that any time I would create a generic CoroutineScope
, and then not cancel it, I can use GlobalScope
instead. Is that correct? The main hesitation with the GlobalScope
is simply that you can’t cancel it, and thus, it doesn’t lend itself well to hierarchical coroutines, right?.spierce7
05/18/2021, 10:24 PM.value
work? I’m seeing instances where values are `emit`ted from the Flow
, but then the .value
is never updated to reflect the result that was either set on .value
or emit
on the Flow
.Grantas33
05/18/2021, 10:27 PMMarius Metzger
05/19/2021, 3:13 PM@InternalCoroutinesApi
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
if (Bukkit.isPrimaryThread()) return false
// Check if the primary server thread is blocked during the execution of this coroutine.
// If it is, using the Bukkit scheduler would cause a deadlock, as scheduled tasks
// are executed at the end of the tick, which requires the server thread to be running.
// To detect whether the server thread is blocked, we traverse the hierarchy of Jobs upwards
// to check if there's a coroutine running in a [runBlocking] context.
// If so, we don't dispatch, causing the action to be executed on the current thread.
// As the primary thread is locked, this has no thread-safety implications with regards to
// the Bukkit API.
// However, Bukkit methods that check whether they're performed on the server thread,
// such as spawning an entity, will fail exceptionally - if this becomes an issue,
// we can move the [isDispatchNeeded] logic into [dispatch], toggling a global flag while
// directly executing the runnable, and patch the server software to respect this flag
// during those checks.
var job: Job? = context[Job]
while (job != null) {
if (job.javaClass == blockingCoroutineClass) {
// we found a coroutine spawned via runBlocking - check whether it blocks the
// primary server thread using the same logic as [CraftServer.isPrimaryThread]
val blockedThread = Reflect.on(job).field("blockedThread").get<Thread>()
if (blockedThread == MinecraftServer.getServer().serverThread ||
blockedThread == MinecraftServer.getServer().shutdownThread ||
blockedThread is TickThread) {
return false
}
}
if (AbstractCoroutine::class.java.isAssignableFrom(job.javaClass)) {
// traverse context hierarchy upwards
job = Reflect.on(job).field("parentContext").get<CoroutineContext>()[Job]
} else {
throw UnsupportedOperationException("Can't handle jobs of type ${job.javaClass}")
}
}
return true
}
it’s not beautiful, but it worked - until I just updated to kotlinx.coroutines 1.5.0, which removes the field AbstractCoroutine.parentContext
in this commit.
Now my question is: is there still a way to hackily derive the parent Job
from a coroutine?
Or even better - is there a proper solution to this issue that doesn’t require messing with coroutine internals at all?
any help is appreciated 😊frankelot
05/19/2021, 8:32 PMCoroutineScope
s
I have a class that allocates some resources on its constructor init { // }
and the client has to explicitly call .dispose()
when it's done with it (I'm on Android so this is done in onDestroy()
)
lateinit var foo : Foo
onCreate() {
foo = Foo() // resources get allocated (init openGL context, open a file, etc)
}
onDestroy() {
foo.dispose() // must not forget!
}
I was thinking that it would be really nice to have these resources cleaned up automatically (something like RAII in c++)
That got me thinking, if there was a way to get a callback when the parent coroutine scope is being cancelled... I could do the cleanup thenursus
05/20/2021, 2:41 AMDominaezzz
05/20/2021, 11:25 AMKirill Gribov
05/20/2021, 12:33 PMsunnat629
05/20/2021, 3:14 PMprivate val _program = MutableStateFlow(NwProgram())
override val program: StateFlow<NwProgram> = _program
private val stateFlowScope: CoroutineScope by lazy { CoroutineScope(Dispatchers.Default) }
override val entities: StateFlow<List<NwEntity>> = program.transform {
emit(it.entities ?: emptyList())
}.stateIn(
scope = stateFlowScope,
started = WhileSubscribed(5000),
initialValue = emptyList()
)
But during building, it got -
e: java.lang.AssertionError: Unbound symbols not allowed
Execution failed for task ':coresdk:compileDebugKotlin'.
> Internal compiler error. See log for more details
Kotlin v1.5.0
and coroutines v1.5.0
So, why it’s happening? Can anyone help
I was migrating livedata
to stateflow
in my Android project.Slackbot
05/21/2021, 9:06 AMKirill Gribov
05/21/2021, 5:14 PM@JvmInline
value class ValueClassId(private val value: UUID)
class SomeService {
suspend fun outerFails(id: UUID): ValueClassId {
return inner(id)
}
suspend fun outerWorks(id: UUID) = try {
somethingTrowing()
justValueId(id)
} catch (e: RuntimeException) {
fallbackValueId(id)
}
private suspend fun inner(id: UUID) = try {
somethingTrowing()
justValueId(id)
} catch (e: RuntimeException) {
fallbackValueId(id)
}
private suspend fun justValueId(id: UUID): ValueClassId {
delay(1)
return ValueClassId(id)
}
private suspend fun fallbackValueId(id: UUID): ValueClassId {
delay(1)
return ValueClassId(id)
}
private suspend fun somethingTrowing(): Unit {
delay(1)
throw IllegalArgumentException()
}
}
fun main(): Unit = runBlocking {
val s = SomeService()
val id = UUID.randomUUID()
// works fine
s.outerWorks(id)
// Fails on: ContinuationImpl.kt:33
// val outcome = invokeSuspend(param)
// if (outcome === COROUTINE_SUSPENDED) return
s.outerFails(id)
}
dniHze
05/22/2021, 4:58 PMFlow.flatMap
operator? I know that direct replacement for it is Flow.flatMapMerge
, but I'm still wandering why the team decided to avoid regular flatMap
naming for particular operator in a favor of, well, a longer and more self-describing name.
For me it's kinda weird trying to name basically the same thing differently, especially from the common to other libraries and languages perspective. Not trying to convince anybody to rename the operator, just searching for the reasoning.Jimmy Alvarez
05/22/2021, 11:04 PMJimmy Alvarez
05/22/2021, 11:04 PMstreetsofboston
05/22/2021, 11:39 PMJimmy Alvarez
05/22/2021, 11:48 PMstreetsofboston
05/23/2021, 12:12 AMJimmy Alvarez
05/23/2021, 12:42 AMscope.launch(onlyOneThreadDispatcher) {
suspendFunc1()
suspendFunc1()
}
This will be blocking, does not will?ephemient
05/23/2021, 2:17 AMlaunch { suspend1() }; launch { suspend2()
may run interleaved at suspend points even if the executor only has one thread. that's kinda the whole point?streetsofboston
05/23/2021, 3:49 AMscope.launch { suspendFunc1() }
scope.launch { suspendFunc2() }
More like the above.
The calls to launch return immediately and the two suspendFuncs run asynchronously.
In your example, Jimmy, where they are called inside the same one launch call, they run sequentially.
But, sequentially or asynchronously, they are never blocking. They are suspending. Unless you put actual blocking code inside of them (eg Thread.sleep() or waiting for a socket, etc)diesieben07
05/23/2021, 9:12 AMlaunch
starts a new coroutine. So technically the answer is: No, one coroutine only ever executes one suspending function at a time. However coroutines are cheap, so you can just launch as many as you need.Erik
05/23/2021, 3:55 PMJimmy Alvarez
05/24/2021, 2:32 PMsuspendFunc1()
function is a remote call, in my understanding suspendFunc2()
will not be executed until remote call completes, even if it is suspend
So, even if the function is suspend it is blocking the coroutine, isn’t it?Erik
05/24/2021, 3:55 PMsuspend
modifier, it indicates that the function might suspend while running (e.g. of it is awaiting results from a network call). A good implementation will not block, but suspend execution, which will later resume.Jimmy Alvarez
05/24/2021, 4:52 PM