I want to use Kafka Schema Registry with Protobuf in my Kafka Streams application with Scala. I started with a very simple data schema, but I always get a ClassCastException. When producing a record I get the following error:
Can’t convert value of class com.my.example.protobuf.schema.User to class io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer specified in value.serializer.
java.lang.ClassCastException: class com.my.example.protobuf.schema.User cannot be cast to class com.google.protobuf.Message (com.my.example.protobuf.schema.User and com.google.protobuf.Message are in unnamed module of loader
Produce logic
private val DEFAULT_KAFKA_PRODUCER_PROPERTIES = collection.immutable.Map(
ProducerConfig.CLIENT_ID_CONFIG -> "myClientId",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> BOOTSTRAP_SERVER,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer",
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
)
...
val producer = new KafkaProducer[String, User](kafkaProducerConfigs)
val record: ProducerRecord[String, User] =
new ProducerRecord[String, User](
"test-topic",
"test-key",
User(
"Kafka"
)
)
producer.send(record).get(5L, TimeUnit.SECONDS)
user.proto
syntax = "proto3";
message User {
string name = 1;
}
Case class user.scala generated by ScalaPB
// Generated by the Scala Plugin for the Protocol Buffer Compiler.
// Do not edit!
//
// Protofile syntax: PROTO3
package com.my.example.protobuf.schema
@SerialVersionUID(0L)
final case class User(
name: _root_.scala.Predef.String = "",
unknownFields: _root_.scalapb.UnknownFieldSet = _root_.scalapb.UnknownFieldSet.empty
) extends scalapb.GeneratedMessage with scalapb.lenses.Updatable[User] {
@transient
private[this] var __serializedSizeMemoized: _root_.scala.Int = 0
private[this] def __computeSerializedSize(): _root_.scala.Int = {
var __size = 0
{
val __value = name
if (!__value.isEmpty) {
__size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(1, __value)
}
};
__size += unknownFields.serializedSize
__size
}
override def serializedSize: _root_.scala.Int = {
var __size = __serializedSizeMemoized
if (__size == 0) {
__size = __computeSerializedSize() + 1
__serializedSizeMemoized = __size
}
__size - 1
}
def writeTo(`_output__`: _root_.com.google.protobuf.CodedOutputStream): _root_.scala.Unit = {
{
val __v = name
if (!__v.isEmpty) {
_output__.writeString(1, __v)
}
};
unknownFields.writeTo(_output__)
}
def withName(__v: _root_.scala.Predef.String): User = copy(name = __v)
def withUnknownFields(__v: _root_.scalapb.UnknownFieldSet) = copy(unknownFields = __v)
def discardUnknownFields = copy(unknownFields = _root_.scalapb.UnknownFieldSet.empty)
def getFieldByNumber(__fieldNumber: _root_.scala.Int): _root_.scala.Any = {
(__fieldNumber: @_root_.scala.unchecked) match {
case 1 => {
val __t = name
if (__t != "") __t else null
}
}
}
def getField(__field: _root_.scalapb.descriptors.FieldDescriptor): _root_.scalapb.descriptors.PValue = {
_root_.scala.Predef.require(__field.containingMessage eq companion.scalaDescriptor)
(__field.number: @_root_.scala.unchecked) match {
case 1 => _root_.scalapb.descriptors.PString(name)
}
}
def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToUnicodeString(this)
def companion: User.type = User
// @@protoc_insertion_point(GeneratedMessage[User])
}
object User extends scalapb.GeneratedMessageCompanion[User] {
implicit def messageCompanion: scalapb.GeneratedMessageCompanion[User] = this
def parseFrom(`_input__`: _root_.com.google.protobuf.CodedInputStream): User = {
var __name: _root_.scala.Predef.String = ""
var `_unknownFields__`: _root_.scalapb.UnknownFieldSet.Builder = null
var _done__ = false
while (!_done__) {
val _tag__ = _input__.readTag()
_tag__ match {
case 0 => _done__ = true
case 10 =>
__name = _input__.readStringRequireUtf8()
case tag =>
if (_unknownFields__ == null) {
_unknownFields__ = new _root_.scalapb.UnknownFieldSet.Builder()
}
_unknownFields__.parseField(tag, _input__)
}
}
User(
name = __name,
unknownFields = if (_unknownFields__ == null) _root_.scalapb.UnknownFieldSet.empty else _unknownFields__.result()
)
}
implicit def messageReads: _root_.scalapb.descriptors.Reads[User] = _root_.scalapb.descriptors.Reads{
case _root_.scalapb.descriptors.PMessage(__fieldsMap) =>
_root_.scala.Predef.require(__fieldsMap.keys.forall(_.containingMessage eq scalaDescriptor), "FieldDescriptor does not match message type.")
User(
name = __fieldsMap.get(scalaDescriptor.findFieldByNumber(1).get).map(_.as[_root_.scala.Predef.String]).getOrElse("")
)
case _ => throw new RuntimeException("Expected PMessage")
}
def javaDescriptor: _root_.com.google.protobuf.Descriptors.Descriptor = UserProto.javaDescriptor.getMessageTypes().get(0)
def scalaDescriptor: _root_.scalapb.descriptors.Descriptor = UserProto.scalaDescriptor.messages(0)
def messageCompanionForFieldNumber(__number: _root_.scala.Int): _root_.scalapb.GeneratedMessageCompanion[_] = throw new MatchError(__number)
lazy val nestedMessagesCompanions: Seq[_root_.scalapb.GeneratedMessageCompanion[_ <: _root_.scalapb.GeneratedMessage]] = Seq.empty
def enumCompanionForFieldNumber(__fieldNumber: _root_.scala.Int): _root_.scalapb.GeneratedEnumCompanion[_] = throw new MatchError(__fieldNumber)
lazy val defaultInstance = User(
name = ""
)
implicit class UserLens[UpperPB](_l: _root_.scalapb.lenses.Lens[UpperPB, User]) extends _root_.scalapb.lenses.ObjectLens[UpperPB, User](_l) {
def name: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.name)((c_, f_) => c_.copy(name = f_))
}
final val NAME_FIELD_NUMBER = 1
def of(
name: _root_.scala.Predef.String
): _root_.com.my.example.protobuf.schema.User = _root_.com.my.example.protobuf.schema.User(
name
)
// @@protoc_insertion_point(GeneratedMessageCompanion[User])
}
Class UserProto.scala generated by ScalaPB
// Generated by the Scala Plugin for the Protocol Buffer Compiler.
// Do not edit!
//
// Protofile syntax: PROTO3
package com.my.example.protobuf.schema
object UserProto extends _root_.scalapb.GeneratedFileObject {
lazy val dependencies: Seq[_root_.scalapb.GeneratedFileObject] = Seq.empty
lazy val messagesCompanions: Seq[_root_.scalapb.GeneratedMessageCompanion[_ <: _root_.scalapb.GeneratedMessage]] =
Seq[_root_.scalapb.GeneratedMessageCompanion[_ <: _root_.scalapb.GeneratedMessage]](
User
)
private lazy val ProtoBytes: _root_.scala.Array[Byte] =
scalapb.Encoding.fromBase64(scala.collection.immutable.Seq(
"""Cgp1c2VyLnByb3RvIiUKBFVzZXISHQoEbmFtZRgBIAEoCUIJ4j8GEgRuYW1lUgRuYW1lYgZwcm90bzM="""
).mkString)
lazy val scalaDescriptor: _root_.scalapb.descriptors.FileDescriptor = {
val scalaProto = com.google.protobuf.descriptor.FileDescriptorProto.parseFrom(ProtoBytes)
_root_.scalapb.descriptors.FileDescriptor.buildFrom(scalaProto, dependencies.map(_.scalaDescriptor))
}
lazy val javaDescriptor: com.google.protobuf.Descriptors.FileDescriptor = {
val javaProto = com.google.protobuf.DescriptorProtos.FileDescriptorProto.parseFrom(ProtoBytes)
com.google.protobuf.Descriptors.FileDescriptor.buildFrom(javaProto, _root_.scala.Array(
))
}
@deprecated("Use javaDescriptor instead. In a future version this will refer to scalaDescriptor.", "ScalaPB 0.5.47")
def descriptor: com.google.protobuf.Descriptors.FileDescriptor = javaDescriptor
}