How to make the Main thread wait until the Flux ha...
# spring
v
How to make the Main thread wait until the Flux has completed all its feed
Copy code
@Test
    fun testFnCall_partyAddrByCleanCd() {
        println("\n\n\n\n ---------------- SEE MEE START  ----------------------")
        val start = System.currentTimeMillis()
        val res = ecmDbSvc.callFnGetPartyAddrByExtCleanCd("ECM-123")
        assert(res != null)
        assertTrue(res.count() == 100)
        val fluxOfPartyAddr = Flux.fromIterable(res)
                .delayElements(Duration.ofMillis(100))
                .parallel(4).runOn(Schedulers.parallel())
                .doOnNext { e -> println(e.addrLine1) }

        // fluxOfPartyAddr.blockLast()  --> Invalid when using parallel
        fluxOfPartyAddr.doOnComplete {
            val end = System.currentTimeMillis()
            println("time talen: " + (end - start) + " milliSeconds")
            println("---------------- SEE MEE START COMPLETE ----------------------")

        }

        fluxOfPartyAddr.subscribe()
        //        ??? anything here to make the thread wait ???

    }
l
Assuming that the method
callFnGetPartyAddrByExtCleanCd
returns
listOf(1,2,3)
the
fluxOfPartyAddr
can be tested changing
fluxOfPartyAddr.subscribe()
to something like:
Copy code
StepVerifier.create(fluxOfPartyAddr)
        .consumeNextWith { assertEquals(1, it) }
        .consumeNextWith { assertEquals(2, it) }
        .consumeNextWith { assertEquals(3, it) }
        .verifyComplete()
v
Thank you, I tried that, but the fluxOfPartAddr is a result set from DB query, there is no telling the count for sure. Plus I just wanted to log the values to System.out so I can verify few things visually
s
I would also suggest to use
StepVerifier
for that use case, but be ware that you can also use
blockLast()
operator.