I have a java code that writes a map field to a file and then read from the file by using Apache Arrow. During the read, when loading the record batch I got the not all nodes and buffers were consumed error. Below is the code:
File file = new File("/temp/test.arrow");
Schema schema;
try (BufferAllocator allocator = new RootAllocator()) {
Field keyField = new Field("id", FieldType.notNullable(new ArrowType.Int(64, true)), null);
Field valueField = new Field("value", FieldType.nullable(new ArrowType.Int(64, true)), null);
Field structField =
new Field("entry", FieldType.notNullable(ArrowType.Struct.INSTANCE), List.of(keyField, valueField));
Field mapIntToIntField = new Field("mapFieldIntToInt", FieldType.notNullable(new ArrowType.Map(false)), List.of(structField));
schema = new Schema(Arrays.asList(mapIntToIntField));
try (
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, allocator);
MapVector mapVector = (MapVector) vectorSchemaRoot.getVector("mapFieldIntToInt")) {
UnionMapWriter mapWriter = mapVector.getWriter();
mapWriter.setPosition(0);
mapWriter.startMap();
for (int i = 0; i < 3; i++) {
mapWriter.startEntry();
mapWriter.key().bigInt().writeBigInt(i);
mapWriter.value().bigInt().writeBigInt(i * 7);
mapWriter.endEntry();
}
mapWriter.endMap();
mapWriter.setValueCount(1);
vectorSchemaRoot.setRowCount(1);
try (
FileOutputStream fileOutputStream = new FileOutputStream(file);
ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel())) {
writer.start();
writer.writeBatch();
writer.end();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// Deserialize Arrow data from a file
try (
BufferAllocator rootAllocator = new RootAllocator();
FileInputStream fileInputStream = new FileInputStream(file);
ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), rootAllocator)) {
for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock); // error thrown here
VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
System.out.print(vectorSchemaRootRecover.contentToTSVString());
int totalCount = vectorSchemaRootRecover.getRowCount();
Schema schema = vectorSchemaRootRecover.getSchema();
for (int i = 0; i < totalCount; i++) {
for (Field field : schema.getFields()) {
String fieldName = field.getName();
Object arrowValue = vectorSchemaRootRecover.getVector(field).getObject(i);
System.out.println("fieldName: " + fieldName + ", arrowValue: " + arrowValue);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
Any hint would be appreciated. Thanks!
By debugging the code, looks like write works as expected. I can see the correct data in vectorSchemaRoot
, not sure if any setting is missing during the write that caused the error in loading the file.