nitrog42
10/20/2020, 10:07 AMMutableSharedFlow(replay = 0)
doesn't emit values ? 😕 the same code works if replay = 1 (but I don't want to replay first value each time 😢 )Pablo
10/20/2020, 10:13 AMCoroutineScope
and a CoroutineDispatcher
is it mandatory to implement in the onDestroy()
the cancel of them? what happens if I don't do this?fjoglar
10/20/2020, 11:31 AMActivity
, I'm getting:
java.lang.IllegalStateException: ReceiveChannel.consumeAsFlow can be collected just once
The code I'm executing from is this:
private fun getCustomer() = lifecycleScope.launch {
isLoading = true
// Get the user's customer code.
GetUserDetailsUseCase().getCustomer().collect { result ->
when (result) {
is Result.Success -> uploadDocument(result.data)
is Result.Error -> showPopup(result.errorMessageResId)
}
}
}
private fun uploadDocument(customer: Customer) = lifecycleScope.launch {
this@SignUpDocumentValidationActivity.customer = customer
// Upload document and get URL.
val filename = "${System.currentTimeMillis()}${customer.code}"
when (val uploadResult = UploadImageUseCase().uploadImage(
scaleImage(),
filename
)) {
is Result.Success -> {
Logger.i(TAG, "Image $filename.jpg uploaded successfully.")
sendDocumentAndContinue(uploadResult.data)
}
is Result.Error -> showPopup(uploadResult.errorMessageResId)
}
}
private fun sendDocumentAndContinue(documentUrl: String) = lifecycleScope.launch {
// Send document to backend
UpdateDocumentUseCase().updateDocument(
customer!!,
DocumentType.values()[currentStep],
documentUrl
).collect { result ->
when (result) {
is Result.Success -> {
deleteImage()
isLoading = false
nextStep()
}
is Result.Error -> showPopup(result.errorMessageResId)
}
}
}
How can I get rid of this issue and call the two collect() methods secuentially?
Thanks!Patrick
10/20/2020, 12:43 PMShawn Witte
10/20/2020, 9:10 PMsuspend
function that launches several operations (syncing individual pieces of local data) and I don't want this function to be run more than once simultaneously (if we try to POST the same data twice, then we get two new entries on the server side).
What is the preferred practice for preventing a second simultaneous execution of this function? I have an idea, but I wanted the community opinion since there was some concern about my answer in a code review.dan.the.man
10/20/2020, 9:34 PMBroadcastChannel(BUFFERED)
still the best replacement for PublishSubject?william
10/20/2020, 11:53 PMSachin Maharana
10/21/2020, 7:23 AMKV
10/21/2020, 12:43 PMbuildRequestUrl
which build the URL including headers and query parameters.
Now I am working on tests for api call's response.
and I am debugging the code where I build the URL which is this
val uriBuilder = Uri.parse(baseUrl).buildUpon()
But the issue is it throws null pointer exception and result is null (see attached screenshot)
How do I solve this issue?
I am using below library
testImplementation 'junit:junit:4.13'
testImplementation "org.mockito:mockito-core:3.4.6"
testImplementation 'androidx.arch.core:core-testing:2.1.0'
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.9'
androidTestImplementation 'androidx.test.ext:junit:1.1.1'
I am using below class
@ExperimentalCoroutinesApi
class TestCoroutineRule : TestRule {
private val testCoroutineDispatcher = TestCoroutineDispatcher()
private val testCoroutineScope = TestCoroutineScope(testCoroutineDispatcher)
override fun apply(base: Statement, description: Description?) = object : Statement() {
@Throws(Throwable::class)
override fun evaluate() {
Dispatchers.setMain(testCoroutineDispatcher)
base.evaluate()
Dispatchers.resetMain()
testCoroutineScope.cleanupTestCoroutines()
}
}
fun runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) =
testCoroutineScope.runBlockingTest { block() }
}
I am using the below rule in the test class:
@get:Rule
val testInstantTaskExecutorRule: TestRule = InstantTaskExecutorRule()
@get:Rule
val testCoroutineRule = TestCoroutineRule()
Big Chungus
10/21/2020, 12:48 PMkotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 15000 ms
(Not serring timeouts anywhere explicitly). Does anyone know how I could increase this?Mark
10/21/2020, 1:08 PMFlow.combine(Flow)
However, the second flow is not known until at some point in the progression of the original Flow (at which point, the second flow can be derived). At the moment, I achieve this by (and it feels like a very nasty hack) having a MutableStateFlow
and updating that flow from a coroutine launched from within onEach
(just before combine
). So then we use combine(MutableStateFlow)
. But how to do this properly?william
10/21/2020, 11:27 PMChannel(Channel.UNLIMITED)
in use to act as a queue between coroutines but am having trouble with my receiving coroutine not seeing any of what the producer is giving. could there be an issue from performing both of this in the scope of <http://Dispatchers.IO|Dispatchers.IO>
(which i believe is multithreaded in java) ? should i be using a single threaded coroutine scope / dispatcher for this to work?Sachin Maharana
10/22/2020, 12:10 PMimport kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.delay
fun main() = runBlocking {
println("${Thread.activeCount()} threads active at the start")
val time = measureTimeMillis {
createCoroutines(10_000)
}
println("${Thread.activeCount()} threads active at the end")
println("Took $time ms")
}
suspend fun createCoroutines(amount: Int) {
val jobs = ArrayList<Job>()
for (i in 1..amount) {
jobs += launch {
delay(1000)
}
}
jobs.forEach {
it.join()
}
}
Filip de Waard
10/22/2020, 12:26 PMdb.execute { ... }.asType<Foo>().fetch().flow()
Now we want to return a Map. Two ways of doing that are:
db.execute { ... }.asType<Foo>().fetch().flow().fold(mapOf()) { acc, it -> acc + (transform(it.bar) to it.baz) }
or:
db.execute { ... }.asType<Foo>().fetch().flow().map(transform(it.bar) to it.baz).toList().toMap()
Which is better and why?Rechee Jozil
10/22/2020, 4:10 PMAdam Miskiewicz
10/22/2020, 4:56 PMCoroutineScheduler
doesn’t expose any public APIs to get it’s current “state”, meaning that there’s no way to really get much insight into the state of the default dispatcher.
TL;DR — I want to figure out how to structure my application such that, when we’re saturating the CPU (and thus all the default dispatcher threads are super busy doing “stuff”), I can start to throw some back pressure and reject some requests, rather than continuing to accept hundreds/thousands of requests and causing the server to completely become unresponsive. And notably, I want to be able to cause this behavior with a mechanism that doesn’t require a ton of tuning per-service — I don’t want to do this strictly with a “request rate limit” or something like that. I’d like it to be much more adaptive, ideally.Florian
10/22/2020, 5:18 PMLuis Munoz
10/22/2020, 6:55 PMwilliam
10/22/2020, 10:21 PMMJegorovas
10/23/2020, 8:49 AMoffset
variable in this method safe to use in multiple threads?
suspend fun InputStream.readByChunks(
fileSize: Long,
chunkSize: Int,
block: suspend (bytes: ByteArray, offset: Long) -> Unit
) {
var offset = 0L
this.buffered().use { input ->
while (offset != fileSize) {
val buffer = if (offset + chunkSize < fileSize) {
ByteArray(chunkSize)
} else {
ByteArray((fileSize - offset).toInt())
}
val read = input.read(buffer)
block(buffer, offset)
offset += read
}
}
}
william
10/23/2020, 12:11 PMclass PersistentWsClient() {
private val scope = CoroutineScope(Dispatchers.Default)
private val sendingQueue = Channel<Frame>(UNLIMITED)
fun open(host: String, path: String, port: Int = 8080) {
scope.launch {
this@PersistentWsClient.e("waiting for messages")
for (msg in sendingQueue) {
this@PersistentWsClient.e(msg.toString())
}
}
}
fun send(payload: Frame) {
scope.launch {
for (i in 0..10) {
delay(500)
sendingQueue.offer(payload)
this.e("offering $payload")
}
}
sendingQueue.offer(payload)
}
}
(i have stripped away some extra code).
In this example i only ever get
waiting for messages
offering <payload>
offering <payload>
...
Albert
10/23/2020, 12:31 PMStackOverflowError
errorursus
10/24/2020, 3:40 AMursus
10/24/2020, 1:54 PMObservable.just(1).startWith(0)
emits 0,1
flow
flowOf(1).onStart { emit(0) }
emits 1,0
is this by design? a rx fix? pretty foot gunny when mkgrating
also, what if the actual flow is replayed? if its "below" the onStart, id assume it emits first which is not goodjanvladimirmostert
10/24/2020, 2:49 PMsuspend fun lookupListings(): List<QueryListings.SelectListingsResult> {
if i make use of runBlocking, i'm assuming it's physically blocking a thread until that suspend function completes, what is the better route to take here to get rid of the `runBlocking`s?
class HomePage : PageResponse(
title = "Page Title",
description = "Page Description",
) {
val listings = runBlocking {
lookupListings()
}
val somethingElse = runBlocking {
suspendLookupSomethingElse()
}
Edit: this is on the JVMwilliam
10/24/2020, 4:16 PMStateFlow
appears to compare values before updating them (unlike LiveData
), so if I have a mutable list which I update and send to my MutableStateFlow
it doesn't get published from the looks of it - unless I create a new list / copy it. Is that the right way to work around this?ursus
10/24/2020, 7:16 PMursus
10/24/2020, 7:25 PMstartWith
and flow onStart
again
CoroutineScope(SupervisorJob() + <http://Dispatchers.IO|Dispatchers.IO>).launch {
flowOf(1)
.onStart { emit(0) }
.collect {
stateFlow.value = it
}
}
-- is equivalent to:
Observable.just(1)
.startWith(0) <---
.subscribeOn(<http://Schedulers.io|Schedulers.io>()) <---
.subscribe {
behaviorRelay.accept(it)
}
-- In rx I use:
Observable.just(1)
.subscribeOn(<http://Schedulers.io|Schedulers.io>()) <---
.startWith(0) <--- swapped
.subscribe {
behaviorRelay.accept(it)
}
is this doable somehow in flow? (to not have the onStart emit coming from io thread)Jeremy
10/24/2020, 8:30 PMursus
10/24/2020, 9:18 PMreplay
operators to migrate to Flow.
I see the shareIn()
but unsure what the scope parameter should be?ursus
10/24/2020, 9:18 PMreplay
operators to migrate to Flow.
I see the shareIn()
but unsure what the scope parameter should be?gildor
10/25/2020, 3:31 AM