VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 74762

Last change on this file since 74762 was 69111, checked in by vboxsync, 7 years ago

(C) year

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 41.1 KB
Line 
1/* $Id: reqpool.cpp 69111 2017-10-17 14:26:02Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2017 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/list.h>
38#include <iprt/log.h>
39#include <iprt/mem.h>
40#include <iprt/string.h>
41#include <iprt/time.h>
42#include <iprt/semaphore.h>
43#include <iprt/thread.h>
44
45#include "internal/req.h"
46#include "internal/magics.h"
47
48
49/*********************************************************************************************************************************
50* Defined Constants And Macros *
51*********************************************************************************************************************************/
52/** The max number of worker threads. */
53#define RTREQPOOL_MAX_THREADS UINT32_C(16384)
54/** The max number of milliseconds to push back. */
55#define RTREQPOOL_PUSH_BACK_MAX_MS RT_MS_1MIN
56/** The max number of free requests to keep around. */
57#define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U)
58
59
60/*********************************************************************************************************************************
61* Structures and Typedefs *
62*********************************************************************************************************************************/
63typedef struct RTREQPOOLTHREAD
64{
65 /** Node in the RTREQPOOLINT::IdleThreads list. */
66 RTLISTNODE IdleNode;
67 /** Node in the RTREQPOOLINT::WorkerThreads list. */
68 RTLISTNODE ListNode;
69
70 /** The submit timestamp of the pending request. */
71 uint64_t uPendingNanoTs;
72 /** The submit timestamp of the request processing. */
73 uint64_t uProcessingNanoTs;
74 /** When this CPU went idle the last time. */
75 uint64_t uIdleNanoTs;
76 /** The number of requests processed by this thread. */
77 uint64_t cReqProcessed;
78 /** Total time the requests processed by this thread took to process. */
79 uint64_t cNsTotalReqProcessing;
80 /** Total time the requests processed by this thread had to wait in
81 * the queue before being scheduled. */
82 uint64_t cNsTotalReqQueued;
83 /** The CPU this was scheduled last time we checked. */
84 RTCPUID idLastCpu;
85
86 /** The submitter will put an incoming request here when scheduling an idle
87 * thread. */
88 PRTREQINT volatile pTodoReq;
89 /** The request the thread is currently processing. */
90 PRTREQINT volatile pPendingReq;
91
92 /** The thread handle. */
93 RTTHREAD hThread;
94 /** Nano seconds timestamp representing the birth time of the thread. */
95 uint64_t uBirthNanoTs;
96 /** Pointer to the request thread pool instance the thread is associated
97 * with. */
98 struct RTREQPOOLINT *pPool;
99} RTREQPOOLTHREAD;
100/** Pointer to a worker thread. */
101typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
102
103/**
104 * Request thread pool instance data.
105 */
106typedef struct RTREQPOOLINT
107{
108 /** Magic value (RTREQPOOL_MAGIC). */
109 uint32_t u32Magic;
110 /** The request pool name. */
111 char szName[12];
112
113 /** @name Config
114 * @{ */
115 /** The worker thread type. */
116 RTTHREADTYPE enmThreadType;
117 /** The maximum number of worker threads. */
118 uint32_t cMaxThreads;
119 /** The minimum number of worker threads. */
120 uint32_t cMinThreads;
121 /** The number of milliseconds a thread needs to be idle before it is
122 * considered for retirement. */
123 uint32_t cMsMinIdle;
124 /** cMsMinIdle in nano seconds. */
125 uint64_t cNsMinIdle;
126 /** The idle thread sleep interval in milliseconds. */
127 RTMSINTERVAL cMsIdleSleep;
128 /** The number of threads which should be spawned before throttling kicks
129 * in. */
130 uint32_t cThreadsPushBackThreshold;
131 /** The max number of milliseconds to push back a submitter before creating
132 * a new worker thread once the threshold has been reached. */
133 uint32_t cMsMaxPushBack;
134 /** The minimum number of milliseconds to push back a submitter before
135 * creating a new worker thread once the threshold has been reached. */
136 uint32_t cMsMinPushBack;
137 /** The max number of free requests in the recycle LIFO. */
138 uint32_t cMaxFreeRequests;
139 /** @} */
140
141 /** Signaled by terminating worker threads. */
142 RTSEMEVENTMULTI hThreadTermEvt;
143
144 /** Destruction indicator. The worker threads checks in their loop. */
145 bool volatile fDestructing;
146
147 /** The current submitter push back in milliseconds.
148 * This is recalculated when worker threads come and go. */
149 uint32_t cMsCurPushBack;
150 /** The current number of worker threads. */
151 uint32_t cCurThreads;
152 /** Statistics: The total number of threads created. */
153 uint32_t cThreadsCreated;
154 /** Statistics: The timestamp when the last thread was created. */
155 uint64_t uLastThreadCreateNanoTs;
156 /** Linked list of worker threads. */
157 RTLISTANCHOR WorkerThreads;
158
159 /** The number of requests processed and counted in the time totals. */
160 uint64_t cReqProcessed;
161 /** Total time the requests processed by this thread took to process. */
162 uint64_t cNsTotalReqProcessing;
163 /** Total time the requests processed by this thread had to wait in
164 * the queue before being scheduled. */
165 uint64_t cNsTotalReqQueued;
166
167 /** Reference counter. */
168 uint32_t volatile cRefs;
169 /** The number of idle thread or threads in the process of becoming
170 * idle. This is increased before the to-be-idle thread tries to enter
171 * the critical section and add itself to the list. */
172 uint32_t volatile cIdleThreads;
173 /** Linked list of idle threads. */
174 RTLISTANCHOR IdleThreads;
175
176 /** Head of the request FIFO. */
177 PRTREQINT pPendingRequests;
178 /** Where to insert the next request. */
179 PRTREQINT *ppPendingRequests;
180 /** The number of requests currently pending. */
181 uint32_t cCurPendingRequests;
182 /** The number of requests currently being executed. */
183 uint32_t volatile cCurActiveRequests;
184 /** The number of requests submitted. */
185 uint64_t cReqSubmitted;
186
187 /** Head of the request recycling LIFO. */
188 PRTREQINT pFreeRequests;
189 /** The number of requests in the recycling LIFO. This is read without
190 * entering the critical section, thus volatile. */
191 uint32_t volatile cCurFreeRequests;
192
193 /** Critical section serializing access to members of this structure. */
194 RTCRITSECT CritSect;
195
196} RTREQPOOLINT;
197
198
199/**
200 * Used by exiting thread and the pool destruction code to cancel unexpected
201 * requests.
202 *
203 * @param pReq The request.
204 */
205static void rtReqPoolCancelReq(PRTREQINT pReq)
206{
207 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
208 pReq->enmState = RTREQSTATE_COMPLETED;
209 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
210 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
211 RTSemEventMultiSignal(pReq->hPushBackEvt);
212 RTSemEventSignal(pReq->EventSem);
213
214 RTReqRelease(pReq);
215}
216
217
218/**
219 * Recalculate the max pushback interval when adding or removing worker threads.
220 *
221 * @param pPool The pool. cMsCurPushBack will be changed.
222 */
223static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
224{
225 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
226 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
227 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
228
229 uint32_t cMsCurPushBack;
230 if ((cMsRange >> 2) >= cSteps)
231 cMsCurPushBack = cMsRange / cSteps * iStep;
232 else
233 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
234 cMsCurPushBack += pPool->cMsMinPushBack;
235
236 pPool->cMsCurPushBack = cMsCurPushBack;
237}
238
239
240
241/**
242 * Performs thread exit.
243 *
244 * @returns Thread termination status code (VINF_SUCCESS).
245 * @param pPool The pool.
246 * @param pThread The thread.
247 * @param fLocked Whether we are inside the critical section
248 * already.
249 */
250static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
251{
252 if (!fLocked)
253 RTCritSectEnter(&pPool->CritSect);
254
255 /* Get out of the idle list. */
256 if (!RTListIsEmpty(&pThread->IdleNode))
257 {
258 RTListNodeRemove(&pThread->IdleNode);
259 Assert(pPool->cIdleThreads > 0);
260 ASMAtomicDecU32(&pPool->cIdleThreads);
261 }
262
263 /* Get out of the thread list. */
264 RTListNodeRemove(&pThread->ListNode);
265 Assert(pPool->cCurThreads > 0);
266 pPool->cCurThreads--;
267 rtReqPoolRecalcPushBack(pPool);
268
269 /* This shouldn't happen... */
270 PRTREQINT pReq = pThread->pTodoReq;
271 if (pReq)
272 {
273 AssertFailed();
274 pThread->pTodoReq = NULL;
275 rtReqPoolCancelReq(pReq);
276 }
277
278 /* If we're the last thread terminating, ping the destruction thread before
279 we leave the critical section. */
280 if ( RTListIsEmpty(&pPool->WorkerThreads)
281 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
282 RTSemEventMultiSignal(pPool->hThreadTermEvt);
283
284 RTCritSectLeave(&pPool->CritSect);
285
286 return VINF_SUCCESS;
287}
288
289
290
291/**
292 * Process one request.
293 *
294 * @param pPool The pool.
295 * @param pThread The worker thread.
296 * @param pReq The request to process.
297 */
298static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
299{
300 /*
301 * Update thread state.
302 */
303 pThread->uProcessingNanoTs = RTTimeNanoTS();
304 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
305 pThread->pPendingReq = pReq;
306 ASMAtomicIncU32(&pPool->cCurActiveRequests);
307 Assert(pReq->u32Magic == RTREQ_MAGIC);
308
309 /*
310 * Do the actual processing.
311 */
312 rtReqProcessOne(pReq);
313
314 /*
315 * Update thread statistics and state.
316 */
317 ASMAtomicDecU32(&pPool->cCurActiveRequests);
318 pThread->pPendingReq = NULL;
319 uint64_t const uNsTsEnd = RTTimeNanoTS();
320 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
321 pThread->cNsTotalReqQueued += pThread->uProcessingNanoTs - pThread->uPendingNanoTs;
322 pThread->cReqProcessed++;
323}
324
325
326
327/**
328 * The Worker Thread Procedure.
329 *
330 * @returns VINF_SUCCESS.
331 * @param hThreadSelf The thread handle (unused).
332 * @param pvArg Pointer to the thread data.
333 */
334static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
335{
336 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
337 PRTREQPOOLINT pPool = pThread->pPool;
338
339 /*
340 * The work loop.
341 */
342 uint64_t cReqPrevProcessedIdle = UINT64_MAX;
343 uint64_t cReqPrevProcessedStat = 0;
344 uint64_t cNsPrevTotalReqProcessing = 0;
345 uint64_t cNsPrevTotalReqQueued = 0;
346 while (!pPool->fDestructing)
347 {
348 /*
349 * Process pending work.
350 */
351
352 /* Check if anything is scheduled directly to us. */
353 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
354 if (pReq)
355 {
356 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
357 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
358 continue;
359 }
360
361 ASMAtomicIncU32(&pPool->cIdleThreads);
362 RTCritSectEnter(&pPool->CritSect);
363
364 /* Update the global statistics. */
365 if (cReqPrevProcessedStat != pThread->cReqProcessed)
366 {
367 pPool->cReqProcessed += pThread->cReqProcessed - cReqPrevProcessedStat;
368 cReqPrevProcessedStat = pThread->cReqProcessed;
369 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
370 cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing;
371 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
372 cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued;
373 }
374
375 /* Recheck the todo request pointer after entering the critsect. */
376 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
377 if (pReq)
378 {
379 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
380 RTCritSectLeave(&pPool->CritSect);
381
382 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
383 continue;
384 }
385
386 /* Any pending requests in the queue? */
387 pReq = pPool->pPendingRequests;
388 if (pReq)
389 {
390 pPool->pPendingRequests = pReq->pNext;
391 if (pReq->pNext == NULL)
392 pPool->ppPendingRequests = &pPool->pPendingRequests;
393 Assert(pPool->cCurPendingRequests > 0);
394 pPool->cCurPendingRequests--;
395
396 /* Un-idle ourselves and process the request. */
397 if (!RTListIsEmpty(&pThread->IdleNode))
398 {
399 RTListNodeRemove(&pThread->IdleNode);
400 RTListInit(&pThread->IdleNode);
401 ASMAtomicDecU32(&pPool->cIdleThreads);
402 }
403 ASMAtomicDecU32(&pPool->cIdleThreads);
404 RTCritSectLeave(&pPool->CritSect);
405
406 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
407 continue;
408 }
409
410 /*
411 * Nothing to do, go idle.
412 */
413 if (cReqPrevProcessedIdle != pThread->cReqProcessed)
414 {
415 cReqPrevProcessedIdle = pThread->cReqProcessed;
416 pThread->uIdleNanoTs = RTTimeNanoTS();
417 }
418 else if (pPool->cCurThreads > pPool->cMinThreads)
419 {
420 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
421 if (cNsIdle >= pPool->cNsMinIdle)
422 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
423 }
424
425 if (RTListIsEmpty(&pThread->IdleNode))
426 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
427 else
428 ASMAtomicDecU32(&pPool->cIdleThreads);
429 RTThreadUserReset(hThreadSelf);
430 uint32_t const cMsSleep = pPool->cMsIdleSleep;
431
432 RTCritSectLeave(&pPool->CritSect);
433
434 RTThreadUserWait(hThreadSelf, cMsSleep);
435 }
436
437 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
438}
439
440
441/**
442 * Create a new worker thread.
443 *
444 * @param pPool The pool needing new worker thread.
445 * @remarks Caller owns the critical section
446 */
447static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
448{
449 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
450 if (!pThread)
451 return;
452
453 pThread->uBirthNanoTs = RTTimeNanoTS();
454 pThread->pPool = pPool;
455 pThread->idLastCpu = NIL_RTCPUID;
456 pThread->hThread = NIL_RTTHREAD;
457 RTListInit(&pThread->IdleNode);
458 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
459 pPool->cCurThreads++;
460 pPool->cThreadsCreated++;
461
462 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
463 pPool->enmThreadType, 0 /*fFlags*/, "%s%02u", pPool->szName, pPool->cThreadsCreated);
464 if (RT_SUCCESS(rc))
465 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
466 else
467 {
468 pPool->cCurThreads--;
469 RTListNodeRemove(&pThread->ListNode);
470 RTMemFree(pThread);
471 }
472}
473
474
475/**
476 * Repel the submitter, giving the worker threads a chance to process the
477 * incoming request.
478 *
479 * @returns Success if a worker picked up the request, failure if not. The
480 * critical section has been left on success, while we'll be inside it
481 * on failure.
482 * @param pPool The pool.
483 * @param pReq The incoming request.
484 */
485static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
486{
487 /*
488 * Lazily create the push back semaphore that we'll be blociing on.
489 */
490 int rc;
491 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
492 if (hEvt == NIL_RTSEMEVENTMULTI)
493 {
494 rc = RTSemEventMultiCreate(&hEvt);
495 if (RT_FAILURE(rc))
496 return rc;
497 pReq->hPushBackEvt = hEvt;
498 }
499
500 /*
501 * Prepare the request and semaphore.
502 */
503 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
504 pReq->fSignalPushBack = true;
505 RTReqRetain(pReq);
506 RTSemEventMultiReset(hEvt);
507
508 RTCritSectLeave(&pPool->CritSect);
509
510 /*
511 * Block.
512 */
513 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
514 if (RT_FAILURE(rc))
515 {
516 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
517 RTCritSectEnter(&pPool->CritSect);
518 }
519 RTReqRelease(pReq);
520 return rc;
521}
522
523
524
525DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
526{
527 RTCritSectEnter(&pPool->CritSect);
528
529 pPool->cReqSubmitted++;
530
531 /*
532 * Try schedule the request to a thread that's currently idle.
533 */
534 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
535 if (pThread)
536 {
537 /** @todo CPU affinity??? */
538 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
539
540 RTListNodeRemove(&pThread->IdleNode);
541 RTListInit(&pThread->IdleNode);
542 ASMAtomicDecU32(&pPool->cIdleThreads);
543
544 RTThreadUserSignal(pThread->hThread);
545
546 RTCritSectLeave(&pPool->CritSect);
547 return;
548 }
549 Assert(RTListIsEmpty(&pPool->IdleThreads));
550
551 /*
552 * Put the request in the pending queue.
553 */
554 pReq->pNext = NULL;
555 *pPool->ppPendingRequests = pReq;
556 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
557 pPool->cCurPendingRequests++;
558
559 /*
560 * If there is an incoming worker thread already or we've reached the
561 * maximum number of worker threads, we're done.
562 */
563 if ( pPool->cIdleThreads > 0
564 || pPool->cCurThreads >= pPool->cMaxThreads)
565 {
566 RTCritSectLeave(&pPool->CritSect);
567 return;
568 }
569
570 /*
571 * Push back before creating a new worker thread.
572 */
573 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
574 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
575 {
576 int rc = rtReqPoolPushBack(pPool, pReq);
577 if (RT_SUCCESS(rc))
578 return;
579 }
580
581 /*
582 * Create a new thread for processing the request.
583 * For simplicity, we don't bother leaving the critical section while doing so.
584 */
585 rtReqPoolCreateNewWorker(pPool);
586
587 RTCritSectLeave(&pPool->CritSect);
588 return;
589}
590
591
592/**
593 * Frees a requst.
594 *
595 * @returns true if recycled, false if not.
596 * @param pPool The request thread pool.
597 * @param pReq The request.
598 */
599DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
600{
601 if ( pPool
602 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
603 {
604 RTCritSectEnter(&pPool->CritSect);
605 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
606 {
607 pReq->pNext = pPool->pFreeRequests;
608 pPool->pFreeRequests = pReq;
609 ASMAtomicIncU32(&pPool->cCurFreeRequests);
610
611 RTCritSectLeave(&pPool->CritSect);
612 return true;
613 }
614
615 RTCritSectLeave(&pPool->CritSect);
616 }
617 return false;
618}
619
620
621RTDECL(int) RTReqPoolCreate(uint32_t cMaxThreads, RTMSINTERVAL cMsMinIdle,
622 uint32_t cThreadsPushBackThreshold, uint32_t cMsMaxPushBack,
623 const char *pszName, PRTREQPOOL phPool)
624{
625 /*
626 * Validate and massage the config.
627 */
628 if (cMaxThreads == UINT32_MAX)
629 cMaxThreads = RTREQPOOL_MAX_THREADS;
630 AssertMsgReturn(cMaxThreads > 0 && cMaxThreads <= RTREQPOOL_MAX_THREADS, ("%u\n", cMaxThreads), VERR_OUT_OF_RANGE);
631 uint32_t const cMinThreads = cMaxThreads > 2 ? 2 : cMaxThreads - 1;
632
633 if (cThreadsPushBackThreshold == 0)
634 cThreadsPushBackThreshold = cMinThreads;
635 else if (cThreadsPushBackThreshold == UINT32_MAX)
636 cThreadsPushBackThreshold = cMaxThreads;
637 AssertMsgReturn(cThreadsPushBackThreshold <= cMaxThreads, ("%u/%u\n", cThreadsPushBackThreshold, cMaxThreads), VERR_OUT_OF_RANGE);
638
639 if (cMsMaxPushBack == UINT32_MAX)
640 cMsMaxPushBack = RTREQPOOL_PUSH_BACK_MAX_MS;
641 AssertMsgReturn(cMsMaxPushBack <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", cMsMaxPushBack), VERR_OUT_OF_RANGE);
642 uint32_t const cMsMinPushBack = cMsMaxPushBack >= 200 ? 100 : cMsMaxPushBack / 2;
643
644 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
645 size_t cchName = strlen(pszName);
646 AssertReturn(cchName > 0, VERR_INVALID_PARAMETER);
647 Assert(cchName <= 10);
648
649 AssertPtrReturn(phPool, VERR_INVALID_POINTER);
650
651 /*
652 * Create and initialize the pool.
653 */
654 PRTREQPOOLINT pPool = (PRTREQPOOLINT)RTMemAlloc(sizeof(*pPool));
655 if (!pPool)
656 return VERR_NO_MEMORY;
657
658 pPool->u32Magic = RTREQPOOL_MAGIC;
659 RTStrCopy(pPool->szName, sizeof(pPool->szName), pszName);
660
661 pPool->enmThreadType = RTTHREADTYPE_DEFAULT;
662 pPool->cMaxThreads = cMaxThreads;
663 pPool->cMinThreads = cMinThreads;
664 pPool->cMsMinIdle = cMsMinIdle == RT_INDEFINITE_WAIT || cMsMinIdle >= UINT32_MAX ? UINT32_MAX : cMsMinIdle;
665 pPool->cNsMinIdle = pPool->cMsMinIdle == UINT32_MAX ? UINT64_MAX : cMsMinIdle * RT_NS_1MS_64;
666 pPool->cMsIdleSleep = pPool->cMsMinIdle == UINT32_MAX ? RT_INDEFINITE_WAIT : RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
667 pPool->cThreadsPushBackThreshold = cThreadsPushBackThreshold;
668 pPool->cMsMaxPushBack = cMsMaxPushBack;
669 pPool->cMsMinPushBack = cMsMinPushBack;
670 pPool->cMaxFreeRequests = cMaxThreads * 2;
671 pPool->hThreadTermEvt = NIL_RTSEMEVENTMULTI;
672 pPool->fDestructing = false;
673 pPool->cMsCurPushBack = 0;
674 pPool->cCurThreads = 0;
675 pPool->cThreadsCreated = 0;
676 pPool->uLastThreadCreateNanoTs = 0;
677 RTListInit(&pPool->WorkerThreads);
678 pPool->cReqProcessed = 0;
679 pPool->cNsTotalReqProcessing= 0;
680 pPool->cNsTotalReqQueued = 0;
681 pPool->cRefs = 1;
682 pPool->cIdleThreads = 0;
683 RTListInit(&pPool->IdleThreads);
684 pPool->pPendingRequests = NULL;
685 pPool->ppPendingRequests = &pPool->pPendingRequests;
686 pPool->cCurPendingRequests = 0;
687 pPool->cCurActiveRequests = 0;
688 pPool->cReqSubmitted = 0;
689 pPool->pFreeRequests = NULL;
690 pPool->cCurFreeRequests = 0;
691
692 int rc = RTSemEventMultiCreate(&pPool->hThreadTermEvt);
693 if (RT_SUCCESS(rc))
694 {
695 rc = RTCritSectInit(&pPool->CritSect);
696 if (RT_SUCCESS(rc))
697 {
698 *phPool = pPool;
699 return VINF_SUCCESS;
700 }
701
702 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
703 }
704 pPool->u32Magic = RTREQPOOL_MAGIC_DEAD;
705 RTMemFree(pPool);
706 return rc;
707}
708
709
710
711RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
712{
713 PRTREQPOOLINT pPool = hPool;
714 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
715 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
716 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
717
718 RTCritSectEnter(&pPool->CritSect);
719
720 bool fWakeUpIdleThreads = false;
721 int rc = VINF_SUCCESS;
722 switch (enmVar)
723 {
724 case RTREQPOOLCFGVAR_THREAD_TYPE:
725 AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
726 ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
727
728 pPool->enmThreadType = (RTTHREADTYPE)uValue;
729 break;
730
731 case RTREQPOOLCFGVAR_MIN_THREADS:
732 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
733 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
734 pPool->cMinThreads = (uint32_t)uValue;
735 if (pPool->cMinThreads > pPool->cMaxThreads)
736 pPool->cMaxThreads = pPool->cMinThreads;
737 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
738 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
739 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
740 rtReqPoolRecalcPushBack(pPool);
741 break;
742
743 case RTREQPOOLCFGVAR_MAX_THREADS:
744 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
745 pPool->cMaxThreads = (uint32_t)uValue;
746 if (pPool->cMaxThreads < pPool->cMinThreads)
747 {
748 pPool->cMinThreads = pPool->cMaxThreads;
749 fWakeUpIdleThreads = true;
750 }
751 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
752 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
753 rtReqPoolRecalcPushBack(pPool);
754 break;
755
756 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
757 AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
758 if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
759 {
760 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
761 pPool->cMsMinIdle = (uint32_t)uValue;
762 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
763 if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
764 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
765 }
766 else
767 {
768 pPool->cMsMinIdle = UINT32_MAX;
769 pPool->cNsMinIdle = UINT64_MAX;
770 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
771 }
772 break;
773
774 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
775 AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
776 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
777 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
778 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
779 {
780 pPool->cMsMinIdle = UINT32_MAX;
781 pPool->cNsMinIdle = UINT64_MAX;
782 }
783 break;
784
785 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
786 if (uValue == UINT64_MAX)
787 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
788 else if (uValue == 0)
789 pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
790 else
791 {
792 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
793 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
794 pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
795 }
796 break;
797
798 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
799 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
800 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
801 else
802 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
803 pPool->cMsMinPushBack = (uint32_t)uValue;
804 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
805 pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
806 rtReqPoolRecalcPushBack(pPool);
807 break;
808
809 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
810 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
811 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
812 else
813 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
814 pPool->cMsMaxPushBack = (uint32_t)uValue;
815 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
816 pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
817 rtReqPoolRecalcPushBack(pPool);
818 break;
819
820 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
821 if (uValue == UINT64_MAX)
822 {
823 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
824 if (pPool->cMaxFreeRequests < 16)
825 pPool->cMaxFreeRequests = 16;
826 }
827 else
828 {
829 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
830 pPool->cMaxFreeRequests = (uint32_t)uValue;
831 }
832
833 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
834 {
835 PRTREQINT pReq = pPool->pFreeRequests;
836 pPool->pFreeRequests = pReq->pNext;
837 ASMAtomicDecU32(&pPool->cCurFreeRequests);
838 rtReqFreeIt(pReq);
839 }
840 break;
841
842 default:
843 AssertFailed();
844 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
845 }
846
847 /* Wake up all idle threads if required. */
848 if (fWakeUpIdleThreads)
849 {
850 Assert(rc == VINF_SUCCESS);
851 PRTREQPOOLTHREAD pThread;
852 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
853 {
854 RTThreadUserSignal(pThread->hThread);
855 }
856 }
857
858 RTCritSectLeave(&pPool->CritSect);
859
860 return rc;
861}
862RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar);
863
864
865RTDECL(uint64_t) RTReqPoolGetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar)
866{
867 PRTREQPOOLINT pPool = hPool;
868 AssertPtrReturn(pPool, UINT64_MAX);
869 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
870 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, UINT64_MAX);
871
872 RTCritSectEnter(&pPool->CritSect);
873
874 uint64_t u64;
875 switch (enmVar)
876 {
877 case RTREQPOOLCFGVAR_THREAD_TYPE:
878 u64 = pPool->enmThreadType;
879 break;
880
881 case RTREQPOOLCFGVAR_MIN_THREADS:
882 u64 = pPool->cMinThreads;
883 break;
884
885 case RTREQPOOLCFGVAR_MAX_THREADS:
886 u64 = pPool->cMaxThreads;
887 break;
888
889 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
890 u64 = pPool->cMsMinIdle;
891 break;
892
893 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
894 u64 = pPool->cMsIdleSleep;
895 break;
896
897 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
898 u64 = pPool->cThreadsPushBackThreshold;
899 break;
900
901 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
902 u64 = pPool->cMsMinPushBack;
903 break;
904
905 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
906 u64 = pPool->cMsMaxPushBack;
907 break;
908
909 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
910 u64 = pPool->cMaxFreeRequests;
911 break;
912
913 default:
914 AssertFailed();
915 u64 = UINT64_MAX;
916 break;
917 }
918
919 RTCritSectLeave(&pPool->CritSect);
920
921 return u64;
922}
923RT_EXPORT_SYMBOL(RTReqGetQueryCfgVar);
924
925
926RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
927{
928 PRTREQPOOLINT pPool = hPool;
929 AssertPtrReturn(pPool, UINT64_MAX);
930 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
931 AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
932
933 RTCritSectEnter(&pPool->CritSect);
934
935 uint64_t u64;
936 switch (enmStat)
937 {
938 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
939 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
940 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
941 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
942 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
943 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
944 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
945 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
946 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
947 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
948 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
949 default:
950 AssertFailed();
951 u64 = UINT64_MAX;
952 break;
953 }
954
955 RTCritSectLeave(&pPool->CritSect);
956
957 return u64;
958}
959RT_EXPORT_SYMBOL(RTReqPoolGetStat);
960
961
962RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
963{
964 PRTREQPOOLINT pPool = hPool;
965 AssertPtrReturn(pPool, UINT32_MAX);
966 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
967
968 return ASMAtomicIncU32(&pPool->cRefs);
969}
970RT_EXPORT_SYMBOL(RTReqPoolRetain);
971
972
973RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
974{
975 /*
976 * Ignore NULL and validate the request.
977 */
978 if (!hPool)
979 return 0;
980 PRTREQPOOLINT pPool = hPool;
981 AssertPtrReturn(pPool, UINT32_MAX);
982 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
983
984 /*
985 * Drop a reference, free it when it reaches zero.
986 */
987 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
988 if (cRefs == 0)
989 {
990 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
991
992 RTCritSectEnter(&pPool->CritSect);
993#ifdef RT_STRICT
994 RTTHREAD const hSelf = RTThreadSelf();
995#endif
996
997 /* Indicate to the worker threads that we're shutting down. */
998 ASMAtomicWriteBool(&pPool->fDestructing, true);
999 PRTREQPOOLTHREAD pThread;
1000 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
1001 {
1002 Assert(pThread->hThread != hSelf);
1003 RTThreadUserSignal(pThread->hThread);
1004 }
1005
1006 /* Cancel pending requests. */
1007 Assert(!pPool->pPendingRequests);
1008 while (pPool->pPendingRequests)
1009 {
1010 PRTREQINT pReq = pPool->pPendingRequests;
1011 pPool->pPendingRequests = pReq->pNext;
1012 rtReqPoolCancelReq(pReq);
1013 }
1014 pPool->ppPendingRequests = NULL;
1015 pPool->cCurPendingRequests = 0;
1016
1017 /* Wait for the workers to shut down. */
1018 while (!RTListIsEmpty(&pPool->WorkerThreads))
1019 {
1020 RTCritSectLeave(&pPool->CritSect);
1021 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
1022 RTCritSectEnter(&pPool->CritSect);
1023 /** @todo should we wait forever here? */
1024 }
1025
1026 /* Free recycled requests. */
1027 for (;;)
1028 {
1029 PRTREQINT pReq = pPool->pFreeRequests;
1030 if (!pReq)
1031 break;
1032 pPool->pFreeRequests = pReq->pNext;
1033 pPool->cCurFreeRequests--;
1034 rtReqFreeIt(pReq);
1035 }
1036
1037 /* Finally, free the critical section and pool instance. */
1038 RTCritSectLeave(&pPool->CritSect);
1039 RTCritSectDelete(&pPool->CritSect);
1040 RTMemFree(pPool);
1041 }
1042
1043 return cRefs;
1044}
1045RT_EXPORT_SYMBOL(RTReqPoolRelease);
1046
1047
1048RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
1049{
1050 PRTREQPOOLINT pPool = hPool;
1051 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
1052 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
1053
1054 /*
1055 * Try recycle old requests.
1056 */
1057 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
1058 {
1059 RTCritSectEnter(&pPool->CritSect);
1060 PRTREQINT pReq = pPool->pFreeRequests;
1061 if (pReq)
1062 {
1063 ASMAtomicDecU32(&pPool->cCurFreeRequests);
1064 pPool->pFreeRequests = pReq->pNext;
1065
1066 RTCritSectLeave(&pPool->CritSect);
1067
1068 Assert(pReq->fPoolOrQueue);
1069 Assert(pReq->uOwner.hPool == pPool);
1070
1071 int rc = rtReqReInit(pReq, enmType);
1072 if (RT_SUCCESS(rc))
1073 {
1074 *phReq = pReq;
1075 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
1076 return rc;
1077 }
1078 }
1079 else
1080 RTCritSectLeave(&pPool->CritSect);
1081 }
1082
1083 /*
1084 * Allocate a new request.
1085 */
1086 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
1087 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
1088 return rc;
1089}
1090RT_EXPORT_SYMBOL(RTReqPoolAlloc);
1091
1092
1093RTDECL(int) RTReqPoolCallEx( RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
1094{
1095 va_list va;
1096 va_start(va, cArgs);
1097 int rc = RTReqPoolCallExV(hPool, cMillies, phReq, fFlags, pfnFunction, cArgs, va);
1098 va_end(va);
1099 return rc;
1100}
1101RT_EXPORT_SYMBOL(RTReqPoolCallEx);
1102
1103
1104RTDECL(int) RTReqPoolCallExV(RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, va_list va)
1105{
1106 /*
1107 * Check input.
1108 */
1109 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
1110 AssertMsgReturn(!((uint32_t)fFlags & ~(uint32_t)(RTREQFLAGS_NO_WAIT | RTREQFLAGS_RETURN_MASK)), ("%#x\n", (uint32_t)fFlags), VERR_INVALID_PARAMETER);
1111 if (!(fFlags & RTREQFLAGS_NO_WAIT))
1112 {
1113 AssertPtrReturn(phReq, VERR_INVALID_POINTER);
1114 *phReq = NIL_RTREQ;
1115 }
1116
1117 PRTREQINT pReq = NULL;
1118 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
1119
1120 /*
1121 * Allocate and initialize the request.
1122 */
1123 int rc = RTReqPoolAlloc(hPool, RTREQTYPE_INTERNAL, &pReq);
1124 if (RT_FAILURE(rc))
1125 return rc;
1126 pReq->fFlags = fFlags;
1127 pReq->u.Internal.pfn = pfnFunction;
1128 pReq->u.Internal.cArgs = cArgs;
1129 for (unsigned iArg = 0; iArg < cArgs; iArg++)
1130 pReq->u.Internal.aArgs[iArg] = va_arg(va, uintptr_t);
1131
1132 /*
1133 * Submit the request.
1134 */
1135 rc = RTReqSubmit(pReq, cMillies);
1136 if ( rc != VINF_SUCCESS
1137 && rc != VERR_TIMEOUT)
1138 {
1139 Assert(rc != VERR_INTERRUPTED);
1140 RTReqRelease(pReq);
1141 pReq = NULL;
1142 }
1143
1144 if (!(fFlags & RTREQFLAGS_NO_WAIT))
1145 {
1146 *phReq = pReq;
1147 LogFlow(("RTReqPoolCallExV: returns %Rrc *phReq=%p\n", rc, pReq));
1148 }
1149 else
1150 LogFlow(("RTReqPoolCallExV: returns %Rrc\n", rc));
1151 return rc;
1152}
1153RT_EXPORT_SYMBOL(RTReqPoolCallExV);
1154
1155
1156RTDECL(int) RTReqPoolCallWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1157{
1158 PRTREQINT pReq;
1159 va_list va;
1160 va_start(va, cArgs);
1161 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_IPRT_STATUS,
1162 pfnFunction, cArgs, va);
1163 va_end(va);
1164 if (RT_SUCCESS(rc))
1165 rc = pReq->iStatusX;
1166 RTReqRelease(pReq);
1167 return rc;
1168}
1169RT_EXPORT_SYMBOL(RTReqPoolCallWait);
1170
1171
1172RTDECL(int) RTReqPoolCallNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1173{
1174 va_list va;
1175 va_start(va, cArgs);
1176 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_IPRT_STATUS | RTREQFLAGS_NO_WAIT,
1177 pfnFunction, cArgs, va);
1178 va_end(va);
1179 return rc;
1180}
1181RT_EXPORT_SYMBOL(RTReqPoolCallNoWait);
1182
1183
1184RTDECL(int) RTReqPoolCallVoidWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1185{
1186 PRTREQINT pReq;
1187 va_list va;
1188 va_start(va, cArgs);
1189 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_VOID,
1190 pfnFunction, cArgs, va);
1191 va_end(va);
1192 if (RT_SUCCESS(rc))
1193 rc = pReq->iStatusX;
1194 RTReqRelease(pReq);
1195 return rc;
1196}
1197RT_EXPORT_SYMBOL(RTReqPoolCallVoidWait);
1198
1199
1200RTDECL(int) RTReqPoolCallVoidNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1201{
1202 va_list va;
1203 va_start(va, cArgs);
1204 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_VOID | RTREQFLAGS_NO_WAIT,
1205 pfnFunction, cArgs, va);
1206 va_end(va);
1207 return rc;
1208}
1209RT_EXPORT_SYMBOL(RTReqPoolCallVoidNoWait);
1210
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette