VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/aiomgr.cpp@ 57444

Last change on this file since 57444 was 57358, checked in by vboxsync, 9 years ago

*: scm cleanup run.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 42.0 KB
Line 
1/* $Id: aiomgr.cpp 57358 2015-08-14 15:16:38Z vboxsync $ */
2/** @file
3 * IPRT - Async I/O manager.
4 */
5
6/*
7 * Copyright (C) 2013-2015 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31
32#include <iprt/aiomgr.h>
33#include <iprt/err.h>
34#include <iprt/asm.h>
35#include <iprt/mem.h>
36#include <iprt/file.h>
37#include <iprt/list.h>
38#include <iprt/thread.h>
39#include <iprt/assert.h>
40#include <iprt/string.h>
41#include <iprt/critsect.h>
42#include <iprt/memcache.h>
43#include <iprt/semaphore.h>
44#include <iprt/queueatomic.h>
45
46#include "internal/magics.h"
47
48
49/*********************************************************************************************************************************
50* Structures and Typedefs *
51*********************************************************************************************************************************/
52
53/** Pointer to an internal async I/O file instance. */
54typedef struct RTAIOMGRFILEINT *PRTAIOMGRFILEINT;
55
56/**
57 * Blocking event types.
58 */
59typedef enum RTAIOMGREVENT
60{
61 /** Invalid tye */
62 RTAIOMGREVENT_INVALID = 0,
63 /** No event pending. */
64 RTAIOMGREVENT_NO_EVENT,
65 /** A file is added to the manager. */
66 RTAIOMGREVENT_FILE_ADD,
67 /** A file is about to be closed. */
68 RTAIOMGREVENT_FILE_CLOSE,
69 /** The async I/O manager is shut down. */
70 RTAIOMGREVENT_SHUTDOWN,
71 /** 32bit hack */
72 RTAIOMGREVENT_32BIT_HACK = 0x7fffffff
73} RTAIOMGREVENT;
74
75/**
76 * Async I/O manager instance data.
77 */
78typedef struct RTAIOMGRINT
79{
80 /** Magic value. */
81 uint32_t u32Magic;
82 /** Reference count. */
83 volatile uint32_t cRefs;
84 /** Async I/O context handle. */
85 RTFILEAIOCTX hAioCtx;
86 /** async I/O thread. */
87 RTTHREAD hThread;
88 /** List of files assigned to this manager. */
89 RTLISTANCHOR ListFiles;
90 /** Number of requests active currently. */
91 unsigned cReqsActive;
92 /** Number of maximum requests active. */
93 uint32_t cReqsActiveMax;
94 /** Memory cache for requests. */
95 RTMEMCACHE hMemCacheReqs;
96 /** Critical section protecting the blocking event handling. */
97 RTCRITSECT CritSectBlockingEvent;
98 /** Event semaphore for blocking external events.
99 * The caller waits on it until the async I/O manager
100 * finished processing the event. */
101 RTSEMEVENT hEventSemBlock;
102 /** Blocking event type */
103 volatile RTAIOMGREVENT enmBlockingEvent;
104 /** Event type data */
105 union
106 {
107 /** The file to be added */
108 volatile PRTAIOMGRFILEINT pFileAdd;
109 /** The file to be closed */
110 volatile PRTAIOMGRFILEINT pFileClose;
111 } BlockingEventData;
112} RTAIOMGRINT;
113/** Pointer to an internal async I/O manager instance. */
114typedef RTAIOMGRINT *PRTAIOMGRINT;
115
116/**
117 * Async I/O manager file instance data.
118 */
119typedef struct RTAIOMGRFILEINT
120{
121 /** Magic value. */
122 uint32_t u32Magic;
123 /** Reference count. */
124 volatile uint32_t cRefs;
125 /** Flags. */
126 uint32_t fFlags;
127 /** Opaque user data passed on creation. */
128 void *pvUser;
129 /** File handle. */
130 RTFILE hFile;
131 /** async I/O manager this file belongs to. */
132 PRTAIOMGRINT pAioMgr;
133 /** Work queue for new requests. */
134 RTQUEUEATOMIC QueueReqs;
135 /** Completion callback for this file. */
136 PFNRTAIOMGRREQCOMPLETE pfnReqCompleted;
137 /** Data for exclusive use by the assigned async I/O manager. */
138 struct
139 {
140 /** List node of assigned files for a async I/O manager. */
141 RTLISTNODE NodeAioMgrFiles;
142 /** List of requests waiting for submission. */
143 RTLISTANCHOR ListWaitingReqs;
144 /** Number of requests currently being processed for this endpoint
145 * (excluded flush requests). */
146 unsigned cReqsActive;
147 } AioMgr;
148} RTAIOMGRFILEINT;
149
150/** Flag whether the file is closed. */
151#define RTAIOMGRFILE_FLAGS_CLOSING RT_BIT_32(1)
152
153/**
154 * Request type.
155 */
156typedef enum RTAIOMGRREQTYPE
157{
158 /** Invalid request type. */
159 RTAIOMGRREQTYPE_INVALID = 0,
160 /** Read reques type. */
161 RTAIOMGRREQTYPE_READ,
162 /** Write request. */
163 RTAIOMGRREQTYPE_WRITE,
164 /** Flush request. */
165 RTAIOMGRREQTYPE_FLUSH,
166 /** Prefetech request. */
167 RTAIOMGRREQTYPE_PREFETCH,
168 /** 32bit hack. */
169 RTAIOMGRREQTYPE_32BIT_HACK = 0x7fffffff
170} RTAIOMGRREQTYPE;
171/** Pointer to a reques type. */
172typedef RTAIOMGRREQTYPE *PRTAIOMGRREQTYPE;
173
174/**
175 * Async I/O manager request.
176 */
177typedef struct RTAIOMGRREQ
178{
179 /** Atomic queue work item. */
180 RTQUEUEATOMICITEM WorkItem;
181 /** Node for a waiting list. */
182 RTLISTNODE NodeWaitingList;
183 /** Request flags. */
184 uint32_t fFlags;
185 /** Transfer type. */
186 RTAIOMGRREQTYPE enmType;
187 /** Assigned file request. */
188 RTFILEAIOREQ hReqIo;
189 /** File the request belongs to. */
190 PRTAIOMGRFILEINT pFile;
191 /** Opaque user data. */
192 void *pvUser;
193 /** Start offset */
194 RTFOFF off;
195 /** Data segment. */
196 RTSGSEG DataSeg;
197 /** When non-zero the segment uses a bounce buffer because the provided buffer
198 * doesn't meet host requirements. */
199 size_t cbBounceBuffer;
200 /** Pointer to the used bounce buffer if any. */
201 void *pvBounceBuffer;
202 /** Start offset in the bounce buffer to copy from. */
203 uint32_t offBounceBuffer;
204} RTAIOMGRREQ;
205/** Pointer to a I/O manager request. */
206typedef RTAIOMGRREQ *PRTAIOMGRREQ;
207
208/** Flag whether the request was prepared already. */
209#define RTAIOMGRREQ_FLAGS_PREPARED RT_BIT_32(0)
210
211
212/*********************************************************************************************************************************
213* Defined Constants And Macros *
214*********************************************************************************************************************************/
215
216/** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */
217#define RTAIOMGR_VALID_RETURN_RC(a_hAioMgr, a_rc) \
218 do { \
219 AssertPtrReturn((a_hAioMgr), (a_rc)); \
220 AssertReturn((a_hAioMgr)->u32Magic == RTAIOMGR_MAGIC, (a_rc)); \
221 } while (0)
222
223/** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */
224#define RTAIOMGR_VALID_RETURN(a_hAioMgr) RTAIOMGR_VALID_RETURN_RC((hAioMgr), VERR_INVALID_HANDLE)
225
226/** Validates a handle and returns (void) if not valid. */
227#define RTAIOMGR_VALID_RETURN_VOID(a_hAioMgr) \
228 do { \
229 AssertPtrReturnVoid(a_hAioMgr); \
230 AssertReturnVoid((a_hAioMgr)->u32Magic == RTAIOMGR_MAGIC); \
231 } while (0)
232
233
234/*********************************************************************************************************************************
235* Internal Functions *
236*********************************************************************************************************************************/
237
238static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile,
239 PRTFILEAIOREQ pahReqs, unsigned cReqs);
240
241/**
242 * Removes an endpoint from the currently assigned manager.
243 *
244 * @returns TRUE if there are still requests pending on the current manager for this endpoint.
245 * FALSE otherwise.
246 * @param pEndpointRemove The endpoint to remove.
247 */
248static bool rtAioMgrFileRemove(PRTAIOMGRFILEINT pFile)
249{
250 /* Make sure that there is no request pending on this manager for the endpoint. */
251 if (!pFile->AioMgr.cReqsActive)
252 {
253 RTListNodeRemove(&pFile->AioMgr.NodeAioMgrFiles);
254 return false;
255 }
256
257 return true;
258}
259
260/**
261 * Allocate a new I/O request.
262 *
263 * @returns Pointer to the allocated request or NULL if out of memory.
264 * @param pThis The async I/O manager instance.
265 */
266static PRTAIOMGRREQ rtAioMgrReqAlloc(PRTAIOMGRINT pThis)
267{
268 return (PRTAIOMGRREQ)RTMemCacheAlloc(pThis->hMemCacheReqs);
269}
270
271/**
272 * Frees an I/O request.
273 *
274 * @returns nothing.
275 * @param pThis The async I/O manager instance.
276 * @param pReq The request to free.
277 */
278static void rtAioMgrReqFree(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq)
279{
280 if (pReq->cbBounceBuffer)
281 {
282 AssertPtr(pReq->pvBounceBuffer);
283 RTMemPageFree(pReq->pvBounceBuffer, pReq->cbBounceBuffer);
284 pReq->pvBounceBuffer = NULL;
285 pReq->cbBounceBuffer = 0;
286 }
287 pReq->fFlags = 0;
288 RTAioMgrFileRelease(pReq->pFile);
289 RTMemCacheFree(pThis->hMemCacheReqs, pReq);
290}
291
292static void rtAioMgrReqCompleteRc(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq,
293 int rcReq, size_t cbTransfered)
294{
295 int rc = VINF_SUCCESS;
296 PRTAIOMGRFILEINT pFile;
297
298 pFile = pReq->pFile;
299 pThis->cReqsActive--;
300 pFile->AioMgr.cReqsActive--;
301
302 /*
303 * It is possible that the request failed on Linux with kernels < 2.6.23
304 * if the passed buffer was allocated with remap_pfn_range or if the file
305 * is on an NFS endpoint which does not support async and direct I/O at the same time.
306 * The endpoint will be migrated to a failsafe manager in case a request fails.
307 */
308 if (RT_FAILURE(rcReq))
309 {
310 pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser);
311 rtAioMgrReqFree(pThis, pReq);
312 }
313 else
314 {
315 /*
316 * Restart an incomplete transfer.
317 * This usually means that the request will return an error now
318 * but to get the cause of the error (disk full, file too big, I/O error, ...)
319 * the transfer needs to be continued.
320 */
321 if (RT_UNLIKELY( cbTransfered < pReq->DataSeg.cbSeg
322 || ( pReq->cbBounceBuffer
323 && cbTransfered < pReq->cbBounceBuffer)))
324 {
325 RTFOFF offStart;
326 size_t cbToTransfer;
327 uint8_t *pbBuf = NULL;
328
329 Assert(cbTransfered % 512 == 0);
330
331 if (pReq->cbBounceBuffer)
332 {
333 AssertPtr(pReq->pvBounceBuffer);
334 offStart = (pReq->off & ~((RTFOFF)512-1)) + cbTransfered;
335 cbToTransfer = pReq->cbBounceBuffer - cbTransfered;
336 pbBuf = (uint8_t *)pReq->pvBounceBuffer + cbTransfered;
337 }
338 else
339 {
340 Assert(!pReq->pvBounceBuffer);
341 offStart = pReq->off + cbTransfered;
342 cbToTransfer = pReq->DataSeg.cbSeg - cbTransfered;
343 pbBuf = (uint8_t *)pReq->DataSeg.pvSeg + cbTransfered;
344 }
345
346 if ( pReq->enmType == RTAIOMGRREQTYPE_PREFETCH
347 || pReq->enmType == RTAIOMGRREQTYPE_READ)
348 {
349 rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile, offStart,
350 pbBuf, cbToTransfer, pReq);
351 }
352 else
353 {
354 AssertMsg(pReq->enmType == RTAIOMGRREQTYPE_WRITE,
355 ("Invalid transfer type\n"));
356 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, offStart,
357 pbBuf, cbToTransfer, pReq);
358 }
359 AssertRC(rc);
360
361 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1);
362 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
363 ("Unexpected return code rc=%Rrc\n", rc));
364 }
365 else if (pReq->enmType == RTAIOMGRREQTYPE_PREFETCH)
366 {
367 Assert(pReq->cbBounceBuffer);
368 pReq->enmType = RTAIOMGRREQTYPE_WRITE;
369
370 memcpy(((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer,
371 pReq->DataSeg.pvSeg,
372 pReq->DataSeg.cbSeg);
373
374 /* Write it now. */
375 RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1);
376 size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512);
377
378 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile,
379 offStart, pReq->pvBounceBuffer, cbToTransfer, pReq);
380 AssertRC(rc);
381 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1);
382 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
383 ("Unexpected return code rc=%Rrc\n", rc));
384 }
385 else
386 {
387 if (RT_SUCCESS(rc) && pReq->cbBounceBuffer)
388 {
389 if (pReq->enmType == RTAIOMGRREQTYPE_READ)
390 memcpy(pReq->DataSeg.pvSeg,
391 ((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer,
392 pReq->DataSeg.cbSeg);
393 }
394
395 /* Call completion callback */
396 pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser);
397 rtAioMgrReqFree(pThis, pReq);
398 }
399 } /* request completed successfully */
400}
401
402/**
403 * Wrapper around rtAioMgrReqCompleteRc().
404 */
405static void rtAioMgrReqComplete(PRTAIOMGRINT pThis, RTFILEAIOREQ hReq)
406{
407 size_t cbTransfered = 0;
408 int rcReq = RTFileAioReqGetRC(hReq, &cbTransfered);
409 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(hReq);
410
411 rtAioMgrReqCompleteRc(pThis, pReq, rcReq, cbTransfered);
412}
413
414/**
415 * Wrapper around RTFIleAioCtxSubmit() which is also doing error handling.
416 */
417static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile,
418 PRTFILEAIOREQ pahReqs, unsigned cReqs)
419{
420 pThis->cReqsActive += cReqs;
421 pFile->AioMgr.cReqsActive += cReqs;
422
423 int rc = RTFileAioCtxSubmit(pThis->hAioCtx, pahReqs, cReqs);
424 if (RT_FAILURE(rc))
425 {
426 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
427 {
428 /* Append any not submitted task to the waiting list. */
429 for (size_t i = 0; i < cReqs; i++)
430 {
431 int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL);
432
433 if (rcReq != VERR_FILE_AIO_IN_PROGRESS)
434 {
435 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]);
436
437 Assert(pReq->hReqIo == pahReqs[i]);
438 RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReq->NodeWaitingList);
439 pThis->cReqsActive--;
440 pFile->AioMgr.cReqsActive--;
441 }
442 }
443
444 pThis->cReqsActiveMax = pThis->cReqsActive;
445 rc = VINF_SUCCESS;
446 }
447 else /* Another kind of error happened (full disk, ...) */
448 {
449 /* An error happened. Find out which one caused the error and resubmit all other tasks. */
450 for (size_t i = 0; i < cReqs; i++)
451 {
452 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]);
453 int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL);
454
455 if (rcReq == VERR_FILE_AIO_NOT_SUBMITTED)
456 {
457 /* We call ourself again to do any error handling which might come up now. */
458 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pahReqs[i], 1);
459 AssertRC(rc);
460 }
461 else if (rcReq != VERR_FILE_AIO_IN_PROGRESS)
462 rtAioMgrReqCompleteRc(pThis, pReq, rcReq, 0);
463 }
464 }
465 }
466
467 return VINF_SUCCESS;
468}
469
470/**
471 * Adds a list of requests to the waiting list.
472 *
473 * @returns nothing.
474 * @param pFile The file instance to add the requests to.
475 * @param pReqsHead The head of the request list to add.
476 */
477static void rtAioMgrFileAddReqsToWaitingList(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReqsHead)
478{
479 while (pReqsHead)
480 {
481 PRTAIOMGRREQ pReqCur = pReqsHead;
482
483 pReqsHead = (PRTAIOMGRREQ)pReqsHead->WorkItem.pNext;
484 pReqCur->WorkItem.pNext = NULL;
485 RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReqCur->NodeWaitingList);
486 }
487}
488
489/**
490 * Prepare the native I/o request ensuring that all alignment prerequisites of
491 * the host are met.
492 *
493 * @returns IPRT statuse code.
494 * @param pFile The file instance data.
495 * @param pReq The request to prepare.
496 */
497static int rtAioMgrReqPrepareNonBuffered(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReq)
498{
499 int rc = VINF_SUCCESS;
500 RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1);
501 size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512);
502 void *pvBuf = pReq->DataSeg.pvSeg;
503 bool fAlignedReq = cbToTransfer == pReq->DataSeg.cbSeg
504 && offStart == pReq->off;
505
506 /*
507 * Check if the alignment requirements are met.
508 * Offset, transfer size and buffer address
509 * need to be on a 512 boundary.
510 */
511 if ( !fAlignedReq
512 /** @todo: || ((pEpClassFile->uBitmaskAlignment & (RTR3UINTPTR)pvBuf) != (RTR3UINTPTR)pvBuf) */)
513 {
514 /* Create bounce buffer. */
515 pReq->cbBounceBuffer = cbToTransfer;
516
517 AssertMsg(pReq->off >= offStart, ("Overflow in calculation off=%llu offStart=%llu\n",
518 pReq->off, offStart));
519 pReq->offBounceBuffer = pReq->off - offStart;
520
521 /** @todo: I think we need something like a RTMemAllocAligned method here.
522 * Current assumption is that the maximum alignment is 4096byte
523 * (GPT disk on Windows)
524 * so we can use RTMemPageAlloc here.
525 */
526 pReq->pvBounceBuffer = RTMemPageAlloc(cbToTransfer);
527 if (RT_LIKELY(pReq->pvBounceBuffer))
528 {
529 pvBuf = pReq->pvBounceBuffer;
530
531 if (pReq->enmType == RTAIOMGRREQTYPE_WRITE)
532 {
533 if ( RT_UNLIKELY(cbToTransfer != pReq->DataSeg.cbSeg)
534 || RT_UNLIKELY(offStart != pReq->off))
535 {
536 /* We have to fill the buffer first before we can update the data. */
537 pReq->enmType = RTAIOMGRREQTYPE_WRITE;
538 }
539 else
540 memcpy(pvBuf, pReq->DataSeg.pvSeg, pReq->DataSeg.cbSeg);
541 }
542 }
543 else
544 rc = VERR_NO_MEMORY;
545 }
546 else
547 pReq->cbBounceBuffer = 0;
548
549 if (RT_SUCCESS(rc))
550 {
551 if (pReq->enmType == RTAIOMGRREQTYPE_WRITE)
552 {
553 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile,
554 offStart, pvBuf, cbToTransfer, pReq);
555 }
556 else /* Read or prefetch request. */
557 rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile,
558 offStart, pvBuf, cbToTransfer, pReq);
559 AssertRC(rc);
560 pReq->fFlags |= RTAIOMGRREQ_FLAGS_PREPARED;
561 }
562
563 return rc;
564}
565
566/**
567 * Prepare a new request for enqueuing.
568 *
569 * @returns IPRT status code.
570 * @param pReq The request to prepare.
571 * @param phReqIo Where to store the handle to the native I/O request on success.
572 */
573static int rtAioMgrPrepareReq(PRTAIOMGRREQ pReq, PRTFILEAIOREQ phReqIo)
574{
575 int rc = VINF_SUCCESS;
576 PRTAIOMGRFILEINT pFile = pReq->pFile;
577
578 switch (pReq->enmType)
579 {
580 case RTAIOMGRREQTYPE_FLUSH:
581 {
582 rc = RTFileAioReqPrepareFlush(pReq->hReqIo, pFile->hFile, pReq);
583 break;
584 }
585 case RTAIOMGRREQTYPE_READ:
586 case RTAIOMGRREQTYPE_WRITE:
587 {
588 rc = rtAioMgrReqPrepareNonBuffered(pFile, pReq);
589 break;
590 }
591 default:
592 AssertMsgFailed(("Invalid transfer type %d\n", pReq->enmType));
593 } /* switch transfer type */
594
595 if (RT_SUCCESS(rc))
596 *phReqIo = pReq->hReqIo;
597
598 return rc;
599}
600
601/**
602 * Prepare newly submitted requests for processing.
603 *
604 * @returns IPRT status code
605 * @param pThis The async I/O manager instance data.
606 * @param pFile The file instance.
607 * @param pReqsNew The list of new requests to prepare.
608 */
609static int rtAioMgrPrepareNewReqs(PRTAIOMGRINT pThis,
610 PRTAIOMGRFILEINT pFile,
611 PRTAIOMGRREQ pReqsNew)
612{
613 RTFILEAIOREQ apReqs[20];
614 unsigned cRequests = 0;
615 int rc = VINF_SUCCESS;
616
617 /* Go through the list and queue the requests. */
618 while ( pReqsNew
619 && (pThis->cReqsActive + cRequests < pThis->cReqsActiveMax)
620 && RT_SUCCESS(rc))
621 {
622 PRTAIOMGRREQ pCurr = pReqsNew;
623 pReqsNew = (PRTAIOMGRREQ)pReqsNew->WorkItem.pNext;
624
625 pCurr->WorkItem.pNext = NULL;
626 AssertMsg(VALID_PTR(pCurr->pFile) && (pCurr->pFile == pFile),
627 ("Files do not match\n"));
628 AssertMsg(!(pCurr->fFlags & RTAIOMGRREQ_FLAGS_PREPARED),
629 ("Request on the new list is already prepared\n"));
630
631 rc = rtAioMgrPrepareReq(pCurr, &apReqs[cRequests]);
632 if (RT_FAILURE(rc))
633 rtAioMgrReqCompleteRc(pThis, pCurr, rc, 0);
634 else
635 cRequests++;
636
637 /* Queue the requests if the array is full. */
638 if (cRequests == RT_ELEMENTS(apReqs))
639 {
640 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
641 cRequests = 0;
642 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
643 ("Unexpected return code\n"));
644 }
645 }
646
647 if (cRequests)
648 {
649 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
650 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
651 ("Unexpected return code rc=%Rrc\n", rc));
652 }
653
654 if (pReqsNew)
655 {
656 /* Add the rest of the tasks to the pending list */
657 rtAioMgrFileAddReqsToWaitingList(pFile, pReqsNew);
658 }
659
660 /* Insufficient resources are not fatal. */
661 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
662 rc = VINF_SUCCESS;
663
664 return rc;
665}
666
667/**
668 * Queues waiting requests.
669 *
670 * @returns IPRT status code.
671 * @param pThis The async I/O manager instance data.
672 * @param pFile The file to get the requests from.
673 */
674static int rtAioMgrQueueWaitingReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
675{
676 RTFILEAIOREQ apReqs[20];
677 unsigned cRequests = 0;
678 int rc = VINF_SUCCESS;
679 PRTAIOMGRREQ pReqIt;
680 PRTAIOMGRREQ pReqItNext;
681
682 /* Go through the list and queue the requests. */
683 RTListForEachSafe(&pFile->AioMgr.ListWaitingReqs, pReqIt, pReqItNext, RTAIOMGRREQ, NodeWaitingList)
684 {
685 RTListNodeRemove(&pReqIt->NodeWaitingList);
686 AssertMsg(VALID_PTR(pReqIt->pFile) && (pReqIt->pFile == pFile),
687 ("Files do not match\n"));
688
689 if (!(pReqIt->fFlags & RTAIOMGRREQ_FLAGS_PREPARED))
690 {
691 rc = rtAioMgrPrepareReq(pReqIt, &apReqs[cRequests]);
692 if (RT_FAILURE(rc))
693 rtAioMgrReqCompleteRc(pThis, pReqIt, rc, 0);
694 else
695 cRequests++;
696 }
697 else
698 {
699 apReqs[cRequests] = pReqIt->hReqIo;
700 cRequests++;
701 }
702
703 /* Queue the requests if the array is full. */
704 if (cRequests == RT_ELEMENTS(apReqs))
705 {
706 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
707 cRequests = 0;
708 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
709 ("Unexpected return code\n"));
710 }
711 }
712
713 if (cRequests)
714 {
715 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
716 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
717 ("Unexpected return code rc=%Rrc\n", rc));
718 }
719
720 /* Insufficient resources are not fatal. */
721 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
722 rc = VINF_SUCCESS;
723
724 return rc;
725}
726
727/**
728 * Adds all pending requests for the given file.
729 *
730 * @returns IPRT status code.
731 * @param pThis The async I/O manager instance data.
732 * @param pFile The file to get the requests from.
733 */
734static int rtAioMgrQueueReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
735{
736 int rc = VINF_SUCCESS;
737 PRTAIOMGRFILEINT pReqsHead = NULL;
738
739 /* Check the pending list first */
740 if (!RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs))
741 rc = rtAioMgrQueueWaitingReqs(pThis, pFile);
742
743 if ( RT_SUCCESS(rc)
744 && RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs))
745 {
746 PRTAIOMGRREQ pReqsNew = (PRTAIOMGRREQ)RTQueueAtomicRemoveAll(&pFile->QueueReqs);
747
748 if (pReqsNew)
749 {
750 rc = rtAioMgrPrepareNewReqs(pThis, pFile, pReqsNew);
751 AssertRC(rc);
752 }
753 }
754
755 return rc;
756}
757
758/**
759 * Checks all files for new requests.
760 *
761 * @returns IPRT status code.
762 * @param pThis The I/O manager instance data.
763 */
764static int rtAioMgrCheckFiles(PRTAIOMGRINT pThis)
765{
766 int rc = VINF_SUCCESS;
767 PRTAIOMGRFILEINT pIt;
768
769 RTListForEach(&pThis->ListFiles, pIt, RTAIOMGRFILEINT, AioMgr.NodeAioMgrFiles)
770 {
771 rc = rtAioMgrQueueReqs(pThis, pIt);
772 if (RT_FAILURE(rc))
773 return rc;
774 }
775
776 return rc;
777}
778
779/**
780 * Process a blocking event from the outside.
781 *
782 * @returns IPRT status code.
783 * @param pThis The async I/O manager instance data.
784 */
785static int rtAioMgrProcessBlockingEvent(PRTAIOMGRINT pThis)
786{
787 int rc = VINF_SUCCESS;
788 bool fNotifyWaiter = false;
789
790 switch (pThis->enmBlockingEvent)
791 {
792 case RTAIOMGREVENT_NO_EVENT:
793 /* Nothing to do. */
794 break;
795 case RTAIOMGREVENT_FILE_ADD:
796 {
797 PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileAdd, PRTAIOMGRFILEINT);
798 AssertMsg(VALID_PTR(pFile), ("Adding file event without a file to add\n"));
799
800 RTListAppend(&pThis->ListFiles, &pFile->AioMgr.NodeAioMgrFiles);
801 fNotifyWaiter = true;
802 break;
803 }
804 case RTAIOMGREVENT_FILE_CLOSE:
805 {
806 PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileClose, PRTAIOMGRFILEINT);
807 AssertMsg(VALID_PTR(pFile), ("Close file event without a file to close\n"));
808
809 if (!(pFile->fFlags & RTAIOMGRFILE_FLAGS_CLOSING))
810 {
811 /* Make sure all requests finished. Process the queues a last time first. */
812 rc = rtAioMgrQueueReqs(pThis, pFile);
813 AssertRC(rc);
814
815 pFile->fFlags |= RTAIOMGRFILE_FLAGS_CLOSING;
816 fNotifyWaiter = !rtAioMgrFileRemove(pFile);
817 }
818 else if (!pFile->AioMgr.cReqsActive)
819 fNotifyWaiter = true;
820 break;
821 }
822 case RTAIOMGREVENT_SHUTDOWN:
823 {
824 if (!pThis->cReqsActive)
825 fNotifyWaiter = true;
826 break;
827 }
828 default:
829 AssertReleaseMsgFailed(("Invalid event type %d\n", pThis->enmBlockingEvent));
830 }
831
832 if (fNotifyWaiter)
833 {
834 /* Release the waiting thread. */
835 rc = RTSemEventSignal(pThis->hEventSemBlock);
836 AssertRC(rc);
837 }
838
839 return rc;
840}
841
842/**
843 * async I/O manager worker loop.
844 *
845 * @returns IPRT status code.
846 * @param hThreadSelf The thread handle this worker belongs to.
847 * @param pvUser Opaque user data (Pointer to async I/O manager instance).
848 */
849static DECLCALLBACK(int) rtAioMgrWorker(RTTHREAD hThreadSelf, void *pvUser)
850{
851 PRTAIOMGRINT pThis = (PRTAIOMGRINT)pvUser;
852 bool fRunning = true;
853 int rc = VINF_SUCCESS;
854
855 do
856 {
857 uint32_t cReqsCompleted = 0;
858 RTFILEAIOREQ ahReqsCompleted[32];
859 rc = RTFileAioCtxWait(pThis->hAioCtx, 1, RT_INDEFINITE_WAIT, &ahReqsCompleted[0],
860 RT_ELEMENTS(ahReqsCompleted), &cReqsCompleted);
861 if (rc == VERR_INTERRUPTED)
862 {
863 /* Process external event. */
864 rtAioMgrProcessBlockingEvent(pThis);
865 rc = rtAioMgrCheckFiles(pThis);
866 }
867 else if (RT_FAILURE(rc))
868 {
869 /* Something bad happened. */
870 /** @todo: */
871 }
872 else
873 {
874 /* Requests completed. */
875 for (uint32_t i = 0; i < cReqsCompleted; i++)
876 rtAioMgrReqComplete(pThis, ahReqsCompleted[i]);
877
878 /* Check files for new requests and queue waiting requests. */
879 rc = rtAioMgrCheckFiles(pThis);
880 }
881 } while ( fRunning
882 && RT_SUCCESS(rc));
883
884 return rc;
885}
886
887/**
888 * Wakes up the async I/O manager.
889 *
890 * @returns IPRT status code.
891 * @param pThis The async I/O manager.
892 */
893static int rtAioMgrWakeup(PRTAIOMGRINT pThis)
894{
895 return RTFileAioCtxWakeup(pThis->hAioCtx);
896}
897
898/**
899 * Waits until the async I/O manager handled the given event.
900 *
901 * @returns IPRT status code.
902 * @param pThis The async I/O manager.
903 * @param enmEvent The event to pass to the manager.
904 */
905static int rtAioMgrWaitForBlockingEvent(PRTAIOMGRINT pThis, RTAIOMGREVENT enmEvent)
906{
907 Assert(pThis->enmBlockingEvent == RTAIOMGREVENT_NO_EVENT);
908 ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, enmEvent);
909
910 /* Wakeup the async I/O manager */
911 int rc = rtAioMgrWakeup(pThis);
912 if (RT_FAILURE(rc))
913 return rc;
914
915 /* Wait for completion. */
916 rc = RTSemEventWait(pThis->hEventSemBlock, RT_INDEFINITE_WAIT);
917 AssertRC(rc);
918
919 ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, RTAIOMGREVENT_NO_EVENT);
920
921 return rc;
922}
923
924/**
925 * Add a given file to the given I/O manager.
926 *
927 * @returns IPRT status code.
928 * @param pThis The async I/O manager.
929 * @param pFile The file to add.
930 */
931static int rtAioMgrAddFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
932{
933 /* Update the assigned I/O manager. */
934 ASMAtomicWritePtr(&pFile->pAioMgr, pThis);
935
936 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
937 AssertRCReturn(rc, rc);
938
939 ASMAtomicWritePtr(&pThis->BlockingEventData.pFileAdd, pFile);
940 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_ADD);
941 ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileAdd);
942
943 RTCritSectLeave(&pThis->CritSectBlockingEvent);
944 return rc;
945}
946
947/**
948 * Removes a given file from the given I/O manager.
949 *
950 * @returns IPRT status code.
951 * @param pThis The async I/O manager.
952 * @param pFile The file to remove.
953 */
954static int rtAioMgrCloseFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
955{
956 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
957 AssertRCReturn(rc, rc);
958
959 ASMAtomicWritePtr(&pThis->BlockingEventData.pFileClose, pFile);
960 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_CLOSE);
961 ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileClose);
962
963 RTCritSectLeave(&pThis->CritSectBlockingEvent);
964
965 return rc;
966}
967
968/**
969 * Process a shutdown event.
970 *
971 * @returns IPRT status code.
972 * @param pThis The async I/O manager to shut down.
973 */
974static int rtAioMgrShutdown(PRTAIOMGRINT pThis)
975{
976 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
977 AssertRCReturn(rc, rc);
978
979 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_SHUTDOWN);
980 RTCritSectLeave(&pThis->CritSectBlockingEvent);
981
982 return rc;
983}
984
985/**
986 * Destroys an async I/O manager.
987 *
988 * @returns nothing.
989 * @param pThis The async I/O manager instance to destroy.
990 */
991static void rtAioMgrDestroy(PRTAIOMGRINT pThis)
992{
993 int rc;
994
995 rc = rtAioMgrShutdown(pThis);
996 AssertRC(rc);
997
998 rc = RTThreadWait(pThis->hThread, RT_INDEFINITE_WAIT, NULL);
999 AssertRC(rc);
1000
1001 rc = RTFileAioCtxDestroy(pThis->hAioCtx);
1002 AssertRC(rc);
1003
1004 rc = RTMemCacheDestroy(pThis->hMemCacheReqs);
1005 AssertRC(rc);
1006
1007 pThis->hThread = NIL_RTTHREAD;
1008 pThis->hAioCtx = NIL_RTFILEAIOCTX;
1009 pThis->hMemCacheReqs = NIL_RTMEMCACHE;
1010 pThis->u32Magic = ~RTAIOMGR_MAGIC;
1011 RTCritSectDelete(&pThis->CritSectBlockingEvent);
1012 RTSemEventDestroy(pThis->hEventSemBlock);
1013 RTMemFree(pThis);
1014}
1015
1016/**
1017 * Queues a new request for processing.
1018 */
1019static void rtAioMgrFileQueueReq(PRTAIOMGRFILEINT pThis, PRTAIOMGRREQ pReq)
1020{
1021 RTAioMgrFileRetain(pThis);
1022 RTQueueAtomicInsert(&pThis->QueueReqs, &pReq->WorkItem);
1023 rtAioMgrWakeup(pThis->pAioMgr);
1024}
1025
1026/**
1027 * Destroys an async I/O manager file.
1028 *
1029 * @returns nothing.
1030 * @param pThis The async I/O manager file.
1031 */
1032static void rtAioMgrFileDestroy(PRTAIOMGRFILEINT pThis)
1033{
1034 pThis->u32Magic = ~RTAIOMGRFILE_MAGIC;
1035 rtAioMgrCloseFile(pThis->pAioMgr, pThis);
1036 RTAioMgrRelease(pThis->pAioMgr);
1037 RTMemFree(pThis);
1038}
1039
1040/**
1041 * Queues a new I/O request.
1042 *
1043 * @returns IPRT status code.
1044 * @param hAioMgrFile The I/O manager file handle.
1045 * @param off Start offset of the I/o request.
1046 * @param pSgBuf Data S/G buffer.
1047 * @param cbIo How much to transfer.
1048 * @param pvUser Opaque user data.
1049 * @param enmType I/O direction type (read/write).
1050 */
1051static int rtAioMgrFileIoReqCreate(RTAIOMGRFILE hAioMgrFile, RTFOFF off, PRTSGBUF pSgBuf,
1052 size_t cbIo, void *pvUser, RTAIOMGRREQTYPE enmType)
1053{
1054 int rc;
1055 PRTAIOMGRFILEINT pFile = hAioMgrFile;
1056 PRTAIOMGRINT pAioMgr;
1057
1058 AssertPtrReturn(pFile, VERR_INVALID_HANDLE);
1059 pAioMgr = pFile->pAioMgr;
1060
1061 PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr);
1062 if (RT_LIKELY(pReq))
1063 {
1064 unsigned cSeg = 1;
1065 size_t cbSeg = RTSgBufSegArrayCreate(pSgBuf, &pReq->DataSeg, &cSeg, cbIo);
1066
1067 if (cbSeg == cbIo)
1068 {
1069 pReq->enmType = enmType;
1070 pReq->pFile = pFile;
1071 pReq->pvUser = pvUser;
1072 pReq->off = off;
1073 rtAioMgrFileQueueReq(pFile, pReq);
1074 rc = VERR_FILE_AIO_IN_PROGRESS;
1075 }
1076 else
1077 {
1078 /** @todo: Real S/G buffer support. */
1079 rtAioMgrReqFree(pAioMgr, pReq);
1080 rc = VERR_NOT_SUPPORTED;
1081 }
1082 }
1083 else
1084 rc = VERR_NO_MEMORY;
1085
1086 return rc;
1087}
1088
1089/**
1090 * Request constructor for the memory cache.
1091 *
1092 * @returns IPRT status code.
1093 * @param hMemCache The cache handle.
1094 * @param pvObj The memory object that should be initialized.
1095 * @param pvUser The user argument.
1096 */
1097static DECLCALLBACK(int) rtAioMgrReqCtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser)
1098{
1099 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj;
1100
1101 memset(pReq, 0, sizeof(RTAIOMGRREQ));
1102 return RTFileAioReqCreate(&pReq->hReqIo);
1103}
1104
1105/**
1106 * Request destructor for the memory cache.
1107 *
1108 * @param hMemCache The cache handle.
1109 * @param pvObj The memory object that should be destroyed.
1110 * @param pvUser The user argument.
1111 */
1112static DECLCALLBACK(void) rtAioMgrReqDtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser)
1113{
1114 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj;
1115 int rc = RTFileAioReqDestroy(pReq->hReqIo);
1116
1117 AssertRC(rc);
1118}
1119
1120RTDECL(int) RTAioMgrCreate(PRTAIOMGR phAioMgr, uint32_t cReqsMax)
1121{
1122 int rc = VINF_SUCCESS;
1123 PRTAIOMGRINT pThis;
1124
1125 AssertPtrReturn(phAioMgr, VERR_INVALID_POINTER);
1126 AssertReturn(cReqsMax > 0, VERR_INVALID_PARAMETER);
1127
1128 pThis = (PRTAIOMGRINT)RTMemAllocZ(sizeof(RTAIOMGRINT));
1129 if (pThis)
1130 {
1131 pThis->u32Magic = RTAIOMGR_MAGIC;
1132 pThis->cRefs = 1;
1133 pThis->enmBlockingEvent = RTAIOMGREVENT_NO_EVENT;
1134 RTListInit(&pThis->ListFiles);
1135 rc = RTCritSectInit(&pThis->CritSectBlockingEvent);
1136 if (RT_SUCCESS(rc))
1137 {
1138 rc = RTSemEventCreate(&pThis->hEventSemBlock);
1139 if (RT_SUCCESS(rc))
1140 {
1141 rc = RTMemCacheCreate(&pThis->hMemCacheReqs, sizeof(RTAIOMGRREQ),
1142 0, UINT32_MAX, rtAioMgrReqCtor, rtAioMgrReqDtor, NULL, 0);
1143 if (RT_SUCCESS(rc))
1144 {
1145 rc = RTFileAioCtxCreate(&pThis->hAioCtx, cReqsMax == UINT32_MAX
1146 ? RTFILEAIO_UNLIMITED_REQS
1147 : cReqsMax,
1148 RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS);
1149 if (RT_SUCCESS(rc))
1150 {
1151 rc = RTThreadCreateF(&pThis->hThread, rtAioMgrWorker, pThis, 0, RTTHREADTYPE_IO,
1152 RTTHREADFLAGS_WAITABLE, "AioMgr-%u", cReqsMax);
1153 if (RT_FAILURE(rc))
1154 {
1155 rc = RTFileAioCtxDestroy(pThis->hAioCtx);
1156 AssertRC(rc);
1157 }
1158 }
1159
1160 if (RT_FAILURE(rc))
1161 RTMemCacheDestroy(pThis->hMemCacheReqs);
1162 }
1163
1164 if (RT_FAILURE(rc))
1165 RTSemEventDestroy(pThis->hEventSemBlock);
1166 }
1167
1168 if (RT_FAILURE(rc))
1169 RTCritSectDelete(&pThis->CritSectBlockingEvent);
1170 }
1171
1172 if (RT_FAILURE(rc))
1173 RTMemFree(pThis);
1174 }
1175 else
1176 rc = VERR_NO_MEMORY;
1177
1178 if (RT_SUCCESS(rc))
1179 *phAioMgr = pThis;
1180
1181 return rc;
1182}
1183
1184RTDECL(uint32_t) RTAioMgrRetain(RTAIOMGR hAioMgr)
1185{
1186 PRTAIOMGRINT pThis = hAioMgr;
1187 AssertReturn(hAioMgr != NIL_RTAIOMGR, UINT32_MAX);
1188 AssertPtrReturn(pThis, UINT32_MAX);
1189 AssertReturn(pThis->u32Magic == RTAIOMGR_MAGIC, UINT32_MAX);
1190
1191 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
1192 AssertMsg(cRefs > 1 && cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1193 return cRefs;
1194}
1195
1196RTDECL(uint32_t) RTAioMgrRelease(RTAIOMGR hAioMgr)
1197{
1198 PRTAIOMGRINT pThis = hAioMgr;
1199 if (pThis == NIL_RTAIOMGR)
1200 return 0;
1201 AssertPtrReturn(pThis, UINT32_MAX);
1202 AssertReturn(pThis->u32Magic == RTAIOMGR_MAGIC, UINT32_MAX);
1203
1204 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
1205 AssertMsg(cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1206 if (cRefs == 0)
1207 rtAioMgrDestroy(pThis);
1208 return cRefs;
1209}
1210
1211RTDECL(int) RTAioMgrFileCreate(RTAIOMGR hAioMgr, RTFILE hFile, PFNRTAIOMGRREQCOMPLETE pfnReqComplete,
1212 void *pvUser, PRTAIOMGRFILE phAioMgrFile)
1213{
1214 int rc = VINF_SUCCESS;
1215 PRTAIOMGRFILEINT pThis;
1216
1217 AssertReturn(hAioMgr != NIL_RTAIOMGR, VERR_INVALID_HANDLE);
1218 AssertReturn(hFile != NIL_RTFILE, VERR_INVALID_HANDLE);
1219 AssertPtrReturn(pfnReqComplete, VERR_INVALID_POINTER);
1220 AssertPtrReturn(phAioMgrFile, VERR_INVALID_POINTER);
1221
1222 pThis = (PRTAIOMGRFILEINT)RTMemAllocZ(sizeof(RTAIOMGRFILEINT));
1223 if (pThis)
1224 {
1225 pThis->u32Magic = RTAIOMGRFILE_MAGIC;
1226 pThis->cRefs = 1;
1227 pThis->hFile = hFile;
1228 pThis->pAioMgr = hAioMgr;
1229 pThis->pvUser = pvUser;
1230 pThis->pfnReqCompleted = pfnReqComplete;
1231 RTQueueAtomicInit(&pThis->QueueReqs);
1232 RTListInit(&pThis->AioMgr.ListWaitingReqs);
1233 RTAioMgrRetain(hAioMgr);
1234 rc = RTFileAioCtxAssociateWithFile(pThis->pAioMgr->hAioCtx, hFile);
1235 if (RT_FAILURE(rc))
1236 rtAioMgrFileDestroy(pThis);
1237 else
1238 rtAioMgrAddFile(pThis->pAioMgr, pThis);
1239 }
1240 else
1241 rc = VERR_NO_MEMORY;
1242
1243 if (RT_SUCCESS(rc))
1244 *phAioMgrFile = pThis;
1245
1246 return rc;
1247}
1248
1249RTDECL(uint32_t) RTAioMgrFileRetain(RTAIOMGRFILE hAioMgrFile)
1250{
1251 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1252 AssertReturn(hAioMgrFile != NIL_RTAIOMGRFILE, UINT32_MAX);
1253 AssertPtrReturn(pThis, UINT32_MAX);
1254 AssertReturn(pThis->u32Magic == RTAIOMGRFILE_MAGIC, UINT32_MAX);
1255
1256 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
1257 AssertMsg(cRefs > 1 && cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1258 return cRefs;
1259}
1260
1261RTDECL(uint32_t) RTAioMgrFileRelease(RTAIOMGRFILE hAioMgrFile)
1262{
1263 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1264 if (pThis == NIL_RTAIOMGRFILE)
1265 return 0;
1266 AssertPtrReturn(pThis, UINT32_MAX);
1267 AssertReturn(pThis->u32Magic == RTAIOMGRFILE_MAGIC, UINT32_MAX);
1268
1269 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
1270 AssertMsg(cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1271 if (cRefs == 0)
1272 rtAioMgrFileDestroy(pThis);
1273 return cRefs;
1274}
1275
1276RTDECL(void *) RTAioMgrFileGetUser(RTAIOMGRFILE hAioMgrFile)
1277{
1278 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1279
1280 AssertPtrReturn(pThis, NULL);
1281 return pThis->pvUser;
1282}
1283
1284RTDECL(int) RTAioMgrFileRead(RTAIOMGRFILE hAioMgrFile, RTFOFF off,
1285 PRTSGBUF pSgBuf, size_t cbRead, void *pvUser)
1286{
1287 return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbRead, pvUser,
1288 RTAIOMGRREQTYPE_READ);
1289}
1290
1291RTDECL(int) RTAioMgrFileWrite(RTAIOMGRFILE hAioMgrFile, RTFOFF off,
1292 PRTSGBUF pSgBuf, size_t cbWrite, void *pvUser)
1293{
1294 return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbWrite, pvUser,
1295 RTAIOMGRREQTYPE_WRITE);
1296}
1297
1298RTDECL(int) RTAioMgrFileFlush(RTAIOMGRFILE hAioMgrFile, void *pvUser)
1299{
1300 PRTAIOMGRFILEINT pFile = hAioMgrFile;
1301 PRTAIOMGRINT pAioMgr;
1302
1303 AssertPtrReturn(pFile, VERR_INVALID_HANDLE);
1304
1305 pAioMgr = pFile->pAioMgr;
1306
1307 PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr);
1308 if (RT_UNLIKELY(!pReq))
1309 return VERR_NO_MEMORY;
1310
1311 pReq->pFile = pFile;
1312 pReq->enmType = RTAIOMGRREQTYPE_FLUSH;
1313 pReq->pvUser = pvUser;
1314 rtAioMgrFileQueueReq(pFile, pReq);
1315
1316 return VERR_FILE_AIO_IN_PROGRESS;
1317}
1318
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette