Hi, How to we deal with coroutines' `Flow` and Arr...
# arrow
j
Hi, How to we deal with coroutines'
Flow
and Arrow's
IO
? Here is an example of what I'd like to achieve. But it doesn't compilbe because
IO.fx
restrict suspension:
Copy code
val flow: Flow<Int> =
    flowOf(1, 2, 3)

fun putStrLn(value: Any?): IO<Unit> =
    IO { println(value) }

val program = IO.fx {
  flow.collect {
    !putStrLn(it)
  }
}

fun main() {
  program.unsafeRunSync()
}
s
Hey @Jonathan, You an use
putStrLn(it).suspended()
instead of
!
if you’re in a
suspended
context
There is also a KotlinX integration module, currently in
0.10.5-SNAPSHOT
where we expose operations such as
suspendCancelable
and
unsafeRunScoped
to integrate with structured concurrency
j
Thanks @simon.vergauwen!
s
You’re welcome 👍
a
@Jonathan what simon said 😄 + also here is a sneak preview of that module and how it works. Please let us know if you have any questions or anything is not clear so we can improve 💪 https://next.arrow-kt.io/docs/integrations/kotlinxcoroutines/
🔝 1
j
Well, After playing around with
IO
, I have a hard time to understand its benefits. To me it looks like
IO<T>
is equivalent to
suspend () -> T
. So why wouldn't I declare
suspend
functions instead of using
IO
?
a
instead they are equivalent
but there are some differences on how you do certain stuff, basically IO is quite powerful and let’s you do error handling, racing, parallel ops and cancellation with almost no effort
IO and this module allows you to gradually introduce Arrow in your project, allowing you to handle errors or do more powerful things than with plain coroutines
for example, imagine you have a suspend function
Copy code
suspend fun openFile(): File =
    File("./some/path")
with coroutines, you would need to surround either that function or the calling site with a try-catch to avoid exceptions
while with arrow and IO you can simply
.handleError {...}
or for example, imagine you want to read the lines of the file and do some op on each of them… since that job is parallelizable you could use IO for that
Copy code
!lines.parTraverse { IO { it.transformLine) } }
that will create an IO of the transformation of each line and then execute them all in parallel, returning you a list with all lines transformed in the same order as in the beginning
j
Thanks very much @aballano for all theses explanations. It is however still not fully clear. For error handling, I do find
Either
helpful, but I don't see how
IO
helps. It anyway represents error with
Throwable
, forcing me to use
IO<Either<L, R>>
in order to get proper error handling. And I can do the same with suspending function that return
Either
.
Copy code
// Let's assume this function being in a third party lib.
// So I cannot change it's signature or implementation
fun boom(): Int = throw Exception("boom")

// how this IO oriented code:
val foo = IO { boom() }

// whould be supperior that:
suspend fun foo() = boom()

suspend fun main() {
  foo() // <-- I agree, it throws. Which is not good.
  foo.suspended() // <-- But this trows as well.
  foo.unsafeRunSync() // <-- And that throws too.

  // Yes, that one doesn't throw:
  foo.runAsync { either ->
    // But I only get an exception. So it is still not that helpful.
    // And I will have to have at least one unsafe call at my entry point.
    // So how `IO` is preventing it from crashing, more than what suspend function and Either?
    IO.unit
  }
}
And sorry, but I don't undersrand how
val openFile: IO<File>
would be superior to
suspend fun openFile(): File
Could you write a complete example?
Copy code
// how this IO oriented code:
val foo = IO {
  Either.catch { boom() }
}

// would be superior that:
suspend fun foo() = Either.catch { boom() }
a
for IO<Either<A, B>> we will have IO<E, A> directly, which will allow you to encode domain errors directly
👍 1
❤️ 1
j
That will be nice indeed.
But until then?
a
until then the api to work with IO<Either…> is not that nice, but it works if you want to explicitly encode domain errors
j
Actually.... That doesn't change much the question. Why would
IO<E, A>
be prefereable to
suspend () -> Either<E, A>
?
a
that always depends on what you’re trying to do, if it’s simple you can use suspend, but if you want cancellation, error handling, etc it will be easier with IO
j
But
suspend
function already support cancellation. And
Either
already suport typed error handling.
a
suspend supports cancellation if you take care of it manually or use supported operators, no?
with IO, that doesn’t matter because any IO can be made cancellable
j
Yes indeed. Isn't that the same for
IO
?
"can be made cancellable", so you have to make it cancellable....
how is that different?
a
when running, you need to be explicit
j
Can you show me an example?
a
sure
if you take a closer look at those, there are few differences • arrow code doesn’t need to call
yield()
or to check cancellation in any way • runBlocking == fx + unsafeRunXXX (because of lazyness) • launch inside runBlocking or others == IO.fork()
j
Ok. Indeed, that is fancy.
thanks very much for the example.
a
for error handling, consider this case: open a file read the lines transform some lines print result with coroutines:
Copy code
runBlocking {
  openFile()
      .read()
      .transformLines(Transformations.RemoveEveryXLetter(1))
      .forEach(::println)
}
IO + fx
Copy code
IO.fx {
    val file = !IO { openFile() }
    val lines = !IO { file.read() }
    val transformedLines = !IO { lines.map { it.transformLine(Transformations.RemoveEveryXLetter(1)) } }
    !IO { transformedLines.forEach(::println) }
}.unsafeRunXXX
so not big difference, actually the IO is more verbose
now there are 2 things that can be improved here: • transformLines can throw, which is not considered in any case • line transformation can actually be done in parallel because all ops/lines are independant
when you add error handling to the coroutines code you end up with something like
Copy code
val lines = openFile().read()
val transformedLines = try {
    lines.map { it.transformLine(Transformations.RemoveEveryXLetter(1)) }
} catch (t: Throwable) {
    emptyList<String>()
}
transformedLines.forEach(::println)
or something equivalent
but for IO, is as simple as adding this to the transformation part
Copy code
.handleError { emptyList() }
so
Copy code
IO.fx {
    val file = !IO { openFile() }
    val lines = !IO { file.read() }
    val transformedLines = !IO { lines.map { it.transformLine(Transformations.RemoveEveryXLetter(1)) } }.handleError { emptyList() }
    !IO { transformedLines.forEach(::println) }
}.unsafeRunXXX
now, for the second part, not sure how that parallelization would look like in coroutines, but for IO is reaaaaally as simple as:
Copy code
val transformedLines = !lines.parTraverse { IO { it.transformLine(Transformations.RemoveEveryXLetter(1)) } }
and done, each line in the file will be paralelized and treated independantly 🤷
that parTraverse = map to transform + run each element in parallel
sorry for the long post @Jonathan, I hope I solved some doubts 😄
j
No, at the contrary, thank you very much @aballano for clarifying. It is helpful.
🙌 1
I think the error handling example finally make sense too.
But I am looking forward
IO<E, A>
!
Does Arrow have special construct for
Flow
or equivalent ? I mean some kind equivalent of an
IO
but that can emit multiple results instead of one.
what is the type of
lines
in your error handling example?
a
lines is
List<String>
and we don’t have yet an equivalent for Flow in arrow, that would be a Stream
we will have it this year tho, after the release and focus for 0.11 we will be working on that one
j
Wouldn't it be better to work on integration of
Flow
with
IO
rather than creating an equivalent of
Flow
?
a
we will for sure investigate how Flow works before getting started and we’re likely to provide integrations between Flow and the future Arrow Stream… but Stream won’t be the same as Flow in the way IO is not the same as suspend
👍 1
there are some key differences like those explained above, lazyness, etc
j
Ok. thanks very much for all your explanations.
a
no problem! 🙂