otang
12/06/2016, 6:54 PMval momentSource: FlinkKafkaConsumer09[Moment] = kafkaFactory.createSpecific(classOf[Moment])`
Calling Kotlin Code:
fun <T : SpecificRecordBase> createSpecific(avroType: Class<T>): FlinkKafkaConsumer09<T> {
val name = SpecificData.get().getSchema(avroType).fullName
return FlinkKafkaConsumer09(name, SerdesSpecificAvroDeserializer(avroType), properties)
}