rocketraman
12/01/2020, 2:52 PMCustomCoder
implementation that can handle any @Serializable
. Avro4k needs a KSerializer<T>
. How can I obtain the KSerializer
at runtime? Keep in mind that in Beam Coder
needs to be serializable itself, so KSerializer
or KType
cannot be a property of the coder i.e. this does not work, nor does passing in `KType`:
class Avro4kCoder<T: Any>(private val serializer: KSerializer<T>): CustomCoder<T>() {
override fun encode(value: T, outStream: OutputStream) {
Avro.default.openOutputStream(serializer) {
encodeFormat = AvroEncodeFormat.Binary
}.to(outStream).write(value).flush()
}
override fun decode(inStream: InputStream): T? {
return Avro.default.openInputStream(serializer) {
decodeFormat = AvroDecodeFormat.Binary(Avro.default.schema(serializer))
}.from(inStream).next()
}
}
Vampire
12/01/2020, 3:00 PM.serializer()
on it?rocketraman
12/01/2020, 3:02 PMCaused by: java.io.NotSerializableException: Foo$$serializer
rocketraman
12/01/2020, 3:07 PM.serializer()
on Class
. There is on KClass
, but that is an internal API.Vampire
12/01/2020, 3:09 PMKClass
is not internal.
And the serializer
methods are generated by the kotlinx.serialization compiler plugin afairrocketraman
12/01/2020, 3:09 PMKClass.serializer()
is internalVampire
12/01/2020, 3:10 PMVampire
12/01/2020, 3:10 PMrocketraman
12/01/2020, 3:22 PMrocketraman
12/01/2020, 3:23 PMclass Avro4kCoder<T: Any>(private val serializer: KSerializer<T>): CustomCoder<T>() {
override fun encode(value: T, outStream: OutputStream) {
Avro.default.openOutputStream(serializer) {
encodeFormat = AvroEncodeFormat.Binary
}.to(outStream).write(value).flush()
}
override fun decode(inStream: InputStream): T? {
return Avro.default.openInputStream(serializer) {
decodeFormat = AvroDecodeFormat.Binary(Avro.default.schema(serializer))
}.from(inStream).next()
}
}
rocketraman
12/01/2020, 3:24 PMKSerializer<T>
is not serializable, which causes a NotSerializableException
at runtime.rocketraman
12/01/2020, 3:25 PMKClass
in the constructor instead, and create the serializer at execution time, it would work. However, as I said, that API is internal.rocketraman
12/01/2020, 3:33 PMKType
instead -- looks like the public API allows one to call serializer(KType)
to get an instance of KSerializer
. Unfortunately, Beam complains at runtime about that too:
Caused by: java.io.NotSerializableException: kotlin.reflect.jvm.internal.KTypeImpl
rocketraman
12/01/2020, 3:55 PMGilles Barbier
12/04/2020, 4:15 PMfun getKSerializerOrNull(klass: Class<*>): KSerializer<*>? {
val companionField = klass.declaredFields.find {
it.name == "Companion" && isStatic(it.modifiers)
} ?: return null
val companion = companionField.get(klass)
val serializerMethod = try {
companion::class.java.getMethod("serializer")
} catch (e: NoSuchMethodException) {
return null
}
if (serializerMethod.returnType.name != KSerializer::class.qualifiedName) {
return null
}
@Suppress("UNCHECKED_CAST")
return serializerMethod.invoke(companion) as KSerializer<*>
}
Gilles Barbier
12/04/2020, 4:15 PMGilles Barbier
12/04/2020, 4:16 PMGilles Barbier
12/04/2020, 4:24 PMfun <T : Any> kserializer(klass: KClass<T>) = when (klass) {
MyClass1::class -> MyClass1.serializer()
MyClass2::class -> MyClass2.serializer()
MyClass3::class -> MyClass3.serializer()
else -> throw RuntimeException("This should not happen: apply kserializer with ${klass.qualifiedName}")
} as KSerializer <T>
as the set of classes being called was limited (not very elegant neither, but at least I do not have any reflection here)