I have a simple Arrow Flight JS client that’s reading Flight RPC data from a Python server over grpc-Web. I can retrieve the payload of bytes from the server fine, but am facing a couple of issues:
- Converting Raw Bytes to Table: I can’t work out how to convert the raw bytes into a Table format correctly. At the moment, I can get the bytes, but when I run tableFromIPC, it doesn’t produce a table.
- Releasing Stream on the Server: When the stream finishes, the Python server doesn’t get released from its stream (doGet). So it keeps eating up RAM until it crashes (e.g., 40 GB worth for a table with 20 rows).
- Optimisations to Avoid Memory Copies: If you have any additional feedback regarding optimisations that can be used to avoid memory copies, it would be much appreciated. Particularly in terms of not copying the data into JS arrays if possible. While the PyArrow documentation is extensive, the JS documentation is mostly the API endpoints and function definitions, so this will be really helpful.
Thanks heaps for your help.
Regards,
PB
client.ts:
import * as grpcWeb from 'grpc-web';
import { FlightServiceClient } from '../proto/arrow_flight/Arrow_flightServiceClientPb';
import { Ticket, FlightData, FlightDescriptor, FlightInfo, Criteria } from '../proto/arrow_flight/arrow_flight_pb';
import { tableFromIPC } from 'apache-arrow';
export class FlightClient {
private client: FlightServiceClient;
constructor(serverUrl: string) {
const grpcOptions = {
withCredentials: true,
};
this.client = new FlightServiceClient(serverUrl, null, grpcOptions);
}
private handleStreamEvents<T>(stream: grpcWeb.ClientReadableStream<T>, processData: (response: T) => void, onEnd?: () => void): void {
stream.on('data', (response: T) => {
processData(response);
});
stream.on('status', (status: grpcWeb.Status) => {
if (status.code === grpcWeb.StatusCode.OK) {
console.log('Stream finished successfully');
} else {
console.error('Stream finished with error:', status);
}
});
stream.on('error', (err: grpcWeb.RpcError) => {
console.error('Stream encountered an error:', err);
console.error('Error code:', err.code);
console.error('Error message:', err.message);
console.error('Error metadata:', err.metadata);
});
stream.on('end', () => {
console.log('Stream ended');
if (onEnd) {
onEnd();
}
});
}
public doGet(ticketValue: string): void {
const ticket = new Ticket();
ticket.setTicket(ticketValue);
console.log('Sending ticket:', ticketValue);
const stream = this.client.doGet(ticket);
const chunks: Uint8Array[] = [];
this.handleStreamEvents(
stream,
(response: FlightData) => {
const dataBody = response.getDataBody_asU8();
chunks.push(dataBody);
},
() => {
const totalLength = chunks.reduce((acc, chunk) => acc + chunk.length, 0);
const completeData = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of chunks) {
completeData.set(chunk, offset);
offset += chunk.length;
}
try {
const table = tableFromIPC(completeData);
console.log('Received table:', table);
} catch (error) {
console.error('Failed to decode table:', error);
}
}
);
}
}
index.ts:
import { FlightClient } from './client';
// Configure XMLHttpRequest for grpc-web in Node.js environment
global.XMLHttpRequest = require('xhr2');
const client = new FlightClient('http://localhost:8080'); // Pointing to Envoy proxy
// Test fetchFlightData
client.doGet(`1`); // Hack to get the data immediately
Then on the server side, my do_get method is like:
flight_server.py
def do_get(self, context, ticket):
df = pd.DataFrame({‘col1’: [1, 2], ‘col2’: [3, 4]})
table = pa.Table.from_pandas(df)
def generate_batches(table):
for batch in table.to_batches():
yield batch
return pa.flight.GeneratorStream(table.schema, generate_batches(table))