Now I'm trying to figure out if I could first inte...
# opensavvy
d
Now I'm trying to figure out if I could first integrate ProgressiveState in my image generation classes, is there any builder for it like
out { }
? I'd maybe make it into a flow... I know I could probably use the cache's progress, but that's less finely grained (unless I couple caching into my generator...).
Copy code
progressive { }
... I couldn't find it in the kdocs... but thanks to intellij's ctrl-click, I found it...
c
d
Oh... in the Arrow docs
And how do I report progress there?
there's no loading(...) in the dsl
c
What are you trying to do exactly?
d
Trying to track progress of the image generation
c
Is the image generation a
suspend fun
?
d
yes Also, I'm wondering if I give a ProgressiveOutcome to the cache, it'll use it to track the progress of the item being generated...
c
Then you can just do this. The
cache {}
builder will automatically capture calls to
report
and transform them into a
Flow
Basically your pipeline should look like:
Copy code
private val imageCache = cache {
    // generate your image,
    // use 'report' to report progress
}.cachedInMemory(…)
    .expiresAfter(…)

fun getImage(id: String): ProgressiveFlow<Image> = imageCache[id]
    .map {
        if (it is ProgressiveOutcome.Loading) {
            yourDefaultImage()
        } else it
    }
d
I'm a bit afraid of the "magic" of saving thing in the coroutine context... there's no indication in the function's declaration that there's progress being managed, nor for what it's being managed...
Also, the context might change when using another withContext or a new coroutine and mess up the whole thing, no?
c
there's no indication in the function's declaration that there's progress being managed
Yes, that's the goal. If you don't register something that wants to know about progress events, they're just ignored. This way, you can insert progress events anywhere and you know it won't break existing code.
Also, the context might change when using another withContext or a new coroutine and mess up the whole thing, no?
Worst case, you lose progress events. But the actual data (the result of the operation) is safe.
d
So there's no real way to do this more explicitly? And what about the
Outcome
? So I'd just use a simple
Outcome
instead of a
ProgressiveOutcome
? Would the cache track that too?
c
So there's no real way to do this more explicitly?
You can pass
ProgressReporter
around (as a receiver or context parameter) if you prefer.
d
Isn't there
report(...)
inside the
progressive { }
block?
c
The
progressive {}
block is very new, it's not stable yet
if you have feedback on it, don't hesitate
I'm not sure what you're trying to do, but since you're interacting with the cache, you should just use the
cache {}
builder and you won't need to create progressive values yourself
d
I think having a
report(...)
on it would be useful, since the current implementation requires to call a function that returns an Outcome or ProgressiveOutcome, and passes a Progress in the bind function... sometimes part of the progress is in the function with the progressive { } block itself.| I have multiple stages to the generation of images, and any one of those steps can fail, that's why I'm trying not to do it straight in the cache block. Progress isn't as critical here, but while I'm at it (I do need loading failed or done at the very least), why not track that too for debugging and performance.
c
Can you give me an example of the actual code you have?
d
(started trying to integrate Progress):
Copy code
class ImageFormatProviderImpl(
    private val targetBucket: String,
    private val formattedFolder: String,
    private val formatGenerator: FormatGenerator,
    private val formatStorage: ImageFormatStorage,
) : ImageFormatProvider {

    private fun String.toName(imageFormatting: ImageFormatting) =
        "$formattedFolder/${imageFormatting.folderName}/$this.png"

    @OptIn(ExperimentalProgressiveRaiseApi::class)
    override suspend fun provideFormats(
        source: OriginalSource,
        formats: Set<ImageFormatRequest>
    ): ProgressiveOutcome<Nothing, GeneratedImageFormats> {
        return progressive {
            val original = formatStorage.getOriginal(source)
            val generated = formatGenerator.generateFormat(original, formats)
            val originalContentSha1 = "ext/" + (original.toByteString().sha1().hex())

            generated.associate {
                val newKey = originalContentSha1.toName(it.first.imageFormatting)

                formatStorage.putGenerated(targetBucket, newKey, it.second)

                it.first.name to GeneratedImageFormat(targetBucket, newKey, it.first)
            }.let(::GeneratedImageFormats)
        }
    }
}
c
This doesn't use the cache, right?
ProgressiveOutcome<Nothing, T>
is the same as
Progressive<T>
, since there are no possible errors
Except using the return type
ProgressiveOutcome
, I don't see anything that comes from Pedestal here 🤔
d
This doesn't use the cache, right?
not yet.
ProgressiveOutcome<Nothing, T>
is the same as...
I didn't yet integrate error handling, I'm trying to figure out progress and caching first, so I started with Nothing to keep the project compiling.
👍 1
Once I get all the pieces together, I'd need to use
raise(...)
in all the places that can cause errors and
bind()
the results of all those functions.
I'm planning to use the cache at a different level. I didn't really want to mix responsibilities.
👍 1
Ok, it seems like
cache { }
doesn't take ProgressiveOutcome, only Outcome...
c
Yes, because it captures calls to
report
itself
d
And asOutcome returns a nullable Outcome...
So I should just use Outcome and the "magical" report?
c
That's what I would do, yes Have all your functions return
Outcome<Error, Success>
and use implicit progress reporting where it makes sense to
Btw with context parameters, you won't need to use
Outcome<Error, Success>
at all, you'll be able to just use
Raise<Error>
! Now:
Copy code
suspend fun foo(id: Int): Outcome<String, Int> = out {
    report(loading(0.0))
    ensure(id > 0) { "Too small" }
    report(loading(0.5))
    id.toString()
}
In the future:
Copy code
context(_: Raise<String>)
suspend fun foo(id: Int): String {
    report(loading(0.0))
    ensure(id > 0) { "Too small" }
    report(loading(0.5))
    return id.toString
}
1
d
I guess it would be nice to have the same for the ProgressReporter... to be able to declare an explicit one in the the context parameters.
I'm not so sure that in it's current state a user won't report a failure or done by accident and that wouldn't really work... since the Outcome won't finish then... I really don't know why report doesn't ONLY take a loading progress.
c
I guess it would be nice to have the same for the ProgressReporter... to be able to declare an explicit one in the the context parameters.
That's what
ProgressReporter
is!
Copy code
context(_: Raise<String>, reporter: ProgressReporter)
suspend fun foo(id: Int): String {
    reporter.report(loading(0.0))
    ensure(id > 0) { "Too small" }
    reporter.report(loading(0.5))
    return id.toString
}
This is possible with just
Copy code
dev.opensavvy.pedestal:progress
Personally, I prefer having it be implicit, which is provided by the bonus module
Copy code
dev.opensavvy.pedestal:progress-coroutines
I really don't know why report doesn't ONLY take a loading progress.
You don't need to
report(done())
basically ever, because all builders will do it for you
d
But if you do, it doesn't really make sense... isn't that a functional programming principle to limit input to only what can be done in the current context? I guess it would complicate the code too much, and that's why you didn't do it though... but if the Dsl would have a report function, then it COULD be limited to only reporting Loading progress...
c
Honestly, I wonder if it makes sense to report
done()
. I haven't really thought of it much.
I'll need to scan my projects and see in which cases I do it.
At the very least, it much be possible in
ProgressReporter
, because the builders need to access that
d
This seems to be wrong for some reason @CLOVIS :
Copy code
cache.get(source)
            .map {
                when (it) {
                    is ProgressiveOutcome.Unsuccessful<*> -> TODO()
                    is ProgressiveOutcome.Success -> it.value.toImageVars(expiry)
                }
            }
            .firstOrNull()
