Dmitry Khalanskiy [JB]
06/20/2022, 12:18 PMKy
06/20/2022, 7:05 PMHttpClient
kick off it’s own coroutine to make requests? And do we have any control over that coroutineContext?
I’m having trouble getting ktors HttpClient to behave nicely in regards to synchronization. Unless I wrap the request in withContext(runTest.coroutineContext)
my tests do not wait for ktor request to return and the test will complete prematurelyNorbi
06/21/2022, 7:12 AMrunBlocking()
on each HTTP request?
Does it degrade performance a lot?
Thanks.Nino
06/21/2022, 12:02 PMonStart
and delay
but it's smelly.
Source code:
// I don't control the ConnectivityManager, it's from Android
class ConnectivityRepository(private val connectivityManager: ConnectivityManager) {
fun isInternetAvailableFlow(): Flow<Boolean> = callbackFlow {
val networkCallback = object : ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
trySend(true)
}
override fun onLost(network: Network) {
trySend(false)
}
}
connectivityManager.registerDefaultNetworkCallback(networkCallback)
awaitClose { connectivityManager.unregisterNetworkCallback(networkCallback) }
}
}
Unit test:
@Test
fun `happy path`() = runTest {
// Given
val connectivityRepository = ConnectivityRepository(connectivityManager)
val networkCallbackSlot = slot<ConnectivityManager.NetworkCallback>() // Mocking stuff : I can 'capture' something during the test execution with this
val connectivityManager = mockk<ConnectivityManager>() // Mocking the ConnectivityManager
justRun { connectivityManager.registerDefaultNetworkCallback(capture(networkCallbackSlot)) } // This function is mocked and "wired" with the slot
justRun { connectivityManager.unregisterNetworkCallback(any<ConnectivityManager.NetworkCallback>()) }
// When
val result = connectivityRepositoryImpl.isInternetAvailableFlow().onStart { // Ugly solution but nothing else works...
launch {
delay(1)
networkCallbackSlot.captured.onAvailable(mockk())
}
}.first()
// Then
assertThat(result).isTrue()
}
I need to capture the anonymous class extending ConnectivityManager.NetworkCallback
in order to call it with either onAvaible
or onLost
during my test, but at the same time, since this is a cold flow, it won't run until I collect it. But it won't emit something until I run onAvaible
or onLost
. onStart
is too early, and collect
is never called. I'd need a "afterStart" callback on my Flow or something like that ?Yuriy Dynnikov
06/21/2022, 3:36 PMGlobalScope.launch {...}
, stop working. Looks like an exception gets its way to CoroutineScheduler and breaks it. How can I protect myself from this kind of behavior?
Exception stacktrace:
java.util.concurrent.CancellationException: The task was rejected
at kotlinx.coroutines.ExceptionsKt.CancellationException(Exceptions.kt:22)
at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.cancelJobOnRejection(Executors.kt:169)
at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.dispatch(Executors.kt:131)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:159)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:513)
at kotlinx.coroutines.AwaitAll$AwaitAllNode.invoke(Await.kt:115)
at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1519)
at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:323)
at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:240)
at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:906)
at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:863)
at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:828)
at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:233)
at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:39)
at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@47310ad2[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@5f40bc28[Wrapped task = CancellableContinuation(DispatchedContinuation[java.util.concurrent.ScheduledThreadPoolExecutor@6cea1b4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20], Continuation at core.MLock$invoke$2.invokeSuspend(MLock.kt:35)@1a367720]){Completed}@6f01d5c3]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor@6cea1b4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2070)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.dispatch(Executors.kt:128)
... 19 more
Lukas Lechner
06/21/2022, 5:30 PMhttps://youtu.be/tVDCpjqQ1Ro▾
Igor Kolomiets
06/21/2022, 5:45 PMpublic interface FibonacciService {
CompletableFuture<Long> nextFibonacci(Long number);
}
In my Kotlin code, I’d like to add extension function to this interface to expose nextFibonacci
method as suspended function:
suspend fun FibonacciService.nextFibonacci(number: Long): Long = nextFibonacci(number).asDeferred().await()
Unfortunately, the code above doesn’t work, because nextFibonacci
extension is shadowed by interface original method (I have to rename it to something different, e.g. coNextFibonacci
, suspndNextFibonacci
, deferredNextFibonacci
?). Is there a conventional prefix (or suffix) to distinguish suspendible variants?
Also, would it be nice if Kotlin language allowed for suspend
variants in cases like this (presence of CoroutineScope would allow to figure out which one to use)?liminal
06/22/2022, 4:35 AMazabost
06/22/2022, 7:40 PMsuspend fun
?
I’m asking about it only because I’m trying to optimise some modules’ dependencies (e.g. get rid of unnecessary implementation
/ api
dependencies)Chachako
06/23/2022, 9:39 AMchannelFlow.collect
will resume? If there is a task in the channelFlow
block that doesn’t know when it will complete, when will the coroutines channel close?
I’m a little curious what the criteria is for the channel to determine channelFlow
completion, is it to wait a while without any send
and then end the hang?Fleshgrinder
06/24/2022, 1:56 PMval result = awaitAll(
async { networkFlow.toSet() },
async { filesystemFlow.toList() },
)
val networkResult = result[0] as Set<NetworkResult>
val filesystemResult = result[1] as List<FilesystemResult>
This works exactly as intended, but the ceremony required to get it to work makes it seem even worse than threading and futures. I was hoping for at least …
val (networkResult, filesystemResult) = awaitAll(
async { networkFlow.toSet() },
async { filesystemFlow.toList() },
)
… but this does not exist. Any other way to make this nicer? 🤔Grigory Panko
06/24/2022, 8:37 PMjava.lang.IllegalAccessError: class kotlin.coroutines.jvm.internal.DebugProbesKt (in module kotlin.stdlib) cannot access class kotlinx.coroutines.debug.internal.DebugProbesImpl (in module kotlinx.coroutines.core.jvm) because module kotlin.stdlib does not read module kotlinx.coroutines.core.jvm
on any coroutine launch. I guess it's because Java 17 removed support for --illegal-access
option. I tried to use --add-reads kotlin.stdlib=kotlinx.coroutines.core.jvm
option, but it shows WARNING: Unknown module: kotlin.stdlib specified to --add-reads
on JVM init and still throws error on coroutines access. Any ideas how can I fix it?Slackbot
06/25/2022, 9:17 AMSergei Grishchenko
06/25/2022, 12:11 PMprivate fun generateWork() = channelFlow {
for (i in 1..10) {
val page = "page$i"
println("Generator sent $page")
send(page)
}
close()
println("Generator is closed")
}
private fun CoroutineScope.doWork(id: Int, flow: Flow<String>) = launch {
flow.collect {
println("Worker $id processed $it")
}
println("Worker $id finished")
}
suspend fun performWork() {
try {
coroutineScope {
val workFlow: Flow<String?> = generateWork()
val sharedWorkFlow = workFlow
.onCompletion { cause -> if (cause == null) emit(null) }
.shareIn(this, WhileSubscribed())
.takeWhile { it != null }
.filterNotNull()
val workersCount = 10
List(workersCount) { id ->
val workPartFlow = sharedWorkFlow
.withIndex()
.filter { (index, _) -> index % workersCount == id }
.map { (_, value) -> value }
doWork(id, workPartFlow)
}.joinAll()
cancel()
}
} catch (e: CancellationException) {
println("Work is performed")
}
}
So my questions are:
1. Is there possibility to implement it simpler?
2. Am I use Shared Flow correctly?
3. Are there ways to unsubscribe from Shared Flow aside from canceling of coroutineScope
?
4. Is there way to make Shared Flow finite aside from emitting some special value from onCompletion
(it is null in my case) and use takeWhile
to track it?
Thank youArjan van Wieringen
06/25/2022, 7:40 PM@Test
fun distributedTest2() = runTest {
repeat(1000) {
val (state, updates) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute()
// state is mutablestateflow, updates is mutablesharedflow that also updates state
val otherSource = uuid()
updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Baz", Instant.fromEpochMilliseconds(1))))
updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Bat", Instant.fromEpochMilliseconds(2))))
/* What do do here to make sure I can assert `state` */
assertEquals("Bat", state.value.value)
}
}
Basically I have a MutableStateFlow state
and a MutableSharedFlow updates
. By emitting into updates
I update, among other stuff, also the state
. However, for the life of me I can not see how to test this. In an application this works as expected.
Basically this is the distribute()
function:
data class DistributedMergeable<T : Mergeable<T>>(
val states: MutableStateFlow<T>,
val updates: MutableSharedFlow<Update<T>>
) {
data class Update<T : Mergeable<T>>(val source: UUID, val value: T)
}
fun <T : Mergeable<T>> T.distribute(
updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> {
val source = uuid()
val states = MutableStateFlow(this)
states.onEach { newState ->
println("$newState")
updates.emit(DistributedMergeable.Update(source, newState))
}.launchIn(scope)
updates.onEach { update ->
if (update.source == source) {
println("Same source: $update")
return@onEach
}
if (update.value == states.value) {
println("Same state: $update")
return@onEach
}
val merged = states.value.merge(update.value)
println("Merged: $update -> $merged")
states.value = merged
}.launchIn(scope)
return DistributedMergeable(states, updates)
}
CLOVIS
06/26/2022, 8:28 AMJob
, a CoroutineScope
, a CoroutineContext
or something else entirely?
The guide is fairly clear on what functions should accept, but I didn't find anywhere it mentions classes.oday
06/26/2022, 9:34 PMclass DetermineAuthStatus @Inject constructor(
private val firebaseAuth: FirebaseAuth,
private val populateUserPrefs: PopulateUserPrefs
) {
operator fun invoke() = firebaseAuth.currentUser?.let { currentUser ->
val subject = SingleSubject.create<Boolean>()
subject
.hide()
.doOnSubscribe {
currentUser.getIdToken(false)
.addOnSuccessListener { task ->
populateUserPrefs(currentUser, task.token)
subject.onSuccess(true)
}
.addOnFailureListener {
subject.onError(it)
}
}
} ?: Single.just(false)
}
Brian Estrada
06/26/2022, 9:39 PMSharedFlow
and StateFlow
but none of these seem to keep the items in a buffer once they’re read they’re gone. I would normally use a BehaviorSubject in RxJava for this but haven’t been able to find the equvilant in coroutines
Edit: Nevermind, the question MutableSharedFlow
has the ability to do this (you can set it in the constructor and I missed that somehow) thanksfranztesca
06/26/2022, 11:02 PMconflate
flow operator and throwables...
Basically, when throwing an exception from a flow
that passes through a conflated operator, the re-thrown exception is different from the original one.
It's reproducible with the following code:
@Test
fun myTest() = runBlocking {
val throwable = Throwable()
try {
flow<Unit> { throw throwable }.conflate().collect()
} catch (caught: Throwable) {
// We expect the same exception that we are throwing
assert(throwable === caught) { "Expected $throwable, found $caught" }
// Prints "Expected java.lang.Throwable, found java.lang.Throwable"
}
}
The same test without the .conflate
operator passes.
Can you confirm that what I expect is the actual intended behavior and therefore this is a bug?
I'm on :kotlin: 1.7.0
and coroutines 1.6.3
:thank-you:Berkeli Alashov
06/27/2022, 6:11 AMGopal S Akshintala
06/27/2022, 1:53 PMArjan van Wieringen
06/27/2022, 2:16 PMritesh
06/27/2022, 8:11 PMval channel = Channel<Boolen>
which emit a event when certain action happens and in the consumer application, i have this code sitting there in BaseActivity
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
if(AuthSDK.channel.receive){
// logout from app
}
}
}
It works great, i was wondering if it's the correct way of doing it or if there's a better solution in co-routine stream world.
• Reason why i didn't go with SharedFlow
- events can be lost when producer produces but consumer moves to STOPPED state.
• Current channel i am using is RENDEZVOUS
channels, in this scenario, there is a guarantee for my event to be received and emitted, irrespective of lifecycle changes.
• I realised instead of receive
, consumeAsFlow
or receiveAsFlow
can be used. As this is one-off event does it matter if don't.
• Both sender and receiver channel co-routines are on main thread, that also means, both send
and receive
will be in suspended state. Is it a concern, is there a trade-off in doing so.tylerwilson
06/28/2022, 3:04 PMCompilation failed: Internal compiler error: no implementation found for FUN DEFAULT_PROPERTY_ACCESSOR name:<get-thread> visibility:internal modality:ABSTRACT <> ($this:kotlinx.coroutines.CloseableCoroutineDispatcher) returnType:kotlinx.coroutines.Thread
when building vtable for CLASS CLASS name:MultiWorkerDispatcher modality:FINAL visibility:private superTypes:[kotlinx.coroutines.CloseableCoroutineDispatcher]
at /Users/administrator/Documents/agent/work/8d547b974a7be21f/ktor-utils/posix/src/io/ktor/util/CoroutineUtils.kt (0:0)
CLASS CLASS name:MultiWorkerDispatcher modality:FINAL visibility:private superTypes:[kotlinx.coroutines.CloseableCoroutineDispatcher]
* Source files:
* Compiler version info: Konan: 1.6.21 / Kotlin: 1.6.21
* Output kind: FRAMEWORK
The 1.6.1-native-mt works fine for me. Perhaps this version removed some platforms?Remy Benza
06/28/2022, 5:11 PMval singleDispatcher = Dispatchers.IO.limitedParallism(1)
suspend fun codeBlockA() = withContext(singleDispatcher) { .. }
suspend fun codeBlockB() = withContext(singleDispatcher) { .. }
Never in time will code inside functions codeBlockA
and codeBlockB
be running at the same time. Irrespective of when / how often they are invoked and/or what parent coroutine there are launched from. Correct?Zhanna Gorelova
06/29/2022, 11:11 AMclass CoroutineTest {
@Test
fun `test`() = withContext { context ->
// when
SomethingWithFlow(context)
//then
assertNotNull(context[Job])
}
private fun withContext(context: CoroutineContext = <http://Dispatchers.IO|Dispatchers.IO>, block: suspend (CoroutineContext) -> Unit) {
runBlocking(context) { block(context) }
}
class SomethingWithFlow(override val coroutineContext: CoroutineContext) : CoroutineScope {
private val flow = MutableStateFlow<String>("Initial")
init {
launch {
flow.debounce(30_000L).collectLatest {
ensureActive()
println(it)
}
}
}
}
}
Jan Skrasek
06/30/2022, 9:46 AMArjan van Wieringen
07/01/2022, 5:23 PMMutableSharedFlow
? What I mean is, that I have a routine that both subscribes and emits to the flow. Why would I want this? Well, for instance when I want to send/receive from a Browser BroadcastChannel
:
inline fun <reified T> MutableSharedFlow<T>.broadcast(
channel: BroadcastChannel,
scope: CoroutineScope,
serializersModule: SerializersModule = EmptySerializersModule
) {
val jsonSerializer = Json {
this.serializersModule = serializersModule
}
channel.onmessage = { event ->
val deserialized = jsonSerializer.decodeFromString<T>(event.data as String)
scope.launch {
this@broadcast.emit(deserialized.data)
}
}
this.onEach { data ->
val serialized = jsonSerializer.encodeToString(data)
channel.postMessage(serialized)
}.launchIn(scope)
}
Of course, this will not work and will generate an endless loop, since it receives its own updates. A few options came to my mind:
• remember the last send message - will not work because it can be the case that it receives something again from the broadcastchannel
• create a new wrapper class that overrides 'emit' so that I get a split between internal and external emits and can handle it according to this
• wrapping the sharedflow with some sort of ID and include this ID in the message in order to distinguish own messages, however this dirties the messaging classes, but IMHO I can't find another idea. It leaks the implementation of broadcasting out of the MutableSharedFlow.
Any ideas out there? I can't imagine I am the only one doing this, or I am barking up the completely wrong tree by using MutableSharedFlow 😉lesincs
07/02/2022, 5:02 AMasync {}
throws exception not until .await()
be called.Exerosis
07/02/2022, 11:24 AMclass MyWrapper(init: suspend () -> (Unit)) : ExampleFrameworkApp {
override fun onInit() = blocking { init() }
}
As well as other framework calls (like events off the event bus etc.)
I would like to be able to say something like:
class MyApplication : MyWrapper({
withContext(IO) {
http.get("..")
}
delay(1.second)
.. etc.
})
However the issue is that when withContext resumes it will call dispatch from an IO thread that schedules the resume, however, we are currently blocking the main thread so that never happens and we deadlock.
I think I need a local event loop or something along those lines, is there a builtin construct I could use or do I need to make my own system here?Exerosis
07/02/2022, 11:24 AMclass MyWrapper(init: suspend () -> (Unit)) : ExampleFrameworkApp {
override fun onInit() = blocking { init() }
}
As well as other framework calls (like events off the event bus etc.)
I would like to be able to say something like:
class MyApplication : MyWrapper({
withContext(IO) {
http.get("..")
}
delay(1.second)
.. etc.
})
However the issue is that when withContext resumes it will call dispatch from an IO thread that schedules the resume, however, we are currently blocking the main thread so that never happens and we deadlock.
I think I need a local event loop or something along those lines, is there a builtin construct I could use or do I need to make my own system here?Nick Allen
07/05/2022, 4:52 PMso with the above dispatcher I could doAvoid any solution that involves blocking on the main thread. Launch your coroutines instead.
withContext(IO) {Moving off the main thread only helps when you are not already blocking it. This is why you need to launch from the main thread callbacks.
I think I need a local event loop or something along those lines, is there a builtin constructYes, if your dispatcher returns false for
isDispatchNeeded
, then a shared thread local loop is used. That won't fix your dead-lock, though.
You can look at existing integrations for inspiration for wrapping your scheduler:
https://github.com/Kotlin/kotlinx.coroutines/blob/master/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt
There's ones for swing, javafx, and android.