xref: /src/sys/contrib/zstd/programs/fileio_asyncio.h (revision c0d9a07101a1e72769ee0619a583f63a078fb391)
17e509d50SXin LI /*
27e509d50SXin LI  * Copyright (c) Meta Platforms, Inc. and affiliates.
37e509d50SXin LI  * All rights reserved.
47e509d50SXin LI  *
57e509d50SXin LI  * This source code is licensed under both the BSD-style license (found in the
67e509d50SXin LI  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
77e509d50SXin LI  * in the COPYING file in the root directory of this source tree).
87e509d50SXin LI  * You may select, at your option, one of the above-listed licenses.
97e509d50SXin LI  */
107e509d50SXin LI 
117e509d50SXin LI  /*
127e509d50SXin LI   * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
137e509d50SXin LI   * Current implementation relies on having one thread that reads and one that
147e509d50SXin LI   * writes.
157e509d50SXin LI   * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
167e509d50SXin LI   * are performed serially by the appropriate worker thread.
177e509d50SXin LI   * Most systems exposes better primitives to perform asynchronous IO, such as
187e509d50SXin LI   * io_uring on newer linux systems. The API is built in such a way that in the
197e509d50SXin LI   * future we could replace the threads with better solutions when available.
207e509d50SXin LI   */
217e509d50SXin LI 
227e509d50SXin LI #ifndef ZSTD_FILEIO_ASYNCIO_H
237e509d50SXin LI #define ZSTD_FILEIO_ASYNCIO_H
247e509d50SXin LI 
257e509d50SXin LI #include "../lib/common/mem.h"     /* U32, U64 */
267e509d50SXin LI #include "fileio_types.h"
277e509d50SXin LI #include "platform.h"
287e509d50SXin LI #include "util.h"
297e509d50SXin LI #include "../lib/common/pool.h"
307e509d50SXin LI #include "../lib/common/threading.h"
317e509d50SXin LI 
327e509d50SXin LI #define MAX_IO_JOBS          (10)
337e509d50SXin LI 
347e509d50SXin LI typedef struct {
357e509d50SXin LI     /* These struct fields should be set only on creation and not changed afterwards */
367e509d50SXin LI     POOL_ctx* threadPool;
377e509d50SXin LI     int threadPoolActive;
387e509d50SXin LI     int totalIoJobs;
397e509d50SXin LI     const FIO_prefs_t* prefs;
407e509d50SXin LI     POOL_function poolFunction;
417e509d50SXin LI 
427e509d50SXin LI     /* Controls the file we currently write to, make changes only by using provided utility functions */
437e509d50SXin LI     FILE* file;
447e509d50SXin LI 
457e509d50SXin LI     /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
467e509d50SXin LI      * only be mutated after locking the mutex */
477e509d50SXin LI     ZSTD_pthread_mutex_t ioJobsMutex;
487e509d50SXin LI     void* availableJobs[MAX_IO_JOBS];
497e509d50SXin LI     int availableJobsCount;
507e509d50SXin LI     size_t jobBufferSize;
517e509d50SXin LI } IOPoolCtx_t;
527e509d50SXin LI 
537e509d50SXin LI typedef struct {
547e509d50SXin LI     IOPoolCtx_t base;
557e509d50SXin LI 
567e509d50SXin LI     /* State regarding the currently read file */
577e509d50SXin LI     int reachedEof;
587e509d50SXin LI     U64 nextReadOffset;
597e509d50SXin LI     U64 waitingOnOffset;
607e509d50SXin LI 
617e509d50SXin LI     /* We may hold an IOJob object as needed if we actively expose its buffer. */
627e509d50SXin LI     void *currentJobHeld;
637e509d50SXin LI 
647e509d50SXin LI     /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
657e509d50SXin LI      * the first of them. Shouldn't be accessed from outside ot utility functions. */
667e509d50SXin LI     U8 *coalesceBuffer;
677e509d50SXin LI 
687e509d50SXin LI     /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
697e509d50SXin LI      * change when consuming / refilling buffer. */
707e509d50SXin LI     U8 *srcBuffer;
717e509d50SXin LI     size_t srcBufferLoaded;
727e509d50SXin LI 
737e509d50SXin LI     /* We need to know what tasks completed so we can use their buffers when their time comes.
747e509d50SXin LI      * Should only be accessed after locking base.ioJobsMutex . */
757e509d50SXin LI     void* completedJobs[MAX_IO_JOBS];
767e509d50SXin LI     int completedJobsCount;
777e509d50SXin LI     ZSTD_pthread_cond_t jobCompletedCond;
787e509d50SXin LI } ReadPoolCtx_t;
797e509d50SXin LI 
807e509d50SXin LI typedef struct {
817e509d50SXin LI     IOPoolCtx_t base;
827e509d50SXin LI     unsigned storedSkips;
837e509d50SXin LI } WritePoolCtx_t;
847e509d50SXin LI 
857e509d50SXin LI typedef struct {
867e509d50SXin LI     /* These fields are automatically set and shouldn't be changed by non WritePool code. */
877e509d50SXin LI     void *ctx;
887e509d50SXin LI     FILE* file;
897e509d50SXin LI     void *buffer;
907e509d50SXin LI     size_t bufferSize;
917e509d50SXin LI 
927e509d50SXin LI     /* This field should be changed before a job is queued for execution and should contain the number
937e509d50SXin LI      * of bytes to write from the buffer. */
947e509d50SXin LI     size_t usedBufferSize;
957e509d50SXin LI     U64 offset;
967e509d50SXin LI } IOJob_t;
977e509d50SXin LI 
987e509d50SXin LI /* AIO_supported:
997e509d50SXin LI  * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
1007e509d50SXin LI int AIO_supported(void);
1017e509d50SXin LI 
1027e509d50SXin LI 
1037e509d50SXin LI /* AIO_WritePool_releaseIoJob:
1047e509d50SXin LI  * Releases an acquired job back to the pool. Doesn't execute the job. */
1057e509d50SXin LI void AIO_WritePool_releaseIoJob(IOJob_t *job);
1067e509d50SXin LI 
1077e509d50SXin LI /* AIO_WritePool_acquireJob:
1087e509d50SXin LI  * Returns an available write job to be used for a future write. */
1097e509d50SXin LI IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
1107e509d50SXin LI 
1117e509d50SXin LI /* AIO_WritePool_enqueueAndReacquireWriteJob:
1127e509d50SXin LI  * Enqueues a write job for execution and acquires a new one.
1137e509d50SXin LI  * After execution `job`'s pointed value would change to the newly acquired job.
1147e509d50SXin LI  * Make sure to set `usedBufferSize` to the wanted length before call.
1157e509d50SXin LI  * The queued job shouldn't be used directly after queueing it. */
1167e509d50SXin LI void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
1177e509d50SXin LI 
1187e509d50SXin LI /* AIO_WritePool_sparseWriteEnd:
1197e509d50SXin LI  * Ends sparse writes to the current file.
1207e509d50SXin LI  * Blocks on completion of all current write jobs before executing. */
1217e509d50SXin LI void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
1227e509d50SXin LI 
1237e509d50SXin LI /* AIO_WritePool_setFile:
1247e509d50SXin LI  * Sets the destination file for future writes in the pool.
1257e509d50SXin LI  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
1267e509d50SXin LI  * Also requires ending of sparse write if a previous file was used in sparse mode. */
1277e509d50SXin LI void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
1287e509d50SXin LI 
1297e509d50SXin LI /* AIO_WritePool_getFile:
1307e509d50SXin LI  * Returns the file the writePool is currently set to write to. */
1317e509d50SXin LI FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
1327e509d50SXin LI 
1337e509d50SXin LI /* AIO_WritePool_closeFile:
1347e509d50SXin LI  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
1357e509d50SXin LI  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
1367e509d50SXin LI int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
1377e509d50SXin LI 
1387e509d50SXin LI /* AIO_WritePool_create:
1397e509d50SXin LI  * Allocates and sets and a new write pool including its included jobs.
1407e509d50SXin LI  * bufferSize should be set to the maximal buffer we want to write to at a time. */
1417e509d50SXin LI WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
1427e509d50SXin LI 
1437e509d50SXin LI /* AIO_WritePool_free:
1447e509d50SXin LI  * Frees and releases a writePool and its resources. Closes destination file. */
1457e509d50SXin LI void AIO_WritePool_free(WritePoolCtx_t* ctx);
1467e509d50SXin LI 
1477e509d50SXin LI /* AIO_WritePool_setAsync:
1487e509d50SXin LI  * Allows (de)activating async mode, to be used when the expected overhead
1497e509d50SXin LI  * of asyncio costs more than the expected gains. */
1507e509d50SXin LI void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
1517e509d50SXin LI 
1527e509d50SXin LI /* AIO_ReadPool_create:
1537e509d50SXin LI  * Allocates and sets and a new readPool including its included jobs.
1547e509d50SXin LI  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
1557e509d50SXin LI  * as our basic read size. */
1567e509d50SXin LI ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
1577e509d50SXin LI 
1587e509d50SXin LI /* AIO_ReadPool_free:
1597e509d50SXin LI  * Frees and releases a readPool and its resources. Closes source file. */
1607e509d50SXin LI void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
1617e509d50SXin LI 
1627e509d50SXin LI /* AIO_ReadPool_setAsync:
1637e509d50SXin LI  * Allows (de)activating async mode, to be used when the expected overhead
1647e509d50SXin LI  * of asyncio costs more than the expected gains. */
1657e509d50SXin LI void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
1667e509d50SXin LI 
1677e509d50SXin LI /* AIO_ReadPool_consumeBytes:
1687e509d50SXin LI  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
1697e509d50SXin LI void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
1707e509d50SXin LI 
1717e509d50SXin LI /* AIO_ReadPool_fillBuffer:
1727e509d50SXin LI  * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize).
1737e509d50SXin LI  * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
1747e509d50SXin LI  * Return value is the number of bytes added to the buffer.
1757e509d50SXin LI  * Note that srcBuffer might have up to 2 times bufferSize bytes. */
1767e509d50SXin LI size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
1777e509d50SXin LI 
1787e509d50SXin LI /* AIO_ReadPool_consumeAndRefill:
1797e509d50SXin LI  * Consumes the current buffer and refills it with bufferSize bytes. */
1807e509d50SXin LI size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
1817e509d50SXin LI 
1827e509d50SXin LI /* AIO_ReadPool_setFile:
1837e509d50SXin LI  * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
1847e509d50SXin LI  * Waits for all current enqueued tasks to complete if a previous file was set. */
1857e509d50SXin LI void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
1867e509d50SXin LI 
1877e509d50SXin LI /* AIO_ReadPool_getFile:
1887e509d50SXin LI  * Returns the current file set for the read pool. */
1897e509d50SXin LI FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
1907e509d50SXin LI 
1917e509d50SXin LI /* AIO_ReadPool_closeFile:
1927e509d50SXin LI  * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
1937e509d50SXin LI int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
1947e509d50SXin LI 
1957e509d50SXin LI #endif /* ZSTD_FILEIO_ASYNCIO_H */
196