I am using datafusion to retrieve an arrow file appended in real time. Here is my v1 version
#[tokio::test]
pub async fn arrow_file_search_v1() -> Result<(),Error>{
// init datafusion context
let config = SessionConfig::from_env()?
.with_information_schema(true)
.with_batch_size(8 * 1024);
let runtime_env = RuntimeEnv::new(RuntimeConfig::default())?;
let context = SessionContext::new_with_config_rt(config, Arc::new(runtime_env));
let file_format = ArrowFormat::default();
let listing_options = ListingOptions::new(Arc::new(file_format)).with_file_extension(FileType::ARROW.get_ext());
let mut config = ListingTableConfig::new(ListingTableUrl::parse("./1715856177.arrow").unwrap())
.with_listing_options(listing_options)
.with_schema(Arc::from(schema()));
let table = ListingTable::try_new(config)?;
context.register_table("tbl", Arc::new(table))?;
// exec query
let sql = "SELECT * FROM tbl";
let mut df = match context.sql(sql).await {
Ok(df) => df,
_ => return Err(Error::msg("exec sql error")),
};
let batches = df.clone().collect().await?;
Ok(())
}
The error reported after running:
Error: Arrow error: Parser error: Arrow file does not contain correct footer
Caused by:
Parser error: Arrow file does not contain correct footer
Guess it’s because the arrow file is written in real-time append and the footer information is not written yet
So I wrote the v2 version, which uses StreamReader to stream read the record_batch, and then registers the record_batch to give to datafusion for retrieval
#[tokio::test]
pub async fn arrow_file_search_v2() -> Result<(),Error>{
// init datafusion context
let config = SessionConfig::from_env()?
.with_information_schema(true)
.with_batch_size(8 * 1024);
let runtime_env = RuntimeEnv::new(RuntimeConfig::default())?;
let context = SessionContext::new_with_config_rt(config, Arc::new(runtime_env));
// read record_batch by stream_reader
let file = File::open("./1715856177.arrow").unwrap();
let reader = BufReader::new(file);
let mut reader = StreamReader::try_new(reader, None).unwrap();
let schema_ref = reader.schema();
let mut in_records_batches: Vec<RecordBatch> = vec![];
while let Some(batch) = reader.next() {
in_records_batches.push(batch.unwrap());
}
// register table to memtable
let mem_table = Arc::new(MemTable::try_new(Arc::from(schema_ref), vec![in_records_batches])?);
context.register_table("tbl", mem_table.clone())?;
// exec query
let sql = "SELECT * FROM tbl";
let mut df = match context.sql(sql).await {
Ok(df) => df,
_ => return Err(Error::msg("exec sql error")),
};
let batches = df.clone().collect().await?;
Ok(())
}
The code ran successfully!
My question is, if I want to retrieve a real-time appended arrow file using datafusion, is there any other better way?
xxq is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.