Chris Kruczynski
11/19/2019, 7:41 PMIO
to MonoK
nicely, could anybody help?simon.vergauwen
11/20/2019, 7:44 AMMonoK
and IO
. These should be added to Arrow Fx and its integration libs.
import arrow.core.Left
import arrow.core.Right
import <http://arrow.fx.IO|arrow.fx.IO>
import reactor.core.publisher.Mono
fun <A> IO<A>.toMonoK(): MonoK<A> =
MonoK(Mono.create<A> { sink ->
val dispose = unsafeRunAsyncCancellable { result ->
result.fold(sink::error, sink::success)
}
sink.onCancel { dispose.invoke() }
})
fun <A> MonoK<A>.toIO(): IO<A> =
IO.cancelable { cb ->
val dispose = value().subscribe({ a -> cb(Right(a)) }, { e -> cb(Left(e)) })
IO { dispose.dispose() }
}
Chris Kruczynski
11/20/2019, 9:38 AMsimon.vergauwen
11/20/2019, 10:15 AM