I am developing an asynchronous I/O framework for Windows LLM inference. Suppose I have 8 threads executing computation tasks. When a computation task finds that the required neurons for the next stage are not in the memory pool, the computation thread places the task closure into the I/O queue. An I/O thread then retrieves it, allocates a buffer from the memory pool using a lock-free LRU, and generates a series of I/O closures (usually hundreds for each task) containing the buffer, size, and offset, which are then placed into the I/O execution queue.
My asynchronous I/O framework needs to fetch tasks from this I/O execution queue, complete the I/O, modify atomic variables, and once all I/O operations are complete, the task is placed back into the computation queue to continue execution. Below is my asynchronous I/O framework:
static inline bool check_complete(HANDLE hCompletionPort, bool wait) {
DWORD bytesTransferred;
ULONG_PTR completionKey;
LPOVERLAPPED lpOverlapped;
BOOL result = GetQueuedCompletionStatus(hCompletionPort, &bytesTransferred, &completionKey,
&lpOverlapped, wait? INFINITE: 1);
if (result) {
struct iocb *iocb_task = CONTAINING_RECORD(lpOverlapped, struct iocb, overlap);
finished += 1; // atomic
free(iocb_task->buffer); free(iocb_task);
return true; // get a completed IO closure
}
return false;
}
static inline bool do_read(HANDLE hFile, bool wait) {
iocb_t *iocb_task = NULL;
if (wait) {
iocb_task = (iocb_t *)queue_wait_and_pop_ptr(&queue);
} else {
iocb_task = (iocb_t *)queue_try_pop_ptr(&queue);
if (iocb_task == NULL) {
return false;
}
}
BOOL result = ReadFile(hFile, iocb_task->buffer, iocb_task->size, NULL, &iocb_task->overlap);
if (result) {
finished += 1; // atomic
free(iocb_task->buffer); free(iocb_task);
return false;
} else {
if (GetLastError() != ERROR_IO_PENDING) { // ...
}
}
return true;
}
DWORD WINAPI prepare_io(LPVOID param) {
int key = (int)param, ioPending = 0;
HANDLE hFile = CreateFileA( // over *60G*, large file
TARGET_FILE_PATH, GENERIC_READ, FILE_SHARE_READ,
NULL, OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL);
HANDLE hCompletionPort = CreateIoCompletionPort(hFile, NULL, 0, 1);
while (true) {
while (ioPending > NUM_QUEUE_SIZE) { // max queue
check_complete(hCompletionPort, true);
--ioPending;
}
if (ioPending == 0) {
do_read(hFile, true);
++ioPending;
} else {
if (do_read(hFile, false))
++ioPending;
if (check_complete(hCompletionPort, false))
--ioPending;
}
}
// close ...
return 0;
}
int main() {
HANDLE ioThreads[NUM_IO_THREADS]; // 16 threads and 64 queue size
queue_init(&queue, NUM_QUEUE_SIZE * NUM_IO_THREADS);
for (int i = 0; i < NUM_IO_THREADS; ++i) {
ioThreads[i] = CreateThread(NULL, 64 * 1024, prepare_io, (LPVOID)i, 0, NULL);
}
LONGLONG totalRequests = TOTAL_DATA_SIZE / BLOCK_SIZE;
DWORD startTime = GetTickCount();
for (LONGLONG i = 0; i < totalRequests; i++) {
char *buffer = (char *)malloc(BLOCK_SIZE);
iocb_t *iocb = (iocb_t *)malloc(sizeof(iocb_t));
memset(iocb, 0, sizeof(iocb_t));
LONGLONG offset = ((LONGLONG)rand() * rand()) % (fileInfo.nFileSizeLow - BLOCK_SIZE);
offset = (offset / BLOCK_SIZE) * BLOCK_SIZE; // Align to 4K
iocb->buffer = buffer;
iocb->overlap.Offset = offset & 0xFFFFFFFF;
iocb->overlap.OffsetHigh = offset >> 32;
iocb->size = BLOCK_SIZE;
queue_push(&queue, iocb);
}
while (finished < totalRequests) {
}
DWORD endTime = GetTickCount();
DWORD totalTime = endTime - startTime;
printf("Total time: %lu msn", totalTime);
printf("Average throughput: %.2f MB/sn", (double)TOTAL_DATA_SIZE / (1024 * 1024) / (totalTime / 1000.0));
queue_destroy(&queue);
return 0;
}
Above I/O framework can achieve 2.6GB/s in my SSD. It’s The throughput matches the actual performance of the disk.
However, when I ported this framework to the pipeline I mentioned earlier, the performance was only 50MB/s. The performance difference between the framework microbenchmark and the real scenario is 50 times, but the target file, number of threads, queue depth, and code are exactly the same.
Here are some of my observations:
- Reducing the number of I/O threads to 1 still results in only 50MB/s, indicating that it is not caused by a conflict between I/O threads and computation threads.
- The queue implementation uses locks and condition variables, and the same code is used in both the microbenchmark and the real scenario.
- Removing FILE_FLAG_NO_BUFFERING results in 1100MB/s, indicating that the bottleneck is not in the queue’s I/O closure.
Does anyone have any good suggestions?
Yy Lee is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.