xref: /src/sys/contrib/zstd/programs/fileio_asyncio.c (revision c0d9a07101a1e72769ee0619a583f63a078fb391)
17e509d50SXin LI /*
27e509d50SXin LI  * Copyright (c) Meta Platforms, Inc. and affiliates.
37e509d50SXin LI  * All rights reserved.
47e509d50SXin LI  *
57e509d50SXin LI  * This source code is licensed under both the BSD-style license (found in the
67e509d50SXin LI  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
77e509d50SXin LI  * in the COPYING file in the root directory of this source tree).
87e509d50SXin LI  * You may select, at your option, one of the above-listed licenses.
97e509d50SXin LI  */
107e509d50SXin LI 
117e509d50SXin LI #include "platform.h"
127e509d50SXin LI #include <stdio.h>      /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
137e509d50SXin LI #include <stdlib.h>     /* malloc, free */
147e509d50SXin LI #include <assert.h>
157e509d50SXin LI #include <errno.h>      /* errno */
167e509d50SXin LI 
177e509d50SXin LI #if defined (_MSC_VER)
187e509d50SXin LI #  include <sys/stat.h>
197e509d50SXin LI #  include <io.h>
207e509d50SXin LI #endif
217e509d50SXin LI 
227e509d50SXin LI #include "fileio_asyncio.h"
237e509d50SXin LI #include "fileio_common.h"
247e509d50SXin LI 
257e509d50SXin LI /* **********************************************************************
267e509d50SXin LI  *  Sparse write
277e509d50SXin LI  ************************************************************************/
287e509d50SXin LI 
297e509d50SXin LI /** AIO_fwriteSparse() :
307e509d50SXin LI *  @return : storedSkips,
317e509d50SXin LI *            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
327e509d50SXin LI static unsigned
AIO_fwriteSparse(FILE * file,const void * buffer,size_t bufferSize,const FIO_prefs_t * const prefs,unsigned storedSkips)337e509d50SXin LI AIO_fwriteSparse(FILE* file,
347e509d50SXin LI                  const void* buffer, size_t bufferSize,
357e509d50SXin LI                  const FIO_prefs_t* const prefs,
367e509d50SXin LI                  unsigned storedSkips)
377e509d50SXin LI {
387e509d50SXin LI     const size_t* const bufferT = (const size_t*)buffer;   /* Buffer is supposed malloc'ed, hence aligned on size_t */
397e509d50SXin LI     size_t bufferSizeT = bufferSize / sizeof(size_t);
407e509d50SXin LI     const size_t* const bufferTEnd = bufferT + bufferSizeT;
417e509d50SXin LI     const size_t* ptrT = bufferT;
427e509d50SXin LI     static const size_t segmentSizeT = (32 KB) / sizeof(size_t);   /* check every 32 KB */
437e509d50SXin LI 
447e509d50SXin LI     if (prefs->testMode) return 0;  /* do not output anything in test mode */
457e509d50SXin LI 
467e509d50SXin LI     if (!prefs->sparseFileSupport) {  /* normal write */
477e509d50SXin LI         size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
487e509d50SXin LI         if (sizeCheck != bufferSize)
497e509d50SXin LI             EXM_THROW(70, "Write error : cannot write block : %s",
507e509d50SXin LI                       strerror(errno));
517e509d50SXin LI         return 0;
527e509d50SXin LI     }
537e509d50SXin LI 
547e509d50SXin LI     /* avoid int overflow */
557e509d50SXin LI     if (storedSkips > 1 GB) {
567e509d50SXin LI         if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
577e509d50SXin LI         EXM_THROW(91, "1 GB skip error (sparse file support)");
587e509d50SXin LI         storedSkips -= 1 GB;
597e509d50SXin LI     }
607e509d50SXin LI 
617e509d50SXin LI     while (ptrT < bufferTEnd) {
627e509d50SXin LI         size_t nb0T;
637e509d50SXin LI 
647e509d50SXin LI         /* adjust last segment if < 32 KB */
657e509d50SXin LI         size_t seg0SizeT = segmentSizeT;
667e509d50SXin LI         if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
677e509d50SXin LI         bufferSizeT -= seg0SizeT;
687e509d50SXin LI 
697e509d50SXin LI         /* count leading zeroes */
707e509d50SXin LI         for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
717e509d50SXin LI         storedSkips += (unsigned)(nb0T * sizeof(size_t));
727e509d50SXin LI 
737e509d50SXin LI         if (nb0T != seg0SizeT) {   /* not all 0s */
747e509d50SXin LI             size_t const nbNon0ST = seg0SizeT - nb0T;
757e509d50SXin LI             /* skip leading zeros */
767e509d50SXin LI             if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
777e509d50SXin LI                 EXM_THROW(92, "Sparse skip error ; try --no-sparse");
787e509d50SXin LI             storedSkips = 0;
797e509d50SXin LI             /* write the rest */
807e509d50SXin LI             if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
817e509d50SXin LI                 EXM_THROW(93, "Write error : cannot write block : %s",
827e509d50SXin LI                           strerror(errno));
837e509d50SXin LI         }
847e509d50SXin LI         ptrT += seg0SizeT;
857e509d50SXin LI     }
867e509d50SXin LI 
877e509d50SXin LI     {   static size_t const maskT = sizeof(size_t)-1;
887e509d50SXin LI         if (bufferSize & maskT) {
897e509d50SXin LI             /* size not multiple of sizeof(size_t) : implies end of block */
907e509d50SXin LI             const char* const restStart = (const char*)bufferTEnd;
917e509d50SXin LI             const char* restPtr = restStart;
927e509d50SXin LI             const char* const restEnd = (const char*)buffer + bufferSize;
937e509d50SXin LI             assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
947e509d50SXin LI             for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
957e509d50SXin LI             storedSkips += (unsigned) (restPtr - restStart);
967e509d50SXin LI             if (restPtr != restEnd) {
977e509d50SXin LI                 /* not all remaining bytes are 0 */
987e509d50SXin LI                 size_t const restSize = (size_t)(restEnd - restPtr);
997e509d50SXin LI                 if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
1007e509d50SXin LI                     EXM_THROW(92, "Sparse skip error ; try --no-sparse");
1017e509d50SXin LI                 if (fwrite(restPtr, 1, restSize, file) != restSize)
1027e509d50SXin LI                     EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
1037e509d50SXin LI                               strerror(errno));
1047e509d50SXin LI                 storedSkips = 0;
1057e509d50SXin LI             }   }   }
1067e509d50SXin LI 
1077e509d50SXin LI     return storedSkips;
1087e509d50SXin LI }
1097e509d50SXin LI 
1107e509d50SXin LI static void
AIO_fwriteSparseEnd(const FIO_prefs_t * const prefs,FILE * file,unsigned storedSkips)1117e509d50SXin LI AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
1127e509d50SXin LI {
1137e509d50SXin LI     if (prefs->testMode) assert(storedSkips == 0);
1147e509d50SXin LI     if (storedSkips>0) {
1157e509d50SXin LI         assert(prefs->sparseFileSupport > 0);  /* storedSkips>0 implies sparse support is enabled */
1167e509d50SXin LI         (void)prefs;   /* assert can be disabled, in which case prefs becomes unused */
1177e509d50SXin LI         if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
1187e509d50SXin LI             EXM_THROW(69, "Final skip error (sparse file support)");
1197e509d50SXin LI         /* last zero must be explicitly written,
1207e509d50SXin LI          * so that skipped ones get implicitly translated as zero by FS */
1217e509d50SXin LI         {   const char lastZeroByte[1] = { 0 };
1227e509d50SXin LI             if (fwrite(lastZeroByte, 1, 1, file) != 1)
1237e509d50SXin LI                 EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
1247e509d50SXin LI         }   }
1257e509d50SXin LI }
1267e509d50SXin LI 
1277e509d50SXin LI 
1287e509d50SXin LI /* **********************************************************************
1297e509d50SXin LI  *  AsyncIO functionality
1307e509d50SXin LI  ************************************************************************/
1317e509d50SXin LI 
1327e509d50SXin LI /* AIO_supported:
1337e509d50SXin LI  * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
AIO_supported(void)1347e509d50SXin LI int AIO_supported(void) {
1357e509d50SXin LI #ifdef ZSTD_MULTITHREAD
1367e509d50SXin LI     return 1;
1377e509d50SXin LI #else
1387e509d50SXin LI     return 0;
1397e509d50SXin LI #endif
1407e509d50SXin LI }
1417e509d50SXin LI 
1427e509d50SXin LI /* ***********************************
1437e509d50SXin LI  *  Generic IoPool implementation
1447e509d50SXin LI  *************************************/
1457e509d50SXin LI 
AIO_IOPool_createIoJob(IOPoolCtx_t * ctx,size_t bufferSize)1467e509d50SXin LI static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
1477e509d50SXin LI     IOJob_t* const job  = (IOJob_t*) malloc(sizeof(IOJob_t));
1487e509d50SXin LI     void* const buffer = malloc(bufferSize);
1497e509d50SXin LI     if(!job || !buffer)
1507e509d50SXin LI         EXM_THROW(101, "Allocation error : not enough memory");
1517e509d50SXin LI     job->buffer = buffer;
1527e509d50SXin LI     job->bufferSize = bufferSize;
1537e509d50SXin LI     job->usedBufferSize = 0;
1547e509d50SXin LI     job->file = NULL;
1557e509d50SXin LI     job->ctx = ctx;
1567e509d50SXin LI     job->offset = 0;
1577e509d50SXin LI     return job;
1587e509d50SXin LI }
1597e509d50SXin LI 
1607e509d50SXin LI 
1617e509d50SXin LI /* AIO_IOPool_createThreadPool:
1627e509d50SXin LI  * Creates a thread pool and a mutex for threaded IO pool.
1637e509d50SXin LI  * Displays warning if asyncio is requested but MT isn't available. */
AIO_IOPool_createThreadPool(IOPoolCtx_t * ctx,const FIO_prefs_t * prefs)1647e509d50SXin LI static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
1657e509d50SXin LI     ctx->threadPool = NULL;
1667e509d50SXin LI     ctx->threadPoolActive = 0;
1677e509d50SXin LI     if(prefs->asyncIO) {
1687e509d50SXin LI         if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
1697e509d50SXin LI             EXM_THROW(102,"Failed creating ioJobsMutex mutex");
1707e509d50SXin LI         /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
1717e509d50SXin LI          * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
1727e509d50SXin LI         assert(MAX_IO_JOBS >= 2);
1737e509d50SXin LI         ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
1747e509d50SXin LI         ctx->threadPoolActive = 1;
1757e509d50SXin LI         if (!ctx->threadPool)
1767e509d50SXin LI             EXM_THROW(104, "Failed creating I/O thread pool");
1777e509d50SXin LI     }
1787e509d50SXin LI }
1797e509d50SXin LI 
1807e509d50SXin LI /* AIO_IOPool_init:
1817e509d50SXin LI  * Allocates and sets and a new I/O thread pool including its included availableJobs. */
AIO_IOPool_init(IOPoolCtx_t * ctx,const FIO_prefs_t * prefs,POOL_function poolFunction,size_t bufferSize)1827e509d50SXin LI static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
1837e509d50SXin LI     int i;
1847e509d50SXin LI     AIO_IOPool_createThreadPool(ctx, prefs);
1857e509d50SXin LI     ctx->prefs = prefs;
1867e509d50SXin LI     ctx->poolFunction = poolFunction;
1877e509d50SXin LI     ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
1887e509d50SXin LI     ctx->availableJobsCount = ctx->totalIoJobs;
1897e509d50SXin LI     for(i=0; i < ctx->availableJobsCount; i++) {
1907e509d50SXin LI         ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
1917e509d50SXin LI     }
1927e509d50SXin LI     ctx->jobBufferSize = bufferSize;
1937e509d50SXin LI     ctx->file = NULL;
1947e509d50SXin LI }
1957e509d50SXin LI 
1967e509d50SXin LI 
1977e509d50SXin LI /* AIO_IOPool_threadPoolActive:
1987e509d50SXin LI  * Check if current operation uses thread pool.
1997e509d50SXin LI  * Note that in some cases we have a thread pool initialized but choose not to use it. */
AIO_IOPool_threadPoolActive(IOPoolCtx_t * ctx)2007e509d50SXin LI static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
2017e509d50SXin LI     return ctx->threadPool && ctx->threadPoolActive;
2027e509d50SXin LI }
2037e509d50SXin LI 
2047e509d50SXin LI 
2057e509d50SXin LI /* AIO_IOPool_lockJobsMutex:
2067e509d50SXin LI  * Locks the IO jobs mutex if threading is active */
AIO_IOPool_lockJobsMutex(IOPoolCtx_t * ctx)2077e509d50SXin LI static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
2087e509d50SXin LI     if(AIO_IOPool_threadPoolActive(ctx))
2097e509d50SXin LI         ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
2107e509d50SXin LI }
2117e509d50SXin LI 
2127e509d50SXin LI /* AIO_IOPool_unlockJobsMutex:
2137e509d50SXin LI  * Unlocks the IO jobs mutex if threading is active */
AIO_IOPool_unlockJobsMutex(IOPoolCtx_t * ctx)2147e509d50SXin LI static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
2157e509d50SXin LI     if(AIO_IOPool_threadPoolActive(ctx))
2167e509d50SXin LI         ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
2177e509d50SXin LI }
2187e509d50SXin LI 
2197e509d50SXin LI /* AIO_IOPool_releaseIoJob:
2207e509d50SXin LI  * Releases an acquired job back to the pool. Doesn't execute the job. */
AIO_IOPool_releaseIoJob(IOJob_t * job)2217e509d50SXin LI static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
2227e509d50SXin LI     IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
2237e509d50SXin LI     AIO_IOPool_lockJobsMutex(ctx);
2247e509d50SXin LI     assert(ctx->availableJobsCount < ctx->totalIoJobs);
2257e509d50SXin LI     ctx->availableJobs[ctx->availableJobsCount++] = job;
2267e509d50SXin LI     AIO_IOPool_unlockJobsMutex(ctx);
2277e509d50SXin LI }
2287e509d50SXin LI 
2297e509d50SXin LI /* AIO_IOPool_join:
2307e509d50SXin LI  * Waits for all tasks in the pool to finish executing. */
AIO_IOPool_join(IOPoolCtx_t * ctx)2317e509d50SXin LI static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
2327e509d50SXin LI     if(AIO_IOPool_threadPoolActive(ctx))
2337e509d50SXin LI         POOL_joinJobs(ctx->threadPool);
2347e509d50SXin LI }
2357e509d50SXin LI 
2367e509d50SXin LI /* AIO_IOPool_setThreaded:
2377e509d50SXin LI  * Allows (de)activating threaded mode, to be used when the expected overhead
2387e509d50SXin LI  * of threading costs more than the expected gains. */
AIO_IOPool_setThreaded(IOPoolCtx_t * ctx,int threaded)2397e509d50SXin LI static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
2407e509d50SXin LI     assert(threaded == 0 || threaded == 1);
2417e509d50SXin LI     assert(ctx != NULL);
2427e509d50SXin LI     if(ctx->threadPoolActive != threaded) {
2437e509d50SXin LI         AIO_IOPool_join(ctx);
2447e509d50SXin LI         ctx->threadPoolActive = threaded;
2457e509d50SXin LI     }
2467e509d50SXin LI }
2477e509d50SXin LI 
2487e509d50SXin LI /* AIO_IOPool_free:
2497e509d50SXin LI  * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
AIO_IOPool_destroy(IOPoolCtx_t * ctx)2507e509d50SXin LI static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
2517e509d50SXin LI     int i;
2527e509d50SXin LI     if(ctx->threadPool) {
2537e509d50SXin LI         /* Make sure we finish all tasks and then free the resources */
2547e509d50SXin LI         AIO_IOPool_join(ctx);
2557e509d50SXin LI         /* Make sure we are not leaking availableJobs */
2567e509d50SXin LI         assert(ctx->availableJobsCount == ctx->totalIoJobs);
2577e509d50SXin LI         POOL_free(ctx->threadPool);
2587e509d50SXin LI         ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
2597e509d50SXin LI     }
2607e509d50SXin LI     assert(ctx->file == NULL);
2617e509d50SXin LI     for(i=0; i<ctx->availableJobsCount; i++) {
2627e509d50SXin LI         IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
2637e509d50SXin LI         free(job->buffer);
2647e509d50SXin LI         free(job);
2657e509d50SXin LI     }
2667e509d50SXin LI }
2677e509d50SXin LI 
2687e509d50SXin LI /* AIO_IOPool_acquireJob:
2697e509d50SXin LI  * Returns an available io job to be used for a future io. */
AIO_IOPool_acquireJob(IOPoolCtx_t * ctx)2707e509d50SXin LI static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
2717e509d50SXin LI     IOJob_t* job;
2727e509d50SXin LI     assert(ctx->file != NULL || ctx->prefs->testMode);
2737e509d50SXin LI     AIO_IOPool_lockJobsMutex(ctx);
2747e509d50SXin LI     assert(ctx->availableJobsCount > 0);
2757e509d50SXin LI     job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
2767e509d50SXin LI     AIO_IOPool_unlockJobsMutex(ctx);
2777e509d50SXin LI     job->usedBufferSize = 0;
2787e509d50SXin LI     job->file = ctx->file;
2797e509d50SXin LI     job->offset = 0;
2807e509d50SXin LI     return job;
2817e509d50SXin LI }
2827e509d50SXin LI 
2837e509d50SXin LI 
2847e509d50SXin LI /* AIO_IOPool_setFile:
2857e509d50SXin LI  * Sets the destination file for future files in the pool.
2867e509d50SXin LI  * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
AIO_IOPool_setFile(IOPoolCtx_t * ctx,FILE * file)2877e509d50SXin LI static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
2887e509d50SXin LI     assert(ctx!=NULL);
2897e509d50SXin LI     AIO_IOPool_join(ctx);
2907e509d50SXin LI     assert(ctx->availableJobsCount == ctx->totalIoJobs);
2917e509d50SXin LI     ctx->file = file;
2927e509d50SXin LI }
2937e509d50SXin LI 
AIO_IOPool_getFile(const IOPoolCtx_t * ctx)2947e509d50SXin LI static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
2957e509d50SXin LI     return ctx->file;
2967e509d50SXin LI }
2977e509d50SXin LI 
2987e509d50SXin LI /* AIO_IOPool_enqueueJob:
2997e509d50SXin LI  * Enqueues an io job for execution.
3007e509d50SXin LI  * The queued job shouldn't be used directly after queueing it. */
AIO_IOPool_enqueueJob(IOJob_t * job)3017e509d50SXin LI static void AIO_IOPool_enqueueJob(IOJob_t* job) {
3027e509d50SXin LI     IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
3037e509d50SXin LI     if(AIO_IOPool_threadPoolActive(ctx))
3047e509d50SXin LI         POOL_add(ctx->threadPool, ctx->poolFunction, job);
3057e509d50SXin LI     else
3067e509d50SXin LI         ctx->poolFunction(job);
3077e509d50SXin LI }
3087e509d50SXin LI 
3097e509d50SXin LI /* ***********************************
3107e509d50SXin LI  *  WritePool implementation
3117e509d50SXin LI  *************************************/
3127e509d50SXin LI 
3137e509d50SXin LI /* AIO_WritePool_acquireJob:
3147e509d50SXin LI  * Returns an available write job to be used for a future write. */
AIO_WritePool_acquireJob(WritePoolCtx_t * ctx)3157e509d50SXin LI IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
3167e509d50SXin LI     return AIO_IOPool_acquireJob(&ctx->base);
3177e509d50SXin LI }
3187e509d50SXin LI 
3197e509d50SXin LI /* AIO_WritePool_enqueueAndReacquireWriteJob:
3207e509d50SXin LI  * Queues a write job for execution and acquires a new one.
3217e509d50SXin LI  * After execution `job`'s pointed value would change to the newly acquired job.
3227e509d50SXin LI  * Make sure to set `usedBufferSize` to the wanted length before call.
3237e509d50SXin LI  * The queued job shouldn't be used directly after queueing it. */
AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t ** job)3247e509d50SXin LI void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
3257e509d50SXin LI     AIO_IOPool_enqueueJob(*job);
3267e509d50SXin LI     *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
3277e509d50SXin LI }
3287e509d50SXin LI 
3297e509d50SXin LI /* AIO_WritePool_sparseWriteEnd:
3307e509d50SXin LI  * Ends sparse writes to the current file.
3317e509d50SXin LI  * Blocks on completion of all current write jobs before executing. */
AIO_WritePool_sparseWriteEnd(WritePoolCtx_t * ctx)3327e509d50SXin LI void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
3337e509d50SXin LI     assert(ctx != NULL);
3347e509d50SXin LI     AIO_IOPool_join(&ctx->base);
3357e509d50SXin LI     AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
3367e509d50SXin LI     ctx->storedSkips = 0;
3377e509d50SXin LI }
3387e509d50SXin LI 
3397e509d50SXin LI /* AIO_WritePool_setFile:
3407e509d50SXin LI  * Sets the destination file for future writes in the pool.
3417e509d50SXin LI  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
3427e509d50SXin LI  * Also requires ending of sparse write if a previous file was used in sparse mode. */
AIO_WritePool_setFile(WritePoolCtx_t * ctx,FILE * file)3437e509d50SXin LI void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
3447e509d50SXin LI     AIO_IOPool_setFile(&ctx->base, file);
3457e509d50SXin LI     assert(ctx->storedSkips == 0);
3467e509d50SXin LI }
3477e509d50SXin LI 
3487e509d50SXin LI /* AIO_WritePool_getFile:
3497e509d50SXin LI  * Returns the file the writePool is currently set to write to. */
AIO_WritePool_getFile(const WritePoolCtx_t * ctx)3507e509d50SXin LI FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
3517e509d50SXin LI     return AIO_IOPool_getFile(&ctx->base);
3527e509d50SXin LI }
3537e509d50SXin LI 
3547e509d50SXin LI /* AIO_WritePool_releaseIoJob:
3557e509d50SXin LI  * Releases an acquired job back to the pool. Doesn't execute the job. */
AIO_WritePool_releaseIoJob(IOJob_t * job)3567e509d50SXin LI void AIO_WritePool_releaseIoJob(IOJob_t* job) {
3577e509d50SXin LI     AIO_IOPool_releaseIoJob(job);
3587e509d50SXin LI }
3597e509d50SXin LI 
3607e509d50SXin LI /* AIO_WritePool_closeFile:
3617e509d50SXin LI  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
3627e509d50SXin LI  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
AIO_WritePool_closeFile(WritePoolCtx_t * ctx)3637e509d50SXin LI int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
3647e509d50SXin LI     FILE* const dstFile = ctx->base.file;
3657e509d50SXin LI     assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
3667e509d50SXin LI     AIO_WritePool_sparseWriteEnd(ctx);
3677e509d50SXin LI     AIO_IOPool_setFile(&ctx->base, NULL);
3687e509d50SXin LI     return fclose(dstFile);
3697e509d50SXin LI }
3707e509d50SXin LI 
3717e509d50SXin LI /* AIO_WritePool_executeWriteJob:
3727e509d50SXin LI  * Executes a write job synchronously. Can be used as a function for a thread pool. */
AIO_WritePool_executeWriteJob(void * opaque)3737e509d50SXin LI static void AIO_WritePool_executeWriteJob(void* opaque){
3747e509d50SXin LI     IOJob_t* const job = (IOJob_t*) opaque;
3757e509d50SXin LI     WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
3767e509d50SXin LI     ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
3777e509d50SXin LI     AIO_IOPool_releaseIoJob(job);
3787e509d50SXin LI }
3797e509d50SXin LI 
3807e509d50SXin LI /* AIO_WritePool_create:
3817e509d50SXin LI  * Allocates and sets and a new write pool including its included jobs. */
AIO_WritePool_create(const FIO_prefs_t * prefs,size_t bufferSize)3827e509d50SXin LI WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
3837e509d50SXin LI     WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
3847e509d50SXin LI     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
3857e509d50SXin LI     AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
3867e509d50SXin LI     ctx->storedSkips = 0;
3877e509d50SXin LI     return ctx;
3887e509d50SXin LI }
3897e509d50SXin LI 
3907e509d50SXin LI /* AIO_WritePool_free:
3917e509d50SXin LI  * Frees and releases a writePool and its resources. Closes destination file if needs to. */
AIO_WritePool_free(WritePoolCtx_t * ctx)3927e509d50SXin LI void AIO_WritePool_free(WritePoolCtx_t* ctx) {
3937e509d50SXin LI     /* Make sure we finish all tasks and then free the resources */
3947e509d50SXin LI     if(AIO_WritePool_getFile(ctx))
3957e509d50SXin LI         AIO_WritePool_closeFile(ctx);
3967e509d50SXin LI     AIO_IOPool_destroy(&ctx->base);
3977e509d50SXin LI     assert(ctx->storedSkips==0);
3987e509d50SXin LI     free(ctx);
3997e509d50SXin LI }
4007e509d50SXin LI 
4017e509d50SXin LI /* AIO_WritePool_setAsync:
4027e509d50SXin LI  * Allows (de)activating async mode, to be used when the expected overhead
4037e509d50SXin LI  * of asyncio costs more than the expected gains. */
AIO_WritePool_setAsync(WritePoolCtx_t * ctx,int async)4047e509d50SXin LI void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
4057e509d50SXin LI     AIO_IOPool_setThreaded(&ctx->base, async);
4067e509d50SXin LI }
4077e509d50SXin LI 
4087e509d50SXin LI 
4097e509d50SXin LI /* ***********************************
4107e509d50SXin LI  *  ReadPool implementation
4117e509d50SXin LI  *************************************/
AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t * ctx)4127e509d50SXin LI static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
4137e509d50SXin LI     int i;
4147e509d50SXin LI     for(i=0; i<ctx->completedJobsCount; i++) {
4157e509d50SXin LI         IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
4167e509d50SXin LI         AIO_IOPool_releaseIoJob(job);
4177e509d50SXin LI     }
4187e509d50SXin LI     ctx->completedJobsCount = 0;
4197e509d50SXin LI }
4207e509d50SXin LI 
AIO_ReadPool_addJobToCompleted(IOJob_t * job)4217e509d50SXin LI static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
4227e509d50SXin LI     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
4237e509d50SXin LI     AIO_IOPool_lockJobsMutex(&ctx->base);
4247e509d50SXin LI     assert(ctx->completedJobsCount < MAX_IO_JOBS);
4257e509d50SXin LI     ctx->completedJobs[ctx->completedJobsCount++] = job;
4267e509d50SXin LI     if(AIO_IOPool_threadPoolActive(&ctx->base)) {
4277e509d50SXin LI         ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
4287e509d50SXin LI     }
4297e509d50SXin LI     AIO_IOPool_unlockJobsMutex(&ctx->base);
4307e509d50SXin LI }
4317e509d50SXin LI 
4327e509d50SXin LI /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
4337e509d50SXin LI  * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
4347e509d50SXin LI  * if job wasn't found returns NULL.
4357e509d50SXin LI  * IMPORTANT: assumes ioJobsMutex is locked. */
AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t * ctx)4367e509d50SXin LI static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
4377e509d50SXin LI     IOJob_t *job = NULL;
4387e509d50SXin LI     int i;
4397e509d50SXin LI     /* This implementation goes through all completed jobs and looks for the one matching the next offset.
4407e509d50SXin LI      * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
4417e509d50SXin LI      * reads to be completed in order) this implementation was chosen as it better fits other asyncio
4427e509d50SXin LI      * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
4437e509d50SXin LI     for (i=0; i<ctx->completedJobsCount; i++) {
4447e509d50SXin LI         job = (IOJob_t *) ctx->completedJobs[i];
4457e509d50SXin LI         if (job->offset == ctx->waitingOnOffset) {
4467e509d50SXin LI             ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
4477e509d50SXin LI             return job;
4487e509d50SXin LI         }
4497e509d50SXin LI     }
4507e509d50SXin LI     return NULL;
4517e509d50SXin LI }
4527e509d50SXin LI 
4537e509d50SXin LI /* AIO_ReadPool_numReadsInFlight:
4547e509d50SXin LI  * Returns the number of IO read jobs currently in flight. */
AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t * ctx)4557e509d50SXin LI static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
4567e509d50SXin LI     const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
4577e509d50SXin LI     return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld));
4587e509d50SXin LI }
4597e509d50SXin LI 
4607e509d50SXin LI /* AIO_ReadPool_getNextCompletedJob:
4617e509d50SXin LI  * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
4627e509d50SXin LI  * Would block. */
AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t * ctx)4637e509d50SXin LI static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
4647e509d50SXin LI     IOJob_t *job = NULL;
4657e509d50SXin LI     AIO_IOPool_lockJobsMutex(&ctx->base);
4667e509d50SXin LI 
4677e509d50SXin LI     job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
4687e509d50SXin LI 
4697e509d50SXin LI     /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
4707e509d50SXin LI     while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
4717e509d50SXin LI         assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
4727e509d50SXin LI         ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
4737e509d50SXin LI         job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
4747e509d50SXin LI     }
4757e509d50SXin LI 
4767e509d50SXin LI     if(job) {
4777e509d50SXin LI         assert(job->offset == ctx->waitingOnOffset);
4787e509d50SXin LI         ctx->waitingOnOffset += job->usedBufferSize;
4797e509d50SXin LI     }
4807e509d50SXin LI 
4817e509d50SXin LI     AIO_IOPool_unlockJobsMutex(&ctx->base);
4827e509d50SXin LI     return job;
4837e509d50SXin LI }
4847e509d50SXin LI 
4857e509d50SXin LI 
4867e509d50SXin LI /* AIO_ReadPool_executeReadJob:
4877e509d50SXin LI  * Executes a read job synchronously. Can be used as a function for a thread pool. */
AIO_ReadPool_executeReadJob(void * opaque)4887e509d50SXin LI static void AIO_ReadPool_executeReadJob(void* opaque){
4897e509d50SXin LI     IOJob_t* const job = (IOJob_t*) opaque;
4907e509d50SXin LI     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
4917e509d50SXin LI     if(ctx->reachedEof) {
4927e509d50SXin LI         job->usedBufferSize = 0;
4937e509d50SXin LI         AIO_ReadPool_addJobToCompleted(job);
4947e509d50SXin LI         return;
4957e509d50SXin LI     }
4967e509d50SXin LI     job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
4977e509d50SXin LI     if(job->usedBufferSize < job->bufferSize) {
4987e509d50SXin LI         if(ferror(job->file)) {
4997e509d50SXin LI             EXM_THROW(37, "Read error");
5007e509d50SXin LI         } else if(feof(job->file)) {
5017e509d50SXin LI             ctx->reachedEof = 1;
5027e509d50SXin LI         } else {
5037e509d50SXin LI             EXM_THROW(37, "Unexpected short read");
5047e509d50SXin LI         }
5057e509d50SXin LI     }
5067e509d50SXin LI     AIO_ReadPool_addJobToCompleted(job);
5077e509d50SXin LI }
5087e509d50SXin LI 
AIO_ReadPool_enqueueRead(ReadPoolCtx_t * ctx)5097e509d50SXin LI static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
5107e509d50SXin LI     IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
5117e509d50SXin LI     job->offset = ctx->nextReadOffset;
5127e509d50SXin LI     ctx->nextReadOffset += job->bufferSize;
5137e509d50SXin LI     AIO_IOPool_enqueueJob(job);
5147e509d50SXin LI }
5157e509d50SXin LI 
AIO_ReadPool_startReading(ReadPoolCtx_t * ctx)5167e509d50SXin LI static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
5177e509d50SXin LI     while(ctx->base.availableJobsCount) {
5187e509d50SXin LI         AIO_ReadPool_enqueueRead(ctx);
5197e509d50SXin LI     }
5207e509d50SXin LI }
5217e509d50SXin LI 
5227e509d50SXin LI /* AIO_ReadPool_setFile:
5237e509d50SXin LI  * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
5247e509d50SXin LI  * Waits for all current enqueued tasks to complete if a previous file was set. */
AIO_ReadPool_setFile(ReadPoolCtx_t * ctx,FILE * file)5257e509d50SXin LI void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
5267e509d50SXin LI     assert(ctx!=NULL);
5277e509d50SXin LI     AIO_IOPool_join(&ctx->base);
5287e509d50SXin LI     AIO_ReadPool_releaseAllCompletedJobs(ctx);
5297e509d50SXin LI     if (ctx->currentJobHeld) {
5307e509d50SXin LI         AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
5317e509d50SXin LI         ctx->currentJobHeld = NULL;
5327e509d50SXin LI     }
5337e509d50SXin LI     AIO_IOPool_setFile(&ctx->base, file);
5347e509d50SXin LI     ctx->nextReadOffset = 0;
5357e509d50SXin LI     ctx->waitingOnOffset = 0;
5367e509d50SXin LI     ctx->srcBuffer = ctx->coalesceBuffer;
5377e509d50SXin LI     ctx->srcBufferLoaded = 0;
5387e509d50SXin LI     ctx->reachedEof = 0;
5397e509d50SXin LI     if(file != NULL)
5407e509d50SXin LI         AIO_ReadPool_startReading(ctx);
5417e509d50SXin LI }
5427e509d50SXin LI 
5437e509d50SXin LI /* AIO_ReadPool_create:
5447e509d50SXin LI  * Allocates and sets and a new readPool including its included jobs.
5457e509d50SXin LI  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
5467e509d50SXin LI  * as our basic read size. */
AIO_ReadPool_create(const FIO_prefs_t * prefs,size_t bufferSize)5477e509d50SXin LI ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
5487e509d50SXin LI     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
5497e509d50SXin LI     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
5507e509d50SXin LI     AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
5517e509d50SXin LI 
5527e509d50SXin LI     ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
5537e509d50SXin LI     if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory");
5547e509d50SXin LI     ctx->srcBuffer = ctx->coalesceBuffer;
5557e509d50SXin LI     ctx->srcBufferLoaded = 0;
5567e509d50SXin LI     ctx->completedJobsCount = 0;
5577e509d50SXin LI     ctx->currentJobHeld = NULL;
5587e509d50SXin LI 
5597e509d50SXin LI     if(ctx->base.threadPool)
5607e509d50SXin LI         if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
5617e509d50SXin LI             EXM_THROW(103,"Failed creating jobCompletedCond cond");
5627e509d50SXin LI 
5637e509d50SXin LI     return ctx;
5647e509d50SXin LI }
5657e509d50SXin LI 
5667e509d50SXin LI /* AIO_ReadPool_free:
5677e509d50SXin LI  * Frees and releases a readPool and its resources. Closes source file. */
AIO_ReadPool_free(ReadPoolCtx_t * ctx)5687e509d50SXin LI void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
5697e509d50SXin LI     if(AIO_ReadPool_getFile(ctx))
5707e509d50SXin LI         AIO_ReadPool_closeFile(ctx);
5717e509d50SXin LI     if(ctx->base.threadPool)
5727e509d50SXin LI         ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
5737e509d50SXin LI     AIO_IOPool_destroy(&ctx->base);
5747e509d50SXin LI     free(ctx->coalesceBuffer);
5757e509d50SXin LI     free(ctx);
5767e509d50SXin LI }
5777e509d50SXin LI 
5787e509d50SXin LI /* AIO_ReadPool_consumeBytes:
5797e509d50SXin LI  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
AIO_ReadPool_consumeBytes(ReadPoolCtx_t * ctx,size_t n)5807e509d50SXin LI void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
5817e509d50SXin LI     assert(n <= ctx->srcBufferLoaded);
5827e509d50SXin LI     ctx->srcBufferLoaded -= n;
5837e509d50SXin LI     ctx->srcBuffer += n;
5847e509d50SXin LI }
5857e509d50SXin LI 
5867e509d50SXin LI /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
5877e509d50SXin LI  * Release the current held job and get the next one, returns NULL if no next job available. */
AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t * ctx)5887e509d50SXin LI static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
5897e509d50SXin LI     if (ctx->currentJobHeld) {
5907e509d50SXin LI         AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
5917e509d50SXin LI         ctx->currentJobHeld = NULL;
5927e509d50SXin LI         AIO_ReadPool_enqueueRead(ctx);
5937e509d50SXin LI     }
5947e509d50SXin LI     ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
5957e509d50SXin LI     return (IOJob_t*) ctx->currentJobHeld;
5967e509d50SXin LI }
5977e509d50SXin LI 
5987e509d50SXin LI /* AIO_ReadPool_fillBuffer:
5997e509d50SXin LI  * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
6007e509d50SXin LI  * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
6017e509d50SXin LI  * Return value is the number of bytes added to the buffer.
6027e509d50SXin LI  * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
AIO_ReadPool_fillBuffer(ReadPoolCtx_t * ctx,size_t n)6037e509d50SXin LI size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
6047e509d50SXin LI     IOJob_t *job;
6057e509d50SXin LI     int useCoalesce = 0;
6067e509d50SXin LI     if(n > ctx->base.jobBufferSize)
6077e509d50SXin LI         n = ctx->base.jobBufferSize;
6087e509d50SXin LI 
6097e509d50SXin LI     /* We are good, don't read anything */
6107e509d50SXin LI     if (ctx->srcBufferLoaded >= n)
6117e509d50SXin LI         return 0;
6127e509d50SXin LI 
6137e509d50SXin LI     /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
6147e509d50SXin LI      * and coalesce the remaining bytes with the next job's buffer */
6157e509d50SXin LI     if (ctx->srcBufferLoaded > 0) {
6167e509d50SXin LI         useCoalesce = 1;
6177e509d50SXin LI         memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
6187e509d50SXin LI         ctx->srcBuffer = ctx->coalesceBuffer;
6197e509d50SXin LI     }
6207e509d50SXin LI 
6217e509d50SXin LI     /* Read the next chunk */
6227e509d50SXin LI     job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
6237e509d50SXin LI     if(!job)
6247e509d50SXin LI         return 0;
6257e509d50SXin LI     if(useCoalesce) {
6267e509d50SXin LI         assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
6277e509d50SXin LI         memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
6287e509d50SXin LI         ctx->srcBufferLoaded += job->usedBufferSize;
6297e509d50SXin LI     }
6307e509d50SXin LI     else {
6317e509d50SXin LI         ctx->srcBuffer = (U8 *) job->buffer;
6327e509d50SXin LI         ctx->srcBufferLoaded = job->usedBufferSize;
6337e509d50SXin LI     }
6347e509d50SXin LI     return job->usedBufferSize;
6357e509d50SXin LI }
6367e509d50SXin LI 
6377e509d50SXin LI /* AIO_ReadPool_consumeAndRefill:
6387e509d50SXin LI  * Consumes the current buffer and refills it with bufferSize bytes. */
AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t * ctx)6397e509d50SXin LI size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
6407e509d50SXin LI     AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
6417e509d50SXin LI     return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
6427e509d50SXin LI }
6437e509d50SXin LI 
6447e509d50SXin LI /* AIO_ReadPool_getFile:
6457e509d50SXin LI  * Returns the current file set for the read pool. */
AIO_ReadPool_getFile(const ReadPoolCtx_t * ctx)6467e509d50SXin LI FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
6477e509d50SXin LI     return AIO_IOPool_getFile(&ctx->base);
6487e509d50SXin LI }
6497e509d50SXin LI 
6507e509d50SXin LI /* AIO_ReadPool_closeFile:
6517e509d50SXin LI  * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
AIO_ReadPool_closeFile(ReadPoolCtx_t * ctx)6527e509d50SXin LI int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
6537e509d50SXin LI     FILE* const file = AIO_ReadPool_getFile(ctx);
6547e509d50SXin LI     AIO_ReadPool_setFile(ctx, NULL);
6557e509d50SXin LI     return fclose(file);
6567e509d50SXin LI }
6577e509d50SXin LI 
6587e509d50SXin LI /* AIO_ReadPool_setAsync:
6597e509d50SXin LI  * Allows (de)activating async mode, to be used when the expected overhead
6607e509d50SXin LI  * of asyncio costs more than the expected gains. */
AIO_ReadPool_setAsync(ReadPoolCtx_t * ctx,int async)6617e509d50SXin LI void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
6627e509d50SXin LI     AIO_IOPool_setThreaded(&ctx->base, async);
6637e509d50SXin LI }
664