I am trying to serialize data which is of type JsonDocument to Kafka’s Schema Registry. Let’s assume that the data that I’m receiving is a Json string and I don’t have the CLR type.
<code>var schemaRegistryConfig = new SchemaRegistryConfig
Url = "http://localhost:8081"
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
var producerConfig = new ProducerConfig
BootstrapServers = "localhost:9094"
var jsonSerializerConfig = new JsonSerializerConfig
AutoRegisterSchemas = true
using var producer = new ProducerBuilder<string, byte[]>(producerConfig)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(new JsonSerializer<byte[]>(schemaRegistry, jsonSerializerConfig))
const string jsonString = """{ "name": "John Doe", "age": 30 }""";
using var jsonDocument = JsonDocument.Parse(jsonString);
JsonElement rootElement = jsonDocument.RootElement;
var jsonBytes = JsonSerializer.SerializeToUtf8Bytes(rootElement);
var message = new Message<string, byte[]>
var deliveryResult = await producer.ProduceAsync("your-topic", message);
Console.WriteLine($"Delivered message to '{deliveryResult.TopicPartitionOffset}'");
<code>var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:8081"
};
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9094"
};
var jsonSerializerConfig = new JsonSerializerConfig
{
AutoRegisterSchemas = true
};
using var producer = new ProducerBuilder<string, byte[]>(producerConfig)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(new JsonSerializer<byte[]>(schemaRegistry, jsonSerializerConfig))
.Build();
const string jsonString = """{ "name": "John Doe", "age": 30 }""";
using var jsonDocument = JsonDocument.Parse(jsonString);
JsonElement rootElement = jsonDocument.RootElement;
var jsonBytes = JsonSerializer.SerializeToUtf8Bytes(rootElement);
var message = new Message<string, byte[]>
{
Key = "key1",
Value = jsonBytes
};
var deliveryResult = await producer.ProduceAsync("your-topic", message);
Console.WriteLine($"Delivered message to '{deliveryResult.TopicPartitionOffset}'");
</code>
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:8081"
};
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9094"
};
var jsonSerializerConfig = new JsonSerializerConfig
{
AutoRegisterSchemas = true
};
using var producer = new ProducerBuilder<string, byte[]>(producerConfig)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(new JsonSerializer<byte[]>(schemaRegistry, jsonSerializerConfig))
.Build();
const string jsonString = """{ "name": "John Doe", "age": 30 }""";
using var jsonDocument = JsonDocument.Parse(jsonString);
JsonElement rootElement = jsonDocument.RootElement;
var jsonBytes = JsonSerializer.SerializeToUtf8Bytes(rootElement);
var message = new Message<string, byte[]>
{
Key = "key1",
Value = jsonBytes
};
var deliveryResult = await producer.ProduceAsync("your-topic", message);
Console.WriteLine($"Delivered message to '{deliveryResult.TopicPartitionOffset}'");
This does seem to work but it created a schema like this:
"$schema": "http://json-schema.org/draft-04/schema#",
<code>{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Byte[]",
"type": "string",
"format": "byte"
}
</code>
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Byte[]",
"type": "string",
"format": "byte"
}
And the value was encoded ‘eyJuYW1lIjoiSm9obiBEb2UiLCJhZ2UiOjMwfQ==’
I am able to consume and deserialize the record, but I am not sure if I did it correctly. Additionally, the schema and the value look weird. I would expect a properties field in the schema containing the name and age fields with their types. However, I am still unsure why this is working.