kevin.cianfarini
08/25/2022, 1:01 PM2.0.3
with cURL on K/N with the new memory manager on linuxX64
. It always happens on the second network request for a given "service". Here's an abbreviated stracktrace.
Uncaught Kotlin exception: kotlinx.coroutines.JobCancellationException: Parent job is Completed; job=JobImpl{Completed}@162aaa80
at 0 permit-data.kexe 0x42fd70 kfun:kotlin.Exception#<init>(kotlin.String?;kotlin.Throwable?){} + 128
at 1 permit-data.kexe 0x42ff20 kfun:kotlin.RuntimeException#<init>(kotlin.String?;kotlin.Throwable?){} + 128
at 2 permit-data.kexe 0x4304a0 kfun:kotlin.IllegalStateException#<init>(kotlin.String?;kotlin.Throwable?){} + 128
at 3 permit-data.kexe 0x43bf30 kfun:kotlin.coroutines.cancellation.CancellationException#<init>(kotlin.String?;kotlin.Throwable?){} + 128
Full stacktrace is in thread. I'm going to keep digging in, Is this perhaps a problems with ktor client?Aleksei Tirman [JB]
08/25/2022, 1:02 PMkevin.cianfarini
08/25/2022, 1:04 PMval httpClient = HttpClient(Curl) {
install(HttpTimeout)
}
class GeocodeService(
private val baseUrl: String,
private val client: HttpClient,
private val json: Json,
) {
suspend fun search(
streetAddress: String,
postcode: String,
): GeocodeResponse? {
val response = client.get("$baseUrl/search") {
parameter("street", streetAddress)
parameter("postalcode", postcode)
parameter("addressdetails", 1)
}
return json.decodeFromString(
deserializer = ListSerializer(GeocodeResponse.serializer()),
string = response.bodyAsText(),
).firstOrNull()
}
}
Aleksei Tirman [JB]
08/25/2022, 1:05 PMGeocodeService
?kevin.cianfarini
08/25/2022, 1:06 PMprivate fun createGeocodingFlow(
source: Flow<PermitResult>,
client: HttpClient,
geocodeProgress: Animation<Address>,
): Flow<Pair<PermitResult, GeocodeResponse?>> {
val service = GeocodeService(nominatimUrl, client, json)
return source.withIndex().map { (index, record) ->
record.address?.let { geocodeProgress.update(it) }
val geocodeResult = record.address?.let { address ->
service.search(
address.streetAddress,
address.postalCode,
)
}
Pair(record, geocodeResult)
}
}
source
is a channelFlow
private fun createPermitDataFlow(client: HttpClient): Flow<PermitResult> = channelFlow {
PermitService(client, json).use { service ->
coroutineScope {
PermitType.values().forEach { type ->
launch {
service.getPermitData(type).result.permitResults.forEach { permit ->
send(permit)
}
}
}
}
}
}
}.buffer(Channel.UNLIMITED)
awaitClose
on the channelFlow