kioba
12/20/2019, 5:50 PMFunctor instance Queue
ticket. I know this should be a fairly simple task sorry if I ask something obbvious :S ,
I have no previous experience with the Queue
solution so I looked into the mentioned ZIO Queue
(to be precise the trait ZQueue[-RA, +EA, -RB, +EB, -A, +B]
) implementation.
If I understand correctly we currently dont have a ZQueue
equivalent implementation in :arrow:, our queue is representing a general zio.internal.MutableConcurrentQueue
.
To be able to implement the Functor
we have to make sure the Queue captures the previous queue reference and the f function to be able to deplete the original queue.
TL;DR: we have to create a ZQueue
equivalent in :arrow: and implement Functor
on it instead of Queue.
I believe it is a slightly more work than a simple typeclass. I would be super happy to implement it, but before that I just want to make sure I am on the right path here 🙂 @simon.vergauwenImran/Malic
12/20/2019, 6:13 PMclass ForOp private constructor() {
companion object
}
typealias OpOf<A, X> = arrow.Kind2<ForOp, A, X>
typealias OpPartialOf<A> = arrow.Kind<ForEither, A>
@Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE")
inline fun <A, X> OpOf<A, X>.fix(): Op<A, X> =
this as Op<A, X>
@higherkind
data class Op<A, X>(val f: (X) -> A) {
fun <B> compose(other: Op<B, A>): Op<B, X> =
Op(other.f compose f)
fun <B> compose(g: (A) -> B): Op<B, X> =
compose(Op(g))
}
inline fun <A, X> Op<A, X>.k(): Kind2<ForOp, A, X> = TODO() // does that make sense?
now I want to define:
interface OpContravariant<C> : Contravariant<Conested<ForOp, C>>
How does that look like. I tried it, but I could not find something tangible.
cc @AriostonjJannis
12/22/2019, 10:48 PMStateT<F, S, A>
is defined with Kind<F, (S) -> Kind<F, Tuple2<S, A>>>
? The double Kind<F, *>
are weird. I am currently rewriting some tests for higher coverage and noticed a bunch of failures with monad-applicative-consistency (monad derived ap doing different things than normal ap) and StateT
is the last one that still fails.Attila Domokos
12/24/2019, 4:52 PM> Task :arrow-free-data:test FAILED
with latest commit.
Using Mac OS 10.14.6, using Java 8.0.232.j9-adpt
.
I tried to look at the build server, but couldn't find the latest build.
Is this expected? If not, what could I do to make this test pass?Imran/Malic
12/31/2019, 9:49 AMJannis
01/09/2020, 6:32 PMImran/Malic
01/10/2020, 11:28 AMImran/Malic
01/10/2020, 6:03 PMmtl-data
they’re fine locally, but not in CI. https://github.com/arrow-kt/arrow/pull/1921/checks?check_run_id=383792628
Or am I doing something wrong.pakoito
01/14/2020, 11:51 PMcalvellido
01/15/2020, 11:05 AM./gradlew dokka
Gives me:
> Task :arrow-core:kaptKotlin FAILED
e: [kapt] An exception occurred: java.lang.StackOverflowError
at arrow.meta.encoder.jvm.KotlinPoetEncoder$DefaultImpls.toMeta(KotlinPoetEncoder.kt:88)
(...)
Tried several things already, like increasing the Java heap size, starting from a fresh repo, and even removing all of ~/.gradle/
directory with no luck 😢, and I'm not able to overcome that yet. Has anybody faced something like this recently? Might this be related to this issue: https://github.com/arrow-kt/arrow/issues/1927?Rachel
01/21/2020, 10:26 AMthanerian
01/21/2020, 10:35 AMxxx
, WDYT? there is a technical issue about structure in that way?Jannis
01/21/2020, 10:41 AMPhBastiani
01/27/2020, 10:53 AM2020-01-27T10:24:35.4063911Z arrow.fx.IOTest > IORacePair should be stack safe FAILED
2020-01-27T10:24:35.4064309Z java.lang.StackOverflowError
2020-01-27T10:24:35.4064546Z at <http://arrow.fx.IO|arrow.fx.IO>$Companion.racePair(IO.kt:53)
2020-01-27T10:24:35.4727789Z arrow.fx.IOTest > IORaceTriple should be stack safe FAILED
2020-01-27T10:24:35.4727909Z java.lang.StackOverflowError
2020-01-27T10:24:35.4728010Z at <http://arrow.fx.IO|arrow.fx.IO>$Companion.raceTriple(IO.kt:53)
...
PhBastiani
01/30/2020, 12:22 PMHiosdra
01/30/2020, 7:01 PMtoIO()
(mentioned today in #arrow), I saw that attempt
is like:
fun attempt(): IO<E, Either<Throwable, A>>
While in Scala Cats there is:
val x: IO[Either[Int, String]] = IO[Either[Int, String]] { Left(1) }
val y: IO[Either[Throwable, Either[Int, String]]] = x.attempt
def attempt: IO[Either[Throwable, A]]
Won’t be more intuitive to type it like this?
Also in actual form I am unable to call IO.attempt().unsafeRunSync()
, because unsafeRunSync
is like:
fun <A> IOOf<Nothing, A>.unsafeRunSync(): A
Arkadii Ivanov
02/03/2020, 11:05 AMArkadii Ivanov
02/07/2020, 10:40 AMracePair
implementation for Maybe
. I implemented CoroutineContext
->`Scheduler` interop properly and now race conditions affect tests more often. Already spend ~8h of debugging, ended up with a different implementation that works.
So what is going on (will explain in terms of the existing RxJava2
implementation and the racePairCanJoinLeft
test):
override fun <A, B> CoroutineContext.racePair(fa: MaybeKOf<A>, fb: MaybeKOf<B>): MaybeK<RacePair<ForMaybeK, A, B>> =
asScheduler().let { scheduler ->
Maybe.create<RacePair<ForMaybeK, A, B>> { emitter ->
val sa = ReplaySubject.create<A>()
val sb = ReplaySubject.create<B>()
val dda = fa.value()
/*
* 1:
* We subscribe to "fa".
* This has "subscribeOn" operator somewhere in the upstream.
* So the subscription is asynchronous.
*/
.subscribe(sa::onNext, sa::onError)
val ddb = fb.value().subscribe(sb::onNext, sb::onError)
emitter.setCancellable {
/*
* 3:
* This is called after emission in Step 2 is done.
* Here is the race condition.
* If this is called BEFORE Step 1 is actually subscribed
* and value is delivered to "sa", then test fails with timeout.
*/
dda.dispose(); ddb.dispose()
}
val ffa = Fiber(sa.firstElement().k(), MaybeK { dda.dispose() })
val ffb = Fiber(sb.firstElement().k(), MaybeK { ddb.dispose() })
sa.subscribe({
emitter.onSuccess(RacePair.First(it, ffb))
}, { e -> emitter.tryOnError(e) }, emitter::onComplete)
sb.subscribe({
/*
* 2:
* We receive a value here and emit it to downstream.
* This eventually disposes this Maybe after emission is done.
*/
emitter.onSuccess(RacePair.Second(ffa, it))
}, { e -> emitter.tryOnError(e) }, emitter::onComplete)
}.subscribeOn(scheduler).observeOn(Schedulers.trampoline()).k()
}
So I ended up with the following implementation:
override fun <A, B> CoroutineContext.racePair(fa: Kind<ForMaybeK, A>, fb: Kind<ForMaybeK, B>): Kind<ForMaybeK, RacePair<ForMaybeK, A, B>> {
val scheduler = asScheduler()
val coa: ConnectableObservable<A> = fa.value().subscribeOn(scheduler).asObservable().replay(bufferSize = 1)
val cob: ConnectableObservable<B> = fb.value().subscribeOn(scheduler).asObservable().replay(bufferSize = 1)
lateinit var da: RxDisposable
coa.connect { da = it }
lateinit var db: RxDisposable
cob.connect { db = it }
val ma = coa.firstOrComplete()
val mb = cob.firstOrComplete()
return merge(
ma.map { RacePairResult.First(it) },
mb.map { RacePairResult.Second(it) }
)
.firstOrComplete()
.map {
when (it) {
is RacePairResult.First -> RacePair.First(winner = it.value, fiberB = Fiber(join = mb.k(), cancel = MaybeK(db::dispose)))
is RacePairResult.Second -> RacePair.Second(fiberA = Fiber(join = ma.k(), cancel = MaybeK(da::dispose)), winner = it.value)
}
}
.k()
}
private sealed class RacePairResult<out A, out B> {
class First<A>(val value: A) : RacePairResult<A, Nothing>()
class Second<B>(val value: B) : RacePairResult<Nothing, B>()
}
pakoito
02/07/2020, 11:06 AMArkadii Ivanov
02/07/2020, 4:40 PMmaybek.kt
we have the following function:
override fun <A, B> CoroutineContext.racePair(fa: MaybeKOf<A>, fb: MaybeKOf<B>): MaybeK<RacePair<ForMaybeK, A, B>> =
asScheduler().let { scheduler ->
Maybe.create<RacePair<ForMaybeK, A, B>> { emitter ->
val sa = ReplaySubject.create<A>()
val sb = ReplaySubject.create<B>()
val dda = fa.value().subscribe(sa::onNext, sa::onError)
val ddb = fb.value().subscribe(sb::onNext, sb::onError)
emitter.setCancellable { dda.dispose(); ddb.dispose() } // <-- This is the line in question
val ffa = Fiber(sa.firstElement().k(), MaybeK { dda.dispose() })
val ffb = Fiber(sb.firstElement().k(), MaybeK { ddb.dispose() })
sa.subscribe({
emitter.onSuccess(RacePair.First(it, ffb))
}, { e -> emitter.tryOnError(e) }, emitter::onComplete)
sb.subscribe({
emitter.onSuccess(RacePair.Second(ffa, it))
}, { e -> emitter.tryOnError(e) }, emitter::onComplete)
}.subscribeOn(scheduler).observeOn(Schedulers.trampoline()).k()
}
I pointed the line that I would like to discuss. I don't think this is correct to dispose BOTH subscriptions when the main stream is succeeded. If a Winner
produced a value then the main Maybe
completes and this callback gets executed. So the Loser
terminates as well. In my Reaktive
implementation I fixed this but now racePairCanCancelsLoser
fails since:
val loser = s.release().bracket(use = { never<String>() }, release = { p.complete(i) })
The Loser
never completes because no one cancels it and it is never
completes by itself.
Such a change fixes the test and looks. What do you think?addamsson
02/12/2020, 12:17 PMRachel
02/12/2020, 1:53 PMRachel
02/14/2020, 5:48 PMRachel
02/14/2020, 5:48 PMsimon.vergauwen
02/18/2020, 1:24 PMQueue
which was based on ZIO
to how MVar
is build in Arrow and Cats-effects which uses lower level constructs such as atomic
with tailrec
instead of Ref
.
You can find the PR here, https://github.com/arrow-kt/arrow/pull/2081, there is some impact on the API. Since we can offer a more powerful API but 2 use-cases are removed. Dropping and bounded queue with capacity 0.
Looking forward to your feedback on the PR 🙂Rachel
02/19/2020, 8:08 PMRachel
02/20/2020, 2:30 PMAdrianRaFo
02/20/2020, 3:45 PMAdrianRaFo
02/20/2020, 3:45 PMArkadii Ivanov
02/20/2020, 5:48 PMarrow-fx
repo. I can't add the Reaktive dependencies because of the following error:
A problem occurred configuring project ':arrow-fx-reaktive'.
> Could not resolve all files for configuration ':arrow-fx-reaktive:runtimeClasspath'.
> Could not resolve com.badoo.reaktive:reaktive-jvm:"1.1.10".
Required by:
project :arrow-fx-reaktive
> Could not resolve com.badoo.reaktive:reaktive-jvm:"1.1.10".
> Could not get resource '<https://oss.jfrog.org/artifactory/oss-snapshot-local/com/badoo/reaktive/reaktive-jvm/%221.1.10%22/reaktive-jvm-%221.1.10%22.pom>'.
> Could not GET '<https://oss.jfrog.org/artifactory/oss-snapshot-local/com/badoo/reaktive/reaktive-jvm/%221.1.10%22/reaktive-jvm-%221.1.10%22.pom>'. Received status code 409 from server:
In the old repo it is resolved fine.
Changing order of the declared maven repositories does not help. The following trick does not help as well:
maven {
url "<https://oss.jfrog.org/artifactory/oss-snapshot-local/>"
content {
excludeGroup "com.badoo.reaktive"
}
}
Do someone have any suggestions?Arkadii Ivanov
02/20/2020, 5:48 PMarrow-fx
repo. I can't add the Reaktive dependencies because of the following error:
A problem occurred configuring project ':arrow-fx-reaktive'.
> Could not resolve all files for configuration ':arrow-fx-reaktive:runtimeClasspath'.
> Could not resolve com.badoo.reaktive:reaktive-jvm:"1.1.10".
Required by:
project :arrow-fx-reaktive
> Could not resolve com.badoo.reaktive:reaktive-jvm:"1.1.10".
> Could not get resource '<https://oss.jfrog.org/artifactory/oss-snapshot-local/com/badoo/reaktive/reaktive-jvm/%221.1.10%22/reaktive-jvm-%221.1.10%22.pom>'.
> Could not GET '<https://oss.jfrog.org/artifactory/oss-snapshot-local/com/badoo/reaktive/reaktive-jvm/%221.1.10%22/reaktive-jvm-%221.1.10%22.pom>'. Received status code 409 from server:
In the old repo it is resolved fine.
Changing order of the declared maven repositories does not help. The following trick does not help as well:
maven {
url "<https://oss.jfrog.org/artifactory/oss-snapshot-local/>"
content {
excludeGroup "com.badoo.reaktive"
}
}
Do someone have any suggestions?Rachel
02/20/2020, 6:06 PMarrow-fx-reaktive
so a repositories
section should be added in the arrow-fx-reaktive/build.gradle
arrow-benchmarks-fx
repositories
section with that repository:Arkadii Ivanov
02/20/2020, 6:08 PMRachel
02/20/2020, 6:09 PMArkadii Ivanov
02/20/2020, 6:10 PM<https://oss.jfrog.org/artifactory/oss-snapshot-local/>
repositoryRachel
02/20/2020, 6:11 PMArkadii Ivanov
02/20/2020, 6:12 PMRachel
02/20/2020, 6:12 PMArkadii Ivanov
02/20/2020, 6:15 PMRachel
02/20/2020, 6:15 PMCould not resolve com.badoo.reaktive:reaktive-jvm:"1.1.10"
Could not resolve com.badoo.reaktive:reaktive-jvm:1.1.10
...com/badoo/reaktive/reaktive-jvm/%221.1.10%22/reaktive-jvm-%221.1.10%22.pom'
REAKTIVE_VERSION = "1.1.10"
REAKTIVE_VERSION=1.1.10
Arkadii Ivanov
02/20/2020, 6:29 PMRachel
02/20/2020, 6:29 PMArkadii Ivanov
02/20/2020, 6:29 PMRachel
02/20/2020, 6:29 PM