VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp@ 79983

Last change on this file since 79983 was 79983, checked in by vboxsync, 6 years ago

Runtime/RTIoQueue: Updates

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 17.1 KB
Line 
1/* $Id: ioqueue-stdfile-provider.cpp 79983 2019-07-25 17:21:24Z vboxsync $ */
2/** @file
3 * IPRT - I/O queue, Standard file provider.
4 */
5
6/*
7 * Copyright (C) 2019 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#define LOG_GROUP RTLOGGROUP_IOQUEUE
32#include <iprt/ioqueue.h>
33
34#include <iprt/asm.h>
35#include <iprt/errcore.h>
36#include <iprt/file.h>
37#include <iprt/log.h>
38#include <iprt/mem.h>
39#include <iprt/semaphore.h>
40#include <iprt/string.h>
41#include <iprt/thread.h>
42
43#include "internal/ioqueue.h"
44
45
46/*********************************************************************************************************************************
47* Defined Constants And Macros *
48*********************************************************************************************************************************/
49
50/** The I/O queue worker thread needs to wake up the waiting thread when requests completed. */
51#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP RT_BIT(0)
52/** The waiting thread was interrupted by the external wakeup call. */
53#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR RT_BIT(1)
54/** The I/O queue worker thread needs to be woken up to process new requests. */
55#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP RT_BIT(2)
56#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT 2
57
58
59/*********************************************************************************************************************************
60* Structures and Typedefs *
61*********************************************************************************************************************************/
62
63
64/**
65 * Submission queue entry.
66 */
67typedef struct RTIOQUEUESSQENTRY
68{
69 /** The file to work on. */
70 RTFILE hFile;
71 /** I/O operation. */
72 RTIOQUEUEOP enmOp;
73 /** Start offset. */
74 uint64_t off;
75 /** Additional request flags. */
76 uint32_t fReqFlags;
77 /** Size of the request. */
78 size_t cbReq;
79 /** Opaque user data passed on completion. */
80 void *pvUser;
81 /** Flag whether this is a S/G or standard request. */
82 bool fSg;
83 /** Type dependent data. */
84 union
85 {
86 /** Pointer to buffer for non S/G requests. */
87 void *pvBuf;
88 /** Pointer to S/G buffer. */
89 PCRTSGBUF pSgBuf;
90 } u;
91} RTIOQUEUESSQENTRY;
92/** Pointer to a submission queue entry. */
93typedef RTIOQUEUESSQENTRY *PRTIOQUEUESSQENTRY;
94/** Pointer to a constant submission queue entry. */
95typedef const RTIOQUEUESSQENTRY *PCRTIOQUEUESSQENTRY;
96
97
98/**
99 * Internal I/O queue provider instance data.
100 */
101typedef struct RTIOQUEUEPROVINT
102{
103 /** Size of the submission queue in entries. */
104 uint32_t cSqEntries;
105 /** Size of the completion queue in entries. */
106 uint32_t cCqEntries;
107 /** Pointer to the submission queue base. */
108 PRTIOQUEUESSQENTRY paSqEntryBase;
109 /** Submission queue producer index. */
110 volatile uint32_t idxSqProd;
111 /** Submission queue consumer index. */
112 volatile uint32_t idxSqCons;
113 /** Pointer to the completion queue base. */
114 PRTIOQUEUECEVT paCqEntryBase;
115 /** Completion queue producer index. */
116 volatile uint32_t idxCqProd;
117 /** Completion queue consumer index. */
118 volatile uint32_t idxCqCons;
119 /** Various state flags for synchronizing the worker thread with other participants. */
120 volatile uint32_t fState;
121 /** The worker thread handle. */
122 RTTHREAD hThrdWork;
123 /** Event semaphore the worker thread waits on for work. */
124 RTSEMEVENT hSemEvtWorker;
125 /** Event semaphore the caller waits for completion events. */
126 RTSEMEVENT hSemEvtWaitEvts;
127 /** Flag whether to shutdown the worker thread. */
128 volatile bool fShutdown;
129} RTIOQUEUEPROVINT;
130/** Pointer to the internal I/O queue provider instance data. */
131typedef RTIOQUEUEPROVINT *PRTIOQUEUEPROVINT;
132
133
134/*********************************************************************************************************************************
135* Internal Functions *
136*********************************************************************************************************************************/
137
138
139/**
140 * Processes the given submission queue entry and reports back the result in the completion queue.
141 *
142 * @returns nothing.
143 * @param pSqEntry The submission queue entry to process.
144 * @param pCqEntry The comppletion queue entry to store the result in.
145 */
146static void rtIoQueueStdFileProv_SqEntryProcess(PCRTIOQUEUESSQENTRY pSqEntry, PRTIOQUEUECEVT pCqEntry)
147{
148 int rcReq = VINF_SUCCESS;
149
150 switch (pSqEntry->enmOp)
151 {
152 case RTIOQUEUEOP_READ:
153 if (!pSqEntry->fSg)
154 rcReq = RTFileReadAt(pSqEntry->hFile, pSqEntry->off, pSqEntry->u.pvBuf, pSqEntry->cbReq, NULL);
155 else
156 {
157 RTSGBUF SgBuf;
158 RTSgBufClone(&SgBuf, pSqEntry->u.pSgBuf);
159 rcReq = RTFileSgReadAt(pSqEntry->hFile, pSqEntry->off, &SgBuf, pSqEntry->cbReq, NULL);
160 }
161 break;
162 case RTIOQUEUEOP_WRITE:
163 if (!pSqEntry->fSg)
164 rcReq = RTFileWriteAt(pSqEntry->hFile, pSqEntry->off, pSqEntry->u.pvBuf, pSqEntry->cbReq, NULL);
165 else
166 {
167 RTSGBUF SgBuf;
168 RTSgBufClone(&SgBuf, pSqEntry->u.pSgBuf);
169 rcReq = RTFileSgWriteAt(pSqEntry->hFile, pSqEntry->off, &SgBuf, pSqEntry->cbReq, NULL);
170 }
171 break;
172 case RTIOQUEUEOP_SYNC:
173 rcReq = RTFileFlush(pSqEntry->hFile);
174 break;
175 default:
176 AssertMsgFailedReturnVoid(("Invalid I/O queue operation: %d\n", pSqEntry->enmOp));
177 }
178
179 /* Write the result back into the completion queue. */
180 pCqEntry->rcReq = rcReq;
181 pCqEntry->pvUser = pSqEntry->pvUser;
182}
183
184
185/**
186 * The main I/O queue worker loop which processes the incoming I/O requests.
187 */
188static DECLCALLBACK(int) rtIoQueueStdFileProv_WorkerLoop(RTTHREAD hThrdSelf, void *pvUser)
189{
190 PRTIOQUEUEPROVINT pThis = (PRTIOQUEUEPROVINT)pvUser;
191
192 /* Signal that we started up. */
193 int rc = RTThreadUserSignal(hThrdSelf);
194 AssertRC(rc);
195
196 while (!ASMAtomicReadBool(&pThis->fShutdown))
197 {
198 /* Wait for some work. */
199 ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP);
200 uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
201 uint32_t idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
202 uint32_t idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
203
204 if (idxSqCons == idxSqProd)
205 {
206 rc = RTSemEventWait(pThis->hSemEvtWorker, RT_INDEFINITE_WAIT);
207 AssertRC(rc);
208
209 idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
210 idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
211 idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
212 }
213
214 ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT);
215
216 /* Process all requests. */
217 do
218 {
219 while ( idxSqCons != idxSqProd
220 && idxCqCons != pThis->idxCqProd)
221 {
222 PCRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqCons];
223 PRTIOQUEUECEVT pCqEntry = &pThis->paCqEntryBase[pThis->idxCqProd];
224
225 rtIoQueueStdFileProv_SqEntryProcess(pSqEntry, pCqEntry);
226 ASMWriteFence();
227
228 idxSqCons = (idxSqCons + 1) % pThis->cSqEntries;
229 pThis->idxCqProd = (pThis->idxCqProd + 1) % pThis->cCqEntries;
230 ASMWriteFence();
231 if (ASMAtomicReadU32(&pThis->fState) & RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP)
232 {
233 rc = RTSemEventSignal(pThis->hSemEvtWaitEvts);
234 AssertRC(rc);
235 }
236 }
237
238 ASMWriteFence();
239 ASMAtomicWriteU32(&pThis->idxSqCons, idxSqCons);
240 idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
241 idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
242 } while (idxSqCons != idxSqProd);
243 }
244
245 return VINF_SUCCESS;
246}
247
248
249/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnIsSupported} */
250static DECLCALLBACK(bool) rtIoQueueStdFileProv_IsSupported(void)
251{
252 /* The common code/public API already checked for the proper handle type. */
253 return true;
254}
255
256
257/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueInit} */
258static DECLCALLBACK(int) rtIoQueueStdFileProv_QueueInit(RTIOQUEUEPROV hIoQueueProv, uint32_t fFlags,
259 uint32_t cSqEntries, uint32_t cCqEntries)
260{
261 RT_NOREF(fFlags);
262
263 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
264 int rc = VINF_SUCCESS;
265
266 pThis->cSqEntries = cSqEntries;
267 pThis->cCqEntries = cCqEntries;
268 pThis->idxSqProd = 0;
269 pThis->idxSqCons = 0;
270 pThis->idxCqProd = 0;
271 pThis->idxCqCons = 0;
272 pThis->fShutdown = false;
273 pThis->fState = 0;
274
275 pThis->paSqEntryBase = (PRTIOQUEUESSQENTRY)RTMemAllocZ(cSqEntries * sizeof(RTIOQUEUESSQENTRY));
276 if (RT_LIKELY(pThis->paSqEntryBase))
277 {
278 pThis->paCqEntryBase = (PRTIOQUEUECEVT)RTMemAllocZ(cCqEntries * sizeof(RTIOQUEUECEVT));
279 if (RT_LIKELY(pThis->paSqEntryBase))
280 {
281 rc = RTSemEventCreate(&pThis->hSemEvtWorker);
282 if (RT_SUCCESS(rc))
283 {
284 rc = RTSemEventCreate(&pThis->hSemEvtWaitEvts);
285 if (RT_SUCCESS(rc))
286 {
287 /* Spin up the worker thread. */
288 rc = RTThreadCreate(&pThis->hThrdWork, rtIoQueueStdFileProv_WorkerLoop, pThis, 0, RTTHREADTYPE_IO, RTTHREADFLAGS_WAITABLE,
289 "IoQ-StdFile");
290 if (RT_SUCCESS(rc))
291 {
292 rc = RTThreadUserWait(pThis->hThrdWork, 10 * RT_MS_1SEC);
293 AssertRC(rc);
294
295 return VINF_SUCCESS;
296 }
297
298 RTSemEventDestroy(pThis->hSemEvtWaitEvts);
299 }
300
301 RTSemEventDestroy(pThis->hSemEvtWorker);
302 }
303
304 RTMemFree(pThis->paCqEntryBase);
305 }
306 else
307 rc = VERR_NO_MEMORY;
308
309 RTMemFree(pThis->paSqEntryBase);
310 }
311 else
312 rc = VERR_NO_MEMORY;
313
314 return rc;
315}
316
317
318/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueDestroy} */
319static DECLCALLBACK(void) rtIoQueueStdFileProv_QueueDestroy(RTIOQUEUEPROV hIoQueueProv)
320{
321 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
322
323 ASMAtomicXchgBool(&pThis->fShutdown, true);
324 RTSemEventSignal(pThis->hSemEvtWorker);
325
326 int rc = RTThreadWait(pThis->hThrdWork, 60 * RT_MS_1SEC, NULL);
327 AssertRC(rc);
328
329 RTSemEventDestroy(pThis->hSemEvtWaitEvts);
330 RTSemEventDestroy(pThis->hSemEvtWorker);
331 RTMemFree(pThis->paCqEntryBase);
332 RTMemFree(pThis->paSqEntryBase);
333 RT_BZERO(pThis, sizeof(*pThis));
334}
335
336
337/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleRegister} */
338static DECLCALLBACK(int) rtIoQueueStdFileProv_HandleRegister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle)
339{
340 RT_NOREF(hIoQueueProv, pHandle);
341
342 /* Nothing to do here. */
343 return VINF_SUCCESS;
344}
345
346
347/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleDeregister} */
348static DECLCALLBACK(int) rtIoQueueStdFileProv_HandleDeregister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle)
349{
350 RT_NOREF(hIoQueueProv, pHandle);
351
352 /* Nothing to do here. */
353 return VINF_SUCCESS;
354}
355
356
357/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepare} */
358static DECLCALLBACK(int) rtIoQueueStdFileProv_ReqPrepare(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
359 uint64_t off, void *pvBuf, size_t cbBuf, uint32_t fReqFlags,
360 void *pvUser)
361{
362 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
363 uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
364 PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqProd];
365
366 pSqEntry->hFile = pHandle->u.hFile;
367 pSqEntry->enmOp = enmOp;
368 pSqEntry->off = off;
369 pSqEntry->fReqFlags = fReqFlags;
370 pSqEntry->cbReq = cbBuf;
371 pSqEntry->pvUser = pvUser;
372 pSqEntry->fSg = false;
373 pSqEntry->u.pvBuf = pvBuf;
374 ASMWriteFence();
375
376 idxSqProd = (idxSqProd + 1) % pThis->cSqEntries;
377 ASMAtomicWriteU32(&pThis->idxSqProd, idxSqProd);
378 return VINF_SUCCESS;
379}
380
381
382/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepareSg} */
383static DECLCALLBACK(int) rtIoQueueStdFileProv_ReqPrepareSg(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
384 uint64_t off, PCRTSGBUF pSgBuf, size_t cbSg, uint32_t fReqFlags,
385 void *pvUser)
386{
387 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
388 uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
389 PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqProd];
390
391 pSqEntry->hFile = pHandle->u.hFile;
392 pSqEntry->enmOp = enmOp;
393 pSqEntry->off = off;
394 pSqEntry->fReqFlags = fReqFlags;
395 pSqEntry->cbReq = cbSg;
396 pSqEntry->pvUser = pvUser;
397 pSqEntry->fSg = true;
398 pSqEntry->u.pSgBuf = pSgBuf;
399 ASMWriteFence();
400
401 idxSqProd = (idxSqProd + 1) % pThis->cSqEntries;
402 ASMAtomicWriteU32(&pThis->idxSqProd, idxSqProd);
403 return VINF_SUCCESS;
404}
405
406
407/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnCommit} */
408static DECLCALLBACK(int) rtIoQueueStdFileProv_Commit(RTIOQUEUEPROV hIoQueueProv, uint32_t *pcReqsCommitted)
409{
410 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
411 RT_NOREF(pcReqsCommitted);
412
413 return RTSemEventSignal(pThis->hSemEvtWorker);
414}
415
416
417/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWait} */
418static DECLCALLBACK(int) rtIoQueueStdFileProv_EvtWait(RTIOQUEUEPROV hIoQueueProv, PRTIOQUEUECEVT paCEvt, uint32_t cCEvt,
419 uint32_t cMinWait, uint32_t *pcCEvt, uint32_t fFlags)
420{
421 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
422 RT_NOREF(pThis, paCEvt, cCEvt, cMinWait, pcCEvt, fFlags);
423
424 return VERR_NOT_IMPLEMENTED;
425}
426
427
428/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWaitWakeup} */
429static DECLCALLBACK(int) rtIoQueueStdFileProv_EvtWaitWakeup(RTIOQUEUEPROV hIoQueueProv)
430{
431 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
432
433 ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR);
434 return RTSemEventSignal(pThis->hSemEvtWaitEvts);
435}
436
437
438/**
439 * Standard file I/O queue provider virtual method table.
440 */
441RT_DECL_DATA_CONST(RTIOQUEUEPROVVTABLE const) g_RTIoQueueStdFileProv =
442{
443 /** uVersion */
444 RTIOQUEUEPROVVTABLE_VERSION,
445 /** pszId */
446 "StdFile",
447 /** cbIoQueueProv */
448 sizeof(RTIOQUEUEPROVINT),
449 /** enmHnd */
450 RTHANDLETYPE_FILE,
451 /** fFlags */
452 0,
453 /** pfnIsSupported */
454 rtIoQueueStdFileProv_IsSupported,
455 /** pfnQueueInit */
456 rtIoQueueStdFileProv_QueueInit,
457 /** pfnQueueDestroy */
458 rtIoQueueStdFileProv_QueueDestroy,
459 /** pfnHandleRegister */
460 rtIoQueueStdFileProv_HandleRegister,
461 /** pfnHandleDeregister */
462 rtIoQueueStdFileProv_HandleDeregister,
463 /** pfnReqPrepare */
464 rtIoQueueStdFileProv_ReqPrepare,
465 /** pfnReqPrepareSg */
466 rtIoQueueStdFileProv_ReqPrepareSg,
467 /** pfnCommit */
468 rtIoQueueStdFileProv_Commit,
469 /** pfnEvtWait */
470 rtIoQueueStdFileProv_EvtWait,
471 /** pfnEvtWaitWakeup */
472 rtIoQueueStdFileProv_EvtWaitWakeup,
473 /** uEndMarker */
474 RTIOQUEUEPROVVTABLE_VERSION
475};
476
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette