Arjan van Wieringen
07/03/2022, 2:38 PMFlow
and platform-specific API's, e.g. the web broadcastchannel API. However I choose to pass CoroutineScope
when needed, but this leads to code like this:
states.broadcast(channel.deserialize(scope), scope)
Is it a code smell to have these kind of scopes everywhere? Or is it not a problem. My intuition says that it is not a problem, because this way I can easily control the scopes and no magic happens. So I can keep the structured concurrency.oday
07/03/2022, 3:44 PMlifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { uiState ->
if (uiState.authenticated) {
setContent {
Home()
}
} else {
setContent {
Authentication()
}
}
}
}
}
and the issue is that it’s collecting the stateFlow immediately before my viewModel gets the chance return the result, so it shows Authentication, then when the value comes back ti shows Home again
I want it to…well observe like LiveData doesPhilipp Kleber
07/03/2022, 4:37 PMFlow#shareIn
mention, that using buffer(0).shareIn(scope, started, 0)
makes upstream suspend until all subscribers have processed the value. My question is whether that still holds true for replay > 0
(for all currently active subscribers).reactormonk
07/04/2022, 8:35 AMChannel
?Ruben Quadros
07/04/2022, 9:35 AMreceiveAsFlow
vs consumeAsFlow
?
receiveAsFlow
can have multiple collectors and consumeAsFlow
only 1?reactormonk
07/04/2022, 12:43 PMsuspendCoroutine
- but the code has some rather strict timing constraints, aka one code block should fully exit before another of the same kind should enter - how would I enforce that?
Code looks like this:
suspend fun BluetoothReader.transmitEscapeCommandS(input: ByteArray): CommandAPDU {
return suspendCoroutine { block ->
if (this.transmitEscapeCommand(input)) {
this.setOnEscapeResponseAvailableListener { _, response, errorCode ->
if (errorCode == 0) {
block.resume(CommandAPDU(response))
} else {
block.resumeWithException(ReaderCommandException("Error code from reader: $errorCode"))
}
this.setOnEscapeResponseAvailableListener(null)
}
} else {
block.resumeWithException(ReaderCommandException("Couldn't send command. Bonded?"))
}
}
}
The listener can only really be set once, so there's a bit of a race condition there.janvladimirmostert
07/04/2022, 10:05 PMsupervisorScope {
launch(Dispatchers.Default) {
supervisorScope {
Pablo
07/05/2022, 9:17 AMinterface Foo<T> {
fun onSuccess(lis: (T?) -> Unit)
}
I'm trying to do something like this :
suspend fun getFoo(): Bar {
suspendCoroutine<Bar?> { continuation ->
val listener = object : Foo<Bar?> {
override fun onSuccess(lis: (Bar?) -> Unit) { }
}
}
But I don't know how to return a BarAhmed Ibrahim
07/05/2022, 12:18 PMrunCurrent
.
So the problem, is that I have a ViewModel, that exposes a StateFlow<MyState
, that's backed by combine(flow1, flow2...
, I'm trying to assert the intermediate states that should be emitted, yet only the initial and the last state got emitted. Any idea what I'm doing wrong? (code in 🧵)reactormonk
07/07/2022, 1:05 PMtrySendBlocking
to send exceptions down the channel?reactormonk
07/07/2022, 5:43 PMmapWithState
or similar for Channels, where I can keep a consistent state around?liminal
07/07/2022, 5:49 PMKayCee
07/08/2022, 3:48 AMExerosis
07/09/2022, 1:42 AMfun main() = runBlocking {
val component = Component {
var a = 0; var b = 0
every(50.milliseconds) {
if (it % 20 == 0) println("A: ${a++}")
}
every(1.seconds) {
println("B: ${b++}")
}
}
component.enable()
delay(50.seconds)
println("Done!")
}
context(Toggled) @Base
suspend fun every(period: Duration, block: suspend (Int) -> (Unit)) {
simultaneously {
var i = 0
while (isActive) {
block(i++)
delay(period)
}
}
}
The problem with this is that over time the two tasks get way out of sync:
A: 47
B: 49
After 50 secondsMartin Gaens
07/09/2022, 5:56 PMfun main() = runBlocking {
val bot = bot {
token = "YOUR_API_KEY"
dispatch {
text {
launch { println("launched") }
bot.sendMessage(ChatId.fromId(message.chat.id), text = text)
}
}
}
bot.startPolling()
}
This code should set a callback for when the bot receives any text message from a user. However, the launch { }
never gets launched. And I don't know why! I know all this Telegram bot stuff might be off-topic but it's related to coroutines and I can't comprehend why it wouldn't work. Can somebody tell me what I'm doing wrong?zak.taccardi
07/10/2022, 6:49 PMStateFlow<T>
instance, I want to log every emission of T
from the source standpoint. Is this possible without duplicating logs for each subscriber?reactormonk
07/12/2022, 10:00 AMLukas Lechner
07/12/2022, 10:59 AM.buffer(capacity = 0, onBufferOverflow = BufferOverflow.DROP_LATEST)
in my flow, but this doesn't work, since a capacity of 1
is used when using BufferOverflow.DROP_LATEST
.
What's the use case:
The upstream flow items are produced whenever the user clicks a button to perform a network request, which happens downstream after the buffer. When the user presses the button while the network request is on-going, this event should be dropped (that's why I am using a buffer with capacity of 0). However I am not able to achieve this. A new network request is initiated shortly after the previous one is complete when the user clicks the button while the first network request is currently running. Any idea on how to implement this in a nice way?Kulwinder Singh
07/12/2022, 11:10 AMdelay
for 500ms but due to this i’m missing all the other data and only last TempData is recieved
GlobalScope.launch() {
flowMain
.collect { data ->
Log.i("AppLogs", data.toString())
delay(1)
}
}
viewInputLvlArray.forEachIndexed { index, view ->
flowMain.value = TempData(index, someotherValue)
}
Kulwinder Singh
07/12/2022, 11:11 AMursus
07/13/2022, 12:31 AMdefault
or io
dispatcher? Or rather, when would I want to use io over default? I get that its in the name, but what exactly is io work?
Networking? Retrofit has coroutines support, so the thread wont be blocked there.
Database? Room has it as well
Only thing that comes to mind is file writing/reading?
Or would it be bad to just use the io
? I mean it contracts eventually, right?
Can I see how many actual threads are in the io
at some point?Jakub Gwóźdź
07/13/2022, 8:28 AMsuspend fun main() = withContext(MDCContext(mapOf("host" to "local"))) {
val ktorClient = HttpClient(OkHttp)
log("entering context")
(1..10).forEach { i ->
val newContextMap = MDC.getCopyOfContextMap()
newContextMap["i"] = i.toString()
withContext(MDCContext(newContextMap)) {
log("before calling ktor client $i")
ktorClient.get { url("<https://example.com/$i>") }.bodyAsText()
log("after calling ktor client $i")
}
}
log("exiting context")
ktorClient.close()
}
when run, the MDC is shuffled totally indeterministic:
10:21:45.238 [main] - entering context {host=local}
10:21:45.240 [main] - before calling ktor client 1 {host=local, i=1}
10:21:45.824 [DefaultDispatcher-worker-1] - after calling ktor client 1 {}
10:21:45.824 [DefaultDispatcher-worker-1] - before calling ktor client 2 {host=local, i=2}
10:21:45.998 [DefaultDispatcher-worker-1] - after calling ktor client 2 {}
10:21:45.998 [DefaultDispatcher-worker-1] - before calling ktor client 3 {host=local, i=3}
10:21:46.102 [DefaultDispatcher-worker-1] - after calling ktor client 3 {host=local}
10:21:46.103 [DefaultDispatcher-worker-1] - before calling ktor client 4 {host=local, i=4}
10:21:46.208 [DefaultDispatcher-worker-3] - after calling ktor client 4 {}
...
But as soon as I change first line to
suspend fun main() = withContext(Dispatchers.Default + MDCContext(mapOf("host" to "local"))) {
it magically starts working as intended. (same on any other dispatcher, IO or Unconfined)
Why is it so? Is it a problem with suspend fun main
, ktor-client, or - most probably - me messing up something here?Dmitry Khalanskiy [JB]
07/13/2022, 11:14 AMTestScope.backgroundScope
for launching coroutines that perform work in the background and need to be cancelled at the end of the test.
• Fixed the issue with 1.6.2 and 1.6.3 where the updated POM of kotlinx-coroutines-debug
broke some builds.reactormonk
07/13/2022, 4:57 PMsuspend fun
which produces ByteArray
in 16 byte chunks. Reading them is kinda time-intensive, so I was wondering if there's a concept of a lazy channel of bytes, so I say "ready me X bytes" without it reading everything.Marc Plano-Lesay
07/14/2022, 1:53 AMreactormonk
07/14/2022, 8:54 AMfirst()
etc. doesn't advance though 😞
I'd also take a parser-combinator libraries if there's one around 😄
return channelFlow<TLV> {
when(input.first()) {
(0x00.toByte()) -> send(TLVNULL)
(0x03.toByte()) -> {
val length = parseLength(input)
val bytes = input.take(length).toList()
send(TLVNDEF(NdefMessage(bytes.toByteArray())))
}
(0xFD.toByte()) -> {
val length = parseLength(input)
val bytes = input.take(length)
send(TLVProp(bytes.toList().toByteArray()))
}
(0xFD.toByte()) -> {
send(TLVTerminator)
close()
}
}
}
Marek Kubiczek
07/14/2022, 9:07 AM@Before
fun setUp() {
Dispatchers.setMain(UnconfinedTestDispatcher())
}
@Test
fun `Some exemplary test`() = runTest {
val dependency1 = mockk<Dependency1>()
val viewModel = TestedViewModel(dependency1)
viewModel.testedMethod()
coVerify(exactly = 1) { dependency1.methodToBeCaled() }
}
The ViewModel itself is calling that method in viewModelScope
fun testedMethod() {
viewModelScope.launch {
methodToBeCalled()
}
}
However I forgot to mock the called method. I am seeing in log
Exception in thread "Test worker @coroutine#7" io.mockk.MockKException: no answer found for: Dependency1(#1).methodToBeCalled(continuation {})
But the test passes.
Is it known issue?Peter Kievits
07/14/2022, 10:02 AMreactormonk
07/14/2022, 1:26 PMreactormonk
07/15/2022, 9:04 AMFlow
(think Flow<Card>
) - the callback should fire after the card has been disconnected. I'm currently on interface Card { var afterCardRemoval: Function<Unit>? }
and then firing from the scope producing the Flow<Card>
, but that feels kinda dirty.reactormonk
07/15/2022, 9:04 AMFlow
(think Flow<Card>
) - the callback should fire after the card has been disconnected. I'm currently on interface Card { var afterCardRemoval: Function<Unit>? }
and then firing from the scope producing the Flow<Card>
, but that feels kinda dirty.Joffrey
07/15/2022, 9:07 AM.onEach { afterCardRemoval = { ... } }
work for you?Robert Williams
07/15/2022, 9:08 AMreactormonk
07/15/2022, 9:09 AMFor instance, why doesn't .onEach { afterCardRemoval = { ... } } work for you?there's basically the
Flow<Event>
which I'm processing, and I'm wondering if I can change the API a bit.Robert Williams
07/15/2022, 9:14 AMreactormonk
07/15/2022, 9:16 AMRobert Williams
07/15/2022, 9:26 AMreactormonk
07/15/2022, 10:51 AMMutableStateFlow
, but there's a) how do I clean it up, aka how do I set the Flow
from the state to over? Make it a take()
is my current approach, but I'm not sure I like it. Also, how do I fire off an async
into the sunset? the coroutineScope { async { flow.collect { ... } } }
doesn't yield as I'd expect it to.gildor
07/16/2022, 3:10 PM