17e509d50SXin LI /* 27e509d50SXin LI * Copyright (c) Meta Platforms, Inc. and affiliates. 37e509d50SXin LI * All rights reserved. 47e509d50SXin LI * 57e509d50SXin LI * This source code is licensed under both the BSD-style license (found in the 67e509d50SXin LI * LICENSE file in the root directory of this source tree) and the GPLv2 (found 77e509d50SXin LI * in the COPYING file in the root directory of this source tree). 87e509d50SXin LI * You may select, at your option, one of the above-listed licenses. 97e509d50SXin LI */ 107e509d50SXin LI 117e509d50SXin LI /* 127e509d50SXin LI * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously. 137e509d50SXin LI * Current implementation relies on having one thread that reads and one that 147e509d50SXin LI * writes. 157e509d50SXin LI * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but 167e509d50SXin LI * are performed serially by the appropriate worker thread. 177e509d50SXin LI * Most systems exposes better primitives to perform asynchronous IO, such as 187e509d50SXin LI * io_uring on newer linux systems. The API is built in such a way that in the 197e509d50SXin LI * future we could replace the threads with better solutions when available. 207e509d50SXin LI */ 217e509d50SXin LI 227e509d50SXin LI #ifndef ZSTD_FILEIO_ASYNCIO_H 237e509d50SXin LI #define ZSTD_FILEIO_ASYNCIO_H 247e509d50SXin LI 257e509d50SXin LI #include "../lib/common/mem.h" /* U32, U64 */ 267e509d50SXin LI #include "fileio_types.h" 277e509d50SXin LI #include "platform.h" 287e509d50SXin LI #include "util.h" 297e509d50SXin LI #include "../lib/common/pool.h" 307e509d50SXin LI #include "../lib/common/threading.h" 317e509d50SXin LI 327e509d50SXin LI #define MAX_IO_JOBS (10) 337e509d50SXin LI 347e509d50SXin LI typedef struct { 357e509d50SXin LI /* These struct fields should be set only on creation and not changed afterwards */ 367e509d50SXin LI POOL_ctx* threadPool; 377e509d50SXin LI int threadPoolActive; 387e509d50SXin LI int totalIoJobs; 397e509d50SXin LI const FIO_prefs_t* prefs; 407e509d50SXin LI POOL_function poolFunction; 417e509d50SXin LI 427e509d50SXin LI /* Controls the file we currently write to, make changes only by using provided utility functions */ 437e509d50SXin LI FILE* file; 447e509d50SXin LI 457e509d50SXin LI /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should 467e509d50SXin LI * only be mutated after locking the mutex */ 477e509d50SXin LI ZSTD_pthread_mutex_t ioJobsMutex; 487e509d50SXin LI void* availableJobs[MAX_IO_JOBS]; 497e509d50SXin LI int availableJobsCount; 507e509d50SXin LI size_t jobBufferSize; 517e509d50SXin LI } IOPoolCtx_t; 527e509d50SXin LI 537e509d50SXin LI typedef struct { 547e509d50SXin LI IOPoolCtx_t base; 557e509d50SXin LI 567e509d50SXin LI /* State regarding the currently read file */ 577e509d50SXin LI int reachedEof; 587e509d50SXin LI U64 nextReadOffset; 597e509d50SXin LI U64 waitingOnOffset; 607e509d50SXin LI 617e509d50SXin LI /* We may hold an IOJob object as needed if we actively expose its buffer. */ 627e509d50SXin LI void *currentJobHeld; 637e509d50SXin LI 647e509d50SXin LI /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in 657e509d50SXin LI * the first of them. Shouldn't be accessed from outside ot utility functions. */ 667e509d50SXin LI U8 *coalesceBuffer; 677e509d50SXin LI 687e509d50SXin LI /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might 697e509d50SXin LI * change when consuming / refilling buffer. */ 707e509d50SXin LI U8 *srcBuffer; 717e509d50SXin LI size_t srcBufferLoaded; 727e509d50SXin LI 737e509d50SXin LI /* We need to know what tasks completed so we can use their buffers when their time comes. 747e509d50SXin LI * Should only be accessed after locking base.ioJobsMutex . */ 757e509d50SXin LI void* completedJobs[MAX_IO_JOBS]; 767e509d50SXin LI int completedJobsCount; 777e509d50SXin LI ZSTD_pthread_cond_t jobCompletedCond; 787e509d50SXin LI } ReadPoolCtx_t; 797e509d50SXin LI 807e509d50SXin LI typedef struct { 817e509d50SXin LI IOPoolCtx_t base; 827e509d50SXin LI unsigned storedSkips; 837e509d50SXin LI } WritePoolCtx_t; 847e509d50SXin LI 857e509d50SXin LI typedef struct { 867e509d50SXin LI /* These fields are automatically set and shouldn't be changed by non WritePool code. */ 877e509d50SXin LI void *ctx; 887e509d50SXin LI FILE* file; 897e509d50SXin LI void *buffer; 907e509d50SXin LI size_t bufferSize; 917e509d50SXin LI 927e509d50SXin LI /* This field should be changed before a job is queued for execution and should contain the number 937e509d50SXin LI * of bytes to write from the buffer. */ 947e509d50SXin LI size_t usedBufferSize; 957e509d50SXin LI U64 offset; 967e509d50SXin LI } IOJob_t; 977e509d50SXin LI 987e509d50SXin LI /* AIO_supported: 997e509d50SXin LI * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ 1007e509d50SXin LI int AIO_supported(void); 1017e509d50SXin LI 1027e509d50SXin LI 1037e509d50SXin LI /* AIO_WritePool_releaseIoJob: 1047e509d50SXin LI * Releases an acquired job back to the pool. Doesn't execute the job. */ 1057e509d50SXin LI void AIO_WritePool_releaseIoJob(IOJob_t *job); 1067e509d50SXin LI 1077e509d50SXin LI /* AIO_WritePool_acquireJob: 1087e509d50SXin LI * Returns an available write job to be used for a future write. */ 1097e509d50SXin LI IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx); 1107e509d50SXin LI 1117e509d50SXin LI /* AIO_WritePool_enqueueAndReacquireWriteJob: 1127e509d50SXin LI * Enqueues a write job for execution and acquires a new one. 1137e509d50SXin LI * After execution `job`'s pointed value would change to the newly acquired job. 1147e509d50SXin LI * Make sure to set `usedBufferSize` to the wanted length before call. 1157e509d50SXin LI * The queued job shouldn't be used directly after queueing it. */ 1167e509d50SXin LI void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job); 1177e509d50SXin LI 1187e509d50SXin LI /* AIO_WritePool_sparseWriteEnd: 1197e509d50SXin LI * Ends sparse writes to the current file. 1207e509d50SXin LI * Blocks on completion of all current write jobs before executing. */ 1217e509d50SXin LI void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx); 1227e509d50SXin LI 1237e509d50SXin LI /* AIO_WritePool_setFile: 1247e509d50SXin LI * Sets the destination file for future writes in the pool. 1257e509d50SXin LI * Requires completion of all queues write jobs and release of all otherwise acquired jobs. 1267e509d50SXin LI * Also requires ending of sparse write if a previous file was used in sparse mode. */ 1277e509d50SXin LI void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file); 1287e509d50SXin LI 1297e509d50SXin LI /* AIO_WritePool_getFile: 1307e509d50SXin LI * Returns the file the writePool is currently set to write to. */ 1317e509d50SXin LI FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx); 1327e509d50SXin LI 1337e509d50SXin LI /* AIO_WritePool_closeFile: 1347e509d50SXin LI * Ends sparse write and closes the writePool's current file and sets the file to NULL. 1357e509d50SXin LI * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ 1367e509d50SXin LI int AIO_WritePool_closeFile(WritePoolCtx_t *ctx); 1377e509d50SXin LI 1387e509d50SXin LI /* AIO_WritePool_create: 1397e509d50SXin LI * Allocates and sets and a new write pool including its included jobs. 1407e509d50SXin LI * bufferSize should be set to the maximal buffer we want to write to at a time. */ 1417e509d50SXin LI WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize); 1427e509d50SXin LI 1437e509d50SXin LI /* AIO_WritePool_free: 1447e509d50SXin LI * Frees and releases a writePool and its resources. Closes destination file. */ 1457e509d50SXin LI void AIO_WritePool_free(WritePoolCtx_t* ctx); 1467e509d50SXin LI 1477e509d50SXin LI /* AIO_WritePool_setAsync: 1487e509d50SXin LI * Allows (de)activating async mode, to be used when the expected overhead 1497e509d50SXin LI * of asyncio costs more than the expected gains. */ 1507e509d50SXin LI void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async); 1517e509d50SXin LI 1527e509d50SXin LI /* AIO_ReadPool_create: 1537e509d50SXin LI * Allocates and sets and a new readPool including its included jobs. 1547e509d50SXin LI * bufferSize should be set to the maximal buffer we want to read at a time, will also be used 1557e509d50SXin LI * as our basic read size. */ 1567e509d50SXin LI ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize); 1577e509d50SXin LI 1587e509d50SXin LI /* AIO_ReadPool_free: 1597e509d50SXin LI * Frees and releases a readPool and its resources. Closes source file. */ 1607e509d50SXin LI void AIO_ReadPool_free(ReadPoolCtx_t* ctx); 1617e509d50SXin LI 1627e509d50SXin LI /* AIO_ReadPool_setAsync: 1637e509d50SXin LI * Allows (de)activating async mode, to be used when the expected overhead 1647e509d50SXin LI * of asyncio costs more than the expected gains. */ 1657e509d50SXin LI void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async); 1667e509d50SXin LI 1677e509d50SXin LI /* AIO_ReadPool_consumeBytes: 1687e509d50SXin LI * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ 1697e509d50SXin LI void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n); 1707e509d50SXin LI 1717e509d50SXin LI /* AIO_ReadPool_fillBuffer: 1727e509d50SXin LI * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize). 1737e509d50SXin LI * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file. 1747e509d50SXin LI * Return value is the number of bytes added to the buffer. 1757e509d50SXin LI * Note that srcBuffer might have up to 2 times bufferSize bytes. */ 1767e509d50SXin LI size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n); 1777e509d50SXin LI 1787e509d50SXin LI /* AIO_ReadPool_consumeAndRefill: 1797e509d50SXin LI * Consumes the current buffer and refills it with bufferSize bytes. */ 1807e509d50SXin LI size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx); 1817e509d50SXin LI 1827e509d50SXin LI /* AIO_ReadPool_setFile: 1837e509d50SXin LI * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. 1847e509d50SXin LI * Waits for all current enqueued tasks to complete if a previous file was set. */ 1857e509d50SXin LI void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file); 1867e509d50SXin LI 1877e509d50SXin LI /* AIO_ReadPool_getFile: 1887e509d50SXin LI * Returns the current file set for the read pool. */ 1897e509d50SXin LI FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx); 1907e509d50SXin LI 1917e509d50SXin LI /* AIO_ReadPool_closeFile: 1927e509d50SXin LI * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ 1937e509d50SXin LI int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx); 1947e509d50SXin LI 1957e509d50SXin LI #endif /* ZSTD_FILEIO_ASYNCIO_H */ 196