With the existing operators/constructors, I probab...
# arrow
j
With the existing operators/constructors, I probably could implement a "on/off" switch only, when that switch is evaluated in the function that is passed to
repeat
(think
AtomicBoolean
or something similar). So the Schedule actually continues, but the effect to be run by
repeat
has to evaluate that switch inside it's own logic. What I am looking for is something like an
pause
operator, which completely stops the Schedule until some condition gets true again. Is this semantic possible with the current operators somehow or could that be added in the future ?
s
It's definitely possible, but I don't fully understand your requirements.
The easiest way to pause it so
await
a
CompletableDeferred
. Depending on when you need it to pause you can implement it differently. If you need to pause/resume multiple times it might be more tricky, but you can probably use a
Channel(RENDEZ_VOUS)
instead of a
CompletableDeferred
or both combined.
j
Start a Schedule by repeat, stop it, restart again
s
Stop & restart it on every loop ?
j
no, just once, twice, …. from ‚outside‘
keep its config & called fx, just stop and restart from a web controller e.g.
s
Copy code
val pauser = Channel<Unit>(Channel.RENDEZ_VOUS)

launch {
  delay(10.seconds)
  pauser.send(Unit)
}

Schedule.forever<Unit>().repeat {
  println("Running")
  pauser.receive() 
}
pauser.receive()
will wait until there is a value available through
send(Unit)
, but
send(Unit)
will also suspend until
receive
takes it off the
Channel
.
So if it's from a Ktor endpoint.
Copy code
fun Application.pauser(
  channel: Channel<Unit>
): Route =
  routing {
    post("/pauser") {
      launch { channel.send(Unit) }
      call.respond(ACCEPTED)
    }
  }

Schedule.forever<Unit>().repeat {
  println("Running")
  pauser.receive() 
}
j
yes, that is possible already, but I would like pauser.receive() not be a concern of the fx that's repeated
s
Okay, then you can do, and then compose it with the
Schedule
Copy code
Schedule.invoke({ Unit }) { _,_ ->
   pauser.receive()
}
This function will be called after the user function.
So it will be like manually putting it at the end.
j
interesting! and that receive will suspend all activity in the Schedule until the send is called, right?
s
Yes, but the
Channel
needs to use
RENDEZ_VOUS
strategy. That means that there needs to be an element trade-off, so like a
Queue
with capacity = 0.
j
But I need to replace it by AtomicBoolean instead of channel, because that seems to be better for my purpose
The Schedule should not depend on someone always call send
so what I was looking for was
invoke
I guess 👍
I will look it up in the documentation
is the fn passed to invoke called after each user fn repetition?
don't know if this is what you are suggesting:
Copy code
val switch = AtomicBoolean(true)

@OptIn(ExperimentalTime::class)
fun repeater(): Schedule<Unit, Int> =
    Schedule.spaced<Unit>(1.seconds).compose(Schedule.invoke({}) { input, state ->
        if (switch.get())
            Schedule.Decision.Companion.cont(1.seconds, input, Eval.Always {})
        else
          // What goes here ?  
    })
but I can't find 'What goes here ?'
s
Now I am really confused to what you want to do 🤔 I updated the snippets to what I thought were your requirements
j
Your Ktor snippet is what I intend to do... it's just that I want to keep the decision to continue (i.e.
pauser.receive()
from your snippet) outside, preferably in the Schedule composition. I understand that you propose using
invoke
for that, but I do not understand the Schedule.Decision mechanism I guess.
btw, in my snippet with Atomic, that Atomic would be set to true/false by any webcontroller, like your ktor route does..
...and Schedule.invoke() should evaluate AtomicBoolean and somehow decide to stop (what Decision value would that be?) or to continue (Schedule unchanged)
s
Copy code
val pauser = Channel<Boolean>(Channel.RENDEZ_VOUS)

fun Application.pauser(): Route =
  routing {
    post("/pauser") {
      launch { channel.send(true) }
      call.respond(ACCEPTED)
    }
  }

val pausingSchedule = Schedule.invoke({ Unit }) { _,_ ->
   val shouldContinue = pauser.receive()
   Decision(shouldContinue, 0.seconds, Unit, Eval.now(Unit))
}


val repeater: Schedule<Unit, Int> =
    Schedule.spaced<Unit>(1.seconds) zipLeft pausingSchedule
j
that doesn't typecheck for me
Type mismatch: inferred type is Unit but Schedule.Decision<Unit, ???> was expected
s
I changed
Channel<Unit>
to
Channel<Boolean>
, but you could also add an
AtomicBoolean
somewhere. The boolean goes into the
Decision
. I updated the snippet, I think this will type-check now.
j
That just terminates, as soon as shouldContinue is false
s
Yes
j
What I would like is to be able to restart again
without calling .repeat {} again
I guess that internally, there is no inherent 'heartbeat' (or channel or ...) in Schedule (independent of any Duration configured). So there is probably nothing to stop/start the scheduling independent of the Schedule-config in the first place.
thanks for your support! I can easily stop (basically your pauser), and on ‚restart‘ just create another schedule. In order not to create the schedule all the time, I could also store it, before each invokation of repeat… at least I hope that’s possible Thank you
Arrow Schedule rocks