Alfi
01/03/2021, 12:17 AMimport java.lang.Thread.currentThread
import java.lang.Thread.sleep
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
fun main() {
val mainThread = currentThread()
val shutdown = CountDownLatch(1)
Runtime.getRuntime().addShutdownHook(thread(start = false) {
println("Shutdown triggered...")
shutdown.countDown()
mainThread.join()
println("Shutdown completed!")
})
println("Starting...")
sleep(1000) // bootstrap long running resources (e.g. DB Connection, HTTP Server, HTTP Client)
println("Started!")
shutdown.await()
println("Stopping...")
sleep(2000) // graceful shutdown (i.e. request long running resource to stop)
println("Stopped!")
}
I'm wondering how the above translates to Arrow fx.pakoito
01/03/2021, 1:01 PMsuspend fun main() {
val shutdown = Promise<Unit>()
ForkConnected {
println("Starting...")
serverCode(shutdown) // bootstrap long running resources (e.g. DB Connection, HTTP Server, HTTP Client)
println("Started!")
}
shutdown.get() // Suspend until fiber finishes
println("Stopping...")
delay(2000) // graceful shutdown (i.e. request long running resource to stop)
println("Stopped!")
}
suspend fun serverCode(shutdown: Promise) {
// somewhere in your server code
// shutdown.complete(Unit)
}
pakoito
01/03/2021, 1:01 PMpakoito
01/03/2021, 1:02 PMpakoito
01/03/2021, 1:02 PMForkConnected
is explicit “fire and forget” and can also be joined, like threadspakoito
01/03/2021, 1:05 PMraulraja
01/03/2021, 1:43 PMAlfi
01/04/2021, 4:24 AMimport arrow.fx.coroutines.Promise
import arrow.fx.coroutines.seconds
import arrow.fx.coroutines.sleep
import kotlin.concurrent.thread
suspend fun main() {
val running = Promise<Unit>()
val shutdown = Promise<Unit>()
Runtime.getRuntime().addShutdownHook(thread(start = false) {
println("Shutdown triggered...")
running.complete(Unit)
shutdown.get()
println("Shutdown completed!")
})
println("Starting...")
sleep(1.seconds) // bootstrap long running resources (e.g. DB Connection, HTTP Server, HTTP Client)
println("Started!")
running.get()
println("Stopping...")
sleep(2.seconds) // graceful shutdown (i.e. request long running resource to stop)
println("Stopped!")
shutdown.complete(Unit)
}
The trouble with this code: "Suspension functions can be called only within coroutine body". So then, how to call a suspension function from this new thread (which is required by Shutdown Hook)? With kotlinx.coroutines
I would use runBlocking
. I'm not sure how best to do that Arrow fx. Should I use IO
for these type of situations?
You may also notice that I have swapped out the call to mainThread.join()
with multiple Promise
to ensure the Shutdown Hook thread doesn't complete before the main block exits. Would this be considered idiomatic in Arrow fx or are there better ways to achieve this behaviour?
I was thinking about wrapping the Stopping block of code ForkConnected
and join()
on it from the Shutdown Hook, but that would require creating the ForkConnected
above the Shutdown Hook without starting the execution so that it can be join()
within the Shutdown Hook.Alfi
01/04/2021, 5:04 AMResource
to solve this problem also. As I think you identified, this code is entirely about managing the lifetime of long running (i.e. lifetime of the process) application resources. I hit some problems there though. Let try and articulate those in an example and come back...Alfi
01/04/2021, 5:56 AMval resourceProgram = suspend {
val running = Promise<Unit>()
val shutdown = Promise<Unit>()
Runtime.getRuntime().addShutdownHook(thread(start = false) {
println("Shutdown triggered...")
running.complete(Unit)
shutdown.get()
println("Shutdown completed!")
})
println("Starting...")
Resource(::createConsumer, ::closeConsumer)
.zip(Resource(::createDBHandle, ::closeDBHandle))
.flatMap { (consumer, handle) ->
Resource({ createFancyService(consumer, handle) }, { service -> shutDownFancyService(service) })
}.use {
println("Started!")
running.get()
println("Stopping...")
}
println("Stopped!")
shutdown.complete(Unit)
Unit
}
suspend fun main(): Unit = resourceProgram.invoke()
The above still needs a solution to the Shutdown Hook to see it run. Also feels a little strange as we're not actually using Service
in the use
block (use of a Resource typically occurs in HTTP Handler for most of my work).pakoito
01/04/2021, 11:24 AMSo then, how to call a suspension function from this new thread (which is required by Shutdown Hook)? Withthat’s one solution, yes. There’s a tool similar toI would usekotlinx.coroutines
. I’m not sure how best to do that Arrow fx. Should I userunBlocking
for these type of situations?IO
runBlocking
on arrow-fxpakoito
01/04/2021, 11:25 AMI was thinking about wrapping the Stopping block of codeThat’s the power of laziness, write the code such that the operation won’t start until you indicate so by joining.andForkConnected
on it from the Shutdown Hook, but that would require creating thejoin()
above the Shutdown Hook without starting the execution so that it can beForkConnected
within the Shutdown Hook. (edited)join()
Alfi
01/04/2021, 12:46 PMStarting...
Started!
Shutdown triggered...
Stopping...
Stopped!
Shutdown completed!
Noted that unsafeRunSync()
is not really recommended in the docs though.pakoito
01/04/2021, 1:17 PMunsafeRunAsync
and handle any potential errorspakoito
01/04/2021, 1:17 PMpakoito
01/04/2021, 1:18 PMpakoito
01/04/2021, 1:19 PMAlfi
01/05/2021, 1:23 AMRuntime.getRuntime().addShutdownHook()
as the idiomatic way to handle requests for the process to exit (typically from the OS sending SIGTERM signal), although I've seen other approaches (e.g. open port and listen for shutdown command on that port).pakoito
01/05/2021, 12:41 PMpakoito
01/05/2021, 12:42 PMAlfi
01/06/2021, 2:59 AMjava.lang.Thread
to Runtime.getRuntime().addShutdownHook()
which it starts when it receives a Signal from the OS, typically a SIGTERM in Linux land (I believe) for an orderly shutdown.
The way I've chosen to manage the lifecycle of the Service (i.e. startup - initialising resources; shutdown - cleanup resources) is on a single thread (typically main). The main thread initialises the resources when it starts and then blocks (see shutdown.await()
in example); shutdown hook thread (when called) unblocks the main thread and resources cleanup is run; shutdown hook thread joins the main thread to complete to avoid JVM exiting before the main thread has completed it's cleanup work.pakoito
01/06/2021, 10:28 AMunsafeRunAsync
is your best option. If I understood correctly, that SIGTERM will not kill the VM outright even after the threadd you passed is completed, right? Your server would continue regardlesAlfi
01/06/2021, 2:47 PMjoin()
on the main thread (or any thread for that matter) if you want it to exit normally. So if I use unsafeRunAsync
the JVM halts before the any of the cleanup code get's executed and I only see the following printed to the terminal:
Starting...
Started!
unsafeRunSync()
does work though and I see the following:
Starting...
Started!
Shutdown triggered...
Stopping...
Stopped!
Shutdown completed!
Alfi
01/06/2021, 2:56 PMEnvironment().unsafeRunSync { ... }
is more idiomatic than IO { ... }.unsafeRunSync()
. It seems to be recommend to be used when you need to integrate with frameworks that don't offer facility to run suspend functions (like Java APIs).