Mark Fisher
11/26/2019, 11:46 AMunsafeRunAsync
. I have a test with 3 IO objects I've created and done an attempt()
on, then I want them to run concurrently. I've tried using unsafeRunAsync expecting them to run ... asynchronously, but they run sequentially. How do I make the async IOs run concurrently?
@Test
fun `io module test`() {
val ioModule = Module(IO.async())
ioModule.run {
val ioS1 = repository.byId(u1).fix().attempt()
val ioS2 = repository.byId(u2).fix().attempt()
val ioS3 = repository.byId(u3).fix().attempt()
<http://logger.info|logger.info> { "ioS1..." }
ioS1.unsafeRunAsync { it.fold(ioError, ioInfo) }
<http://logger.info|logger.info> { "ioS2..." }
ioS2.unsafeRunAsync { it.fold(ioError, ioInfo) }
<http://logger.info|logger.info> { "ioS3..." }
ioS3.unsafeRunAsync { it.fold(ioError, ioInfo) }
}
}
My equivalent for using Observable was to use subscribeOn(...)
as follows:
repository.byId(u1).fix().observable.subscribeOn(<http://Schedulers.io|Schedulers.io>()).subscribe(idLogger, errorLogger)
repository.byId(u2).fix().observable.subscribeOn(Schedulers.computation()).subscribe(idLogger, errorLogger)
repository.byId(u3).fix().observable.subscribeOn(Schedulers.newThread()).subscribe(idLogger, errorLogger)
How can I fix the io module test
to run the 3 IO jobs at the same time?
Thanks!Mark Fisher
11/26/2019, 12:28 PM@Test
fun `io module test`() {
val ioModule = Module(IO.async())
ioModule.run {
val ioS1 = repository.byId(u1).fix().attempt()
val ioS2 = repository.byId(u2).fix().attempt()
val ioS3 = repository.byId(u3).fix().attempt()
val contextIO = IO.dispatchers().io()
IO.parMapN(contextIO, ioS1, ioS2, ioS3) { a, b, c ->
ioInfo(a)
ioInfo(b)
ioInfo(c)
}.unsafeRunSync()
}
}
Mark Fisher
11/26/2019, 12:29 PMprivate val ioInfo: (Either<Throwable, Int>) -> Unit = { result ->
when (result) {
is Either.Right -> <http://logger.info|logger.info> { "got stock: ${result.b}" }
is Either.Left -> logger.error { "error: ${result.a}" }
}
}