kushalp
03/31/2020, 10:12 AMList<Either<A,B>>
into Either<List<A>, List<B>>
?aballano
03/31/2020, 10:13 AMaballano
03/31/2020, 10:14 AMaballano
03/31/2020, 10:14 AMkushalp
03/31/2020, 10:19 AMA
(if any) and return?kushalp
03/31/2020, 10:25 AMkushalp
03/31/2020, 10:30 AMfun stream() =
IO.fx {
val (records) = IO {
producer.beginTransaction()
consumer.poll(Duration.ofSeconds(1))
}
val result: List<Either<Exception, Future<RecordMetadata>>> = records.map { record ->
someNullableOperation().toOption().fold(
{
Exception("I want to blow up here!").left()
},
{
publishMessage(record).right()
}
)
}
result
}.unsafeRunAsync { ioResult ->
ioResult.fold(
{ e ->
producer.abortTransaction()
logger.error(e) { "Unhandled exception in stream: $e" }
throw e
},
{ result: List<Either<Exception, Future<RecordMetadata>>> ->
producer.commitTransaction()
consumer.commitSync()
if (result.count() > 0) <http://logger.info|logger.info> { "Published ${result.count()} messages" }
}
)
}
kushalp
03/31/2020, 10:30 AMsimon.vergauwen
03/31/2020, 10:53 AMIO
instead of Either
mixed with Future
inside List
.
I’d need to write a better Future
-> IO
operator tho. Give me a sec 🙂simon.vergauwen
03/31/2020, 10:58 AMFuture
the type you get back from publishMessage
? The Future
API doens’t offer very safe ways to listen to it.kushalp
03/31/2020, 10:59 AMFuture<RecordMetadata>
kushalp
03/31/2020, 11:02 AM.toOption
instead and then .fold
over it?kushalp
03/31/2020, 11:06 AMkushalp
03/31/2020, 11:06 AM!records
?aballano
03/31/2020, 11:18 AM!
is the same as destructuring an IO in an Fx block or calling bind()
aballano
03/31/2020, 11:18 AMkushalp
03/31/2020, 11:18 AMval (name) = IO { ... }
?aballano
03/31/2020, 11:21 AMval (_) = IO { ... }
<- this is not executedkushalp
03/31/2020, 11:25 AMkushalp
03/31/2020, 11:25 AMkushalp
03/31/2020, 11:26 AMkushalp
03/31/2020, 11:26 AMkushalp
03/31/2020, 11:27 AMkushalp
03/31/2020, 11:29 AMConsumerRecords
is not a List<T>
. It's a boxed object, that has the following: iterator
, map
.pakoito
03/31/2020, 11:33 AMunsafeRunAsync
inside a function that's not the main entry point, I believe there should be another way of achieving thispakoito
03/31/2020, 11:34 AMparMap
or parTraverse
is what you're looking forpakoito
03/31/2020, 11:34 AMfork()
kushalp
03/31/2020, 11:35 AMstream()
in a loop on the main entry pointkushalp
03/31/2020, 11:35 AMpakoito
03/31/2020, 11:36 AMunsafeRunAsync
can be done in either a bracketCase,
guarantee
or onError
block instead. That way it'll still be an IO
effectkushalp
03/31/2020, 11:48 AMbacketCase
work with the Kafka consumer/producer libraries? Or do I have to implement that?pakoito
03/31/2020, 11:52 AMguaranteeCase
or bracketCase
just need to close the transaction you've opened beforepakoito
03/31/2020, 11:52 AMpakoito
03/31/2020, 11:53 AMIO { beginTransaction() }
.bracketCase({ case -> if (isSuccess(case)) endTransaction() else rollbackTransaction() }) { value ->
IO.fx { thingsInTransaction() }
}
kushalp
03/31/2020, 1:15 PM