christophsturm
04/29/2020, 3:55 PMorg.jetbrains.kotlinx:kotlinx-coroutines-reactive
? or even coroutines code that uses the r2dbc spi api?elizarov
04/29/2020, 5:36 PMStateFlow
is documented in a new issue. Your feedback is welcome: https://github.com/Kotlin/kotlinx.coroutines/issues/1973igor.wojda
04/29/2020, 7:24 PMsortedList
like this? https://twitter.com/igorwojda/status/1255416485293502465svenjacobs
04/30/2020, 9:49 AMFlow
. In my concept everything is a Flow
, beginning with UI events. Let's say I have a view interface like this
interface MyView {
val buttonClicks: Flow<Action.ButtonClick>
val button2Clicks: Flow<Action.Button2Click>
fun setMessage(message: String)
}
I have actions
sealed class Action {
object ButtonClick : Action()
object Button2Click : Action()
}
and a state that holds UI state
data class State(
val message: String
)
The idea is that a Flow<Action
is transformed into a Flow<State>
based on the current Action and that state changes are then delegated to the view.
The - for this example extremly simplified - flow process would look like this
val flow =
merge(
view.buttonClicks,
view.button2Clicks
).flatMapMerge { action ->
when (action) {
is Action.ButtonClick -> flow {
// Simulate Flow that constantly produces values
var increment = 0
while (true) {
delay(1000)
increment++
emit(State(message = "Message $increment"))
}
}
is Action.Button2Click -> flowOf(State(message = "Hello world"))
}
}.onEach { state ->
view.setMessage(state.message)
}
// Flow is collected when view is ready
GlobalScope.launch {
flow.collect()
}
However I have a problem with this concept. Let's say Action.ButtonClick
should subscribe to some API that constantly produces values which update the UI (simulated in the example above). Unfortunately when multiple button clicks occur, multiple (inner) flows are running in parallel, each unnecessarily updating the UI. Do you have an idea how I can solve this problem, somehow stopping the previous (inner) flow? Thanks for your help!kzotin
04/30/2020, 12:18 PMcoroutines
, but also android-ktx
libs
https://github.com/Kotlin/kotlinx.coroutines/issues/1979
Wondering, if any of you have heard about it?Paul Martin
04/30/2020, 7:17 PMfun main() = runBlocking {
launch {
println("block 1: start")
delay(2000)
println("block 1: end")
}
launch {
println("block 2: start")
delay(1000)
println("block 2: end")
}
Unit
}
as expected, the output is:
block 1: start
block 2: start
block 2: end
block 1: end
so the two launch
blocks are running concurrently.
next i try and replace the delay
calls with some functions of my own:
fun functionThatTakesALongTime() {
for (j in 1..5) {
for (i in 1..1_000_000_000) {
i * i * i
}
}
}
fun functionThatTakesAShortTime() {
1 + 1
}
fun main() = runBlocking {
launch {
println("block 1: start")
functionThatTakesALongTime()
println("block 1: end")
}
launch {
println("block 2: start")
functionThatTakesAShortTime()
println("block 2: end")
}
Unit
}
this time the output is this:
block 1: start
block 1: end
block 2: start
block 2: end
in other words it ran the whole of the first launch
block before starting on the second one. want i want is for them to run concurrently as in the first example.
i guess there's something fundamental that i'm missing! what is it about delay
which enables the launch
blocks to run concurrently?
thanks!Colton Idle
04/30/2020, 8:29 PMviewModelScope.launch {
val nameResponse = retrofitService.getName()
val phoneResponse = retrofitService.getPhone()
print("${nameResponse.isSuccessful.toString()} ${phoneResponse.isSuccessful.toString()}")
}
Is making it parallel as easy as doing this?
viewModelScope.launch {
val nameResponse = async { retrofitService.getName() }
val phoneResponse = async { retrofitService.getPhone() }
print("${nameResponse.await().isSuccessful.toString()} ${phoneResponse.await().isSuccessful.toString()}")
}
If that is all I need. Am I missing something anything from your point of view. Besides error handling (which is a whole other topic)? Any other ways to do this?ubu
05/01/2020, 9:30 AMviewModelScope
from Android Architecture Components’ ViewModel
that i’ve been thinking over lately. I run a lot of background operations inside some ViewModel
, because I use its ViewModelScope
. These operations are use-cases
injected into constructor of this ViewModel
. For a better separation of concerns I would like to extract some of these operations in some other class that I would then inject in this ViewModel
, but in order to run these operations, I need that ViewModelScope
. Is there a way to provide it to injected components without passing the scope every time in some function signature?Lukas Lechner
05/01/2020, 11:13 AMSean Keane
05/01/2020, 4:11 PMLogin
but it calls the function and kicks it off on a new Coroutine. This causes the test to continue and hit the verify BEFORE the request is made. Im working in Multiplatform and using Coroutine testing tools in Android as a target for the tests. This is my test:
class KAuthenticationServiceTest {
var loginRequest = service.auth.login.build("<mailto:fakeEmail@mail.com|fakeEmail@mail.com>", "fakePassword")
var mockStorage = MockStorage()
var mockCredentialsManager = MockCredentialsManager()
var mockNetworkManager = MockNetworkManager()
lateinit var authService: AuthService
@BeforeTest
fun setup() {
authService = KAuthenticationService(
storage = mockStorage,
credentialsManager = mockCredentialsManager,
networkManager = mockNetworkManager
)
}
/**
* Given: A user requests to login
* When: The call is made to the login interactor
* Then: The request is executed successfully
*/
@Test
fun callIsMadeToLoginInteractorToExecuteRequest() = runTest {
authService.login(loginRequest, {}, {})
assertTrue {
mockNetworkManager.requestValue == loginRequest
}
}
}
The run test function in Android is the following:
val mainThreadSurrogate = newSingleThreadContext("UI thread")
actual fun <T> runTest(block: suspend () -> T) {
Dispatchers.setMain(mainThreadSurrogate)
runBlockingTest { block() }
}
Im a bit lost as to how I can get the runBlocking to run the authService.login on the same thread so it blocks until executed and then continues to the assert? Has anyone got any suggestions?SrSouza
05/01/2020, 9:50 PMnewSingleThreadExecutor
as a Dispatcher but I have a problem with it, when I use delay(x)
, other functions that use the same dispatcher does not execute until the delay finish. Why this is happening if delay
does not block the Thread.Mark
05/03/2020, 9:53 AMkev1n11
05/03/2020, 9:22 PMCoroutineScope
.
We are POCing the following into our Spring Boot server
• as the server starts up, use SqsAsyncClient
to read in messages,
• these are sent into a buffered Channel
,
• also start a pool of worker coroutines to process these messages, the processing functions are all suspend fun
The above are encapsulated in a SQSConsumer
which implements CoroutineScope
, and we cant seem to figure out a clean way to test that.
The code looks something like
class SQSConsumer(
val sqs: SqsAsyncClient,
val props: SQSProperties
) : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = <http://Dispatchers.IO|Dispatchers.IO> + supervisorJob
// called from another class using @PostConstruct
fun start() = launch {
val launchQueueReceiveChannel = launchReceiver(
props.queueUrl,
props.waitTimeSeconds,
props.maxNumberOfMessages
)
repeat(props.workers) {
launchWorker(launchQueueReceiveChannel, props.queueUrl, props.timeout)
}
}
}
What are the common patterns for overriding Dispatcher
in tests?
If you have a better suggest to encapsulate the above (Spring Boot or not), we’d love to hear too.Viktor Vostrikov
05/05/2020, 5:57 AMSlackbot
05/05/2020, 9:09 AMchristophsturm
05/05/2020, 2:32 PMSoundlicious
05/05/2020, 3:04 PMChilli
05/05/2020, 4:32 PMkotlinx-coroutines-core-native
takes suspiciously long timezak.taccardi
05/05/2020, 4:51 PMactor
of one type with another, but struggling with the SendChannel#selectClause
. How can I implement the following?
private val _onSend: SelectClause2<StateActorIntention<S, I>, SendChannel<StateActorIntention<S, I>>> = actor.onSend
// how do I forward onSend to `_onSend` without using `@InternalCoroutinesApi`?
override val onSend: SelectClause2<I, SendChannel<I>> get() = TODO("??")
ar-g
05/05/2020, 4:55 PMflow
and show on UI
• when the coroutine
is canceled partiallySolve
function expected to stop working
typealias OnSudokuBoardChanged = suspend (board: List<List<Int>>) -> Unit
class SudokuSolver {
fun solve(board: List<List<Int>>) = flow {
partiallySolve(0, 0, board) {
emit(it)
}
}
private suspend fun partiallySolve(
curI: Int,
curJ: Int,
board: MutableList<MutableList<Int>>,
callback: OnSudokuBoardChanged
): Boolean {
if (curI > board.lastIndex) {
return true
}
val curNum = board[curI][curJ]
val nextI = if (curJ == board.lastIndex) curI + 1 else curI
val nextJ = if (curJ == board.lastIndex) 0 else curJ + 1
if (curNum != 0) {
return partiallySolve(nextI, nextJ, board, callback)
}
if (curNum == 0) {
for (guess in 1..9) {
if (validate(guess, curI, curJ, board)) {
board[curI][curJ] = guess
callback.invoke(board)
if (partiallySolve(nextI, nextJ, board, callback)) {
return true
}
}
}
}
board[curI][curJ] = 0
return false
}
Mark
05/06/2020, 2:27 AMLuis Munoz
05/06/2020, 2:53 AMar-g
05/06/2020, 1:37 PMfun getAll(): Flow<...> { //works in parallel, doesn't respect structural concurrency
GlobalScope.launch {
//start request...
}
return db.flow()//starts emitting immediately...
}
suspend fun getAll(): Flow<...> { //works sequentially
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
launch {
//start request...
}
}
return db.flow()//start emitting after request is finished...
}
SrSouza
05/06/2020, 4:24 PMreceiveAsFlow().filter {}.collect()
but I think that this is not the right aproach be cause do not work. How I can await the first appearance of a value in a Channel?Joffrey
05/06/2020, 4:51 PMtry {
doTheThingThatUsuallyGetsStuck()
} catch(e: TimeoutCancellationException) {
fail("some debug information about why the test may have timed out")
}
The problem with this is that sometimes the code under test uses timeouts as well, so this catch block may actually fail the test even in some valid timeout situations. Is there a way for me to transparently add information about the timeout somehow?
If I rethrow a CancellationException
with a custom error message, then how can I catch it upstream and access the message?Jeff Johnson
05/06/2020, 5:43 PMView
that does not exist anymore.
open class CoroutineLauncher : CoroutineScope {
private val dispatcher: CoroutineDispatcher = Dispatchers.Main
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = dispatcher + supervisorJob
fun launch(action: suspend CoroutineScope.() -> Unit) = launch(block = action)
fun cancelCoroutines() {
supervisorJob.cancelChildren() //coroutineContext.cancelChildren() has same results
}
}
here is the usage
class MyFragment : Fragment {
val launcher = CoroutineLauncher()
fun onSomeEvent() {
launcher.launch {
val result = someSuspendFunction() // made suspend function by using withContext(<http://Dispatchers.IO|Dispatchers.IO>)
if (!isActive) return
// CAUSES CRASH
myTextView.text = result.userText
}
}
override fun onDestroyView() {
super.onDestroyView()
launcher.cancelCoroutines()
}
}
I added log lines to ensure onDestroyView
and cancelCoroutines
are both being called before the crash. It is my understanding that withContext
is cancellable and thus I shouldn’t even need to check for isActive
. I feel like I’m missing something obvious, but I can’t figure out what it iszak.taccardi
05/06/2020, 5:55 PMAtomicRef
on numberOfTimesUserHasChanged
because it’s guaranteed to be executed in the same coroutine?
My concern is that currentUserIdFlow
could switch threads/coroutines internally, but I think that would not be able to affect the coroutine in which numberOfTimesUserHasChanged
is read and written from?Erik
05/07/2020, 10:31 AMCoroutineWorker
that does work on a coroutine. This work can be cancelled by the system, it then throws a CancellationException
.
In my apps I want to log any exceptions that are thrown by the crucial part of my work, e.g.: runCatching { crucialWork() }.onFailure { log(it) }
. However, I see a lot of JobCancellationException
instances being logged with message "Job was cancelled"
. I don't want to log regular cancellations without an exceptional cause. So, if I read the documentation of Job
(https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html), am I right to conclude that I'm only interested in exceptions that are not of type CancellationException
, or if I catch a CancellationException
, then I'm only interested if its cause
is not a CancellationException
? In other words: I'm interested in 'failed' coroutines, not in 'normally cancelled' coroutines.
So, what exceptions do I log, what do I ignore?ar-g
05/07/2020, 1:03 PMFlow invariant is violated: Emission from another coroutine detected...
It makes sense, the question is how to not violate it and ensure that:
• Request and observing DB happens in parallel
• Exception wrapped and propagated to UI
• UI continues to observe DB
fun getAll(): Flow<Wrap<...>> = flow { //request and observing db happens in parallel
coroutineScope {
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
try {
//start request...
} catch (e){
emit(Wrap(e))//!!! this will throw IllegalStateException: Flow invariant is violated: Emission from another coroutine is detected.
}
}
emitAll(db.flow()) //starts emitting immediately...
}
}
christophsturm
05/07/2020, 5:52 PMfun createCaptureServer(port: Int): CompletableFuture<String> {
val future = CompletableFuture<String>()
val undertow = Undertow.builder().addHttpListener(port, "127.0.0.1").setHandler {
future.complete(it.queryString)
}.build()
undertow.start()
return future.thenApplyAsync {
undertow.stop()
it
}
}
possibly using ktor instead of undertow.christophsturm
05/07/2020, 5:52 PMfun createCaptureServer(port: Int): CompletableFuture<String> {
val future = CompletableFuture<String>()
val undertow = Undertow.builder().addHttpListener(port, "127.0.0.1").setHandler {
future.complete(it.queryString)
}.build()
undertow.start()
return future.thenApplyAsync {
undertow.stop()
it
}
}
possibly using ktor instead of undertow.octylFractal
05/07/2020, 5:55 PMCompletableDeferred
instead of Future
Luis Munoz
05/07/2020, 5:58 PMoctylFractal
05/07/2020, 5:59 PMsuspendCancellableCoroutine
christophsturm
05/07/2020, 6:03 PMthenApplyAsync
part? (stop the http server)octylFractal
05/07/2020, 6:10 PMtry-finally
(assuming you go the suspendCancellableCoroutine
route)christophsturm
05/07/2020, 6:15 PMsuspendCancellableCoroutine
better? when would it cancel?octylFractal
05/07/2020, 6:16 PMsuspendCancellableCoroutine
-- I think that using the future model for actually retrieving the data is bettersuspend fun createCaptureServer(port: Int): String {
val deferred = CompleteableDeferred<String>()
val server = <create server, with handler that completes the deferred>
server.start()
try {
return deferred.await()
} finally {
server.close()
}
}
is my takechristophsturm
05/07/2020, 6:20 PMsuspend fun createCaptureServer(port: Int): String {
val result = suspendCoroutine<String>{cont->
val undertow = Undertow.builder().addHttpListener(port, "127.0.0.1").setHandler {
cont.resume(it.queryString)
}.build()
undertow.start()
}
undertow.close()
return result
}
this is what i came up with, but it does not work that way because undertow is inside the callback, so i would have to make it nullable and a var. which is not so greatval deferred = CompletableDeferred<String>()
val undertow = Undertow.builder().addHttpListener(port, "127.0.0.1").setHandler {
deferred.complete(it.queryString)
}.build()
undertow.start()
val result = deferred.await()
undertow.stop()
return result
}
thanks for your help!octylFractal
05/07/2020, 6:26 PM