I did a solution with help of googling and asking AI about an error I was getting when executing dataflow flex templates with java apache beam:
Error message from worker: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null String
This error happens because at the PCollection<KV<String, CustomClass>>, the key could be null, so I found that I needed to setup a NullableCoder for the key and I did like so:
Coder<String> nullableStringCoder = NullableCoder.of(StringUtf8Coder.of());
But since it is a KV, I also need to specify a coder for the Value, and since the value is a CustomClass
, I need to create a CustomCoder
for it right? While searching I found this answer How can I code nullable objects in Google Cloud Dataflow? which has a very simple approach where I would just annotate my CustomClass
to use the DefaultCoder
as AvroCoder
, and then just annotate the Nullable
fields too, but I could not follow this approach because in my case again I have a KV, so that is why the approach I followed was to create a CustomCoder
because then I can specify it to the pipeline step with the method .withCoder(customCoder);
This solution is working but I was wondering if there could be another ways to do this, for example at the begining of the pipeline maybe I could do:
pipeline.getCoderRegistry().registerCoderForClass(String.class, NullableCoder.of(StringUtf8Coder.of()));
pipeline.getCoderRegistry().registerCoderForClass(CustomClass.class, AvroCoder.of(CustomClass.class));
And maybe this could give me the same result as the first approach of creating a CustomCoder?
Anyways I share the solution with the CustomCoder that its working:
How I use the CustomClassCoder:
PTransform<PBegin, PCollection<KV<String, CustomClass>>> readFromBigquery(
TypedRead<KV<String, CustomClass>> typedRead) {
Coder<String> nullableStringCoder = NullableCoder.of(StringUtf8Coder.of());
Coder<CustomClass> customClassCoder = new CustomClassCoder();
Coder<KV<String, CustomClass>> customCoder = KvCoder.of(nullableStringCoder, customClassCoder);
return typedRead
.fromQuery(queryString)
.usingStandardSql()
.withoutValidation()
.withKmsKey(kmsKey)
.withCoder(customCoder);
}
CustomClassCoder:
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import java.time.Instant;
public class CustomClassCoder extends AtomicCoder<CustomClass> {
private final Coder<String> idCoder = NullableCoder.of(StringUtf8Coder.of());
private final Coder<Integer> successfulOperationsCoder = BigEndianIntegerCoder.of();
private final Coder<Integer> failedOperationsCoder = BigEndianIntegerCoder.of();
private final Coder<Long> generationTimestampCoder = BigEndianLongCoder.of(); // Coder for Instant
@Override public void encode(CustomClass value, OutputStream outStream) throws IOException {
idCoder.encode(value.getId(), outStream);
successfulOperationsCoder.encode(value.getSuccessfulOps(), outStream);
failedOperationsCoder.encode(value.getFailedOps(), outStream);
// Encode Instant as milliseconds
generationTimestampCoder.encode(value.getGeneratedAt().toEpochMilli(), outStream);
}
@Override public CustomClass decode(InputStream inStream) throws IOException {
String id = idCoder.decode(inStream);
Integer successfulOps = successfulOperationsCoder.decode(inStream);
Integer failedOps = failedOperationsCoder.decode(inStream);
// Decode Instant from milliseconds
long epochMilli = generationTimestampCoder.decode(inStream);
Instant generatedAt = Instant.ofEpochMilli(epochMilli);
CustomClass customClass = new CustomClass();
customClass.setId(id);
customClass.setSuccessfulOps(successfulOps);
customClass.setFailedOps(failedOps);
customClass.setGeneratedAt(generatedAt);
return customClass;
}
}
I have done some dataflow executions and this works, but would appreciate if someone can confirm if this approach is okay or if there is another way to handle this scenario (allow null key at a KV)? Thanks!