Florian
12/02/2020, 9:32 AMA
with 2 subclasses B
and C
. Can I combine a Flow<B>
and a Flow<C>
to a Flow<A>
?pablisco
12/02/2020, 10:18 AMfun <T> loopFlow(initial: T, f: suspend (T) -> Flow<T>): Flow<T>
The idea being that the initial
value is emitted once and then f
is applied recursively. And then finish the flow when cancelled or when an empty flow is produced (duh). I’ll post in thread what I’ve tried so far without much luck…Lilly
12/02/2020, 2:19 PMprivate suspend fun sendPacketWithAcknowledgement(packet: RequestPacket) {
try {
withTimeout(PACKET_CONFIRMATION_TIMEOUT) {
sendPacket(packet.bytes)
}
// Wait here until ack packet received
// mutex.lock() // I thought this will suspend the code here and on mutex.unlock() it will resume here
} catch (e: TimeoutCancellationException) {
// do sth.
}
}
On caller site:
sendPacketWithAcknowledgement(StartCommunication()) // should be sequential
isCommunicationEstablished = true
The caller code should behave sequential, so that isCommunicationEstablished = true
is only processed, when the ack packet is received and the suspension is resumed manually. I thought mutex would fit here but I guess I didn't use it the right away. It's a bluetooth classic scenario btw. So does someone know how I can achieve this behavior?Saul Wiggin
12/02/2020, 4:07 PMbbaldino
12/02/2020, 8:34 PMNikky
12/03/2020, 1:28 PMErik
12/03/2020, 5:47 PMMutableSharedFlow<T>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
equals 1
(in reality it's variable). I can't figure out how to get this working in a unit test: every time I emit something it is immediately collected (I use Turbine to test
the flow). I basically want to emit 2 items and verify that only the last one is collected, the oldest is dropped.Nikky
12/04/2020, 11:44 AMFlorian
12/04/2020, 1:12 PMdoSomeOtherWork
, we could also switch to the `applicationScope`/`externalScope` in the ViewModel directly, right?Tolriq
12/05/2020, 10:28 AMlaunchIn
to allow simple cancellation without cancelling the scope / job that is used in the launchIn
call? Currently saving each jobs separately and cancelling them one by one.Florian
12/05/2020, 2:16 PMsuspend fun
but also returns a Flow. And the first line can't be caught like this because it's not inside the Flow.
suspend fun getChatUserFlow(uid: String): Flow<ChatUser?> {
val results = userCollection.whereEqualTo(FieldPath.documentId(), uid).get().await()
return if (!results.isEmpty) {
results.documents[0].reference.asFlow<ChatUser>()
} else {
flowOf(null)
}
}
v0ldem0rt
12/06/2020, 3:02 AMsuspend fun <T> PreparedStatement.withCancellation(callback: suspend () -> T): T {
val stmt = this
suspendCancellableCoroutine { cont ->
cont.invokeOnCancellation {
stmt.cancel()
}
}
return callback()
}
Florian
12/06/2020, 8:35 PMcallbackFlow
can not be caught with the catch
operator?v0ldem0rt
12/07/2020, 2:43 AMsuspendCancellableCoroutine
so that if the scope gets cancelled I can close my DB statementsFlorian
12/07/2020, 10:03 AMcatch
operator I add on the resulting (combined) Flow?Ray
12/07/2020, 3:46 PMQuery()
suspend fun getUser():User
Repository
suspend fun getUserFromDb():User {
return withcontext (<http://dispatcher.IO|dispatcher.IO>) {return data.getUser()}
}
zak.taccardi
12/07/2020, 5:50 PMsuspend
function using something similar to .scan(..)
with a SharedFlow
?
.scan(loadInitialValueAsynchronously()) { .. }
I’m effectively trying to replace an actor
with a SharedFlow
.Slackbot
12/08/2020, 9:08 AMTolriq
12/08/2020, 10:02 AMfun <T> StateFlow<T>.withPrevious(): Flow<Pair<T?, T>> {
return flow {
var previous: T? = null
this@withPrevious.onEach {
emit(Pair(previous, it))
previous = it
}.collect()
}
}
Sourabh Rawat
12/08/2020, 12:37 PMfun main() = runBlocking {
val httpClient = HttpClient {
install(Logging) {
level = LogLevel.ALL
}
@OptIn(KtorExperimentalAPI::class)
install(WebSockets)
expectSuccess = false
install(JsonFeature) {
@OptIn(KtorExperimentalAPI::class)
acceptContentTypes = listOf(ContentType("application", "json-rpc"))
serializer = KotlinxSerializer()
}
}
val wss =httpClient.webSocketSession(method = HttpMethod.Get, host = "127.0.0.1", port = 6800, path = "/jsonrpc")
wss.close()
}
this does not stop the program. what am I doing wrong?Tash
12/08/2020, 6:52 PMFlow
don’t work as desired, so what I need is more of a throttleFirst
. Tried with using async
and returning null
when the request should be throttled:
class ApiClient {
private var deferredLoad: Deferred<Result>? = null
suspend fun load(throttleMillis: Long = 300): Result? {
return coroutineScope {
if (deferredLoad?.isCompleted != false) {
async {
val result = /** make GET API call **/
delay(throttleMillis)
result
}
.apply { deferredLoad = this }
.await()
} else {
null
}
}
}
}
Is this approach recommended? Finding it a little weird to hold on to a var
of Deferred
, thinking it might need a Mutex
. Wondering if there’s a more elegant approach that others have found?Shan
12/09/2020, 2:06 AM.asSingle()
on a Deferred<T>
from the kotlinx-coroutines-rx2
library, why the coroutineContext
I pass into .asSingle()
can't contain a Job?kevinherron
12/09/2020, 12:44 PMsuspendCancellableCoroutine
instead of suspendCoroutine
when there is no way to propagate cancellation to the adapted API and no action to run via CancellableContinuation::invokeOnCancellation
?Nikola Milovic
12/09/2020, 3:23 PMoverride suspend fun saveConfession(text: String): SaveConfessionResponse = suspendCoroutine {cont ->
val confessionToSave = ConfessionDataModel(text)
val call = newConfessionService.saveConfession(confessionToSave)
call.enqueue(object : Callback<SaveConfessionResponse>{
override fun onResponse(call: Call<SaveConfessionResponse>, response: Response<SaveConfessionResponse>) {
cont.resume(response.body()!!)
}
override fun onFailure(call: Call<SaveConfessionResponse>, t: Throwable) {
cont.resumeWithException(t)
}
})
}
And I am trying to call it from my useCase object
suspend fun execute(text: String): Result = suspendCoroutine{ cont->
try {
val response = confessionRepository.saveConfession(text)
when (response.status) {
200 -> Result.Success(response.id, 200)
400 -> Result.Error(null)
500 -> Result.Error(null)
else -> Result.Error(null)
}
} catch (e: Exception) {
return Result.Error(e)
}
}
/
But i cannot call saveConfession outside of coroutine body, can I somehow get the scope that the execute function was called from?Colton Idle
12/09/2020, 8:11 PMGuillermo Alcantara
12/09/2020, 10:47 PMExperimentalCoroutinesApi
functions in prod for a library? Should I surface that to my users? If I understand correctly, it means it will be binary compatible (unlike FlowPreview
). So the only issue is that at some point I might need to change code. But the AAR will be fine using a future version of the library?asad.awadia
12/10/2020, 12:09 AMAnimesh Sahu
12/10/2020, 1:47 PMliminal
12/10/2020, 5:05 PMGilles Barbier
12/10/2020, 6:13 PM