VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/vfs/vfsreadahead.cpp@ 66598

Last change on this file since 66598 was 66598, checked in by vboxsync, 8 years ago

iprt: Added 'pull' VFS chain element provider/factory using the existing read-ahead code.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 33.0 KB
Line 
1/* $Id: vfsreadahead.cpp 66598 2017-04-17 15:52:56Z vboxsync $ */
2/** @file
3 * IPRT - Virtual File System, Read-Ahead Thread.
4 */
5
6/*
7 * Copyright (C) 2010-2016 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#define LOG_GROUP RTLOGGROUP_VFS
32#include "internal/iprt.h"
33#include <iprt/vfs.h>
34
35#include <iprt/asm.h>
36#include <iprt/assert.h>
37#include <iprt/err.h>
38#include <iprt/file.h>
39#include <iprt/list.h>
40#include <iprt/log.h>
41#include <iprt/poll.h>
42#include <iprt/string.h>
43#include <iprt/vfslowlevel.h>
44
45
46
47/*********************************************************************************************************************************
48* Header Files *
49*********************************************************************************************************************************/
50#include "internal/iprt.h"
51#include <iprt/vfs.h>
52
53#include <iprt/critsect.h>
54#include <iprt/err.h>
55#include <iprt/mem.h>
56#include <iprt/thread.h>
57
58
59/*********************************************************************************************************************************
60* Structures and Typedefs *
61*********************************************************************************************************************************/
62/**
63 * Buffer descriptor.
64 */
65typedef struct RTVFSREADAHEADBUFDESC
66{
67 /** List entry. */
68 RTLISTNODE ListEntry;
69 /** The offset of this extent within the file. */
70 uint64_t off;
71 /** The amount of the buffer that has been filled.
72 * (Buffer size is RTVFSREADAHEAD::cbBuffer.) */
73 uint32_t cbFilled;
74 /** */
75 uint32_t volatile fReserved;
76 /** Pointer to the buffer. */
77 uint8_t *pbBuffer;
78} RTVFSREADAHEADBUFDESC;
79/** Pointer to a memory file extent. */
80typedef RTVFSREADAHEADBUFDESC *PRTVFSREADAHEADBUFDESC;
81
82/**
83 * Read ahead file or I/O stream.
84 */
85typedef struct RTVFSREADAHEAD
86{
87 /** The I/O critical section (protects offActual).
88 * The thread doing I/O or seeking always need to own this. */
89 RTCRITSECT IoCritSect;
90
91 /** The critical section protecting the buffer lists and offConsumer.
92 *
93 * This can be taken while holding IoCritSect as that eliminates a race
94 * condition between the read ahead thread inserting into ConsumerList and
95 * a consumer thread deciding to do a direct read. */
96 RTCRITSECT BufferCritSect;
97 /** List of buffers available for consumption.
98 * The producer thread (hThread) puts buffers into this list once it's done
99 * reading into them. The consumer moves them to the FreeList once the
100 * current position has passed beyond each buffer. */
101 RTLISTANCHOR ConsumerList;
102 /** List of buffers available for the producer. */
103 RTLISTANCHOR FreeList;
104
105 /** The current file position from the consumer point of view. */
106 uint64_t offConsumer;
107
108 /** The end-of-file(/stream) offset. This is initially UINT64_MAX and later
109 * set when reading past EOF. */
110 uint64_t offEof;
111
112 /** The read ahead thread. */
113 RTTHREAD hThread;
114 /** Set when we want the thread to terminate. */
115 bool volatile fTerminateThread;
116 /** Creation flags. */
117 uint32_t fFlags;
118
119 /** The I/O stream we read from. */
120 RTVFSIOSTREAM hIos;
121 /** The file face of hIos, if we're fronting for an actual file. */
122 RTVFSFILE hFile;
123 /** The buffer size. */
124 uint32_t cbBuffer;
125 /** The number of buffers. */
126 uint32_t cBuffers;
127 /** Single big buffer allocation, cBuffers * cbBuffer in size. */
128 uint8_t *pbAllBuffers;
129 /** Array of buffer descriptors (cBuffers in size). */
130 RTVFSREADAHEADBUFDESC aBufDescs[1];
131} RTVFSREADAHEAD;
132/** Pointer to a memory file. */
133typedef RTVFSREADAHEAD *PRTVFSREADAHEAD;
134
135
136
137/**
138 * @interface_method_impl{RTVFSOBJOPS,pfnClose}
139 */
140static DECLCALLBACK(int) rtVfsReadAhead_Close(void *pvThis)
141{
142 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
143 int rc;
144
145 /*
146 * Stop the read-ahead thread.
147 */
148 if (pThis->hThread != NIL_RTTHREAD)
149 {
150 ASMAtomicWriteBool(&pThis->fTerminateThread, true);
151 rc = RTThreadUserSignal(pThis->hThread);
152 AssertRC(rc);
153 rc = RTThreadWait(pThis->hThread, RT_INDEFINITE_WAIT, NULL);
154 AssertRCReturn(rc, rc);
155 pThis->hThread = NIL_RTTHREAD;
156 }
157
158 /*
159 * Release the upstream objects.
160 */
161 RTCritSectEnter(&pThis->IoCritSect);
162
163 RTVfsIoStrmRelease(pThis->hIos);
164 pThis->hIos = NIL_RTVFSIOSTREAM;
165 RTVfsFileRelease(pThis->hFile);
166 pThis->hFile = NIL_RTVFSFILE;
167
168 RTCritSectLeave(&pThis->IoCritSect);
169
170 /*
171 * Free the buffers.
172 */
173 RTCritSectEnter(&pThis->BufferCritSect);
174 if (pThis->pbAllBuffers)
175 {
176 RTMemPageFree(pThis->pbAllBuffers, pThis->cBuffers * pThis->cbBuffer);
177 pThis->pbAllBuffers = NULL;
178 }
179 RTCritSectLeave(&pThis->BufferCritSect);
180
181 /*
182 * Destroy the critical sections.
183 */
184 RTCritSectDelete(&pThis->BufferCritSect);
185 RTCritSectDelete(&pThis->IoCritSect);
186
187 return VINF_SUCCESS;
188}
189
190
191/**
192 * @interface_method_impl{RTVFSOBJOPS,pfnQueryInfo}
193 */
194static DECLCALLBACK(int) rtVfsReadAhead_QueryInfo(void *pvThis, PRTFSOBJINFO pObjInfo, RTFSOBJATTRADD enmAddAttr)
195{
196 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
197 return RTVfsIoStrmQueryInfo(pThis->hIos, pObjInfo, enmAddAttr);
198}
199
200
201/**
202 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnRead}
203 */
204static DECLCALLBACK(int) rtVfsReadAhead_Read(void *pvThis, RTFOFF off, PCRTSGBUF pSgBuf, bool fBlocking, size_t *pcbRead)
205{
206 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
207
208 Assert(pSgBuf->cSegs == 1); /* Caller deals with multiple SGs. */
209
210 /*
211 * We loop here to repeat the buffer search after entering the I/O critical
212 * section, just in case a buffer got inserted while we were waiting for it.
213 */
214 int rc = VINF_SUCCESS;
215 uint8_t *pbDst = (uint8_t *)pSgBuf->paSegs[0].pvSeg;
216 size_t cbDst = pSgBuf->paSegs[0].cbSeg;
217 size_t cbTotalRead = 0;
218 bool fPokeReader = false;
219 bool fOwnsIoCritSect = false;
220 RTCritSectEnter(&pThis->BufferCritSect);
221 for (;;)
222 {
223 /*
224 * Try satisfy the read from the buffers.
225 */
226 uint64_t offCur = pThis->offConsumer;
227 if (off != -1)
228 {
229 offCur = (uint64_t)off;
230 if (pThis->offConsumer != offCur)
231 fPokeReader = true; /* If the current position changed, poke it in case it stopped at EOF. */
232 pThis->offConsumer = offCur;
233 }
234
235 PRTVFSREADAHEADBUFDESC pBufDesc, pNextBufDesc;
236 RTListForEachSafe(&pThis->ConsumerList, pBufDesc, pNextBufDesc, RTVFSREADAHEADBUFDESC, ListEntry)
237 {
238 /* The buffers are sorted and reads must start in a buffer if
239 anything should be taken from the buffer (at least for now). */
240 if (offCur < pBufDesc->off)
241 break;
242
243 /* Anything we can read from this buffer? */
244 uint64_t offCurBuf = offCur - pBufDesc->off;
245 if (offCurBuf < pBufDesc->cbFilled)
246 {
247 size_t const cbFromCurBuf = RT_MIN(pBufDesc->cbFilled - offCurBuf, cbDst);
248 memcpy(pbDst, pBufDesc->pbBuffer + offCurBuf, cbFromCurBuf);
249 pbDst += cbFromCurBuf;
250 cbDst -= cbFromCurBuf;
251 cbTotalRead += cbFromCurBuf;
252 offCur += cbFromCurBuf;
253 }
254
255 /* Discard buffers we've read past. */
256 if (pBufDesc->off + pBufDesc->cbFilled <= offCur)
257 {
258 RTListNodeRemove(&pBufDesc->ListEntry);
259 RTListAppend(&pThis->FreeList, &pBufDesc->ListEntry);
260 fPokeReader = true; /* Poke it as there are now more buffers available. */
261 }
262
263 /* Stop if we're done. */
264 if (!cbDst)
265 break;
266 }
267
268 pThis->offConsumer = offCur;
269 if (off != -1)
270 off = offCur;
271
272 if (!cbDst)
273 break;
274
275 /*
276 * Check if we've reached the end of the file/stream.
277 */
278 if (offCur >= pThis->offEof)
279 {
280 rc = pcbRead ? VINF_EOF : VERR_EOF;
281 Log(("rtVfsReadAhead_Read: ret %Rrc; offCur=%#llx offEof=%#llx\n", rc, offCur, pThis->offEof));
282 break;
283 }
284
285
286 /*
287 * First time around we don't own the I/O critsect and need to take it
288 * and repeat the above buffer reading code.
289 */
290 if (!fOwnsIoCritSect)
291 {
292 RTCritSectLeave(&pThis->BufferCritSect);
293 RTCritSectEnter(&pThis->IoCritSect);
294 RTCritSectEnter(&pThis->BufferCritSect);
295 fOwnsIoCritSect = true;
296 continue;
297 }
298
299
300 /*
301 * Do a direct read of the remaining data.
302 */
303 if (off == -1)
304 {
305 RTFOFF offActual = RTVfsIoStrmTell(pThis->hIos);
306 if (offActual >= 0 && (uint64_t)offActual != offCur)
307 off = offCur;
308 }
309 RTSGSEG TmpSeg = { pbDst, cbDst };
310 RTSGBUF TmpSgBuf;
311 RTSgBufInit(&TmpSgBuf, &TmpSeg, 1);
312 size_t cbThisRead = cbDst;
313 rc = RTVfsIoStrmSgRead(pThis->hIos, off, &TmpSgBuf, fBlocking, pcbRead ? &cbThisRead : NULL);
314 if (RT_SUCCESS(rc))
315 {
316 cbTotalRead += cbThisRead;
317 offCur += cbThisRead;
318 pThis->offConsumer = offCur;
319 if (rc != VINF_EOF)
320 fPokeReader = true;
321 else
322 {
323 pThis->offEof = offCur;
324 Log(("rtVfsReadAhead_Read: EOF %llu (%#llx)\n", pThis->offEof, pThis->offEof));
325 }
326 }
327 /* else if (rc == VERR_EOF): hard to say where exactly the current position
328 is here as cannot have had a non-NULL pcbRead. Set offEof later. */
329 break;
330 }
331 RTCritSectLeave(&pThis->BufferCritSect);
332 if (fOwnsIoCritSect)
333 RTCritSectLeave(&pThis->IoCritSect);
334 if (fPokeReader && rc != VINF_EOF && rc != VERR_EOF)
335 RTThreadUserSignal(pThis->hThread);
336
337 if (pcbRead)
338 *pcbRead = cbTotalRead;
339 Assert(cbTotalRead <= pSgBuf->paSegs[0].cbSeg);
340
341 return rc;
342}
343
344
345/**
346 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnWrite}
347 */
348static DECLCALLBACK(int) rtVfsReadAhead_Write(void *pvThis, RTFOFF off, PCRTSGBUF pSgBuf, bool fBlocking, size_t *pcbWritten)
349{
350 RT_NOREF_PV(pvThis); RT_NOREF_PV(off); RT_NOREF_PV(pSgBuf); RT_NOREF_PV(fBlocking); RT_NOREF_PV(pcbWritten);
351 AssertFailed();
352 return VERR_ACCESS_DENIED;
353}
354
355
356/**
357 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnFlush}
358 */
359static DECLCALLBACK(int) rtVfsReadAhead_Flush(void *pvThis)
360{
361 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
362 return RTVfsIoStrmFlush(pThis->hIos);
363}
364
365
366/**
367 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnPollOne}
368 */
369static DECLCALLBACK(int) rtVfsReadAhead_PollOne(void *pvThis, uint32_t fEvents, RTMSINTERVAL cMillies, bool fIntr,
370 uint32_t *pfRetEvents)
371{
372 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
373 if (pThis->hThread != NIL_RTTHREAD)
374 {
375 /** @todo poll one with read-ahead thread. */
376 return VERR_NOT_IMPLEMENTED;
377 }
378 return RTVfsIoStrmPoll(pThis->hIos, fEvents, cMillies, fIntr, pfRetEvents);
379}
380
381
382/**
383 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnTell}
384 */
385static DECLCALLBACK(int) rtVfsReadAhead_Tell(void *pvThis, PRTFOFF poffActual)
386{
387 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
388
389 RTCritSectEnter(&pThis->BufferCritSect);
390 *poffActual = pThis->offConsumer;
391 RTCritSectLeave(&pThis->BufferCritSect);
392
393 return VINF_SUCCESS;
394}
395
396
397/**
398 * @interface_method_impl{RTVFSOBJSETOPS,pfnMode}
399 */
400static DECLCALLBACK(int) rtVfsReadAhead_SetMode(void *pvThis, RTFMODE fMode, RTFMODE fMask)
401{
402 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
403 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
404
405 RTCritSectEnter(&pThis->IoCritSect);
406 RT_NOREF_PV(fMode); RT_NOREF_PV(fMask); /// @todo int rc = RTVfsFileSetMode(pThis->hFile, fMode, fMask);
407 int rc = VERR_NOT_SUPPORTED;
408 RTCritSectLeave(&pThis->IoCritSect);
409
410 return rc;
411}
412
413
414/**
415 * @interface_method_impl{RTVFSOBJSETOPS,pfnSetTimes}
416 */
417static DECLCALLBACK(int) rtVfsReadAhead_SetTimes(void *pvThis, PCRTTIMESPEC pAccessTime, PCRTTIMESPEC pModificationTime,
418 PCRTTIMESPEC pChangeTime, PCRTTIMESPEC pBirthTime)
419{
420 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
421 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
422
423 RTCritSectEnter(&pThis->IoCritSect);
424 RT_NOREF_PV(pAccessTime); RT_NOREF_PV(pModificationTime); RT_NOREF_PV(pChangeTime); RT_NOREF_PV(pBirthTime);
425 /// @todo int rc = RTVfsFileSetTimes(pThis->hFile, pAccessTime, pModificationTime, pChangeTime, pBirthTime);
426 int rc = VERR_NOT_SUPPORTED;
427 RTCritSectLeave(&pThis->IoCritSect);
428
429 return rc;
430}
431
432
433/**
434 * @interface_method_impl{RTVFSOBJSETOPS,pfnSetOwner}
435 */
436static DECLCALLBACK(int) rtVfsReadAhead_SetOwner(void *pvThis, RTUID uid, RTGID gid)
437{
438 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
439 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
440
441 RTCritSectEnter(&pThis->IoCritSect);
442 RT_NOREF_PV(uid); RT_NOREF_PV(gid);
443 /// @todo int rc = RTVfsFileSetOwner(pThis->hFile, uid, gid);
444 int rc = VERR_NOT_SUPPORTED;
445 RTCritSectLeave(&pThis->IoCritSect);
446
447 return rc;
448}
449
450
451/**
452 * @interface_method_impl{RTVFSFILEOPS,pfnSeek}
453 */
454static DECLCALLBACK(int) rtVfsReadAhead_Seek(void *pvThis, RTFOFF offSeek, unsigned uMethod, PRTFOFF poffActual)
455{
456 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
457 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
458
459 RTCritSectEnter(&pThis->IoCritSect); /* protects against concurrent I/O using the offset. */
460 RTCritSectEnter(&pThis->BufferCritSect); /* protects offConsumer */
461
462 uint64_t offActual = UINT64_MAX;
463 int rc = RTVfsFileSeek(pThis->hFile, offSeek, uMethod, &offActual);
464 if (RT_SUCCESS(rc))
465 {
466 pThis->offConsumer = offActual;
467 if (poffActual)
468 *poffActual = offActual;
469 }
470
471 RTCritSectLeave(&pThis->BufferCritSect);
472 RTCritSectLeave(&pThis->IoCritSect);
473
474 return rc;
475}
476
477
478/**
479 * @interface_method_impl{RTVFSFILEOPS,pfnQuerySize}
480 */
481static DECLCALLBACK(int) rtVfsReadAhead_QuerySize(void *pvThis, uint64_t *pcbFile)
482{
483 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
484 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
485
486 RTCritSectEnter(&pThis->IoCritSect); /* paranoia */
487 int rc = RTVfsFileGetSize(pThis->hFile, pcbFile);
488 RTCritSectLeave(&pThis->IoCritSect);
489
490 return rc;
491}
492
493
494/**
495 * Read ahead I/O stream operations.
496 */
497DECL_HIDDEN_CONST(const RTVFSIOSTREAMOPS) g_VfsReadAheadIosOps =
498{ /* Stream */
499 { /* Obj */
500 RTVFSOBJOPS_VERSION,
501 RTVFSOBJTYPE_IO_STREAM,
502 "Read ahead I/O stream",
503 rtVfsReadAhead_Close,
504 rtVfsReadAhead_QueryInfo,
505 RTVFSOBJOPS_VERSION
506 },
507 RTVFSIOSTREAMOPS_VERSION,
508 RTVFSIOSTREAMOPS_FEAT_NO_SG,
509 rtVfsReadAhead_Read,
510 rtVfsReadAhead_Write,
511 rtVfsReadAhead_Flush,
512 rtVfsReadAhead_PollOne,
513 rtVfsReadAhead_Tell,
514 NULL /*Skip*/,
515 NULL /*ZeroFill*/,
516 RTVFSIOSTREAMOPS_VERSION,
517};
518
519
520/**
521 * Read ahead file operations.
522 */
523DECL_HIDDEN_CONST(const RTVFSFILEOPS) g_VfsReadAheadFileOps =
524{
525 { /* Stream */
526 { /* Obj */
527 RTVFSOBJOPS_VERSION,
528 RTVFSOBJTYPE_FILE,
529 "Read ahead file",
530 rtVfsReadAhead_Close,
531 rtVfsReadAhead_QueryInfo,
532 RTVFSOBJOPS_VERSION
533 },
534 RTVFSIOSTREAMOPS_VERSION,
535 RTVFSIOSTREAMOPS_FEAT_NO_SG,
536 rtVfsReadAhead_Read,
537 rtVfsReadAhead_Write,
538 rtVfsReadAhead_Flush,
539 rtVfsReadAhead_PollOne,
540 rtVfsReadAhead_Tell,
541 NULL /*Skip*/,
542 NULL /*ZeroFill*/,
543 RTVFSIOSTREAMOPS_VERSION,
544 },
545 RTVFSFILEOPS_VERSION,
546 /*RTVFSIOFILEOPS_FEAT_NO_AT_OFFSET*/ 0,
547 { /* ObjSet */
548 RTVFSOBJSETOPS_VERSION,
549 RT_OFFSETOF(RTVFSFILEOPS, Stream.Obj) - RT_OFFSETOF(RTVFSFILEOPS, ObjSet),
550 rtVfsReadAhead_SetMode,
551 rtVfsReadAhead_SetTimes,
552 rtVfsReadAhead_SetOwner,
553 RTVFSOBJSETOPS_VERSION
554 },
555 rtVfsReadAhead_Seek,
556 rtVfsReadAhead_QuerySize,
557 RTVFSFILEOPS_VERSION
558};
559
560
561/**
562 * @callback_method_impl{PFNRTTHREAD, Read ahead thread procedure}
563 */
564static DECLCALLBACK(int) rtVfsReadAheadThreadProc(RTTHREAD hThreadSelf, void *pvUser)
565{
566 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvUser;
567 Assert(pThis);
568
569 while (!pThis->fTerminateThread)
570 {
571 int rc;
572
573 /*
574 * Is there a buffer handy for reading ahead.
575 */
576 PRTVFSREADAHEADBUFDESC pBufDesc = NULL;
577 RTCritSectEnter(&pThis->BufferCritSect);
578 if (!pThis->fTerminateThread)
579 pBufDesc = RTListRemoveFirst(&pThis->FreeList, RTVFSREADAHEADBUFDESC, ListEntry);
580 RTCritSectLeave(&pThis->BufferCritSect);
581
582 if (pBufDesc)
583 {
584 /*
585 * Got a buffer, take the I/O lock and read into it.
586 */
587 rc = VERR_CALLBACK_RETURN;
588 RTCritSectEnter(&pThis->IoCritSect);
589 if (!pThis->fTerminateThread)
590 {
591
592 pBufDesc->off = RTVfsIoStrmTell(pThis->hIos);
593 size_t cbRead = 0;
594 rc = RTVfsIoStrmRead(pThis->hIos, pBufDesc->pbBuffer, pThis->cbBuffer, true /*fBlocking*/, &cbRead);
595 if (RT_SUCCESS(rc))
596 {
597 if (rc == VINF_EOF)
598 {
599 pThis->offEof = pBufDesc->off + cbRead;
600 Log(("rtVfsReadAheadThreadProc: EOF %llu (%#llx)\n", pThis->offEof, pThis->offEof));
601 }
602 pBufDesc->cbFilled = (uint32_t)cbRead;
603
604 /*
605 * Put back the buffer. The consumer list is sorted by offset, but
606 * we should usually end up appending the buffer.
607 */
608 RTCritSectEnter(&pThis->BufferCritSect);
609 PRTVFSREADAHEADBUFDESC pAfter = RTListGetLast(&pThis->ConsumerList, RTVFSREADAHEADBUFDESC, ListEntry);
610 if (!pAfter || pAfter->off <= pBufDesc->off)
611 RTListAppend(&pThis->ConsumerList, &pBufDesc->ListEntry);
612 else
613 {
614 do
615 pAfter = RTListGetPrev(&pThis->ConsumerList, pAfter, RTVFSREADAHEADBUFDESC, ListEntry);
616 while (pAfter && pAfter->off > pBufDesc->off);
617 if (!pAfter)
618 RTListPrepend(&pThis->ConsumerList, &pBufDesc->ListEntry);
619 else
620 {
621 Assert(pAfter->off <= pBufDesc->off);
622 RTListNodeInsertAfter(&pAfter->ListEntry, &pBufDesc->ListEntry);
623 }
624 }
625 RTCritSectLeave(&pThis->BufferCritSect);
626 pBufDesc = NULL;
627
628#ifdef RT_STRICT
629 /* Verify the list ordering. */
630 unsigned cAsserted = 0;
631 uint64_t offAssert = 0;
632 PRTVFSREADAHEADBUFDESC pAssertCur;
633 RTListForEach(&pThis->ConsumerList, pAssertCur, RTVFSREADAHEADBUFDESC, ListEntry)
634 {
635 Assert(offAssert <= pAssertCur->off);
636 offAssert = pAssertCur->off;
637 Assert(cAsserted < pThis->cBuffers);
638 cAsserted++;
639 }
640#endif
641 }
642 else
643 Assert(rc != VERR_EOF);
644 }
645 RTCritSectLeave(&pThis->IoCritSect);
646
647 /*
648 * If we succeeded and we didn't yet reach the end of the stream,
649 * loop without delay to start processing the next buffer.
650 */
651 if (RT_LIKELY(!pBufDesc && rc != VINF_EOF))
652 continue;
653
654 /* Put any unused buffer back in the free list (termination/failure, not EOF). */
655 if (pBufDesc)
656 {
657 RTCritSectEnter(&pThis->BufferCritSect);
658 RTListPrepend(&pThis->FreeList, &pBufDesc->ListEntry);
659 RTCritSectLeave(&pThis->BufferCritSect);
660 }
661 if (pThis->fTerminateThread)
662 break;
663 }
664
665 /*
666 * Wait for more to do.
667 */
668 rc = RTThreadUserWait(hThreadSelf, RT_MS_1MIN);
669 if (RT_SUCCESS(rc))
670 rc = RTThreadUserReset(hThreadSelf);
671 }
672
673 return VINF_SUCCESS;
674}
675
676
677static int rtVfsCreateReadAheadInstance(RTVFSIOSTREAM hVfsIosSrc, RTVFSFILE hVfsFileSrc, uint32_t fFlags,
678 uint32_t cBuffers, uint32_t cbBuffer, PRTVFSIOSTREAM phVfsIos, PRTVFSFILE phVfsFile)
679{
680 /*
681 * Validate input a little.
682 */
683 int rc = VINF_SUCCESS;
684 AssertStmt(cBuffers < _4K, rc = VERR_OUT_OF_RANGE);
685 if (cBuffers == 0)
686 cBuffers = 4;
687 AssertStmt(cbBuffer <= _4M, rc = VERR_OUT_OF_RANGE);
688 if (cbBuffer == 0)
689 cbBuffer = _256K / cBuffers;
690 AssertStmt(cbBuffer * cBuffers < (ARCH_BITS < 64 ? _64M : _256M), rc = VERR_OUT_OF_RANGE);
691 AssertStmt(!fFlags, rc = VERR_INVALID_FLAGS);
692
693 if (RT_SUCCESS(rc))
694 {
695 /*
696 * Create a file or I/O stream instance.
697 */
698 RTVFSFILE hVfsFileReadAhead = NIL_RTVFSFILE;
699 RTVFSIOSTREAM hVfsIosReadAhead = NIL_RTVFSIOSTREAM;
700 PRTVFSREADAHEAD pThis;
701 size_t cbThis = RT_OFFSETOF(RTVFSREADAHEAD, aBufDescs[cBuffers]);
702 if (hVfsFileSrc != NIL_RTVFSFILE)
703 rc = RTVfsNewFile(&g_VfsReadAheadFileOps, cbThis, RTFILE_O_READ, NIL_RTVFS, NIL_RTVFSLOCK,
704 &hVfsFileReadAhead, (void **)&pThis);
705 else
706 rc = RTVfsNewIoStream(&g_VfsReadAheadIosOps, cbThis, RTFILE_O_READ, NIL_RTVFS, NIL_RTVFSLOCK,
707 &hVfsIosReadAhead, (void **)&pThis);
708 if (RT_SUCCESS(rc))
709 {
710 RTListInit(&pThis->ConsumerList);
711 RTListInit(&pThis->FreeList);
712 pThis->hThread = NIL_RTTHREAD;
713 pThis->fTerminateThread = false;
714 pThis->fFlags = fFlags;
715 pThis->hFile = hVfsFileSrc;
716 pThis->hIos = hVfsIosSrc;
717 pThis->cBuffers = cBuffers;
718 pThis->cbBuffer = cbBuffer;
719 pThis->offEof = UINT64_MAX;
720 pThis->offConsumer = RTVfsIoStrmTell(hVfsIosSrc);
721 if ((RTFOFF)pThis->offConsumer >= 0)
722 {
723 rc = RTCritSectInit(&pThis->IoCritSect);
724 if (RT_SUCCESS(rc))
725 rc = RTCritSectInit(&pThis->BufferCritSect);
726 if (RT_SUCCESS(rc))
727 {
728 pThis->pbAllBuffers = (uint8_t *)RTMemPageAlloc(pThis->cbBuffer * pThis->cBuffers);
729 if (pThis->pbAllBuffers)
730 {
731 for (uint32_t i = 0; i < cBuffers; i++)
732 {
733 pThis->aBufDescs[i].cbFilled = 0;
734 pThis->aBufDescs[i].off = UINT64_MAX / 2;
735 pThis->aBufDescs[i].pbBuffer = &pThis->pbAllBuffers[cbBuffer * i];
736 RTListAppend(&pThis->FreeList, &pThis->aBufDescs[i].ListEntry);
737 }
738
739 /*
740 * Create thread.
741 */
742 rc = RTThreadCreate(&pThis->hThread, rtVfsReadAheadThreadProc, pThis, 0, RTTHREADTYPE_DEFAULT,
743 RTTHREADFLAGS_WAITABLE, "vfsreadahead");
744 if (RT_SUCCESS(rc))
745 {
746 /*
747 * We're good.
748 */
749 if (phVfsFile)
750 *phVfsFile = hVfsFileReadAhead;
751 else if (hVfsFileReadAhead == NIL_RTVFSFILE)
752 *phVfsIos = hVfsIosReadAhead;
753 else
754 {
755 *phVfsIos = RTVfsFileToIoStream(hVfsFileReadAhead);
756 RTVfsFileRelease(hVfsFileReadAhead);
757 AssertReturn(*phVfsIos != NIL_RTVFSIOSTREAM, VERR_INTERNAL_ERROR_5);
758 }
759 return VINF_SUCCESS;
760 }
761 }
762 }
763 }
764 else
765 rc = (int)pThis->offConsumer;
766 }
767 }
768
769 RTVfsFileRelease(hVfsFileSrc);
770 RTVfsIoStrmRelease(hVfsIosSrc);
771 return rc;
772}
773
774
775RTDECL(int) RTVfsCreateReadAheadForIoStream(RTVFSIOSTREAM hVfsIos, uint32_t fFlags, uint32_t cBuffers, uint32_t cbBuffer,
776 PRTVFSIOSTREAM phVfsIos)
777{
778 AssertPtrReturn(phVfsIos, VERR_INVALID_POINTER);
779 *phVfsIos = NIL_RTVFSIOSTREAM;
780
781 /*
782 * Retain the input stream, trying to obtain a file handle too so we can
783 * fully mirror it.
784 */
785 uint32_t cRefs = RTVfsIoStrmRetain(hVfsIos);
786 AssertReturn(cRefs != UINT32_MAX, VERR_INVALID_HANDLE);
787 RTVFSFILE hVfsFile = RTVfsIoStrmToFile(hVfsIos);
788
789 /*
790 * Do the job. (This always consumes the above retained references.)
791 */
792 return rtVfsCreateReadAheadInstance(hVfsIos, hVfsFile, fFlags, cBuffers, cbBuffer, phVfsIos, NULL);
793}
794
795
796RTDECL(int) RTVfsCreateReadAheadForFile(RTVFSFILE hVfsFile, uint32_t fFlags, uint32_t cBuffers, uint32_t cbBuffer,
797 PRTVFSFILE phVfsFile)
798{
799 AssertPtrReturn(phVfsFile, VERR_INVALID_POINTER);
800 *phVfsFile = NIL_RTVFSFILE;
801
802 /*
803 * Retain the input file and cast it o an I/O stream.
804 */
805 RTVFSIOSTREAM hVfsIos = RTVfsFileToIoStream(hVfsFile);
806 AssertReturn(hVfsIos != NIL_RTVFSIOSTREAM, VERR_INVALID_HANDLE);
807 uint32_t cRefs = RTVfsFileRetain(hVfsFile);
808 AssertReturnStmt(cRefs != UINT32_MAX, RTVfsIoStrmRelease(hVfsIos), VERR_INVALID_HANDLE);
809
810 /*
811 * Do the job. (This always consumes the above retained references.)
812 */
813 return rtVfsCreateReadAheadInstance(hVfsIos, hVfsFile, fFlags, cBuffers, cbBuffer, NULL, phVfsFile);
814}
815
816
817/**
818 * @interface_method_impl{RTVFSCHAINELEMENTREG,pfnValidate}
819 */
820static DECLCALLBACK(int) rtVfsChainReadAhead_Validate(PCRTVFSCHAINELEMENTREG pProviderReg, PRTVFSCHAINSPEC pSpec,
821 PRTVFSCHAINELEMSPEC pElement, uint32_t *poffError)
822{
823 RT_NOREF(pProviderReg, poffError);
824
825 /*
826 * Basics.
827 */
828 if (pElement->enmTypeIn == RTVFSOBJTYPE_INVALID)
829 return VERR_VFS_CHAIN_CANNOT_BE_FIRST_ELEMENT;
830 if (pElement->cArgs > 2)
831 return VERR_VFS_CHAIN_AT_MOST_TWO_ARGS;
832 if ( pElement->enmType != RTVFSOBJTYPE_FILE
833 && pElement->enmType != RTVFSOBJTYPE_IO_STREAM)
834 return VERR_VFS_CHAIN_ONLY_FILE_OR_IOS;
835 if (pSpec->fOpenFile & RTFILE_O_WRITE)
836 return VERR_VFS_CHAIN_READ_ONLY_IOS;
837
838 /*
839 * Parse the two optional arguments.
840 */
841 uint32_t cBuffers = 0;
842 if (pElement->cArgs > 0)
843 {
844 const char *psz = pElement->paArgs[0].psz;
845 if (*psz)
846 {
847 int rc = RTStrToUInt32Full(psz, 0, &cBuffers);
848 if (RT_FAILURE(rc))
849 {
850 *poffError = pElement->paArgs[0].offSpec;
851 return VERR_VFS_CHAIN_INVALID_ARGUMENT;
852 }
853 }
854 }
855
856 uint32_t cbBuffer = 0;
857 if (pElement->cArgs > 1)
858 {
859 const char *psz = pElement->paArgs[1].psz;
860 if (*psz)
861 {
862 int rc = RTStrToUInt32Full(psz, 0, &cbBuffer);
863 if (RT_FAILURE(rc))
864 {
865 *poffError = pElement->paArgs[1].offSpec;
866 return VERR_VFS_CHAIN_INVALID_ARGUMENT;
867 }
868 }
869 }
870
871 /*
872 * Save the parsed arguments in the spec since their both optional.
873 */
874 pSpec->uProvider = RT_MAKE_U64(cBuffers, cbBuffer);
875
876 return VINF_SUCCESS;
877}
878
879
880/**
881 * @interface_method_impl{RTVFSCHAINELEMENTREG,pfnInstantiate}
882 */
883static DECLCALLBACK(int) rtVfsChainReadAhead_Instantiate(PCRTVFSCHAINELEMENTREG pProviderReg, PCRTVFSCHAINSPEC pSpec,
884 PCRTVFSCHAINELEMSPEC pElement, RTVFSOBJ hPrevVfsObj,
885 PRTVFSOBJ phVfsObj, uint32_t *poffError)
886{
887 RT_NOREF(pProviderReg, pSpec, pElement, poffError);
888 AssertReturn(hPrevVfsObj != NIL_RTVFSOBJ, VERR_VFS_CHAIN_IPE);
889
890 /* Try for a file if we can. */
891 int rc;
892 RTVFSFILE hVfsFileIn = RTVfsObjToFile(hPrevVfsObj);
893 if (hVfsFileIn != NIL_RTVFSFILE)
894 {
895 RTVFSFILE hVfsFile = NIL_RTVFSFILE;
896 rc = RTVfsCreateReadAheadForFile(hVfsFileIn, 0 /*fFlags*/, RT_LO_U32(pSpec->uProvider),
897 RT_HI_U32(pSpec->uProvider), &hVfsFile);
898 RTVfsFileRelease(hVfsFileIn);
899 if (RT_SUCCESS(rc))
900 {
901 *phVfsObj = RTVfsObjFromFile(hVfsFile);
902 RTVfsFileRelease(hVfsFile);
903 if (*phVfsObj != NIL_RTVFSOBJ)
904 return VINF_SUCCESS;
905 rc = VERR_VFS_CHAIN_CAST_FAILED;
906 }
907 }
908 else if (pElement->enmType == RTVFSOBJTYPE_IO_STREAM)
909 {
910 RTVFSIOSTREAM hVfsIosIn = RTVfsObjToIoStream(hPrevVfsObj);
911 if (hVfsIosIn != NIL_RTVFSIOSTREAM)
912 {
913 RTVFSIOSTREAM hVfsIos = NIL_RTVFSIOSTREAM;
914 rc = RTVfsCreateReadAheadForIoStream(hVfsIosIn, 0 /*fFlags*/, RT_LO_U32(pSpec->uProvider),
915 RT_HI_U32(pSpec->uProvider), &hVfsIos);
916 RTVfsIoStrmRelease(hVfsIosIn);
917 if (RT_SUCCESS(rc))
918 {
919 *phVfsObj = RTVfsObjFromIoStream(hVfsIos);
920 RTVfsIoStrmRelease(hVfsIos);
921 if (*phVfsObj != NIL_RTVFSOBJ)
922 return VINF_SUCCESS;
923 rc = VERR_VFS_CHAIN_CAST_FAILED;
924 }
925 }
926 else
927 rc = VERR_VFS_CHAIN_CAST_FAILED;
928 }
929 else
930 rc = VERR_VFS_CHAIN_CAST_FAILED;
931 return rc;
932}
933
934
935/**
936 * @interface_method_impl{RTVFSCHAINELEMENTREG,pfnCanReuseElement}
937 */
938static DECLCALLBACK(bool) rtVfsChainReadAhead_CanReuseElement(PCRTVFSCHAINELEMENTREG pProviderReg,
939 PCRTVFSCHAINSPEC pSpec, PCRTVFSCHAINELEMSPEC pElement,
940 PCRTVFSCHAINSPEC pReuseSpec, PCRTVFSCHAINELEMSPEC pReuseElement)
941{
942 RT_NOREF(pProviderReg, pSpec, pElement, pReuseSpec, pReuseElement);
943 return false;
944}
945
946
947/** VFS chain element 'pull'. */
948static RTVFSCHAINELEMENTREG g_rtVfsChainReadAheadReg =
949{
950 /* uVersion = */ RTVFSCHAINELEMENTREG_VERSION,
951 /* fReserved = */ 0,
952 /* pszName = */ "pull",
953 /* ListEntry = */ { NULL, NULL },
954 /* pszHelp = */ "Takes an I/O stream or file and provides read-ahead caching.\n"
955 "Optional first argument specifies how many buffers to use, 0 indicating the default.\n"
956 "Optional second argument specifies the buffer size, 0 indicating the default.",
957 /* pfnValidate = */ rtVfsChainReadAhead_Validate,
958 /* pfnInstantiate = */ rtVfsChainReadAhead_Instantiate,
959 /* pfnCanReuseElement = */ rtVfsChainReadAhead_CanReuseElement,
960 /* uEndMarker = */ RTVFSCHAINELEMENTREG_VERSION
961};
962
963RTVFSCHAIN_AUTO_REGISTER_ELEMENT_PROVIDER(&g_rtVfsChainReadAheadReg, rtVfsChainReadAheadReg);
964
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette