https://kotlinlang.org logo
#coroutines
Title
# coroutines
j

jeff

01/06/2021, 4:52 PM
I understand that flows are sequential. So even if there's a delay it doesn't disrupt the ordering. But what if I want to disrupt the ordering? Details in 🧵
Here's the "vanilla" version, that preserves ordering despite a delay:
Copy code
@Test
  fun testFlowOrder() = runBlocking {
    val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
    val dispatcher = TestCoroutineDispatcher().apply { pauseDispatcher() }
    val scope = CoroutineScope(dispatcher)

    val results = mutableListOf<String>()

    val flow2 = sharedFlow.transform {
      emit(it)
      delay(200)
      emit("after $it")
    }

    sharedFlow.tryEmit("One") shouldBe true
    sharedFlow.tryEmit("Two") shouldBe true
    sharedFlow.tryEmit("Three") shouldBe true

    flow2
      .onEach { s -> results += s }
      .launchIn(scope)

    dispatcher.advanceUntilIdle()
    
    // We get this result:
    results shouldBe listOf(
      "One",
      "after One",
      "Two",
      "after Two",
      "Three",
      "after Three",
    )
  }
But, I'd like the output ordering to be
Copy code
results shouldBe listOf(
      "One",
      "Two",
      "Three",
      "after One",
      "after Two",
      "after Three",
    )
This is how I've figured out to do it, but I suspect there must be a more elegant way to achieve this. Basically I create a second SharedFlow that allows me to decouple the ordering:
Copy code
@Test
  fun testFlowOrder2() = runBlocking {
    val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
    val sharedFlow2 = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
    val dispatcher = TestCoroutineDispatcher().apply { pauseDispatcher() }
    val scope = CoroutineScope(dispatcher)

    val results = mutableListOf<String>()

    sharedFlow
      .onEach {
        scope.launch {
          sharedFlow2.tryEmit(it)
          delay(200)
          sharedFlow2.tryEmit("after $it")
        }
      }
      .launchIn(scope)

    sharedFlow.tryEmit("One") shouldBe true
    sharedFlow.tryEmit("Two") shouldBe true
    sharedFlow.tryEmit("Three") shouldBe true

    sharedFlow2
      .onEach { s -> results += s }
      .launchIn(scope)

    dispatcher.advanceUntilIdle()

    // Now this passes
    results shouldBe listOf(
      "One",
      "Two",
      "Three",
      "after One",
      "after Two",
      "after Three",
    )
  }
d

Dominaezzz

01/06/2021, 7:27 PM
You need a shared flow, with two transformations and a merge. I'll post an example when I get to my laptop.
Copy code
@Test
  fun testFlowOrder() = runBlocking {
    val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
    val dispatcher = TestCoroutineDispatcher().apply { pauseDispatcher() }
    val scope = CoroutineScope(dispatcher)
    val results = mutableListOf<String>()
    val flow2 = merge(sharedFlow, sharedFlow.transform {
      delay(200)
      emit("after $it")
    })
    sharedFlow.tryEmit("One") shouldBe true
    sharedFlow.tryEmit("Two") shouldBe true
    sharedFlow.tryEmit("Three") shouldBe true
    flow2
      .onEach { s -> results += s }
      .launchIn(scope)
    dispatcher.advanceUntilIdle()
    // We get this result:
    results shouldBe listOf(
      "One",
      "after One",
      "Two",
      "after Two",
      "Three",
      "after Three",
    )
  }
Ignore the assertion, the important bit is in the
merge
.
j

jeff

01/06/2021, 7:52 PM
Neat, thanks! Appreciate it
2 Views