kushalp
03/31/2020, 10:12 AMList<Either<A,B>> into Either<List<A>, List<B>> ?aballano
03/31/2020, 10:13 AMaballano
03/31/2020, 10:14 AMaballano
03/31/2020, 10:14 AMkushalp
03/31/2020, 10:19 AMA (if any) and return?kushalp
03/31/2020, 10:25 AMkushalp
03/31/2020, 10:30 AMfun stream() =
IO.fx {
val (records) = IO {
producer.beginTransaction()
consumer.poll(Duration.ofSeconds(1))
}
val result: List<Either<Exception, Future<RecordMetadata>>> = records.map { record ->
someNullableOperation().toOption().fold(
{
Exception("I want to blow up here!").left()
},
{
publishMessage(record).right()
}
)
}
result
}.unsafeRunAsync { ioResult ->
ioResult.fold(
{ e ->
producer.abortTransaction()
logger.error(e) { "Unhandled exception in stream: $e" }
throw e
},
{ result: List<Either<Exception, Future<RecordMetadata>>> ->
producer.commitTransaction()
consumer.commitSync()
if (result.count() > 0) <http://logger.info|logger.info> { "Published ${result.count()} messages" }
}
)
}kushalp
03/31/2020, 10:30 AMsimon.vergauwen
03/31/2020, 10:53 AMIO instead of Either mixed with Future inside List.
I’d need to write a better Future -> IO operator tho. Give me a sec 🙂simon.vergauwen
03/31/2020, 10:58 AMFuture the type you get back from publishMessage? The Future API doens’t offer very safe ways to listen to it.kushalp
03/31/2020, 10:59 AMFuture<RecordMetadata>kushalp
03/31/2020, 11:02 AM.toOption instead and then .fold over it?kushalp
03/31/2020, 11:06 AMkushalp
03/31/2020, 11:06 AM!records ?aballano
03/31/2020, 11:18 AM! is the same as destructuring an IO in an Fx block or calling bind()aballano
03/31/2020, 11:18 AMkushalp
03/31/2020, 11:18 AMval (name) = IO { ... } ?aballano
03/31/2020, 11:21 AMval (_) = IO { ... } <- this is not executedkushalp
03/31/2020, 11:25 AMkushalp
03/31/2020, 11:25 AMkushalp
03/31/2020, 11:26 AMkushalp
03/31/2020, 11:26 AMkushalp
03/31/2020, 11:27 AMkushalp
03/31/2020, 11:29 AMConsumerRecords is not a List<T>. It's a boxed object, that has the following: iterator, map.pakoito
03/31/2020, 11:33 AMunsafeRunAsync inside a function that's not the main entry point, I believe there should be another way of achieving thispakoito
03/31/2020, 11:34 AMparMap or parTraverse is what you're looking forpakoito
03/31/2020, 11:34 AMfork()kushalp
03/31/2020, 11:35 AMstream() in a loop on the main entry pointkushalp
03/31/2020, 11:35 AMpakoito
03/31/2020, 11:36 AMunsafeRunAsync can be done in either a bracketCase, guarantee or onError block instead. That way it'll still be an IO effectkushalp
03/31/2020, 11:48 AMbacketCase work with the Kafka consumer/producer libraries? Or do I have to implement that?pakoito
03/31/2020, 11:52 AMguaranteeCase or bracketCase just need to close the transaction you've opened beforepakoito
03/31/2020, 11:52 AMpakoito
03/31/2020, 11:53 AMIO { beginTransaction() }
.bracketCase({ case -> if (isSuccess(case)) endTransaction() else rollbackTransaction() }) { value ->
IO.fx { thingsInTransaction() }
}kushalp
03/31/2020, 1:15 PM