I put in delay in the generation in my fake, and it's still not crashing from the TODO()...
AFAIK Unsuccessful includes the Incomplete state...
And the Cache should be returning that until it gets Outcome.Success...
c
Yeah, it should return Incomplete until the very success. After that, it never returns Incomplete again.
d
It doesnt return incomplete at all..
c
Weird. Which cache layers are you using? IIRC, the only one that touches these values at all is
MemoryCache
: https://gitlab.com/opensavvy/groundwork/pedestal/-/blob/main/cache/src/commonMain/kotlin/MemoryCache.kt?ref_type=heads#L76-L81
d
Even with memory cache...
c
Can you create a small reproducer and create an issue? I may have time to look into it tonight
d
I guess I'll have to get this to work without Pedestal then... I really appreciate all the time you took till now, but I really have to get this task done. Maybe for a future project I'll look back at it, thanks!
Anyways,
firstOrNull()
doesn't help here, I need to get the current result or stop the flow...
Even if this was fixed, I would still only always be getting Incomplete even if the task was finished fast enough for me to wait for it (under .5 seconds, say...)
In Futures, there's .get(timeout: ...)
c
I'm not exactly sure what you're searching for, but since everything is backed by
Flow
, you can use the
.timeout
operator, etc
You can think of the entire cache as a fancy
Flow
operator, everything that works with
Flow
usually should continue to work
d
I need a terminal operator that gets the latest value, maximum up till the timeout value
I'm really thinking that the whole concept of your cache is great for my use case, but I'm struggling with the above issue, and I'm not as comfortable with Flows as I should be... but the main thing is that issue of it not returning the Incomplete state even if I would find the proper way to do this with Flow.
Untitled.kt
Ok, that is how it finally worked @CLOVIS
But I needed the
.onStart { emit(ProgressiveOutcome.Incomplete()) }
otherwise the cache wouldn't return it at all..,.
👍 1
You might want to add that
latestValueWithin
(with another name...) as an alternative to
now()
that waits for the first successful value, here users can wait until a timeout for a successful value, otherwise they get Incomplete and do some kind of default like I did...
c
This seems to be about KotlinX.Coroutines, so I created a feature request there: https://github.com/Kotlin/kotlinx.coroutines/issues/4250
👍🏼 1
And yeah,
.onStart
is a great workaround until I fix this upstream
d
I think this should be on the cacheAdapter, not on the MemoryCache though, since it should also be tracking such progress...
Good point in that issue, but I think the timeout() implementation is a bit overkill for such a simple use case.. that operator allow to continue the Flow, I just needed a terminal operator to do that (a bit like your convenient
now()
function...)
c
now
is just
first { it.progress == done() }
, it does nothing complex (you can see its code)
d
That's exactly the point, it's better than putting that longer form in code using your cache, and conveys the idea that you want the value that's
now()
... but now that I think about it more, it might be better to have
successfulNow()
since
now()
implies whatever I have so far...
Which was truthfully a bit confusing in the beginning for me, and wasn't obvious until I saw the implementation
What I was proposing is
successfulNow(timeout...)
which waits for a successful result until the timeout otherwise return whatever's there.
Is my concurrency problem as simple to solve as this:
Copy code
class ConcurrencyLimiter<I, F, V>(
    val upstream: Cache<I, F, V>,
    val maxRequests: Int,
) : Cache<I, F, V> by upstream {
    val semaphore = Semaphore(maxRequests)
    override fun get(id: I): ProgressiveFlow<F, V> = flow {
        semaphore.withPermit { 
            val result = upstream.get(id) 
            
            emitAll(result)
        }
    }
}

fun <I, F, V> Cache<I, F, V>.withConcurrencyLimit(maxRequests: Int) =
    ConcurrencyLimiter(this, maxRequests)
You might want to add this too @CLOVIS...
c
Can you create an issue or MR for this? It sounds like it could be useful, yes
👍🏼 1