I am trying to write a data pipeline in Java using Apache Beam version 2.56.0.
In the first step, I read from a database to create Row
objects. Then, I need to group these rows before applying a transformation. However, it seems that when I use the GroupByKey.create()
transformation, the pipeline gets stuck and never completes.
Below is a simplified example of my data pipeline that exhibits the same behavior:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
public class DataPipeline {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
Schema schema = Schema.builder().addStringField("id").addInt32Field("val").build();
RowCoder rowCoder = RowCoder.of(schema);
KvCoder<String, Row> kvCoder = KvCoder.of(StringUtf8Coder.of(), rowCoder);
Row row1 = Row.withSchema(schema).addValues("1", 12).build();
Row row2 = Row.withSchema(schema).addValues("1", 13).build();
PCollection<Row> rows = pipeline.apply(Create.of(row1, row2).withRowSchema(schema));
PCollection<KV<String, Row>> step1 = rows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.rows()))
.via((Row row) -> KV.of(row.getString("id"), row))).setCoder(kvCoder);
PCollection<KV<String, Iterable<Row>>> step2 = step1.apply(GroupByKey.create());
PCollection<Void> step3 = step2.apply(ParDo.of(new DoFn<KV<String, Iterable<Row>>, Void>() {
@ProcessElement
public void processElement(ProcessContext context) {
KV<String, Iterable<Row>> kv = context.element();
System.out.println("Key: " + kv.getKey());
for (Row row : kv.getValue()) {
System.out.println(row);
}
}
}));
/*
// if I avoid the group by key step, then the code completes
PCollection<Void> step2 = step1.apply(ParDo.of(new DoFn<KV<String, Row>, Void>() {
@ProcessElement
public void processElement(ProcessContext context) {
KV<String, Row> kv = context.element();
System.out.println("Key: " + kv.getKey() + " value: " + kv.getValue());
}
}));
*/
pipeline.run().waitUntilFinish();
}
}
When I remove the GroupByKey step, the code runs to completion. I am unable to figure out what is going wrong. Is there something I am missing?