Hello, I'm trying to learn how best to leverage Ar...
# arrow
a
Hello, I'm trying to learn how best to leverage Arrow core and fx. My days are typically spent working on long running servers. This normally means something like the following managing the lifecycle of the service:
Copy code
import java.lang.Thread.currentThread
import java.lang.Thread.sleep
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread

fun main() {
    val mainThread = currentThread()
    val shutdown = CountDownLatch(1)

    Runtime.getRuntime().addShutdownHook(thread(start = false) {
        println("Shutdown triggered...")
        shutdown.countDown()
        mainThread.join()
        println("Shutdown completed!")
    })

    println("Starting...")
    sleep(1000) // bootstrap long running resources (e.g. DB Connection, HTTP Server, HTTP Client)
    println("Started!")

    shutdown.await()

    println("Stopping...")
    sleep(2000) // graceful shutdown (i.e. request long running resource to stop)
    println("Stopped!")
}
I'm wondering how the above translates to Arrow fx.
p
Copy code
suspend fun main() {
    val shutdown = Promise<Unit>()

    ForkConnected {
      println("Starting...")
      serverCode(shutdown) // bootstrap long running resources (e.g. DB Connection, HTTP Server, HTTP Client)
      println("Started!")
    }
  
    shutdown.get() // Suspend until fiber finishes

    println("Stopping...")
    delay(2000) // graceful shutdown (i.e. request long running resource to stop)
    println("Stopped!")
  }

suspend fun serverCode(shutdown: Promise) {
  // somewhere in your server code
  // shutdown.complete(Unit)
}
not that different
the CountdownLatch becomes a Promise, and you use suspension instead of blocking
ForkConnected
is explicit “fire and forget” and can also be joined, like threads
r
In this case depending on how the server is wired you may want to use Flow + Resource or just Resource to manage the lifecycle and finalizers of services like the http server https://arrow-kt.io/docs/next/apidocs/arrow-fx-coroutines/arrow.fx.coroutines/-resource/
a
Thanks @pakoito. Your port of the code snippet is quite similar to the example on Promise Doc. I had tried that approach:
Copy code
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.seconds
import arrow.fx.coroutines.sleep
import kotlin.concurrent.thread

suspend fun main() {
    val running = Promise<Unit>()
    val shutdown = Promise<Unit>()

    Runtime.getRuntime().addShutdownHook(thread(start = false) {
        println("Shutdown triggered...")
        running.complete(Unit)
        shutdown.get()
        println("Shutdown completed!")
    })

    println("Starting...")
    sleep(1.seconds) // bootstrap long running resources (e.g. DB Connection, HTTP Server, HTTP Client)
    println("Started!")

    running.get()

    println("Stopping...")
    sleep(2.seconds) // graceful shutdown (i.e. request long running resource to stop)
    println("Stopped!")
    
    shutdown.complete(Unit)
}
The trouble with this code: "Suspension functions can be called only within coroutine body". So then, how to call a suspension function from this new thread (which is required by Shutdown Hook)? With
kotlinx.coroutines
I would use
runBlocking
. I'm not sure how best to do that Arrow fx. Should I use
IO
for these type of situations? You may also notice that I have swapped out the call to
mainThread.join()
with multiple
Promise
to ensure the Shutdown Hook thread doesn't complete before the main block exits. Would this be considered idiomatic in Arrow fx or are there better ways to achieve this behaviour? I was thinking about wrapping the Stopping block of code
ForkConnected
and
join()
on it from the Shutdown Hook, but that would require creating the
ForkConnected
above the Shutdown Hook without starting the execution so that it can be
join()
within the Shutdown Hook.
Thanks @raulraja, I had been looking at
Resource
to solve this problem also. As I think you identified, this code is entirely about managing the lifetime of long running (i.e. lifetime of the process) application resources. I hit some problems there though. Let try and articulate those in an example and come back...
@raulraja adapting the example from Resource, I think you'd be suggesting something like the following (trimmed unchanged code blocks):
Copy code
val resourceProgram = suspend {
    val running = Promise<Unit>()
    val shutdown = Promise<Unit>()

    Runtime.getRuntime().addShutdownHook(thread(start = false) {
        println("Shutdown triggered...")
        running.complete(Unit)
        shutdown.get()
        println("Shutdown completed!")
    })

    println("Starting...")
    Resource(::createConsumer, ::closeConsumer)
        .zip(Resource(::createDBHandle, ::closeDBHandle))
        .flatMap { (consumer, handle) ->
            Resource({ createFancyService(consumer, handle) }, { service -> shutDownFancyService(service) })
        }.use {
            println("Started!")

            running.get()

            println("Stopping...")
        }
    println("Stopped!")

    shutdown.complete(Unit)

    Unit
}

