Hi Guys, I have been tyring to write a streaming c...
# coroutines
a
Hi Guys, I have been tyring to write a streaming client in kotlin with Flow apis. Streaming server is TCP based. I have to send request to server for subscribing / unsubscribing and listen the for the updates. I am using callback flow to keep sending the updates back to the subscribbers. Take a look at what i have implemented so far.
private fun createConnection() {
val hostAdd = InetAddress.getByName(endpoint)
val connection = Socket(hostAdd, port)
this.socketConnection = connection
inputStream = DataInputStream(connection.inputStream)
outPutStream = DataOutputStream(connection.outputStream)
}
private fun sendStreamRequest(request: String) {
try {
outPutStream?.writeBytes(request)
outPutStream?.flush()
Log.d(tag, "sendStreamRequest $request")
} catch (ex: Exception) {
ex.printStackTrace()
}
Copy code
}
The below is the methods subcribes for the updates via callback flow
override fun subscribe(request: String):Flow<String> = callbackFlow {
sendStreamRequest(requestFormatter.formatSubscribeRequest(request))
try {
inputStream?.bufferedReader().takeUnless {
it == null
}.use {
it?.forEachLine { response ->
Log.d(tag, "Streaming $response")
val result = trySend(response)//trySendBlocking(response)
Log.d(tag, "result  ${result.isSuccess}")
}
}
} catch (ex: Exception) {
ex.printStackTrace()
}
awaitClose {
Log.d(tag, "awaitclose")
sendStreamRequest(requestFormatter.formatUnSubscribeRequest(request))
}
}
There are few problems that i see in the subscribe method 1. The whole code is written insider a singleton class so everytime i subscribe this code
inputStream?.bufferedReader()
will create addition objects (Let me know if i am right) 2. if i move the code `inputStream?.bufferedReader()`out of the subscribe method how can i update the subscribber ?? Can i use the produdeScope i get from callback flow builder and call trySend method from outside? 3. I also want to listen for network connection changes if the connetion looses i will establish the connection again and let the subscribber listen the updates again but not sure how to do that. Any guidance on this to improve would be of great help. I know listen streams using TCP may sound weird but it is a legacy code which i am trying to convert into Kotlin.
n
In future, please put big code blocks in a thread. 1. Yup, each subscriber creates a separate instance that runs the callbackFlow lambda. You can change this by using the
shareIn
operator which creates a single shared subscription. 2. Yes, but I wouldn't. If you want to separate stuff out, you can use a
MutableSharedFlow
but I'd just stick with
shareIn
operator. 3. If the source
Flow
throws, then you can use the
retry
operator to recover, the
retry
lambda can even suspend until there's a network connection available. Or:
Copy code
connection.flatMapLatest { isConnected -> 
    if (isConnected) {
        doYourNetworkStuffHere()
    } else {
        nothingReallyToDoHere()
    }
}
a
@Nick Allen Thanks for the suggestion ..i have now changed the architecture to use callback with shareIn . Though I had to change a lot of code as my understanding was limited with shareIn and scopes but got it working finally. Also for the connection, I keep retrying without having to suspend retry lambda. but if I have to do so in future, do I need to use
suspendCancellableCoroutine
?
n
do I need to use 
suspendCancellableCoroutine
?
If the thing you are waiting on uses a callback API: yes. Otherwise, no.
a
@Nick Allen Thanks again