Anshulupadhyay03
12/26/2021, 6:36 PMprivate fun createConnection() {
val hostAdd = InetAddress.getByName(endpoint)
val connection = Socket(hostAdd, port)
this.socketConnection = connection
inputStream = DataInputStream(connection.inputStream)
outPutStream = DataOutputStream(connection.outputStream)
}
private fun sendStreamRequest(request: String) {
try {
outPutStream?.writeBytes(request)
outPutStream?.flush()
Log.d(tag, "sendStreamRequest $request")
} catch (ex: Exception) {
ex.printStackTrace()
}
}
The below is the methods subcribes for the updates via callback flow
override fun subscribe(request: String):Flow<String> = callbackFlow {
sendStreamRequest(requestFormatter.formatSubscribeRequest(request))
try {
inputStream?.bufferedReader().takeUnless {
it == null
}.use {
it?.forEachLine { response ->
Log.d(tag, "Streaming $response")
val result = trySend(response)//trySendBlocking(response)
Log.d(tag, "result ${result.isSuccess}")
}
}
} catch (ex: Exception) {
ex.printStackTrace()
}
awaitClose {
Log.d(tag, "awaitclose")
sendStreamRequest(requestFormatter.formatUnSubscribeRequest(request))
}
}
There are few problems that i see in the subscribe method
1. The whole code is written insider a singleton class so everytime i subscribe this code inputStream?.bufferedReader()
will create addition objects (Let me know if i am right)
2. if i move the code `inputStream?.bufferedReader()`out of the subscribe method how can i update the subscribber ?? Can i use the produdeScope i get from callback flow builder and call trySend method from outside?
3. I also want to listen for network connection changes if the connetion looses i will establish the connection again and let the subscribber listen the updates again but not sure how to do that.
Any guidance on this to improve would be of great help. I know listen streams using TCP may sound weird but it is a legacy code which i am trying to convert into Kotlin.Nick Allen
12/27/2021, 2:04 AMshareIn
operator which creates a single shared subscription.
2. Yes, but I wouldn't. If you want to separate stuff out, you can use a MutableSharedFlow
but I'd just stick with shareIn
operator.
3. If the source Flow
throws, then you can use the retry
operator to recover, the retry
lambda can even suspend until there's a network connection available. Or:
connection.flatMapLatest { isConnected ->
if (isConnected) {
doYourNetworkStuffHere()
} else {
nothingReallyToDoHere()
}
}
Anshulupadhyay03
12/28/2021, 7:46 PMsuspendCancellableCoroutine
?Nick Allen
12/28/2021, 9:48 PMdo I need to useIf the thing you are waiting on uses a callback API: yes. Otherwise, no.?suspendCancellableCoroutine
Anshulupadhyay03
12/29/2021, 6:35 PM