VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 76346

Last change on this file since 76346 was 76346, checked in by vboxsync, 6 years ago

*: Preparing for iprt/string.h, iprt/json.h and iprt/serialport.h no longer including iprt/err.h and string.h no longer including latin1.h (it needs err.h). bugref:9344

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 41.1 KB
Line 
1/* $Id: reqpool.cpp 76346 2018-12-22 00:51:28Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2017 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/err.h>
38#include <iprt/list.h>
39#include <iprt/log.h>
40#include <iprt/mem.h>
41#include <iprt/string.h>
42#include <iprt/time.h>
43#include <iprt/semaphore.h>
44#include <iprt/thread.h>
45
46#include "internal/req.h"
47#include "internal/magics.h"
48
49
50/*********************************************************************************************************************************
51* Defined Constants And Macros *
52*********************************************************************************************************************************/
53/** The max number of worker threads. */
54#define RTREQPOOL_MAX_THREADS UINT32_C(16384)
55/** The max number of milliseconds to push back. */
56#define RTREQPOOL_PUSH_BACK_MAX_MS RT_MS_1MIN
57/** The max number of free requests to keep around. */
58#define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U)
59
60
61/*********************************************************************************************************************************
62* Structures and Typedefs *
63*********************************************************************************************************************************/
64typedef struct RTREQPOOLTHREAD
65{
66 /** Node in the RTREQPOOLINT::IdleThreads list. */
67 RTLISTNODE IdleNode;
68 /** Node in the RTREQPOOLINT::WorkerThreads list. */
69 RTLISTNODE ListNode;
70
71 /** The submit timestamp of the pending request. */
72 uint64_t uPendingNanoTs;
73 /** The submit timestamp of the request processing. */
74 uint64_t uProcessingNanoTs;
75 /** When this CPU went idle the last time. */
76 uint64_t uIdleNanoTs;
77 /** The number of requests processed by this thread. */
78 uint64_t cReqProcessed;
79 /** Total time the requests processed by this thread took to process. */
80 uint64_t cNsTotalReqProcessing;
81 /** Total time the requests processed by this thread had to wait in
82 * the queue before being scheduled. */
83 uint64_t cNsTotalReqQueued;
84 /** The CPU this was scheduled last time we checked. */
85 RTCPUID idLastCpu;
86
87 /** The submitter will put an incoming request here when scheduling an idle
88 * thread. */
89 PRTREQINT volatile pTodoReq;
90 /** The request the thread is currently processing. */
91 PRTREQINT volatile pPendingReq;
92
93 /** The thread handle. */
94 RTTHREAD hThread;
95 /** Nano seconds timestamp representing the birth time of the thread. */
96 uint64_t uBirthNanoTs;
97 /** Pointer to the request thread pool instance the thread is associated
98 * with. */
99 struct RTREQPOOLINT *pPool;
100} RTREQPOOLTHREAD;
101/** Pointer to a worker thread. */
102typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
103
104/**
105 * Request thread pool instance data.
106 */
107typedef struct RTREQPOOLINT
108{
109 /** Magic value (RTREQPOOL_MAGIC). */
110 uint32_t u32Magic;
111 /** The request pool name. */
112 char szName[12];
113
114 /** @name Config
115 * @{ */
116 /** The worker thread type. */
117 RTTHREADTYPE enmThreadType;
118 /** The maximum number of worker threads. */
119 uint32_t cMaxThreads;
120 /** The minimum number of worker threads. */
121 uint32_t cMinThreads;
122 /** The number of milliseconds a thread needs to be idle before it is
123 * considered for retirement. */
124 uint32_t cMsMinIdle;
125 /** cMsMinIdle in nano seconds. */
126 uint64_t cNsMinIdle;
127 /** The idle thread sleep interval in milliseconds. */
128 RTMSINTERVAL cMsIdleSleep;
129 /** The number of threads which should be spawned before throttling kicks
130 * in. */
131 uint32_t cThreadsPushBackThreshold;
132 /** The max number of milliseconds to push back a submitter before creating
133 * a new worker thread once the threshold has been reached. */
134 uint32_t cMsMaxPushBack;
135 /** The minimum number of milliseconds to push back a submitter before
136 * creating a new worker thread once the threshold has been reached. */
137 uint32_t cMsMinPushBack;
138 /** The max number of free requests in the recycle LIFO. */
139 uint32_t cMaxFreeRequests;
140 /** @} */
141
142 /** Signaled by terminating worker threads. */
143 RTSEMEVENTMULTI hThreadTermEvt;
144
145 /** Destruction indicator. The worker threads checks in their loop. */
146 bool volatile fDestructing;
147
148 /** The current submitter push back in milliseconds.
149 * This is recalculated when worker threads come and go. */
150 uint32_t cMsCurPushBack;
151 /** The current number of worker threads. */
152 uint32_t cCurThreads;
153 /** Statistics: The total number of threads created. */
154 uint32_t cThreadsCreated;
155 /** Statistics: The timestamp when the last thread was created. */
156 uint64_t uLastThreadCreateNanoTs;
157 /** Linked list of worker threads. */
158 RTLISTANCHOR WorkerThreads;
159
160 /** The number of requests processed and counted in the time totals. */
161 uint64_t cReqProcessed;
162 /** Total time the requests processed by this thread took to process. */
163 uint64_t cNsTotalReqProcessing;
164 /** Total time the requests processed by this thread had to wait in
165 * the queue before being scheduled. */
166 uint64_t cNsTotalReqQueued;
167
168 /** Reference counter. */
169 uint32_t volatile cRefs;
170 /** The number of idle thread or threads in the process of becoming
171 * idle. This is increased before the to-be-idle thread tries to enter
172 * the critical section and add itself to the list. */
173 uint32_t volatile cIdleThreads;
174 /** Linked list of idle threads. */
175 RTLISTANCHOR IdleThreads;
176
177 /** Head of the request FIFO. */
178 PRTREQINT pPendingRequests;
179 /** Where to insert the next request. */
180 PRTREQINT *ppPendingRequests;
181 /** The number of requests currently pending. */
182 uint32_t cCurPendingRequests;
183 /** The number of requests currently being executed. */
184 uint32_t volatile cCurActiveRequests;
185 /** The number of requests submitted. */
186 uint64_t cReqSubmitted;
187
188 /** Head of the request recycling LIFO. */
189 PRTREQINT pFreeRequests;
190 /** The number of requests in the recycling LIFO. This is read without
191 * entering the critical section, thus volatile. */
192 uint32_t volatile cCurFreeRequests;
193
194 /** Critical section serializing access to members of this structure. */
195 RTCRITSECT CritSect;
196
197} RTREQPOOLINT;
198
199
200/**
201 * Used by exiting thread and the pool destruction code to cancel unexpected
202 * requests.
203 *
204 * @param pReq The request.
205 */
206static void rtReqPoolCancelReq(PRTREQINT pReq)
207{
208 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
209 pReq->enmState = RTREQSTATE_COMPLETED;
210 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
211 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
212 RTSemEventMultiSignal(pReq->hPushBackEvt);
213 RTSemEventSignal(pReq->EventSem);
214
215 RTReqRelease(pReq);
216}
217
218
219/**
220 * Recalculate the max pushback interval when adding or removing worker threads.
221 *
222 * @param pPool The pool. cMsCurPushBack will be changed.
223 */
224static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
225{
226 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
227 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
228 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
229
230 uint32_t cMsCurPushBack;
231 if ((cMsRange >> 2) >= cSteps)
232 cMsCurPushBack = cMsRange / cSteps * iStep;
233 else
234 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
235 cMsCurPushBack += pPool->cMsMinPushBack;
236
237 pPool->cMsCurPushBack = cMsCurPushBack;
238}
239
240
241
242/**
243 * Performs thread exit.
244 *
245 * @returns Thread termination status code (VINF_SUCCESS).
246 * @param pPool The pool.
247 * @param pThread The thread.
248 * @param fLocked Whether we are inside the critical section
249 * already.
250 */
251static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
252{
253 if (!fLocked)
254 RTCritSectEnter(&pPool->CritSect);
255
256 /* Get out of the idle list. */
257 if (!RTListIsEmpty(&pThread->IdleNode))
258 {
259 RTListNodeRemove(&pThread->IdleNode);
260 Assert(pPool->cIdleThreads > 0);
261 ASMAtomicDecU32(&pPool->cIdleThreads);
262 }
263
264 /* Get out of the thread list. */
265 RTListNodeRemove(&pThread->ListNode);
266 Assert(pPool->cCurThreads > 0);
267 pPool->cCurThreads--;
268 rtReqPoolRecalcPushBack(pPool);
269
270 /* This shouldn't happen... */
271 PRTREQINT pReq = pThread->pTodoReq;
272 if (pReq)
273 {
274 AssertFailed();
275 pThread->pTodoReq = NULL;
276 rtReqPoolCancelReq(pReq);
277 }
278
279 /* If we're the last thread terminating, ping the destruction thread before
280 we leave the critical section. */
281 if ( RTListIsEmpty(&pPool->WorkerThreads)
282 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
283 RTSemEventMultiSignal(pPool->hThreadTermEvt);
284
285 RTCritSectLeave(&pPool->CritSect);
286
287 return VINF_SUCCESS;
288}
289
290
291
292/**
293 * Process one request.
294 *
295 * @param pPool The pool.
296 * @param pThread The worker thread.
297 * @param pReq The request to process.
298 */
299static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
300{
301 /*
302 * Update thread state.
303 */
304 pThread->uProcessingNanoTs = RTTimeNanoTS();
305 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
306 pThread->pPendingReq = pReq;
307 ASMAtomicIncU32(&pPool->cCurActiveRequests);
308 Assert(pReq->u32Magic == RTREQ_MAGIC);
309
310 /*
311 * Do the actual processing.
312 */
313 rtReqProcessOne(pReq);
314
315 /*
316 * Update thread statistics and state.
317 */
318 ASMAtomicDecU32(&pPool->cCurActiveRequests);
319 pThread->pPendingReq = NULL;
320 uint64_t const uNsTsEnd = RTTimeNanoTS();
321 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
322 pThread->cNsTotalReqQueued += pThread->uProcessingNanoTs - pThread->uPendingNanoTs;
323 pThread->cReqProcessed++;
324}
325
326
327
328/**
329 * The Worker Thread Procedure.
330 *
331 * @returns VINF_SUCCESS.
332 * @param hThreadSelf The thread handle (unused).
333 * @param pvArg Pointer to the thread data.
334 */
335static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
336{
337 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
338 PRTREQPOOLINT pPool = pThread->pPool;
339
340 /*
341 * The work loop.
342 */
343 uint64_t cReqPrevProcessedIdle = UINT64_MAX;
344 uint64_t cReqPrevProcessedStat = 0;
345 uint64_t cNsPrevTotalReqProcessing = 0;
346 uint64_t cNsPrevTotalReqQueued = 0;
347 while (!pPool->fDestructing)
348 {
349 /*
350 * Process pending work.
351 */
352
353 /* Check if anything is scheduled directly to us. */
354 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
355 if (pReq)
356 {
357 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
358 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
359 continue;
360 }
361
362 ASMAtomicIncU32(&pPool->cIdleThreads);
363 RTCritSectEnter(&pPool->CritSect);
364
365 /* Update the global statistics. */
366 if (cReqPrevProcessedStat != pThread->cReqProcessed)
367 {
368 pPool->cReqProcessed += pThread->cReqProcessed - cReqPrevProcessedStat;
369 cReqPrevProcessedStat = pThread->cReqProcessed;
370 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
371 cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing;
372 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
373 cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued;
374 }
375
376 /* Recheck the todo request pointer after entering the critsect. */
377 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
378 if (pReq)
379 {
380 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
381 RTCritSectLeave(&pPool->CritSect);
382
383 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
384 continue;
385 }
386
387 /* Any pending requests in the queue? */
388 pReq = pPool->pPendingRequests;
389 if (pReq)
390 {
391 pPool->pPendingRequests = pReq->pNext;
392 if (pReq->pNext == NULL)
393 pPool->ppPendingRequests = &pPool->pPendingRequests;
394 Assert(pPool->cCurPendingRequests > 0);
395 pPool->cCurPendingRequests--;
396
397 /* Un-idle ourselves and process the request. */
398 if (!RTListIsEmpty(&pThread->IdleNode))
399 {
400 RTListNodeRemove(&pThread->IdleNode);
401 RTListInit(&pThread->IdleNode);
402 ASMAtomicDecU32(&pPool->cIdleThreads);
403 }
404 ASMAtomicDecU32(&pPool->cIdleThreads);
405 RTCritSectLeave(&pPool->CritSect);
406
407 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
408 continue;
409 }
410
411 /*
412 * Nothing to do, go idle.
413 */
414 if (cReqPrevProcessedIdle != pThread->cReqProcessed)
415 {
416 cReqPrevProcessedIdle = pThread->cReqProcessed;
417 pThread->uIdleNanoTs = RTTimeNanoTS();
418 }
419 else if (pPool->cCurThreads > pPool->cMinThreads)
420 {
421 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
422 if (cNsIdle >= pPool->cNsMinIdle)
423 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
424 }
425
426 if (RTListIsEmpty(&pThread->IdleNode))
427 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
428 else
429 ASMAtomicDecU32(&pPool->cIdleThreads);
430 RTThreadUserReset(hThreadSelf);
431 uint32_t const cMsSleep = pPool->cMsIdleSleep;
432
433 RTCritSectLeave(&pPool->CritSect);
434
435 RTThreadUserWait(hThreadSelf, cMsSleep);
436 }
437
438 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
439}
440
441
442/**
443 * Create a new worker thread.
444 *
445 * @param pPool The pool needing new worker thread.
446 * @remarks Caller owns the critical section
447 */
448static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
449{
450 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
451 if (!pThread)
452 return;
453
454 pThread->uBirthNanoTs = RTTimeNanoTS();
455 pThread->pPool = pPool;
456 pThread->idLastCpu = NIL_RTCPUID;
457 pThread->hThread = NIL_RTTHREAD;
458 RTListInit(&pThread->IdleNode);
459 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
460 pPool->cCurThreads++;
461 pPool->cThreadsCreated++;
462
463 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
464 pPool->enmThreadType, 0 /*fFlags*/, "%s%02u", pPool->szName, pPool->cThreadsCreated);
465 if (RT_SUCCESS(rc))
466 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
467 else
468 {
469 pPool->cCurThreads--;
470 RTListNodeRemove(&pThread->ListNode);
471 RTMemFree(pThread);
472 }
473}
474
475
476/**
477 * Repel the submitter, giving the worker threads a chance to process the
478 * incoming request.
479 *
480 * @returns Success if a worker picked up the request, failure if not. The
481 * critical section has been left on success, while we'll be inside it
482 * on failure.
483 * @param pPool The pool.
484 * @param pReq The incoming request.
485 */
486static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
487{
488 /*
489 * Lazily create the push back semaphore that we'll be blociing on.
490 */
491 int rc;
492 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
493 if (hEvt == NIL_RTSEMEVENTMULTI)
494 {
495 rc = RTSemEventMultiCreate(&hEvt);
496 if (RT_FAILURE(rc))
497 return rc;
498 pReq->hPushBackEvt = hEvt;
499 }
500
501 /*
502 * Prepare the request and semaphore.
503 */
504 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
505 pReq->fSignalPushBack = true;
506 RTReqRetain(pReq);
507 RTSemEventMultiReset(hEvt);
508
509 RTCritSectLeave(&pPool->CritSect);
510
511 /*
512 * Block.
513 */
514 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
515 if (RT_FAILURE(rc))
516 {
517 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
518 RTCritSectEnter(&pPool->CritSect);
519 }
520 RTReqRelease(pReq);
521 return rc;
522}
523
524
525
526DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
527{
528 RTCritSectEnter(&pPool->CritSect);
529
530 pPool->cReqSubmitted++;
531
532 /*
533 * Try schedule the request to a thread that's currently idle.
534 */
535 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
536 if (pThread)
537 {
538 /** @todo CPU affinity??? */
539 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
540
541 RTListNodeRemove(&pThread->IdleNode);
542 RTListInit(&pThread->IdleNode);
543 ASMAtomicDecU32(&pPool->cIdleThreads);
544
545 RTThreadUserSignal(pThread->hThread);
546
547 RTCritSectLeave(&pPool->CritSect);
548 return;
549 }
550 Assert(RTListIsEmpty(&pPool->IdleThreads));
551
552 /*
553 * Put the request in the pending queue.
554 */
555 pReq->pNext = NULL;
556 *pPool->ppPendingRequests = pReq;
557 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
558 pPool->cCurPendingRequests++;
559
560 /*
561 * If there is an incoming worker thread already or we've reached the
562 * maximum number of worker threads, we're done.
563 */
564 if ( pPool->cIdleThreads > 0
565 || pPool->cCurThreads >= pPool->cMaxThreads)
566 {
567 RTCritSectLeave(&pPool->CritSect);
568 return;
569 }
570
571 /*
572 * Push back before creating a new worker thread.
573 */
574 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
575 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
576 {
577 int rc = rtReqPoolPushBack(pPool, pReq);
578 if (RT_SUCCESS(rc))
579 return;
580 }
581
582 /*
583 * Create a new thread for processing the request.
584 * For simplicity, we don't bother leaving the critical section while doing so.
585 */
586 rtReqPoolCreateNewWorker(pPool);
587
588 RTCritSectLeave(&pPool->CritSect);
589 return;
590}
591
592
593/**
594 * Frees a requst.
595 *
596 * @returns true if recycled, false if not.
597 * @param pPool The request thread pool.
598 * @param pReq The request.
599 */
600DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
601{
602 if ( pPool
603 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
604 {
605 RTCritSectEnter(&pPool->CritSect);
606 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
607 {
608 pReq->pNext = pPool->pFreeRequests;
609 pPool->pFreeRequests = pReq;
610 ASMAtomicIncU32(&pPool->cCurFreeRequests);
611
612 RTCritSectLeave(&pPool->CritSect);
613 return true;
614 }
615
616 RTCritSectLeave(&pPool->CritSect);
617 }
618 return false;
619}
620
621
622RTDECL(int) RTReqPoolCreate(uint32_t cMaxThreads, RTMSINTERVAL cMsMinIdle,
623 uint32_t cThreadsPushBackThreshold, uint32_t cMsMaxPushBack,
624 const char *pszName, PRTREQPOOL phPool)
625{
626 /*
627 * Validate and massage the config.
628 */
629 if (cMaxThreads == UINT32_MAX)
630 cMaxThreads = RTREQPOOL_MAX_THREADS;
631 AssertMsgReturn(cMaxThreads > 0 && cMaxThreads <= RTREQPOOL_MAX_THREADS, ("%u\n", cMaxThreads), VERR_OUT_OF_RANGE);
632 uint32_t const cMinThreads = cMaxThreads > 2 ? 2 : cMaxThreads - 1;
633
634 if (cThreadsPushBackThreshold == 0)
635 cThreadsPushBackThreshold = cMinThreads;
636 else if (cThreadsPushBackThreshold == UINT32_MAX)
637 cThreadsPushBackThreshold = cMaxThreads;
638 AssertMsgReturn(cThreadsPushBackThreshold <= cMaxThreads, ("%u/%u\n", cThreadsPushBackThreshold, cMaxThreads), VERR_OUT_OF_RANGE);
639
640 if (cMsMaxPushBack == UINT32_MAX)
641 cMsMaxPushBack = RTREQPOOL_PUSH_BACK_MAX_MS;
642 AssertMsgReturn(cMsMaxPushBack <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", cMsMaxPushBack), VERR_OUT_OF_RANGE);
643 uint32_t const cMsMinPushBack = cMsMaxPushBack >= 200 ? 100 : cMsMaxPushBack / 2;
644
645 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
646 size_t cchName = strlen(pszName);
647 AssertReturn(cchName > 0, VERR_INVALID_PARAMETER);
648 Assert(cchName <= 10);
649
650 AssertPtrReturn(phPool, VERR_INVALID_POINTER);
651
652 /*
653 * Create and initialize the pool.
654 */
655 PRTREQPOOLINT pPool = (PRTREQPOOLINT)RTMemAlloc(sizeof(*pPool));
656 if (!pPool)
657 return VERR_NO_MEMORY;
658
659 pPool->u32Magic = RTREQPOOL_MAGIC;
660 RTStrCopy(pPool->szName, sizeof(pPool->szName), pszName);
661
662 pPool->enmThreadType = RTTHREADTYPE_DEFAULT;
663 pPool->cMaxThreads = cMaxThreads;
664 pPool->cMinThreads = cMinThreads;
665 pPool->cMsMinIdle = cMsMinIdle == RT_INDEFINITE_WAIT || cMsMinIdle >= UINT32_MAX ? UINT32_MAX : cMsMinIdle;
666 pPool->cNsMinIdle = pPool->cMsMinIdle == UINT32_MAX ? UINT64_MAX : cMsMinIdle * RT_NS_1MS_64;
667 pPool->cMsIdleSleep = pPool->cMsMinIdle == UINT32_MAX ? RT_INDEFINITE_WAIT : RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
668 pPool->cThreadsPushBackThreshold = cThreadsPushBackThreshold;
669 pPool->cMsMaxPushBack = cMsMaxPushBack;
670 pPool->cMsMinPushBack = cMsMinPushBack;
671 pPool->cMaxFreeRequests = cMaxThreads * 2;
672 pPool->hThreadTermEvt = NIL_RTSEMEVENTMULTI;
673 pPool->fDestructing = false;
674 pPool->cMsCurPushBack = 0;
675 pPool->cCurThreads = 0;
676 pPool->cThreadsCreated = 0;
677 pPool->uLastThreadCreateNanoTs = 0;
678 RTListInit(&pPool->WorkerThreads);
679 pPool->cReqProcessed = 0;
680 pPool->cNsTotalReqProcessing= 0;
681 pPool->cNsTotalReqQueued = 0;
682 pPool->cRefs = 1;
683 pPool->cIdleThreads = 0;
684 RTListInit(&pPool->IdleThreads);
685 pPool->pPendingRequests = NULL;
686 pPool->ppPendingRequests = &pPool->pPendingRequests;
687 pPool->cCurPendingRequests = 0;
688 pPool->cCurActiveRequests = 0;
689 pPool->cReqSubmitted = 0;
690 pPool->pFreeRequests = NULL;
691 pPool->cCurFreeRequests = 0;
692
693 int rc = RTSemEventMultiCreate(&pPool->hThreadTermEvt);
694 if (RT_SUCCESS(rc))
695 {
696 rc = RTCritSectInit(&pPool->CritSect);
697 if (RT_SUCCESS(rc))
698 {
699 *phPool = pPool;
700 return VINF_SUCCESS;
701 }
702
703 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
704 }
705 pPool->u32Magic = RTREQPOOL_MAGIC_DEAD;
706 RTMemFree(pPool);
707 return rc;
708}
709
710
711
712RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
713{
714 PRTREQPOOLINT pPool = hPool;
715 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
716 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
717 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
718
719 RTCritSectEnter(&pPool->CritSect);
720
721 bool fWakeUpIdleThreads = false;
722 int rc = VINF_SUCCESS;
723 switch (enmVar)
724 {
725 case RTREQPOOLCFGVAR_THREAD_TYPE:
726 AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
727 ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
728
729 pPool->enmThreadType = (RTTHREADTYPE)uValue;
730 break;
731
732 case RTREQPOOLCFGVAR_MIN_THREADS:
733 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
734 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
735 pPool->cMinThreads = (uint32_t)uValue;
736 if (pPool->cMinThreads > pPool->cMaxThreads)
737 pPool->cMaxThreads = pPool->cMinThreads;
738 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
739 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
740 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
741 rtReqPoolRecalcPushBack(pPool);
742 break;
743
744 case RTREQPOOLCFGVAR_MAX_THREADS:
745 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
746 pPool->cMaxThreads = (uint32_t)uValue;
747 if (pPool->cMaxThreads < pPool->cMinThreads)
748 {
749 pPool->cMinThreads = pPool->cMaxThreads;
750 fWakeUpIdleThreads = true;
751 }
752 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
753 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
754 rtReqPoolRecalcPushBack(pPool);
755 break;
756
757 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
758 AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
759 if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
760 {
761 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
762 pPool->cMsMinIdle = (uint32_t)uValue;
763 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
764 if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
765 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
766 }
767 else
768 {
769 pPool->cMsMinIdle = UINT32_MAX;
770 pPool->cNsMinIdle = UINT64_MAX;
771 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
772 }
773 break;
774
775 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
776 AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
777 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
778 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
779 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
780 {
781 pPool->cMsMinIdle = UINT32_MAX;
782 pPool->cNsMinIdle = UINT64_MAX;
783 }
784 break;
785
786 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
787 if (uValue == UINT64_MAX)
788 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
789 else if (uValue == 0)
790 pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
791 else
792 {
793 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
794 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
795 pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
796 }
797 break;
798
799 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
800 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
801 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
802 else
803 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
804 pPool->cMsMinPushBack = (uint32_t)uValue;
805 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
806 pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
807 rtReqPoolRecalcPushBack(pPool);
808 break;
809
810 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
811 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
812 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
813 else
814 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
815 pPool->cMsMaxPushBack = (uint32_t)uValue;
816 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
817 pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
818 rtReqPoolRecalcPushBack(pPool);
819 break;
820
821 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
822 if (uValue == UINT64_MAX)
823 {
824 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
825 if (pPool->cMaxFreeRequests < 16)
826 pPool->cMaxFreeRequests = 16;
827 }
828 else
829 {
830 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
831 pPool->cMaxFreeRequests = (uint32_t)uValue;
832 }
833
834 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
835 {
836 PRTREQINT pReq = pPool->pFreeRequests;
837 pPool->pFreeRequests = pReq->pNext;
838 ASMAtomicDecU32(&pPool->cCurFreeRequests);
839 rtReqFreeIt(pReq);
840 }
841 break;
842
843 default:
844 AssertFailed();
845 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
846 }
847
848 /* Wake up all idle threads if required. */
849 if (fWakeUpIdleThreads)
850 {
851 Assert(rc == VINF_SUCCESS);
852 PRTREQPOOLTHREAD pThread;
853 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
854 {
855 RTThreadUserSignal(pThread->hThread);
856 }
857 }
858
859 RTCritSectLeave(&pPool->CritSect);
860
861 return rc;
862}
863RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar);
864
865
866RTDECL(uint64_t) RTReqPoolGetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar)
867{
868 PRTREQPOOLINT pPool = hPool;
869 AssertPtrReturn(pPool, UINT64_MAX);
870 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
871 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, UINT64_MAX);
872
873 RTCritSectEnter(&pPool->CritSect);
874
875 uint64_t u64;
876 switch (enmVar)
877 {
878 case RTREQPOOLCFGVAR_THREAD_TYPE:
879 u64 = pPool->enmThreadType;
880 break;
881
882 case RTREQPOOLCFGVAR_MIN_THREADS:
883 u64 = pPool->cMinThreads;
884 break;
885
886 case RTREQPOOLCFGVAR_MAX_THREADS:
887 u64 = pPool->cMaxThreads;
888 break;
889
890 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
891 u64 = pPool->cMsMinIdle;
892 break;
893
894 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
895 u64 = pPool->cMsIdleSleep;
896 break;
897
898 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
899 u64 = pPool->cThreadsPushBackThreshold;
900 break;
901
902 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
903 u64 = pPool->cMsMinPushBack;
904 break;
905
906 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
907 u64 = pPool->cMsMaxPushBack;
908 break;
909
910 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
911 u64 = pPool->cMaxFreeRequests;
912 break;
913
914 default:
915 AssertFailed();
916 u64 = UINT64_MAX;
917 break;
918 }
919
920 RTCritSectLeave(&pPool->CritSect);
921
922 return u64;
923}
924RT_EXPORT_SYMBOL(RTReqGetQueryCfgVar);
925
926
927RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
928{
929 PRTREQPOOLINT pPool = hPool;
930 AssertPtrReturn(pPool, UINT64_MAX);
931 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
932 AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
933
934 RTCritSectEnter(&pPool->CritSect);
935
936 uint64_t u64;
937 switch (enmStat)
938 {
939 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
940 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
941 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
942 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
943 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
944 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
945 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
946 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
947 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
948 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
949 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
950 default:
951 AssertFailed();
952 u64 = UINT64_MAX;
953 break;
954 }
955
956 RTCritSectLeave(&pPool->CritSect);
957
958 return u64;
959}
960RT_EXPORT_SYMBOL(RTReqPoolGetStat);
961
962
963RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
964{
965 PRTREQPOOLINT pPool = hPool;
966 AssertPtrReturn(pPool, UINT32_MAX);
967 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
968
969 return ASMAtomicIncU32(&pPool->cRefs);
970}
971RT_EXPORT_SYMBOL(RTReqPoolRetain);
972
973
974RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
975{
976 /*
977 * Ignore NULL and validate the request.
978 */
979 if (!hPool)
980 return 0;
981 PRTREQPOOLINT pPool = hPool;
982 AssertPtrReturn(pPool, UINT32_MAX);
983 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
984
985 /*
986 * Drop a reference, free it when it reaches zero.
987 */
988 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
989 if (cRefs == 0)
990 {
991 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
992
993 RTCritSectEnter(&pPool->CritSect);
994#ifdef RT_STRICT
995 RTTHREAD const hSelf = RTThreadSelf();
996#endif
997
998 /* Indicate to the worker threads that we're shutting down. */
999 ASMAtomicWriteBool(&pPool->fDestructing, true);
1000 PRTREQPOOLTHREAD pThread;
1001 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
1002 {
1003 Assert(pThread->hThread != hSelf);
1004 RTThreadUserSignal(pThread->hThread);
1005 }
1006
1007 /* Cancel pending requests. */
1008 Assert(!pPool->pPendingRequests);
1009 while (pPool->pPendingRequests)
1010 {
1011 PRTREQINT pReq = pPool->pPendingRequests;
1012 pPool->pPendingRequests = pReq->pNext;
1013 rtReqPoolCancelReq(pReq);
1014 }
1015 pPool->ppPendingRequests = NULL;
1016 pPool->cCurPendingRequests = 0;
1017
1018 /* Wait for the workers to shut down. */
1019 while (!RTListIsEmpty(&pPool->WorkerThreads))
1020 {
1021 RTCritSectLeave(&pPool->CritSect);
1022 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
1023 RTCritSectEnter(&pPool->CritSect);
1024 /** @todo should we wait forever here? */
1025 }
1026
1027 /* Free recycled requests. */
1028 for (;;)
1029 {
1030 PRTREQINT pReq = pPool->pFreeRequests;
1031 if (!pReq)
1032 break;
1033 pPool->pFreeRequests = pReq->pNext;
1034 pPool->cCurFreeRequests--;
1035 rtReqFreeIt(pReq);
1036 }
1037
1038 /* Finally, free the critical section and pool instance. */
1039 RTCritSectLeave(&pPool->CritSect);
1040 RTCritSectDelete(&pPool->CritSect);
1041 RTMemFree(pPool);
1042 }
1043
1044 return cRefs;
1045}
1046RT_EXPORT_SYMBOL(RTReqPoolRelease);
1047
1048
1049RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
1050{
1051 PRTREQPOOLINT pPool = hPool;
1052 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
1053 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
1054
1055 /*
1056 * Try recycle old requests.
1057 */
1058 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
1059 {
1060 RTCritSectEnter(&pPool->CritSect);
1061 PRTREQINT pReq = pPool->pFreeRequests;
1062 if (pReq)
1063 {
1064 ASMAtomicDecU32(&pPool->cCurFreeRequests);
1065 pPool->pFreeRequests = pReq->pNext;
1066
1067 RTCritSectLeave(&pPool->CritSect);
1068
1069 Assert(pReq->fPoolOrQueue);
1070 Assert(pReq->uOwner.hPool == pPool);
1071
1072 int rc = rtReqReInit(pReq, enmType);
1073 if (RT_SUCCESS(rc))
1074 {
1075 *phReq = pReq;
1076 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
1077 return rc;
1078 }
1079 }
1080 else
1081 RTCritSectLeave(&pPool->CritSect);
1082 }
1083
1084 /*
1085 * Allocate a new request.
1086 */
1087 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
1088 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
1089 return rc;
1090}
1091RT_EXPORT_SYMBOL(RTReqPoolAlloc);
1092
1093
1094RTDECL(int) RTReqPoolCallEx( RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
1095{
1096 va_list va;
1097 va_start(va, cArgs);
1098 int rc = RTReqPoolCallExV(hPool, cMillies, phReq, fFlags, pfnFunction, cArgs, va);
1099 va_end(va);
1100 return rc;
1101}
1102RT_EXPORT_SYMBOL(RTReqPoolCallEx);
1103
1104
1105RTDECL(int) RTReqPoolCallExV(RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, va_list va)
1106{
1107 /*
1108 * Check input.
1109 */
1110 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
1111 AssertMsgReturn(!((uint32_t)fFlags & ~(uint32_t)(RTREQFLAGS_NO_WAIT | RTREQFLAGS_RETURN_MASK)), ("%#x\n", (uint32_t)fFlags), VERR_INVALID_PARAMETER);
1112 if (!(fFlags & RTREQFLAGS_NO_WAIT))
1113 {
1114 AssertPtrReturn(phReq, VERR_INVALID_POINTER);
1115 *phReq = NIL_RTREQ;
1116 }
1117
1118 PRTREQINT pReq = NULL;
1119 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
1120
1121 /*
1122 * Allocate and initialize the request.
1123 */
1124 int rc = RTReqPoolAlloc(hPool, RTREQTYPE_INTERNAL, &pReq);
1125 if (RT_FAILURE(rc))
1126 return rc;
1127 pReq->fFlags = fFlags;
1128 pReq->u.Internal.pfn = pfnFunction;
1129 pReq->u.Internal.cArgs = cArgs;
1130 for (unsigned iArg = 0; iArg < cArgs; iArg++)
1131 pReq->u.Internal.aArgs[iArg] = va_arg(va, uintptr_t);
1132
1133 /*
1134 * Submit the request.
1135 */
1136 rc = RTReqSubmit(pReq, cMillies);
1137 if ( rc != VINF_SUCCESS
1138 && rc != VERR_TIMEOUT)
1139 {
1140 Assert(rc != VERR_INTERRUPTED);
1141 RTReqRelease(pReq);
1142 pReq = NULL;
1143 }
1144
1145 if (!(fFlags & RTREQFLAGS_NO_WAIT))
1146 {
1147 *phReq = pReq;
1148 LogFlow(("RTReqPoolCallExV: returns %Rrc *phReq=%p\n", rc, pReq));
1149 }
1150 else
1151 LogFlow(("RTReqPoolCallExV: returns %Rrc\n", rc));
1152 return rc;
1153}
1154RT_EXPORT_SYMBOL(RTReqPoolCallExV);
1155
1156
1157RTDECL(int) RTReqPoolCallWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1158{
1159 PRTREQINT pReq;
1160 va_list va;
1161 va_start(va, cArgs);
1162 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_IPRT_STATUS,
1163 pfnFunction, cArgs, va);
1164 va_end(va);
1165 if (RT_SUCCESS(rc))
1166 rc = pReq->iStatusX;
1167 RTReqRelease(pReq);
1168 return rc;
1169}
1170RT_EXPORT_SYMBOL(RTReqPoolCallWait);
1171
1172
1173RTDECL(int) RTReqPoolCallNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1174{
1175 va_list va;
1176 va_start(va, cArgs);
1177 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_IPRT_STATUS | RTREQFLAGS_NO_WAIT,
1178 pfnFunction, cArgs, va);
1179 va_end(va);
1180 return rc;
1181}
1182RT_EXPORT_SYMBOL(RTReqPoolCallNoWait);
1183
1184
1185RTDECL(int) RTReqPoolCallVoidWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1186{
1187 PRTREQINT pReq;
1188 va_list va;
1189 va_start(va, cArgs);
1190 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_VOID,
1191 pfnFunction, cArgs, va);
1192 va_end(va);
1193 if (RT_SUCCESS(rc))
1194 rc = pReq->iStatusX;
1195 RTReqRelease(pReq);
1196 return rc;
1197}
1198RT_EXPORT_SYMBOL(RTReqPoolCallVoidWait);
1199
1200
1201RTDECL(int) RTReqPoolCallVoidNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1202{
1203 va_list va;
1204 va_start(va, cArgs);
1205 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_VOID | RTREQFLAGS_NO_WAIT,
1206 pfnFunction, cArgs, va);
1207 va_end(va);
1208 return rc;
1209}
1210RT_EXPORT_SYMBOL(RTReqPoolCallVoidNoWait);
1211
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette