Can anyone help? I am stuck with a `Ktor` `Websock...
# ktor
g
Can anyone help? I am stuck with a
Ktor
Websocket
issue where the connection is non-responsive after receieving a push notification message from the server. The same code using
okhttp3.OkHttpClient
is fine. Details in the 🧵 below 👇:
What I am attempting: • Launch a WebSocketClient from an Android Foreground Service that connects to a WebSocket Server on the local Wi-Fi to receive local push notifications.
What is the problem? • I am able to authenticate, subscribe to the push notification service, send ping's requests and receive pong's. • Each of these steps receive a confirmation from the server. • However, the problem starts once the Android Client receives a push notification from the WebSocket Server. The Client sends a receipt confirmation, the Server sends a confirmation of receipt confirmation, but this is not receieved by the Client. • The Client continues to send pings, but the Server pong responses are not logged in the client. • Inspecting the traffic via Charles Proxy I can see that the Server does send the receipt confirmation and pong responses, it just that the WebSocketClient is non responsive. • I have put logging in a ton of places, but I do not see any errors or issues that could lead me to the problem.
Does it actually work? • Yes. 👌 • I rebuilt the WebSocketClient with
okhttp3.OkHttpClient
and resused as much of the existing Ktor code and it all works as expected.
How I am attempting: •
<http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob()
coroutine to observe a appConfig that then starts the
KtorWebSocketClient
• The
KtorWebSocketClient
is launched on
<http://Dispatchers.IO|Dispatchers.IO> + Job()
KtorWebSocketClient
creates an instance of
WebSocketConnectionManager
that handles connecting to the WebSocket.
WebSocketConnectionManager
takes a handle session argument enabling us to pass the session handling back to
KtorWebSocketClient
. •
KtorWebSocketClient
then launches two coroutines to handle incoming and outgoing messages. • There I am using a message queue to send messages via a private extension function
DefaultClientWebSocketSession.sendMessages()
As mentioned, I do not understand what I am doing wrong as the
okhttp3.OkHttpClient
version works as expected, and I could roll with that, but I would prefer to use the Ktor/WebSocket version.
Code below:
Copy code
@AndroidEntryPoint
class RunningForegroundService : Service() {

    @Inject
    @KtorWSClient
    lateinit var webSocketClient: WebSocketClient

    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        super.onStartCommand(intent, flags, startId)
        when (intent?.action) {
            SERVICE_START -> start()
            SERVICE_STOP -> stop()
        }
        return START_STICKY
    }

    private fun start() {
        startRunningService()
        appConfigFlow = appConfigRepository.observeAppConfig()
            .map { entity ->
                entity.toDomain()
            }
            .stateIn(
                scope = ioSupervisorScope,
                initialValue = AppConfigEntity(),
                started = SharingStarted.Eagerly
            )

        ioSupervisorScope.launch {
            try {
                appConfigFlow.collect { config -> webSocketClient(config) }
            } catch (e: Throwable) {
                Timber.e(e, "Error collecting appConfig ${e.message}")
            }
        }
    }

    private fun stop() {
        // Omitted
    }

    private fun startRunningService() {
        startForeground(/* ... */)
    }

    private fun webSocketClient(config: AppConfigEntity) {
        webSocketJob = websocketJobScope.launch {
            webSocketClient.initialize(/* ... */)
        }        
    }

    companion object {
        const val SERVICE_START = "RunningForegroundService.SERVICE_START"
        const val SERVICE_STOP = "RunningForegroundService.SERVICE_STOP"

        private val handler = CoroutineExceptionHandler { _, exception ->
            Timber.e(exception, "CoroutineExceptionHandler: ${exception.message}")
        }
        private val ioSupervisorScope =
            CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob() + CoroutineName("ForegroundServiceJob") + handler)
        private val websocketJobScope =
            CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + Job() + CoroutineName("WebSocketJob") + handler)
    }
}
Copy code
class KtorWebSocketClient @Inject constructor(
    private val httpClient: HttpClient,
    private val notificationBuilder: NotificationBuilder,
) : WebSocketClient {

    private var notificationCallback: NotificationCallback? = null

    private val hlsUrlDeferredMap = mutableMapOf<Long, CompletableDeferred<String>>()
    private val booleanDeferredMap = mutableMapOf<Long, CompletableDeferred<Boolean>>()
    private val messageQueue = Channel<JsonObject>(Channel.UNLIMITED)
    private val idGenerator = AtomicLong(1)

    private lateinit var scope: CoroutineScope
    private lateinit var appConfig: AppConfigEntity
	private lateinit var messageDispatcher: MessageDispatcher
    private lateinit var messageHandler: MessageHandler
    private lateinit var connectionManager: WebSocketConnectionManager
   

    override suspend fun initialize(
        appConfig: AppConfigEntity,
        scope: CoroutineScope,
        notificationCallback: NotificationCallback,
    ) {
        this.connectionManager = WebSocketConnectionManager(httpClient, appConfig)
        this.notificationCallback = notificationCallback
        this.appConfig = appConfig
        this.scope = scope
        start()
    }

    override suspend fun refreshConnection(
        appConfig: AppConfigEntity?,
        scope: CoroutineScope?,
    ) {
        start()
    }

    override fun close() {
        scope.launch {
            connectionManager.disconnect()
            notificationCallback = null
        }
    }

    private suspend fun start() {
        connectionManager.connectWebSocket {
            // val session: DefaultClientWebSocketSession = this
            val notificationManager =
                NotificationManager(
                    scope = scope,
                    notificationBuilder = notificationBuilder
                )
            messageDispatcher = MessageDispatcher( /* ... */ )
            messageHandler = MessageHandler( /* ... */ )

            val incomingMessagesJob = launch(CoroutineName("IncomingMessagesJob")) { incomingMessages() }

            val sendMessageJob = launch(CoroutineName("SendMessageJob")) { sendMessages() }

            sendMessageJob.join() // Wait for completion or error
            incomingMessagesJob.cancelAndJoin()
        }
    }

    private fun handleWebSocketClose(frame: Frame.Close) {
        Timber.e("WebSocket closed...")
    }

    private suspend fun DefaultClientWebSocketSession.sendMessages() {
        try {
            for (message in messageQueue) {
                if (!isActive) break
                send(Frame.Text(message.toString()))
            }
        } catch (e: Exception) {
            Timber.e(e, ("Error sending WebSocket message: ${e.message}"))
        }
    }

    private suspend fun DefaultClientWebSocketSession.incomingMessages() {
        try {
            while (isActive) {
                incoming.consumeEach { frame ->
                    when (frame) {
                        is Frame.Text -> messageHandler.handleTextFrame(frame, scope)
                        is Frame.Binary -> messageHandler.handleBinaryFrame(frame)
                        is Frame.Ping, is Frame.Pong -> messageHandler.handlePingPong(frame)
                        is Frame.Close -> handleWebSocketClose(frame)
                        else -> Timber.d("Other type of frame received ${frame.frameType}")
                    }
                }
            }
        } catch (e: Exception) {
            Timber.e(e, "Error during WebSocket communication: ${e.message}")
            if (e is CancellationException) {
                Timber.e(e,"WebSocket session cancelled.")
                throw e
            }
        } finally {
            Timber.i("Cleaning up WebSocket session")

        }
    }

    companion object {
        // omitted 
    }
}
Copy code
class WebSocketConnectionManager(
    private val httpClient: HttpClient,
    private var appConfig: AppConfigEntity,
) {
    private val connectedMutex = Mutex()

    suspend fun connectWebSocket(handleSession: suspend DefaultClientWebSocketSession.() -> Unit) {
        connectedMutex.withLock {
            val hostAddress = appConfig.hostAddress
                ?.replace("http://", "")
                ?.replace("https://", "")

            try {
                return httpClient.webSocket(
                    host = hostAddress,
                    path = PATH_WEBSOCKET
                ) {
                    val session: DefaultClientWebSocketSession = this
                    handleSession(session)
                }
            } catch (e: Exception) {
                Timber.e(e, "Failed to connect to WebSocket: ${e.message}")
                if (e is CancellationException) {
                    throw e
                }
            }
        }
    }

    suspend fun disconnect() {
        try {
            connectedMutex.withLock {
                httpClient.close()
            }
        } catch (e: Exception) {
            Timber.e(e, "Failed to disconnect WebSocket: ${e.message}")
        }
    }
}
Copy code
class MessageDispatcher(/* ... */) {
    suspend fun authenticate(accessToken: String?) {
        if (accessToken.isNullOrEmpty())
            throw Exception("Unable to authenticate as Access Token is null or empty")
        val serviceCall = buildJsonObject {
            // Omitted
        }
        sendMessage(serviceCall)
    }

    suspend fun subscribeToPushNotificationChannel(webhookId: String?) {
        if (webhookId.isNullOrEmpty())
            throw Exception("Unable to subscribe to Push Notification Channel. Webhook is null or empty")

        val serviceCall = buildJsonObject {
            // Omitted
        }
        sendMessage(serviceCall)
    }

    suspend fun confirmPushNotificationReceipt(
        confirmId: String,
    ): Boolean {
        val messageIdVal = idGenerator.getAndIncrement()
        val serviceCall = buildJsonObject {
            // Omitted
        }
        val deferred = CompletableDeferred<Boolean>()
        booleanDeferredMap[messageIdVal] = deferred

        sendMessage(serviceCall)
        val result = deferred.await()
        booleanDeferredMap.remove(messageIdVal)
        return result
    }

    suspend fun sendMessage(serviceCall: JsonObject) {
		// messageQueue is observed in in `KtorWebSocketClient` `DefaultClientWebSocketSession.sendMessages()` and sent
        messageQueue.send(serviceCall)
    }
}
Copy code
class MessageHandler(/* ... */) {
    suspend fun handleTextFrame(frame: Frame.Text, scope: CoroutineScope) {
        val frameText = frame.readText()
        val response = Json.parseToJsonElement(frameText).jsonObject
        val messageType = response[TYPE]?.toStringContent()

        when (messageType) {
			// Omitted - handle different message types... i.e.
			// AUTH_REQUIRED, AUTH_OK, AUTH_INVALID, EVENT, RESULT, PONG
			// messageDispatcher.authenticate(appConfig.accessToken)
			// messageDispatcher.subscribeToPushNotificationChannel(appConfig.webhookId)
			// handleEvent(response)
			// handleResult(response)
			// handlePong(response)
        }
    }

    suspend fun handleBinaryFrame(frame: Frame.Binary) {
        // Omitted
    }

    fun handlePingPong(frame: Frame) {
        // Omitted
    }

    private suspend fun handleResult(response: JsonObject) {
        val resultResponse = response[RESULT]
        val error = response[ERROR]?.jsonObject
        // Omitted
        if (error != null) {
            handleError(error)
        }
    }

    private suspend fun handleError(error: JsonObject) {
        val code = error[CODE]?.toStringContent()
        val message = error[MESSAGE]?.toStringContent()
    }

    private suspend fun handlePong(response: JsonObject) {
        // Omitted
    }

    private suspend fun handleEvent(response: JsonObject) {
		// Omitted
		// if the event is a Push Notification we need to confirm with an id.
       	messageDispatcher.confirmPushNotificationReceipt(/* ... */)
    }

}
Copy code
class NotificationManager(
    private val scope: CoroutineScope,
    private val notificationBuilder: NotificationBuilder,
) {

    suspend fun parseAndShowNotification(
        eventData: JsonObject,
        accessToken: String?,
        onNotification: (NotificationEntity) -> Unit,
    ): String {
        if (accessToken.isNullOrEmpty())
            throw Exception("Unable send notification as Access Token is null or empty")
        return notificationBuilder.parseAndShow( /* ... */ )
    }
}
a
Is the problem reproducible with any client engines that support WebSockets (
OkHttp
,
Java
and
CIO
)?
g
I just tried to add
CIO
but unfortunately it seems like it is not even connecting to the Websocket.
Copy code
@Provides
    @Singleton
    @OptIn(ExperimentalSerializationApi::class)
    fun providesNetworkClient(): HttpClient {
        return HttpClient(CIO) {
            install(WebSockets) {
                //pingInterval = 5_000L
                //maxFrameSize = Long.MAX_VALUE
            }
            install(ContentNegotiation) {
                json(Json {
                    prettyPrint = true
                    isLenient = true
                    ignoreUnknownKeys = true
                    explicitNulls = false
                })
            }
            install(HttpTimeout) {
                socketTimeoutMillis = 60_000
                requestTimeoutMillis = 60_000
            }
            install(HttpRequestRetry) {
                retryIf(5) { _, httpResponse ->
                    when {
                        httpResponse.status.value in 500..599 -> true
                        httpResponse.status == HttpStatusCode.TooManyRequests -> true
                        else -> false
                    }
                }
            }
            Logging {
                logger = Logger.DEFAULT
                level = LogLevel.ALL
                logger = object : Logger {
                    override fun log(message: String) {
                        Timber.v("Logger Ktor => $message")
                    }
                }
            }
            ResponseObserver { response ->
                Timber.v("HTTP status: ${response.status.value}")
            }
            defaultRequest {
                header(HttpHeaders.ContentType, ContentType.Application.Json)
               
            }
        }
    }
a
That's weird. It should connect.
g
I have tried switching back and forth between
OkHttp
&
CIO
,
OkHttp
connects 🤔
I found the issue I had with getting it to work with
CIO
, unfortunately the behaviour is the same as with
OkHttp
a
Can you try to simplify the environment or your code to make the problem easier to diagnose?
g
this is an older version of the code before I abstracted, the issue is the same with this:
Copy code
class KtorWebSocketClient @Inject constructor(
    private val httpClient: HttpClient,
    private val heartbeatService: HeartbeatService,
    private val notificationBuilder: NotificationBuilder,
): WebSocketClient {
    private lateinit var scope: CoroutineScope
    private lateinit var appConfig: AppConfigEntity

    private var notificationCallback: NotificationCallback? = null
    private var idGenerator = AtomicLong(1)

    private val hlsUrlDeferredMap = mutableMapOf<Long, CompletableDeferred<String>>()
    private val booleanDeferredMap = mutableMapOf<Long, CompletableDeferred<Boolean>>()
    private val connectedMutex = Mutex()

    private val messageQueue = Channel<JsonObject>(Channel.UNLIMITED)

    override suspend fun initialize(
        appConfig: AppConfigEntity,
        scope: CoroutineScope,
        notificationCallback: NotificationCallback,
    ) {
        this.notificationCallback = notificationCallback
        this.appConfig = appConfig
        this.scope = scope
        this.scope.launch {
            try {
                connectWebSocket()
            } catch (e: Exception) {
                Timber.e(e, "Failed to initialize WebSocket: ${e.message}")
                if (e is CancellationException) {
                    throw e
                }
            }
        }
    }

    override suspend fun refreshConnection(
        appConfig: AppConfigEntity?,
        scope: CoroutineScope?,
    ) {
        heartbeatService.stopHeartbeat()
        hlsUrlDeferredMap.clear()
        booleanDeferredMap.clear()
        appConfig?.let { this.appConfig = it }
        scope?.let { this.scope = it }
        this.scope.launch {
            delay(3_000)
            try {
                if (connectedMutex.isLocked)
                    connectedMutex.unlock()
                this@KtorWebSocketClient.connectWebSocket()
            } catch (e: Exception) {
                Timber.e(e, "Failed to initialize WebSocket: ${e.message}")
                if (e is CancellationException) {
                    throw e
                }
            }
        }
    }

    override fun close() {
        scope.launch {
            heartbeatService.stopHeartbeat()
            httpClient.close()
            notificationCallback = null
        }
    }

    override suspend fun getHlsFeed(entityId: String): String {
        return withContext(scope.coroutineContext) {
            Timber.d("getHlsFeed:\n\tentityId=$entityId\n\t${kotlin.coroutines.coroutineContext[CoroutineName]}")
            val messageIdVal = idGenerator.getAndIncrement()
            val serviceCall = buildJsonObject {
                // Omitted
            }
            val deferred = CompletableDeferred<String>()
            hlsUrlDeferredMap[messageIdVal] = deferred
            sendMessage(serviceCall)
            val hlsUrl = deferred.await()
            hlsUrlDeferredMap.remove(messageIdVal)
            hlsUrl
        }
    }

    private suspend fun connectWebSocket() {
        Timber.i("1. connectWebSocket\n\t${coroutineContext[CoroutineName]}")
        connectedMutex.withLock {
            val url = Url(appConfig.hostAddress)
              
            Timber.i("2. connectWebSocket\n\t${appConfig.hostAddress}\n\t${coroutineContext[CoroutineName]}")
            httpClient.webSocket(
                host = url.host,
				port = url.port
                path = PATH_WEBSOCKET
            ) {
                val incomingMessagesJob =
                    launch(CoroutineName("IncomingMessagesJob")) { incomingMessages() }

                val sendMessageJob =
                    launch(CoroutineName("SendMessageJob")) { sendMessages() }

                sendMessageJob.join() // Wait for completion or error
                incomingMessagesJob.cancelAndJoin()
            }
        }
    }

    private fun cleanUpWebSocket() {
        heartbeatService.stopHeartbeat()
    }

    private fun handleWebSocketClose(frame: Frame.Close) {
        Timber.e("WebSocket closed...")
        heartbeatService.stopHeartbeat()
    }

    private suspend fun authenticateAppAgainstWebSocket(accessToken: String?) {
        if (accessToken.isNullOrEmpty())
            throw Exception("Unable to authenticate as Access Token is null or empty")
        val serviceCall = buildJsonObject {
            // Omitted
        }
        sendMessage(serviceCall)
    }

    private suspend fun subscribeToPushNotificationChannel(webhookId: String?) {
        if (webhookId.isNullOrEmpty())
            throw Exception("Unable to subscribe to Push Notification Channel. Webhook is null or empty")

        val serviceCall = buildJsonObject {
            // Omitted
        }
        sendMessage(serviceCall)
    }

    private suspend fun handleBinaryFrame(frame: Frame.Binary) {
        val bytes = frame.readBytes()
        Timber.d("Received binary frame of size: ${bytes.size}")
    }

    private fun handlePingPong(frame: Frame) {
        Timber.d("WebSocket ${frame.frameType}: ${frame.data.size}, ${frame.data.toReadableString()}")
    }

    private suspend fun handleTextFrame(frame: Frame.Text, scope: CoroutineScope) {
        val frameText = frame.readText()
        val response = Json.parseToJsonElement(frameText).jsonObject
        Timber.d("handleTextFrame frame text raw:\n\t$response\n\t${coroutineContext[CoroutineName]}")
        val messageType = response[TYPE]?.toStringContent()

        when (messageType) {
            AUTH_REQUIRED -> {
                authenticateAppAgainstWebSocket(appConfig.accessToken)
            }
            AUTH_OK -> {
                subscribeToPushNotificationChannel(appConfig.webhookId)
                startHeartbeat(scope)
            }
            AUTH_INVALID -> {
                Timber.w("AUTH_INVALID type received => ${response[MESSAGE]?.toStringContent()}")
            }
            EVENT -> {
                handleEvent(response)
            }
            RESULT -> {
                handleResult(response)
            }
            PONG -> {
                unconfirmedMessages = 0
                handlePong(response)
            }
        }
    }

    private suspend fun handleResult(response: JsonObject) {
        val resultResponse = response[RESULT]
        val responseId = response[ID]?.toLong()
        val success = response[SUCCESS]?.toBoolean()
            ?: throw Exception("Success key expected but not found.")
        val error = response[ERROR]?.jsonObject

        if (success) {
            booleanDeferredMap[responseId]?.complete(success)
        } else {
            booleanDeferredMap[responseId]?.completeExceptionally(Exception("Confirmation failed"))
        }

        if (success && resultResponse !is JsonNull) {
            val subscriptionId = response[ID]?.toLong()
            val result = resultResponse?.jsonObject
            val url = result?.get(URL)?.toStringContent()
            if (url.isNullOrEmpty().not()) {
                completeHlsUrl(subscriptionId, url)
            }
        }

        if (error != null) {
            handleError(error)
        }
    }

    private suspend fun handleError(error: JsonObject) {
        val code = error[CODE]?.toStringContent()
        val message = error[MESSAGE]?.toStringContent()
        Timber.e("ERROR received => code=$code, message=$message")
    }

    private suspend fun handlePong(response: JsonObject) {
        //Timber.d("PONG type received")
    }

    private suspend fun handleEvent(response: JsonObject) {
        // Omitted
		confirmPushNotificationReceipt(messageIdVal, confirmId = confirmId)
    }

    private suspend fun parseAndShowNotification(
        eventData: JsonObject,
        accessToken: String?,
        onNotification: (NotificationEntity) -> Unit,
    ): String {
        if (accessToken.isNullOrEmpty())
            throw Exception("Unable send notification as Access Token is null or empty")
        return notificationBuilder.parseAndShow(
            // Omitted
        )
    }

    private suspend fun confirmPushNotificationReceipt(
        subscriptionId: Long?,
        confirmId: String,
    ): Boolean {
        val messageIdVal = idGenerator.getAndIncrement()
        val serviceCall = buildJsonObject {
            // Omitted
        }
        val deferred = CompletableDeferred<Boolean>()
        booleanDeferredMap[messageIdVal] = deferred

        sendMessage(serviceCall)
        val result = deferred.await()
        booleanDeferredMap.remove(messageIdVal)
        return result
    }

    private suspend fun completeHlsUrl(subscriptionId: Long?, url: String?) {
        if (subscriptionId == null)
            throw Exception("subscriptionId is null")
        if (url.isNullOrEmpty()) {
            hlsUrlDeferredMap[subscriptionId]?.completeExceptionally(Exception("URL missing in response"))
            hlsUrlDeferredMap.remove(subscriptionId)
        } else {
            val hlsUrl = "${appConfig.hostAddress}$url"
            hlsUrlDeferredMap[subscriptionId]?.complete(hlsUrl)
            hlsUrlDeferredMap.remove(subscriptionId)
        }
    }

    private suspend fun sendMessage(serviceCall: JsonObject) {
        messageQueue.send(serviceCall)
    }


    private suspend fun startHeartbeat(scope: CoroutineScope) {
        heartbeatService.startHeartbeat(
            scope = scope,
            interval = 30_000L,
            idGenerator = idGenerator,
            sender = { serviceCall -> sendMessage(serviceCall) },
            onError = {
                /*retryWithExponentialBackoff(
                    operation = ::refreshConnection
                )*/
            }
        )
    }

    private suspend fun <T> retryWithExponentialBackoff(
        maxAttempts: Int = 5,
        initialDelay: Long = 1_000L,  // 1 second
        maxDelay: Long = 32_000L,     // 32 seconds
        factor: Double = 2.0,
        operation: suspend () -> T,
    ): T {
        return operation()
    }


    private suspend fun DefaultClientWebSocketSession.sendMessages() {
        try {
            for (message in messageQueue) { // This loops until the channel is closed
                if (!isActive) break // Stop processing if the coroutine is no longer active
                Timber.i("Sending Stream Request:")
                Timber.i("\t${message}\n\t${kotlin.coroutines.coroutineContext[CoroutineName]}")
                send(Frame.Text(message.toString())) // Send the JSON object as a string
            }
        } catch (e: Exception) {
            Timber.e(e, ("Error sending WebSocket message: ${e.message}"))
        }
    }

    private suspend fun DefaultClientWebSocketSession.incomingMessages() {
        try {
            while (isActive) {
                incoming.consumeEach { frame ->
                    when (frame) {
                        is Frame.Text -> handleTextFrame(frame, scope)
                        is Frame.Binary -> handleBinaryFrame(frame)
                        is Frame.Close -> handleWebSocketClose(frame)
                        is Frame.Ping, is Frame.Pong -> handlePingPong(frame)
                        else -> Timber.d("Other type of frame received ${frame.frameType}")
                    }
                }
            }
        } catch (e: Exception) {
            Timber.e(e, "Error during WebSocket communication: ${e.message}")
            heartbeatService.stopHeartbeat()
            if (e is CancellationException) {
                throw e
            }
        } finally {
            Timber.i("Cleaning up WebSocket session")
            cleanUpWebSocket()
        }
    }

    companion object {
        // Omitted
    }
}
With
CIO
I can't seem to sniff the traffic with Charles Proxy like I can with
OkHttp
, so I have to go by the logs I create in Android Studio, but is there a way where I can add some more in depth logging so that I can see that is happening to the web socket connection? I don't understand what I could be doing incorrectly, especially as it is partially working.
a
What prevents you from configuring the proxy using the CIO engine (https://ktor.io/docs/client-proxy.html#configure_proxy)?
g
I didn't see that, TBH, I have been using an Android Emulator proxying through my Apple Mac and running Charles Proxy and it worked fine with
OkHttp
, which was working fine. Anyway, this doesn't worry me really, I would be more interested in:
is there a way where I can add some more in depth logging so that I can see that is happening to the web socket connection?
a
For logging on the Ktor side, you can configure the logger provider to log messages with the TRACE level to get maximum information about the WebSocket communication.
g
a
Ktor uses SLF4J so you need to configure the logger provider.
g
But from what I explained here, does it sound like I am setting up correctly? https://kotlinlang.slack.com/archives/C0A974TJ9/p1715095660653719?thread_ts=1715095522.318669&amp;cid=C0A974TJ9
a
I can't see anything wrong here.
👍 1
g
So, it seems I am already logging:
Copy code
Logging {
  logger = Logger.DEFAULT
  level = LogLevel.ALL
  logger = object : Logger {
     override fun log(message: String) {
        Timber.v("Logger Ktor => $message")
     }
  }
}
As Gradle dependency:
Copy code
implementation("io.ktor:ktor-client-logging:2.3.10")
implementation ("org.slf4j:slf4j-api:2.0.13")
I did have it here too: https://kotlinlang.slack.com/archives/C0A974TJ9/p1715152915570769?thread_ts=1715095522.318669&cid=C0A974TJ9
But looking at the logs I do not see any reason why I would be getting the issue I describe.
Copy code
Logger Ktor => REQUEST: <http://192.168.86.57:8123/auth/token>
METHOD: HttpMethod(value=POST)
COMMON HEADERS
-> Accept: application/json
-> Accept-Charset: UTF-8
CONTENT HEADERS
-> Content-Length: 219
-> Content-Type: application/x-www-form-urlencoded; charset=UTF-8
BODY Content-Type: application/x-www-form-urlencoded; charset=UTF-8
BODY START
grant_type=refresh_token&efresh_token=redacted&client_id=https%3A%2F%2Fmy-app.o%2Fandroid
BODY END


Logger Ktor => RESPONSE: 200 OK
METHOD: HttpMethod(value=POST)
FROM: <http://192.168.86.57:8123/auth/token>
COMMON HEADERS
-> Cache-Control: no-store
-> Content-Length: 242
-> Content-Type: application/json
-> Date: Sat, 11 May 2024 16:12:07 GMT
-> Pragma: no-cache
-> Referrer-Policy: no-referrer
-> Server: 
-> Set-Cookie: C="redacted"; HttpOnly; Max-Age=7776000; Path=/; SameSite=Lax
-> X-Content-Type-Options: nosniff
-> X-Frame-Options: SAMEORIGIN


BODY Content-Type: application/json
BODY START
{"access_token":"redacted","token_type":"Bearer","expires_in":1800}
BODY END


Logger Ktor => REQUEST: <ws://192.168.86.57:8123/api/websocket>
METHOD: HttpMethod(value=GET)
COMMON HEADERS
-> Accept: application/json
-> Accept-Charset: UTF-8
-> Content-Type: application/json
CONTENT HEADERS
-> Connection: Upgrade
-> Sec-WebSocket-Key: YmIzMGE0NjdkOGY4YWI5ZA==
-> Sec-WebSocket-Version: 13
-> Upgrade: websocket
BODY Content-Type: null
BODY START

BODY END
Logger Ktor => RESPONSE: 101 Switching Protocols
METHOD: HttpMethod(value=GET)
FROM: <ws://192.168.86.57:8123/api/websocket>
COMMON HEADERS
-> Connection: upgrade
-> Date: Sat, 11 May 2024 16:12:08 GMT
-> Sec-WebSocket-Accept: tHbYYwnNMMELoUsR0O41TIEXOUo=
-> Server: Python/3.12 aiohttp/3.9.5
-> Upgrade: websocket
BODY Content-Type: null
BODY START

BODY END
TBH, I expected to see more in the logs, but there is no indication that anything happens. If I send a message from the server nothing is logged.
so, now I am quite confused. Have I found a bug with Ktor & Websocket? It seem so, or am I mistaken?
e
I have the same problem with websocket logging and don't find solution yet...
g
Thanks, so I am not alone 😞
a
so, now I am quite confused. Have I found a bug with Ktor & Websocket? It seem so, or am I mistaken?
It might be the case, but we need to be able to reproduce it on our side to diagnose and fix the issue. Can you try to simplify the environment to make the reproduction easier?
g
I think I have described the scenario and have given abstracted and consolidated code examples above. The server I am connecting to is a local instance of Home Assistant. https://developers.home-assistant.io/docs/api/native-app-integration/notifications/ And Websocket API https://developers.home-assistant.io/docs/api/websocket This should give the team something to go on. If someone wants to pair with me, please DM me, I am happy to help. As mentioned the
OkHttp3
alternative works as expected.
a
Can you share the exact steps required to reproduce the problem and a working sample project?
524 Views