suspend fun main(): Unit = resourceProgram.invoke()
The above still needs a solution to the Shutdown Hook to see it run. Also feels a little strange as we're not actually using
Service
in the
use
block (use of a Resource typically occurs in HTTP Handler for most of my work).
p
So then, how to call a suspension function from this new thread (which is required by Shutdown Hook)? With 
kotlinx.coroutines
 I would use 
runBlocking
. I’m not sure how best to do that Arrow fx. Should I use 
IO
 for these type of situations?
that’s one solution, yes. There’s a tool similar to
runBlocking
on arrow-fx
I was thinking about wrapping the Stopping block of code 
ForkConnected
 and 
join()
 on it from the Shutdown Hook, but that would require creating the 
ForkConnected
 above the Shutdown Hook without starting the execution so that it can be 
join()
 within the Shutdown Hook. (edited)
That’s the power of laziness, write the code such that the operation won’t start until you indicate so by joining.
a
Wrapping the Shutdown Hook block of code in `IO { ... }.unsafeRunSync()`seems to work. Reliably producing the following expected out:
Copy code
Starting...
Started!
Shutdown triggered...
Stopping...
Stopped!
Shutdown completed!
Noted that
unsafeRunSync()
is not really recommended in the docs though.
p
it’s the bridge between async and sync. You could also do
unsafeRunAsync
and handle any potential errors
the point is that it happens in a context that doesn’t support async
and I don’t know the system/runtime well enough to know why it needs to happen like that
we have other finalisers that may not work when the VM closes tho
a
I've always thought of
Runtime.getRuntime().addShutdownHook()
as the idiomatic way to handle requests for the process to exit (typically from the OS sending SIGTERM signal), although I've seen other approaches (e.g. open port and listen for shutdown command on that port).
p
Now I’m the one learning 😄
😀 1
what do you have to do during shutdown in a server like this? close all connections? send a specific signal?
a
So typically our services have a number of long lived (i.e. life time of the service) resources. Things like: • Web Server (we like HTTP4K for it's functional idioms): For offering up endpoints for other services and UIs • DB (of some kind): For any state management the service might require • Message Broker (typically Kafka): For publishing business events When it's time for the service to shutdown (e.g. Kubernetes decides to kill the container), we aim to do that in an orderly fashion so that any pending work (e.g. serving an HTTP request) can be completed and new work is rejected. Also to ensure that any non-Daemon Threads exit to avoid the issue of process not exiting because all threads have not exited. When left up to me (i.e. not wrapped up in some framework), I've looked to trigger the shutdown from the Java Shutdown Hook with can be set by passing a
java.lang.Thread
to
Runtime.getRuntime().addShutdownHook()
which it starts when it receives a Signal from the OS, typically a SIGTERM in Linux land (I believe) for an orderly shutdown. The way I've chosen to manage the lifecycle of the Service (i.e. startup - initialising resources; shutdown - cleanup resources) is on a single thread (typically main). The main thread initialises the resources when it starts and then blocks (see
shutdown.await()
in example); shutdown hook thread (when called) unblocks the main thread and resources cleanup is run; shutdown hook thread joins the main thread to complete to avoid JVM exiting before the main thread has completed it's cleanup work.
p
The yeah, completing the promise with
unsafeRunAsync
is your best option. If I understood correctly, that SIGTERM will not kill the VM outright even after the threadd you passed is completed, right? Your server would continue regardles
a
That would seem to make sense; however the Javadoc indicates that once all the registered Shutdown Hook threads exit the JVM halts. Hence, if you should
join()
on the main thread (or any thread for that matter) if you want it to exit normally. So if I use
unsafeRunAsync
the JVM halts before the any of the cleanup code get's executed and I only see the following printed to the terminal:
Copy code
Starting...
Started!
unsafeRunSync()
does work though and I see the following:
Copy code
Starting...
Started!
Shutdown triggered...
Stopping...
Stopped!
Shutdown completed!
👍🏼 1
On a separate but related note: After reviewing docs again, I'm wondering if
Environment().unsafeRunSync { ... }
is more idiomatic than
IO { ... }.unsafeRunSync()
. It seems to be recommend to be used when you need to integrate with frameworks that don't offer facility to run suspend functions (like Java APIs).