Is there an easy way to convert `List<Either&lt...
# arrow
k
Is there an easy way to convert
List<Either<A,B>>
into
Either<List<A>, List<B>>
?
a
sounds like traverse/sequence!
😄 2
you can take a look at this answer here, let me know if that doesn’t solve your issue 🙂 https://kotlinlang.slack.com/archives/C5UPMM0A0/p1584452235351000
not sure how’s defined for Either in the sense of getting a list on the left side as well tho
k
What will it do by default then? Provide the first
A
(if any) and return?
I'm pretty sure I'm doing something wrong. That leads to all kinds of red in IntelliJ 🙈
If it's useful, here's what my code looks like at the moment:
Copy code
fun stream() =
        IO.fx {
            val (records) = IO {
                producer.beginTransaction()
                consumer.poll(Duration.ofSeconds(1))
            }

            val result: List<Either<Exception, Future<RecordMetadata>>> = records.map { record ->
                someNullableOperation().toOption().fold(
                    {
                        Exception("I want to blow up here!").left()
                    },
                    {
                        publishMessage(record).right()
                    }
                )
            }

            result
        }.unsafeRunAsync { ioResult ->
            ioResult.fold(
                { e ->
                    producer.abortTransaction()
                    logger.error(e) { "Unhandled exception in stream: $e" }
                    throw e
                },
                { result: List<Either<Exception, Future<RecordMetadata>>> ->
                    producer.commitTransaction()
                    consumer.commitSync()

                    if (result.count() > 0) <http://logger.info|logger.info> { "Published ${result.count()} messages" }
                }
            )
        }
I'm working with Kafka, if that's not clear
s
This is the equivalent of your code but just using
IO
instead of
Either
mixed with
Future
inside
List
. I’d need to write a better
Future
->
IO
operator tho. Give me a sec 🙂
Is
Future
the type you get back from
publishMessage
? The
Future
API doens’t offer very safe ways to listen to it.
k
Future<RecordMetadata>
For the nullable, could I not use
.toOption
instead and then
.fold
over it?
I am more confused looking at this code, if I'm honest
Why are you doing
!records
?
a
the bang operator
!
is the same as destructuring an IO in an Fx block or calling
bind()
is just the way of extracting the value
k
Why do that instead of
val (name) = IO { ... }
?
a
it’s the same, but destructuring has been deprecated because it has some issues if you ignore the value like
val (_) = IO { ... }
<- this is not executed
k
Interesting
☝️ is a simpler take on what I originally provided
If that's useful
I've specified all of the types for different parts of it
ConsumerRecords
is not a
List<T>
. It's a boxed object, that has the following:
iterator
,
map
.
p
you shouldn't need to use
unsafeRunAsync
inside a function that's not the main entry point, I believe there should be another way of achieving this
if you're doing things sequentially but you don't want to block or act on the current execution, then
parMap
or
parTraverse
is what you're looking for
you also have the lower-level, more mechanical
fork()
k
I am calling
stream()
in a loop on the main entry point
It's all that this service will do
p
yes. I believe some of the behavior you do in
unsafeRunAsync
can be done in either a
bracketCase,
guarantee
or
onError
block instead. That way it'll still be an
IO
effect
k
does
backetCase
work with the Kafka consumer/producer libraries? Or do I have to implement that?
p
for transactional stuff I believe
guaranteeCase
or
bracketCase
just need to close the transaction you've opened before
so, pseudocode
Copy code
IO { beginTransaction() }
  .bracketCase({ case -> if (isSuccess(case)) endTransaction() else rollbackTransaction() }) { value ->
    IO.fx { thingsInTransaction() }
  }
k
Just wanted to say this was all really useful, and I think I've made some good progress. Thanks! ✌️
🙌 1