I'm trying to use Avro4k with Apache Beam by creat...
# serialization
r
I'm trying to use Avro4k with Apache Beam by creating a generic
CustomCoder
implementation that can handle any
@Serializable
. Avro4k needs a
KSerializer<T>
. How can I obtain the
KSerializer
at runtime? Keep in mind that in Beam
Coder
needs to be serializable itself, so
KSerializer
or
KType
cannot be a property of the coder i.e. this does not work, nor does passing in `KType`:
Copy code
class Avro4kCoder<T: Any>(private val serializer: KSerializer<T>): CustomCoder<T>() {
  override fun encode(value: T, outStream: OutputStream) {
    Avro.default.openOutputStream(serializer) {
      encodeFormat = AvroEncodeFormat.Binary
    }.to(outStream).write(value).flush()
  }
  override fun decode(inStream: InputStream): T? {
    return Avro.default.openInputStream(serializer) {
      decodeFormat = AvroDecodeFormat.Binary(Avro.default.schema(serializer))
    }.from(inStream).next()
  }
}
v
Call
.serializer()
on it?
r
The problem is that the return value is not itself serializable. Apparently Beam needs coders to be serializable, so you end up with errors like:
Copy code
Caused by: java.io.NotSerializableException: Foo$$serializer
Oh, there is no
.serializer()
on
Class
. There is on
KClass
, but that is an internal API.
v
What is internal?
KClass
is not internal. And the
serializer
methods are generated by the kotlinx.serialization compiler plugin afair
r
KClass.serializer()
is internal
v
Do you want a serializer for a class for for an instance of a class?
I don't think the former makes too much sense and the latter is like I said, generated by the compiler plugin afair
r
I want a serializer for an instance. However, I'm trying to create a generic CustomCoder which can handle any type which is @Serializable.
This is what the coder looks like now:
Copy code
class Avro4kCoder<T: Any>(private val serializer: KSerializer<T>): CustomCoder<T>() {
  override fun encode(value: T, outStream: OutputStream) {
    Avro.default.openOutputStream(serializer) {
      encodeFormat = AvroEncodeFormat.Binary
    }.to(outStream).write(value).flush()
  }

  override fun decode(inStream: InputStream): T? {
    return Avro.default.openInputStream(serializer) {
      decodeFormat = AvroDecodeFormat.Binary(Avro.default.schema(serializer))
    }.from(inStream).next()
  }
}
But that doesn't work because
KSerializer<T>
is not serializable, which causes a
NotSerializableException
at runtime.
So I was thinking if I could pass a
KClass
in the constructor instead, and create the serializer at execution time, it would work. However, as I said, that API is internal.
Tried to pass in a
KType
instead -- looks like the public API allows one to call
serializer(KType)
to get an instance of
KSerializer
. Unfortunately, Beam complains at runtime about that too:
Copy code
Caused by: java.io.NotSerializableException: kotlin.reflect.jvm.internal.KTypeImpl
I edited the original question for clarity...
g
To get KSerializer at runtime from a java context, I have done this:
Copy code
fun getKSerializerOrNull(klass: Class<*>): KSerializer<*>? {
    val companionField = klass.declaredFields.find {
        it.name == "Companion" && isStatic(it.modifiers)
    } ?: return null
    val companion = companionField.get(klass)
    val serializerMethod = try {
        companion::class.java.getMethod("serializer")
    } catch (e: NoSuchMethodException) {
        return null
    }
    if (serializerMethod.returnType.name != KSerializer::class.qualifiedName) {
        return null
    }
    @Suppress("UNCHECKED_CAST")
    return serializerMethod.invoke(companion) as KSerializer<*>
}
But I agree it's not satisfying
In a similar situation, I also did a simple function
Copy code
fun <T : Any> kserializer(klass: KClass<T>) = when (klass) {
    MyClass1::class -> MyClass1.serializer()
    MyClass2::class -> MyClass2.serializer()
    MyClass3::class -> MyClass3.serializer()
    else -> throw RuntimeException("This should not happen: apply kserializer with  ${klass.qualifiedName}")
} as KSerializer <T>
as the set of classes being called was limited (not very elegant neither, but at least I do not have any reflection here)