VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqqueue.cpp@ 44417

Last change on this file since 44417 was 39550, checked in by vboxsync, 13 years ago

Request thread pool hancking. Some RTReq refactoring as always...

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 12.6 KB
Line 
1/* $Id: reqqueue.cpp 39550 2011-12-07 20:28:23Z vboxsync $ */
2/** @file
3 * IPRT - Request Queue.
4 */
5
6/*
7 * Copyright (C) 2006-2011 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/string.h>
37#include <iprt/time.h>
38#include <iprt/semaphore.h>
39#include <iprt/thread.h>
40#include <iprt/log.h>
41#include <iprt/mem.h>
42
43#include "internal/req.h"
44#include "internal/magics.h"
45
46
47
48RTDECL(int) RTReqQueueCreate(RTREQQUEUE *phQueue)
49{
50 PRTREQQUEUEINT pQueue = (PRTREQQUEUEINT)RTMemAllocZ(sizeof(RTREQQUEUEINT));
51 if (!pQueue)
52 return VERR_NO_MEMORY;
53 int rc = RTSemEventCreate(&pQueue->EventSem);
54 if (RT_SUCCESS(rc))
55 {
56 pQueue->u32Magic = RTREQQUEUE_MAGIC;
57
58 *phQueue = pQueue;
59 return VINF_SUCCESS;
60 }
61
62 RTMemFree(pQueue);
63 return rc;
64}
65RT_EXPORT_SYMBOL(RTReqQueueCreate);
66
67
68RTDECL(int) RTReqQueueDestroy(RTREQQUEUE hQueue)
69{
70 /*
71 * Check input.
72 */
73 if (hQueue == NIL_RTREQQUEUE)
74 return VINF_SUCCESS;
75 PRTREQQUEUEINT pQueue = hQueue;
76 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
77 AssertReturn(ASMAtomicCmpXchgU32(&pQueue->u32Magic, RTREQQUEUE_MAGIC_DEAD, RTREQQUEUE_MAGIC), VERR_INVALID_HANDLE);
78
79 RTSemEventDestroy(pQueue->EventSem);
80 pQueue->EventSem = NIL_RTSEMEVENT;
81
82 for (unsigned i = 0; i < RT_ELEMENTS(pQueue->apReqFree); i++)
83 {
84 PRTREQ pReq = (PRTREQ)ASMAtomicXchgPtr((void **)&pQueue->apReqFree[i], NULL);
85 while (pReq)
86 {
87 PRTREQ pNext = pReq->pNext;
88 rtReqFreeIt(pReq);
89 pReq = pNext;
90 }
91 }
92
93 RTMemFree(pQueue);
94 return VINF_SUCCESS;
95}
96RT_EXPORT_SYMBOL(RTReqQueueDestroy);
97
98
99RTDECL(int) RTReqQueueProcess(RTREQQUEUE hQueue, RTMSINTERVAL cMillies)
100{
101 LogFlow(("RTReqProcess %x\n", hQueue));
102
103 /*
104 * Check input.
105 */
106 PRTREQQUEUEINT pQueue = hQueue;
107 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
108 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
109
110 /*
111 * Process loop.
112 *
113 * We do not repeat the outer loop if we've got an informational status code
114 * since that code needs processing by our caller.
115 */
116 int rc = VINF_SUCCESS;
117 while (rc <= VINF_SUCCESS)
118 {
119 /*
120 * Get pending requests.
121 */
122 PRTREQ pReqs = ASMAtomicXchgPtrT(&pQueue->pReqs, NULL, PRTREQ);
123 if (!pReqs)
124 {
125 ASMAtomicWriteBool(&pQueue->fBusy, false); /* this aint 100% perfect, but it's good enough for now... */
126 /** @todo We currently don't care if the entire time wasted here is larger than
127 * cMillies */
128 rc = RTSemEventWait(pQueue->EventSem, cMillies);
129 if (rc != VINF_SUCCESS)
130 break;
131 continue;
132 }
133 ASMAtomicWriteBool(&pQueue->fBusy, true);
134
135 /*
136 * Reverse the list to process it in FIFO order.
137 */
138 PRTREQ pReq = pReqs;
139 if (pReq->pNext)
140 Log2(("RTReqProcess: 2+ requests: %p %p %p\n", pReq, pReq->pNext, pReq->pNext->pNext));
141 pReqs = NULL;
142 while (pReq)
143 {
144 Assert(pReq->enmState == RTREQSTATE_QUEUED);
145 Assert(pReq->uOwner.hQueue == pQueue);
146 PRTREQ pCur = pReq;
147 pReq = pReq->pNext;
148 pCur->pNext = pReqs;
149 pReqs = pCur;
150 }
151
152
153 /*
154 * Process the requests.
155 */
156 while (pReqs)
157 {
158 /* Unchain the first request and advance the list. */
159 pReq = pReqs;
160 pReqs = pReqs->pNext;
161 pReq->pNext = NULL;
162
163 /* Process the request */
164 rc = rtReqProcessOne(pReq);
165 AssertRC(rc);
166 if (rc != VINF_SUCCESS)
167 break; /** @todo r=bird: we're dropping requests here! Add 2nd queue that can hold them. (will fix when writing a testcase) */
168 }
169 }
170
171 LogFlow(("RTReqProcess: returns %Rrc\n", rc));
172 return rc;
173}
174RT_EXPORT_SYMBOL(RTReqQueueProcess);
175
176
177RTDECL(int) RTReqQueueCall(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, PFNRT pfnFunction, unsigned cArgs, ...)
178{
179 va_list va;
180 va_start(va, cArgs);
181 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, RTREQFLAGS_IPRT_STATUS, pfnFunction, cArgs, va);
182 va_end(va);
183 return rc;
184}
185RT_EXPORT_SYMBOL(RTReqQueueCall);
186
187
188RTDECL(int) RTReqQueueCallVoid(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, PFNRT pfnFunction, unsigned cArgs, ...)
189{
190 va_list va;
191 va_start(va, cArgs);
192 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, RTREQFLAGS_VOID, pfnFunction, cArgs, va);
193 va_end(va);
194 return rc;
195}
196RT_EXPORT_SYMBOL(RTReqQueueCallVoid);
197
198
199RTDECL(int) RTReqQueueCallEx(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, unsigned fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
200{
201 va_list va;
202 va_start(va, cArgs);
203 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, fFlags, pfnFunction, cArgs, va);
204 va_end(va);
205 return rc;
206}
207RT_EXPORT_SYMBOL(RTReqQueueCallEx);
208
209
210RTDECL(int) RTReqQueueCallV(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, unsigned fFlags, PFNRT pfnFunction, unsigned cArgs, va_list Args)
211{
212 LogFlow(("RTReqCallV: cMillies=%d fFlags=%#x pfnFunction=%p cArgs=%d\n", cMillies, fFlags, pfnFunction, cArgs));
213
214 /*
215 * Check input.
216 */
217 PRTREQQUEUEINT pQueue = hQueue;
218 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
219 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
220 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
221 AssertReturn(!(fFlags & ~(RTREQFLAGS_RETURN_MASK | RTREQFLAGS_NO_WAIT)), VERR_INVALID_PARAMETER);
222
223 if (!(fFlags & RTREQFLAGS_NO_WAIT) || ppReq)
224 {
225 AssertPtrReturn(ppReq, VERR_INVALID_POINTER);
226 *ppReq = NULL;
227 }
228
229 PRTREQ pReq = NULL;
230 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
231
232 /*
233 * Allocate request
234 */
235 int rc = RTReqQueueAlloc(pQueue, RTREQTYPE_INTERNAL, &pReq);
236 if (rc != VINF_SUCCESS)
237 return rc;
238
239 /*
240 * Initialize the request data.
241 */
242 pReq->fFlags = fFlags;
243 pReq->u.Internal.pfn = pfnFunction;
244 pReq->u.Internal.cArgs = cArgs;
245 for (unsigned iArg = 0; iArg < cArgs; iArg++)
246 pReq->u.Internal.aArgs[iArg] = va_arg(Args, uintptr_t);
247
248 /*
249 * Queue the request and return.
250 */
251 rc = RTReqSubmit(pReq, cMillies);
252 if ( rc != VINF_SUCCESS
253 && rc != VERR_TIMEOUT)
254 {
255 RTReqRelease(pReq);
256 pReq = NULL;
257 }
258 if (!(fFlags & RTREQFLAGS_NO_WAIT))
259 {
260 *ppReq = pReq;
261 LogFlow(("RTReqCallV: returns %Rrc *ppReq=%p\n", rc, pReq));
262 }
263 else
264 LogFlow(("RTReqCallV: returns %Rrc\n", rc));
265 Assert(rc != VERR_INTERRUPTED);
266 return rc;
267}
268RT_EXPORT_SYMBOL(RTReqQueueCallV);
269
270
271RTDECL(bool) RTReqQueueIsBusy(RTREQQUEUE hQueue)
272{
273 PRTREQQUEUEINT pQueue = hQueue;
274 AssertPtrReturn(pQueue, false);
275
276 if (ASMAtomicReadBool(&pQueue->fBusy))
277 return true;
278 if (ASMAtomicReadPtrT(&pQueue->pReqs, PRTREQ) != NULL)
279 return true;
280 if (ASMAtomicReadBool(&pQueue->fBusy))
281 return true;
282 return false;
283}
284RT_EXPORT_SYMBOL(RTReqQueueIsBusy);
285
286
287/**
288 * Joins the list pList with whatever is linked up at *pHead.
289 */
290static void vmr3ReqJoinFreeSub(volatile PRTREQ *ppHead, PRTREQ pList)
291{
292 for (unsigned cIterations = 0;; cIterations++)
293 {
294 PRTREQ pHead = ASMAtomicXchgPtrT(ppHead, pList, PRTREQ);
295 if (!pHead)
296 return;
297 PRTREQ pTail = pHead;
298 while (pTail->pNext)
299 pTail = pTail->pNext;
300 pTail->pNext = pList;
301 if (ASMAtomicCmpXchgPtr(ppHead, pHead, pList))
302 return;
303 pTail->pNext = NULL;
304 if (ASMAtomicCmpXchgPtr(ppHead, pHead, NULL))
305 return;
306 pList = pHead;
307 Assert(cIterations != 32);
308 Assert(cIterations != 64);
309 }
310}
311
312
313/**
314 * Joins the list pList with whatever is linked up at *pHead.
315 */
316static void vmr3ReqJoinFree(PRTREQQUEUEINT pQueue, PRTREQ pList)
317{
318 /*
319 * Split the list if it's too long.
320 */
321 unsigned cReqs = 1;
322 PRTREQ pTail = pList;
323 while (pTail->pNext)
324 {
325 if (cReqs++ > 25)
326 {
327 const uint32_t i = pQueue->iReqFree;
328 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(i + 2) % RT_ELEMENTS(pQueue->apReqFree)], pTail->pNext);
329
330 pTail->pNext = NULL;
331 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(i + 2 + (i == pQueue->iReqFree)) % RT_ELEMENTS(pQueue->apReqFree)], pTail->pNext);
332 return;
333 }
334 pTail = pTail->pNext;
335 }
336 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(pQueue->iReqFree + 2) % RT_ELEMENTS(pQueue->apReqFree)], pList);
337}
338
339
340RTDECL(int) RTReqQueueAlloc(RTREQQUEUE hQueue, RTREQTYPE enmType, PRTREQ *phReq)
341{
342 /*
343 * Validate input.
344 */
345 PRTREQQUEUEINT pQueue = hQueue;
346 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
347 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
348 AssertMsgReturn(enmType > RTREQTYPE_INVALID && enmType < RTREQTYPE_MAX, ("%d\n", enmType), VERR_RT_REQUEST_INVALID_TYPE);
349
350 /*
351 * Try get a recycled packet.
352 *
353 * While this could all be solved with a single list with a lock, it's a sport
354 * of mine to avoid locks.
355 */
356 int cTries = RT_ELEMENTS(pQueue->apReqFree) * 2;
357 while (--cTries >= 0)
358 {
359 PRTREQ volatile *ppHead = &pQueue->apReqFree[ASMAtomicIncU32(&pQueue->iReqFree) % RT_ELEMENTS(pQueue->apReqFree)];
360 PRTREQ pReq = ASMAtomicXchgPtrT(ppHead, NULL, PRTREQ);
361 if (pReq)
362 {
363 PRTREQ pNext = pReq->pNext;
364 if ( pNext
365 && !ASMAtomicCmpXchgPtr(ppHead, pNext, NULL))
366 vmr3ReqJoinFree(pQueue, pReq->pNext);
367 ASMAtomicDecU32(&pQueue->cReqFree);
368
369 Assert(pReq->uOwner.hQueue == pQueue);
370 Assert(!pReq->fPoolOrQueue);
371
372 int rc = rtReqReInit(pReq, enmType);
373 if (RT_SUCCESS(rc))
374 {
375 *phReq = pReq;
376 LogFlow(("RTReqQueueAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
377 return VINF_SUCCESS;
378 }
379 }
380 }
381
382 /*
383 * Ok, allocate a new one.
384 */
385 int rc = rtReqAlloc(enmType, false /*fPoolOrQueue*/, pQueue, phReq);
386 LogFlow(("RTReqQueueAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
387 return rc;
388}
389RT_EXPORT_SYMBOL(RTReqQueueAlloc);
390
391
392/**
393 * Recycles a requst.
394 *
395 * @returns true if recycled, false if it should be freed.
396 * @param pQueue The queue.
397 * @param pReq The request.
398 */
399DECLHIDDEN(bool) rtReqQueueRecycle(PRTREQQUEUEINT pQueue, PRTREQINT pReq)
400{
401 if ( !pQueue
402 || pQueue->cReqFree >= 128)
403 return false;
404
405 ASMAtomicIncU32(&pQueue->cReqFree);
406 PRTREQ volatile *ppHead = &pQueue->apReqFree[ASMAtomicIncU32(&pQueue->iReqFree) % RT_ELEMENTS(pQueue->apReqFree)];
407 PRTREQ pNext;
408 do
409 {
410 pNext = *ppHead;
411 ASMAtomicWritePtr(&pReq->pNext, pNext);
412 } while (!ASMAtomicCmpXchgPtr(ppHead, pReq, pNext));
413
414 return true;
415}
416
417
418/**
419 * Submits a request to the queue.
420 *
421 * @param pQueue The queue.
422 * @param pReq The request.
423 */
424DECLHIDDEN(void) rtReqQueueSubmit(PRTREQQUEUEINT pQueue, PRTREQINT pReq)
425{
426 PRTREQ pNext;
427 do
428 {
429 pNext = pQueue->pReqs;
430 pReq->pNext = pNext;
431 ASMAtomicWriteBool(&pQueue->fBusy, true);
432 } while (!ASMAtomicCmpXchgPtr(&pQueue->pReqs, pReq, pNext));
433
434 /*
435 * Notify queue thread.
436 */
437 RTSemEventSignal(pQueue->EventSem);
438}
439
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette