VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/schedulerbase.py@ 60308

Last change on this file since 60308 was 56802, checked in by vboxsync, 10 years ago

schedulerbase.py: Disable SQL explain, it will otherwise deadlock because of the table locking in recreateQueue. Use the new insertList() method on the database interface to speed up scheduling queue creation (45000 database single row inserts is slow). Disabled some of the debug output.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 62.2 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: schedulerbase.py 56802 2015-07-03 23:14:09Z vboxsync $
3# pylint: disable=C0302
4
5
6"""
7Test Manager - Base class and utilities for the schedulers.
8"""
9
10__copyright__ = \
11"""
12Copyright (C) 2012-2015 Oracle Corporation
13
14This file is part of VirtualBox Open Source Edition (OSE), as
15available from http://www.virtualbox.org. This file is free software;
16you can redistribute it and/or modify it under the terms of the GNU
17General Public License (GPL) as published by the Free Software
18Foundation, in version 2 as it comes in the "COPYING" file of the
19VirtualBox OSE distribution. VirtualBox OSE is distributed in the
20hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
21
22The contents of this file may alternatively be used under the terms
23of the Common Development and Distribution License Version 1.0
24(CDDL) only, as it comes in the "COPYING.CDDL" file of the
25VirtualBox OSE distribution, in which case the provisions of the
26CDDL are applicable instead of those of the GPL.
27
28You may elect to license modified versions of this file under the
29terms and conditions of either the GPL or the CDDL or both.
30"""
31__version__ = "$Revision: 56802 $"
32
33
34# Standard python imports.
35import unittest;
36
37# Validation Kit imports.
38from common import utils, constants;
39from testmanager import config;
40from testmanager.core.build import BuildDataEx, BuildLogic;
41from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, TMExceptionBase;
42from testmanager.core.buildsource import BuildSourceData, BuildSourceLogic;
43from testmanager.core.globalresource import GlobalResourceLogic;
44from testmanager.core.schedgroup import SchedGroupData, SchedGroupLogic;
45from testmanager.core.systemlog import SystemLogData, SystemLogLogic;
46from testmanager.core.testbox import TestBoxData;
47from testmanager.core.testboxstatus import TestBoxStatusData, TestBoxStatusLogic;
48from testmanager.core.testcase import TestCaseLogic;
49from testmanager.core.testcaseargs import TestCaseArgsDataEx, TestCaseArgsLogic;
50from testmanager.core.testset import TestSetData, TestSetLogic;
51
52
53class ReCreateQueueData(object):
54 """
55 Data object for recreating a scheduling queue.
56
57 It's mostly a storage object, but has a few data checking operation
58 associated with it.
59 """
60
61 def __init__(self, oDb, idSchedGroup):
62 #
63 # Load data from the database.
64 #
65
66 # Will extend the entries with aoTestCases and dTestCases members
67 # further down. checkForGroupDepCycles will add aidTestGroupPreReqs.
68 self.aoTestGroups = SchedGroupLogic(oDb).getMembers(idSchedGroup);
69
70 # aoTestCases entries are TestCaseData instance with iSchedPriority
71 # and idTestGroup added for our purposes.
72 # We will add oTestGroup and aoArgsVariations members to each further down.
73 self.aoTestCases = SchedGroupLogic(oDb).getTestCasesForGroup(idSchedGroup, cMax = 4096);
74
75 # Load dependencies.
76 oTestCaseLogic = TestCaseLogic(oDb)
77 for oTestCase in self.aoTestCases:
78 oTestCase.aidPreReqs = oTestCaseLogic.getTestCasePreReqIds(oTestCase.idTestCase, cMax = 4096);
79
80 # aoTestCases entries are TestCaseArgsData instance with iSchedPriority
81 # and idTestGroup added for our purposes.
82 # We will add oTestGroup and oTestCase members to each further down.
83 self.aoArgsVariations = SchedGroupLogic(oDb).getTestCaseArgsForGroup(idSchedGroup, cMax = 65536);
84
85 #
86 # Generate global lookups.
87 #
88
89 # Generate a testcase lookup dictionary for use when working on
90 # argument variations.
91 self.dTestCases = dict();
92 for oTestCase in self.aoTestCases:
93 self.dTestCases[oTestCase.idTestCase] = oTestCase;
94 assert len(self.dTestCases) <= len(self.aoTestCases); # Note! Can be shorter!
95
96 # Generate a testgroup lookup dictionary.
97 self.dTestGroups = dict();
98 for oTestGroup in self.aoTestGroups:
99 self.dTestGroups[oTestGroup.idTestGroup] = oTestGroup;
100 assert len(self.dTestGroups) == len(self.aoTestGroups);
101
102 #
103 # Associate extra members with the base data.
104 #
105 if len(self.aoTestGroups) > 0:
106 # Prep the test groups.
107 for oTestGroup in self.aoTestGroups:
108 oTestGroup.aoTestCases = list();
109 oTestGroup.dTestCases = dict();
110
111 # Link testcases to their group, both directions. Prep testcases for
112 # argument varation association.
113 oTestGroup = self.aoTestGroups[0];
114 for oTestCase in self.aoTestCases:
115 if oTestGroup.idTestGroup != oTestCase.idTestGroup:
116 oTestGroup = self.dTestGroups[oTestCase.idTestGroup];
117
118 assert oTestCase.idTestCase not in oTestGroup.dTestCases;
119 oTestGroup.dTestCases[oTestCase.idTestCase] = oTestCase;
120 oTestGroup.aoTestCases.append(oTestCase);
121 oTestCase.oTestGroup = oTestGroup;
122 oTestCase.aoArgsVariations = list();
123
124 # Associate testcase argument variations with their testcases (group)
125 # in both directions.
126 oTestGroup = self.aoTestGroups[0];
127 oTestCase = self.aoTestCases[0] if len(self.aoTestCases) > 0 else None;
128 for oArgVariation in self.aoArgsVariations:
129 if oTestGroup.idTestGroup != oArgVariation.idTestGroup:
130 oTestGroup = self.dTestGroups[oArgVariation.idTestGroup];
131 if oTestCase.idTestCase != oArgVariation.idTestCase or oTestCase.idTestGroup != oArgVariation.idTestGroup:
132 oTestCase = oTestGroup.dTestCases[oArgVariation.idTestCase];
133
134 oTestCase.aoArgsVariations.append(oArgVariation);
135 oArgVariation.oTestCase = oTestCase;
136 oArgVariation.oTestGroup = oTestGroup;
137
138 else:
139 assert len(self.aoTestCases) == 0;
140 assert len(self.aoArgsVariations) == 0;
141 # done.
142
143 @staticmethod
144 def _addPreReqError(aoErrors, aidChain, oObj, sMsg):
145 """ Returns a chain of IDs error entry. """
146
147 sMsg += ' Dependency chain: %s' % (aidChain[0],);
148 for i in range(1, len(aidChain)):
149 sMsg += ' -> %s' % (aidChain[i],);
150
151 aoErrors.append([sMsg, oObj]);
152 return aoErrors;
153
154 def checkForGroupDepCycles(self):
155 """
156 Checks for testgroup depencency cycles and any missing testgroup
157 dependencies.
158 Returns array of errors (see SchedulderBase.recreateQueue()).
159 """
160 aoErrors = list();
161 for oTestGroup in self.aoTestGroups:
162 idPreReq = oTestGroup.idTestGroupPreReq;
163 if idPreReq is None:
164 oTestGroup.aidTestGroupPreReqs = list();
165 continue;
166
167 aidChain = [oTestGroup.idTestGroup,];
168 while idPreReq is not None:
169 aidChain.append(idPreReq);
170 if len(aidChain) >= 10:
171 self._addPreReqError(aoErrors, aidChain, oTestGroup,
172 'TestGroup #%s prerequisite chain is too long!'
173 % (oTestGroup.idTestGroup,));
174 break;
175
176 oDep = self.dTestGroups.get(idPreReq, None);
177 if oDep is None:
178 self._addPreReqError(aoErrors, aidChain, oTestGroup,
179 'TestGroup #%s prerequisite #%s is not in the scheduling group!'
180 % (oTestGroup.idTestGroup, idPreReq,));
181 break;
182
183 idPreReq = oDep.idTestGroupPreReq;
184 oTestGroup.aidTestGroupPreReqs = aidChain[1:];
185
186 return aoErrors;
187
188
189 def checkForMissingTestCaseDeps(self):
190 """
191 Checks that testcase dependencies stays within bounds. We do not allow
192 dependencies outside a testgroup, no dependency cycles or even remotely
193 long dependency chains.
194
195 Returns array of errors (see SchedulderBase.recreateQueue()).
196 """
197 aoErrors = list();
198 for oTestGroup in self.aoTestGroups:
199 for oTestCase in oTestGroup.aoTestCases:
200 if len(oTestCase.aidPreReqs) == 0:
201 continue;
202
203 # Stupid recursion code using special stack(s).
204 aiIndexes = [(oTestCase, 0), ];
205 aidChain = [oTestCase.idTestGroup,];
206 while len(aiIndexes) > 0:
207 (oCur, i) = aiIndexes[-1];
208 if i >= len(oCur.aidPreReqs):
209 aiIndexes.pop();
210 aidChain.pop();
211 else:
212 aiIndexes[-1][1] = i + 1; # whatever happens, we'll advance on the current level.
213
214 idPreReq = oTestCase.aidPreReqs[i];
215 oDep = oTestGroup.dTestCases.get(idPreReq, None);
216 if oDep is None:
217 self._addPreReqError(aoErrors, aidChain, oTestCase,
218 'TestCase #%s prerequisite #%s is not in the scheduling group!'
219 % (oTestCase.idTestCase, idPreReq));
220 elif idPreReq in aidChain:
221 self._addPreReqError(aoErrors, aidChain, oTestCase,
222 'TestCase #%s prerequisite #%s creates a cycle!'
223 % (oTestCase.idTestCase, idPreReq));
224 elif len(oDep.aiPreReqs) == 0:
225 pass;
226 elif len(aidChain) >= 10:
227 self._addPreReqError(aoErrors, aidChain, oTestCase,
228 'TestCase #%s prerequisite chain is too long!' % (oTestCase.idTestCase,));
229 else:
230 aiIndexes.append((oDep, 0));
231 aidChain.append(idPreReq);
232
233 return aoErrors;
234
235 def deepTestGroupSort(self):
236 """
237 Sorts the testgroups and their testcases by priority and dependencies.
238 Note! Don't call this before checking for dependency cycles!
239 """
240 if len(self.aoTestGroups) == 0:
241 return;
242
243 #
244 # ASSUMES groups as well as testcases are sorted by priority by the
245 # database. So we only have to concern ourselves with the dependency
246 # sorting.
247 #
248 iGrpPrio = self.aoTestGroups[0].iSchedPriority;
249 for oTestGroup in self.aoTestGroups:
250 if oTestGroup.iSchedPriority > iGrpPrio:
251 raise TMExceptionBase('Incorrectly sorted testgroups returned by database.');
252 iGrpPrio = oTestGroup.iSchedPriority;
253
254 if len(oTestGroup.aoTestCases) > 0:
255 iTstPrio = oTestGroup.aoTestCases[0];
256 for oTestCase in oTestGroup.aoTestCases:
257 if oTestCase.iSchedPriority > iTstPrio:
258 raise TMExceptionBase('Incorrectly sorted testcases returned by database.');
259
260 #
261 # Sort the testgroups by dependencies.
262 #
263 i = 0;
264 while i < len(self.aoTestGroups):
265 oTestGroup = self.aoTestGroups[i];
266 if oTestGroup.idTestGroupPreReq is not None:
267 iPreReq = self.aoTestGroups.index(self.dTestGroups[oTestGroup.idTestGroupPreReq]);
268 if iPreReq > i:
269 # The prerequisite is after the current entry. Move the
270 # current entry so that it's following it's prereq entry.
271 self.aoTestGroups.insert(iPreReq + 1, oTestGroup);
272 self.aoTestGroups.pop(i);
273 continue;
274 assert iPreReq < i;
275 i += 1; # Advance.
276
277 #
278 # Sort the testcases by dependencies.
279 # Same algorithm as above, just more prerequisites.
280 #
281 for oTestGroup in self.aoTestGroups:
282 i = 0;
283 while i < len(oTestGroup.aoTestCases):
284 oTestCase = oTestGroup.aoTestCases[i];
285 if len(oTestCase.aidPreReqs) > 0:
286 for idPreReq in oTestCase.aidPreReqs:
287 iPreReq = oTestGroup.aoTestCases.index(oTestGroup.dTestCases[idPreReq]);
288 if iPreReq > i:
289 # The prerequisite is after the current entry. Move the
290 # current entry so that it's following it's prereq entry.
291 oTestGroup.aoTestGroups.insert(iPreReq + 1, oTestCase);
292 oTestGroup.aoTestGroups.pop(i);
293 i -= 1; # Don't advance.
294 break;
295 assert iPreReq < i;
296 i += 1; # Advance.
297
298
299 return True;
300
301
302
303class SchedQueueData(ModelDataBase):
304 """
305 Scheduling queue data item.
306 """
307
308 ksIdAttr = 'idSchedGroup';
309
310 ksParam_idSchedGroup = 'SchedQueueData_idSchedGroup';
311 ksParam_idItem = 'SchedQueueData_idItem';
312 ksParam_offQueue = 'SchedQueueData_offQueue';
313 ksParam_idGenTestCaseArgs = 'SchedQueueData_idGenTestCaseArgs';
314 ksParam_idTestGroup = 'SchedQueueData_idTestGroup';
315 ksParam_aidTestGroupPreReqs = 'SchedQueueData_aidTestGroupPreReqs';
316 ksParam_bmHourlySchedule = 'SchedQueueData_bmHourlySchedule';
317 ksParam_tsConfig = 'SchedQueueData_tsConfig';
318 ksParam_tsLastScheduled = 'SchedQueueData_tsLastScheduled';
319 ksParam_idTestSetGangLeader = 'SchedQueueData_idTestSetGangLeader';
320 ksParam_cMissingGangMembers = 'SchedQueueData_cMissingGangMembers';
321
322 kasAllowNullAttributes = [ 'idItem', 'offQueue', 'aidTestGroupPreReqs', 'bmHourlySchedule', 'idTestSetGangLeader',
323 'tsConfig', 'tsLastScheduled' ];
324
325
326 def __init__(self):
327 ModelDataBase.__init__(self);
328
329 #
330 # Initialize with defaults.
331 # See the database for explanations of each of these fields.
332 #
333 self.idSchedGroup = None;
334 self.idItem = None;
335 self.offQueue = None;
336 self.idGenTestCaseArgs = None;
337 self.idTestGroup = None;
338 self.aidTestGroupPreReqs = None;
339 self.bmHourlySchedule = None;
340 self.tsConfig = None;
341 self.tsLastScheduled = None;
342 self.idTestSetGangLeader = None;
343 self.cMissingGangMembers = 1;
344
345 def initFromValues(self, idSchedGroup, idGenTestCaseArgs, idTestGroup, aidTestGroupPreReqs, # pylint: disable=R0913
346 bmHourlySchedule, cMissingGangMembers,
347 idItem = None, offQueue = None, tsConfig = None, tsLastScheduled = None, idTestSetGangLeader = None):
348 """
349 Reinitialize with all attributes potentially given as inputs.
350 Return self.
351 """
352 self.idSchedGroup = idSchedGroup;
353 self.idItem = idItem;
354 self.offQueue = offQueue;
355 self.idGenTestCaseArgs = idGenTestCaseArgs;
356 self.idTestGroup = idTestGroup;
357 self.aidTestGroupPreReqs = aidTestGroupPreReqs;
358 self.bmHourlySchedule = bmHourlySchedule;
359 self.tsConfig = tsConfig;
360 self.tsLastScheduled = tsLastScheduled;
361 self.idTestSetGangLeader = idTestSetGangLeader;
362 self.cMissingGangMembers = cMissingGangMembers;
363 return self;
364
365 def initFromDbRow(self, aoRow):
366 """
367 Initialize from database row (SELECT * FROM SchedQueues).
368 Returns self.
369 Raises exception if no row is specfied.
370 """
371 if aoRow is None:
372 raise TMExceptionBase('SchedQueueData not found.');
373
374 self.idSchedGroup = aoRow[0];
375 self.idItem = aoRow[1];
376 self.offQueue = aoRow[2];
377 self.idGenTestCaseArgs = aoRow[3];
378 self.idTestGroup = aoRow[4];
379 self.aidTestGroupPreReqs = aoRow[5];
380 self.bmHourlySchedule = aoRow[6];
381 self.tsConfig = aoRow[7];
382 self.tsLastScheduled = aoRow[8];
383 self.idTestSetGangLeader = aoRow[9];
384 self.cMissingGangMembers = aoRow[10];
385 return self;
386
387
388
389
390
391
392class SchedulerBase(object):
393 """
394 The scheduler base class.
395
396 The scheduler classes have two functions:
397 1. Recreate the scheduling queue.
398 2. Pick the next task from the queue.
399
400 The first is scheduler specific, the latter isn't.
401 """
402
403 class BuildCache(object):
404 """ Build cache. """
405
406 class BuildCacheIterator(object):
407 """ Build class iterator. """
408 def __init__(self, oCache):
409 self.oCache = oCache;
410 self.iCur = 0;
411
412 def __iter__(self):
413 """Returns self, required by the language."""
414 return self;
415
416 def next(self):
417 """Returns the next build, raises StopIteration when the end has been reached."""
418 while True:
419 if self.iCur >= len(self.oCache.aoEntries):
420 oEntry = self.oCache.fetchFromCursor();
421 if oEntry is None:
422 raise StopIteration;
423 else:
424 oEntry = self.oCache.aoEntries[self.iCur];
425 self.iCur += 1;
426 if not oEntry.fRemoved:
427 return oEntry;
428 # end
429
430 class BuildCacheEntry(object):
431 """ Build cache entry. """
432
433 def __init__(self, oBuild, fMaybeBlacklisted):
434 self.oBuild = oBuild;
435 self._fBlacklisted = None if fMaybeBlacklisted is True else False;
436 self.fRemoved = False;
437 self._dPreReqDecisions = dict();
438
439 def remove(self):
440 """
441 Marks the cache entry as removed.
442 This doesn't actually remove it from the cache array, only marks
443 it as removed. It has no effect on open iterators.
444 """
445 self.fRemoved = True;
446
447 def getPreReqDecision(self, sPreReqSet):
448 """
449 Retrieves a cached prerequisite decision.
450 Returns boolean if found, None if not.
451 """
452 return self._dPreReqDecisions.get(sPreReqSet);
453
454 def setPreReqDecision(self, sPreReqSet, fDecision):
455 """
456 Caches a prerequistie decision.
457 """
458 self._dPreReqDecisions[sPreReqSet] = fDecision;
459 return fDecision;
460
461 def isBlacklisted(self, oDb):
462 """ Checks if the build is blacklisted. """
463 if self._fBlacklisted is None:
464 self._fBlacklisted = BuildLogic(oDb).isBuildBlacklisted(self.oBuild);
465 return self._fBlacklisted;
466
467
468 def __init__(self):
469 self.aoEntries = [];
470 self.oCursor = None;
471
472 def setupSource(self, oDb, idBuildSrc, sOs, sCpuArch, tsNow):
473 """ Configures the build cursor for the cache. """
474 if len(self.aoEntries) == 0 and self.oCursor is None:
475 oBuildSource = BuildSourceData().initFromDbWithId(oDb, idBuildSrc, tsNow);
476 self.oCursor = BuildSourceLogic(oDb).openBuildCursor(oBuildSource, sOs, sCpuArch, tsNow);
477 return True;
478
479 def __iter__(self):
480 """Return an iterator."""
481 return self.BuildCacheIterator(self);
482
483 def fetchFromCursor(self):
484 """ Fetches a build from the cursor and adds it to the cache."""
485 if self.oCursor is None:
486 return None;
487
488 try:
489 aoRow = self.oCursor.fetchOne();
490 except:
491 return None;
492 if aoRow is None:
493 return None;
494
495 oBuild = BuildDataEx().initFromDbRow(aoRow);
496 oEntry = self.BuildCacheEntry(oBuild, aoRow[-1]);
497 self.aoEntries.append(oEntry);
498 return oEntry;
499
500 def __init__(self, oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
501 self._oDb = oDb;
502 self._oSchedGrpData = oSchedGrpData;
503 self._iVerbosity = iVerbosity;
504 self._asMessages = [];
505 self._tsSecStart = tsSecStart if tsSecStart is not None else utils.timestampSecond();
506 self.oBuildCache = self.BuildCache();
507 self.dTestGroupMembers = dict();
508
509 @staticmethod
510 def _instantiate(oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
511 """
512 Instantiate the scheduler specified by the scheduling group.
513 Returns scheduler child class instance. May raise exception if
514 the input is invalid.
515 """
516 if oSchedGrpData.enmScheduler == SchedGroupData.ksScheduler_BestEffortContinousItegration:
517 from testmanager.core.schedulerbeci import SchdulerBeci;
518 oScheduler = SchdulerBeci(oDb, oSchedGrpData, iVerbosity, tsSecStart);
519 else:
520 raise oDb.integrityException('Invalid scheduler "%s", idSchedGroup=%d' \
521 % (oSchedGrpData.enmScheduler, oSchedGrpData.idSchedGroup));
522 return oScheduler;
523
524
525 #
526 # Misc.
527 #
528
529 def msgDebug(self, sText):
530 """Debug printing."""
531 if self._iVerbosity > 1:
532 self._asMessages.append('debug:' + sText);
533 return None;
534
535 def msgInfo(self, sText):
536 """Info printing."""
537 if self._iVerbosity > 1:
538 self._asMessages.append('info: ' + sText);
539 return None;
540
541 def dprint(self, sMsg):
542 """Prints a debug message to the srv glue log (see config.py). """
543 if config.g_kfSrvGlueDebugScheduler:
544 self._oDb.dprint(sMsg);
545 return None;
546
547 def getElapsedSecs(self):
548 """ Returns the number of seconds this scheduling task has been running. """
549 tsSecNow = utils.timestampSecond();
550 if tsSecNow < self._tsSecStart: # paranoia
551 self._tsSecStart = tsSecNow;
552 return tsSecNow - self._tsSecStart;
553
554
555 #
556 # Create schedule.
557 #
558
559 def _recreateQueueCancelGatherings(self):
560 """
561 Cancels all pending gang gatherings on the current queue.
562 """
563 self._oDb.execute('SELECT idTestSetGangLeader\n'
564 'FROM SchedQueues\n'
565 'WHERE idSchedGroup = %s\n'
566 ' AND idTestSetGangLeader is not NULL\n'
567 , (self._oSchedGrpData.idSchedGroup,));
568 if self._oDb.getRowCount() > 0:
569 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
570 for aoRow in self._oDb.fetchAll():
571 idTestSetGangLeader = aoRow[0];
572 oTBStatusLogic.updateGangStatus(idTestSetGangLeader,
573 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
574 fCommit = False);
575 return True;
576
577 def _recreateQueueItems(self, oData):
578 """
579 Returns an array of queue items (SchedQueueData).
580 Child classes must override this.
581 """
582 _ = oData;
583 return [];
584
585 def recreateQueueWorker(self):
586 """
587 Worker for recreateQueue.
588 """
589
590 #
591 # Collect the necessary data and validate it.
592 #
593 oData = ReCreateQueueData(self._oDb, self._oSchedGrpData.idSchedGroup);
594 aoErrors = oData.checkForGroupDepCycles();
595 aoErrors.extend(oData.checkForMissingTestCaseDeps());
596 if len(aoErrors) == 0:
597 oData.deepTestGroupSort();
598
599 #
600 # The creation of the scheduling queue is done by the child class.
601 #
602 # We will try guess where in queue we're currently at and rotate
603 # the items such that we will resume execution in the approximately
604 # same position. The goal of the scheduler is to provide a 100%
605 # deterministic result so that if we regenerate the queue when there
606 # are no changes to the testcases, testgroups or scheduling groups
607 # involved, test execution will be unchanged (save for maybe just a
608 # little for gang gathering).
609 #
610 aoItems = list();
611 if len(oData.aoArgsVariations) > 0:
612 aoItems = self._recreateQueueItems(oData);
613 self.msgDebug('len(aoItems)=%s' % (len(aoItems),));
614 #for i in range(len(aoItems)):
615 # self.msgDebug('aoItems[%2d]=%s' % (i, aoItems[i]));
616 if len(aoItems) > 0:
617 self._oDb.execute('SELECT offQueue FROM SchedQueues WHERE idSchedGroup = %s ORDER BY idItem LIMIT 1'
618 , (self._oSchedGrpData.idSchedGroup,));
619 if self._oDb.getRowCount() > 0:
620 offQueue = self._oDb.fetchOne()[0];
621 self._oDb.execute('SELECT COUNT(*) FROM SchedQueues WHERE idSchedGroup = %s'
622 , (self._oSchedGrpData.idSchedGroup,));
623 cItems = self._oDb.fetchOne()[0];
624 offQueueNew = (offQueue * cItems) / len(aoItems);
625 if offQueueNew != 0:
626 aoItems = aoItems[offQueueNew:] + aoItems[:offQueueNew];
627
628 #
629 # Replace the scheduling queue.
630 # Care need to be take to first timeout/abort any gangs in the
631 # gathering state since these use the queue to set up the date.
632 #
633 self._recreateQueueCancelGatherings();
634 self._oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup = %s\n', (self._oSchedGrpData.idSchedGroup,));
635 self._oDb.insertList('INSERT INTO SchedQueues (\n'
636 ' idSchedGroup,\n'
637 ' offQueue,\n'
638 ' idGenTestCaseArgs,\n'
639 ' idTestGroup,\n'
640 ' aidTestGroupPreReqs,\n'
641 ' bmHourlySchedule,\n'
642 ' cMissingGangMembers )\n',
643 aoItems, self._formatItemForInsert);
644 return (aoErrors, self._asMessages);
645
646 def _formatItemForInsert(self, oItem):
647 """
648 Used by recreateQueueWorker together with TMDatabaseConnect::insertList
649 """
650 return self._oDb.formatBindArgs('(%s,%s,%s,%s,%s,%s,%s)'
651 , ( oItem.idSchedGroup,
652 oItem.offQueue,
653 oItem.idGenTestCaseArgs,
654 oItem.idTestGroup,
655 oItem.aidTestGroupPreReqs if len(oItem.aidTestGroupPreReqs) > 0 else None,
656 oItem.bmHourlySchedule,
657 oItem.cMissingGangMembers
658 ));
659
660 @staticmethod
661 def recreateQueue(oDb, uidAuthor, idSchedGroup, iVerbosity = 1):
662 """
663 (Re-)creates the scheduling queue for the given group.
664
665 Returns (asMessages, asMessages). On success the array with the error
666 will be empty, on failure it will contain (sError, oRelatedObject)
667 entries. The messages is for debugging and are simple strings.
668
669 Raises exception database error.
670 """
671
672 aoExtraMsgs = [];
673 if oDb.debugIsExplainEnabled():
674 aoExtraMsgs += ['Warning! Disabling SQL explain to avoid deadlocking against locked tables.'];
675 oDb.debugDisableExplain();
676
677 aoErrors = [];
678 asMessages = [];
679 try:
680 #
681 # To avoid concurrency issues (SchedQueues) and inconsistent data (*),
682 # we lock quite a few tables while doing this work. We access more
683 # data than scheduleNewTask so we lock some additional tables.
684 #
685 oDb.rollback();
686 oDb.begin();
687 oDb.execute('LOCK TABLE SchedGroups, SchedGroupMembers, TestGroups, TestGroupMembers IN SHARE MODE');
688 oDb.execute('LOCK TABLE TestBoxes, TestCaseArgs, TestCases IN SHARE MODE');
689 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
690
691 #
692 # Instantiate the scheduler and call the worker function.
693 #
694 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, idSchedGroup);
695 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
696
697 (aoErrors, asMessages) = oScheduler.recreateQueueWorker();
698 if len(aoErrors) == 0:
699 SystemLogLogic(oDb).addEntry(SystemLogData.ksEvent_SchedQueueRecreate,
700 'User #%d recreated sched queue #%d.' % (uidAuthor, idSchedGroup,));
701 oDb.commit();
702 else:
703 oDb.rollback();
704
705 except:
706 oDb.rollback();
707 raise;
708
709 return (aoErrors, aoExtraMsgs + asMessages);
710
711
712
713 #
714 # Schedule Task.
715 #
716
717 def _composeGangArguments(self, idTestSet):
718 """
719 Composes the gang specific testdriver arguments.
720 Returns command line string, including a leading space.
721 """
722
723 oTestSet = TestSetData().initFromDbWithId(self._oDb, idTestSet);
724 aoGangMembers = TestSetLogic(self._oDb).getGang(oTestSet.idTestSetGangLeader);
725
726 sArgs = ' --gang-member-no %s --gang-members %s' % (oTestSet.iGangMemberNo, len(aoGangMembers));
727 for i in range(len(aoGangMembers)):
728 sArgs = ' --gang-ipv4-%s %s' % (i, aoGangMembers[i].ip); ## @todo IPv6
729
730 return sArgs;
731
732
733 def composeExecResponseWorker(self, idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl):
734 """
735 Given all the bits of data, compose an EXEC command response to the testbox.
736 """
737 sScriptZips = oTestEx.oTestCase.sValidationKitZips;
738 if sScriptZips is None or sScriptZips.find('@VALIDATIONKIT_ZIP@') >= 0:
739 assert oValidationKitBuild;
740 if sScriptZips is None:
741 sScriptZips = oValidationKitBuild.sBinaries;
742 else:
743 sScriptZips = sScriptZips.replace('@VALIDATIONKIT_ZIP@', oValidationKitBuild.sBinaries);
744 sScriptZips = sScriptZips.replace('@DOWNLOAD_BASE_URL@', sBaseUrl + config.g_ksTmDownloadBaseUrlRel);
745
746 sCmdLine = oTestEx.oTestCase.sBaseCmd + ' ' + oTestEx.sArgs;
747 sCmdLine = sCmdLine.replace('@BUILD_BINARIES@', oBuild.sBinaries);
748 sCmdLine = sCmdLine.strip();
749 if oTestEx.cGangMembers > 1:
750 sCmdLine += ' ' + self._composeGangArguments(idTestSet);
751
752 cSecTimeout = oTestEx.cSecTimeout if oTestEx.cSecTimeout is not None else oTestEx.oTestCase.cSecTimeout;
753 cSecTimeout = cSecTimeout * oTestBox.pctScaleTimeout / 100;
754
755 dResponse = \
756 {
757 constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_EXEC,
758 constants.tbresp.EXEC_PARAM_RESULT_ID: idTestSet,
759 constants.tbresp.EXEC_PARAM_SCRIPT_ZIPS: sScriptZips,
760 constants.tbresp.EXEC_PARAM_SCRIPT_CMD_LINE: sCmdLine,
761 constants.tbresp.EXEC_PARAM_TIMEOUT: cSecTimeout,
762 };
763 return dResponse;
764
765 @staticmethod
766 def composeExecResponse(oDb, idTestSet, sBaseUrl, iVerbosity = 0):
767 """
768 Composes an EXEC response for a gang member (other than the last).
769 Returns a EXEC response or raises an exception (DB/input error).
770 """
771 #
772 # Gather the necessary data.
773 #
774 oTestSet = TestSetData().initFromDbWithId(oDb, idTestSet);
775 oTestBox = TestBoxData().initFromDbWithGenId(oDb, oTestSet.idGenTestBox);
776 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTestSet.idGenTestCaseArgs);
777 oBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuild);
778 oValidationKitBuild = None;
779 if oTestSet.idBuildTestSuite is not None:
780 oValidationKitBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuildTestSuite);
781
782 #
783 # Instantiate the specified scheduler and let it do the rest.
784 #
785 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestBox.idSchedGroup, oTestSet.tsCreated);
786 assert oSchedGrpData.fEnabled is True;
787 assert oSchedGrpData.idBuildSrc is not None;
788 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
789
790 return oScheduler.composeExecResponseWorker(idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl);
791
792
793 def _updateTask(self, oTask, tsNow):
794 """
795 Updates a gang schedule task.
796 """
797 assert oTask.cMissingGangMembers >= 1;
798 assert oTask.idTestSetGangLeader is not None;
799 assert oTask.idTestSetGangLeader >= 1;
800 if tsNow is not None:
801 self._oDb.execute('UPDATE SchedQueues\n'
802 ' SET idTestSetGangLeader = %s,\n'
803 ' cMissingGangMembers = %s,\n'
804 ' tsLastScheduled = %s\n'
805 'WHERE idItem = %s\n'
806 , (oTask.idTestSetGangLeader, oTask.cMissingGangMembers, tsNow, oTask.idItem,) );
807 else:
808 self._oDb.execute('UPDATE SchedQueues\n'
809 ' SET cMissingGangMembers = %s\n'
810 'WHERE idItem = %s\n'
811 , (oTask.cMissingGangMembers, oTask.idItem,) );
812 return True;
813
814 def _moveTaskToEndOfQueue(self, oTask, cGangMembers, tsNow):
815 """
816 The task has been scheduled successfully, reset it's data move it to
817 the end of the queue.
818 """
819 if cGangMembers > 1:
820 self._oDb.execute('UPDATE SchedQueues\n'
821 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
822 ' idTestSetGangLeader = NULL,\n'
823 ' cMissingGangMembers = %s\n'
824 'WHERE idItem = %s\n'
825 , (cGangMembers, oTask.idItem,) );
826 else:
827 self._oDb.execute('UPDATE SchedQueues\n'
828 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
829 ' idTestSetGangLeader = NULL,\n'
830 ' cMissingGangMembers = 1,\n'
831 ' tsLastScheduled = %s\n'
832 'WHERE idItem = %s\n'
833 , (tsNow, oTask.idItem,) );
834 return True;
835
836
837
838
839 def _createTestSet(self, oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow):
840 """
841 Creates a test set for using the given data.
842 Will not commit, someone up the callstack will that later on.
843 Returns the test set ID, may raise an exception on database error.
844 """
845 # Lazy bird doesn't want to write testset.py and does it all here.
846
847 #
848 # We're getting the TestSet ID first in order to include it in the base
849 # file name (that way we can directly relate files on the disk to the
850 # test set when doing batch work), and also for idTesetSetGangLeader.
851 #
852 self._oDb.execute('SELECT NEXTVAL(\'TestSetIdSeq\')');
853 idTestSet = self._oDb.fetchOne()[0];
854
855 sBaseFilename = '%04d/%02d/%02d/%02d/TestSet-%s' \
856 % (tsNow.year, tsNow.month, tsNow.day, (tsNow.hour / 6) * 6, idTestSet);
857
858 #
859 # Gang scheduling parameters. Changes the oTask data for updating by caller.
860 #
861 iGangMemberNo = 0;
862
863 if oTestEx.cGangMembers <= 1:
864 assert oTask.idTestSetGangLeader is None;
865 assert oTask.cMissingGangMembers <= 1;
866 elif oTask.idTestSetGangLeader is None:
867 assert oTask.cMissingGangMembers == oTestEx.cGangMembers;
868 oTask.cMissingGangMembers = oTestEx.cGangMembers - 1;
869 oTask.idTestSetGangLeader = idTestSet;
870 else:
871 assert oTask.cMissingGangMembers > 0 and oTask.cMissingGangMembers < oTestEx.cGangMembers;
872 oTask.cMissingGangMembers -= 1;
873
874 #
875 # Do the database stuff.
876 #
877 self._oDb.execute('INSERT INTO TestSets (\n'
878 ' idTestSet,\n'
879 ' tsConfig,\n'
880 ' tsCreated,\n'
881 ' idBuild,\n'
882 ' idBuildCategory,\n'
883 ' idBuildTestSuite,\n'
884 ' idGenTestBox,\n'
885 ' idTestBox,\n'
886 ' idTestGroup,\n'
887 ' idGenTestCase,\n'
888 ' idTestCase,\n'
889 ' idGenTestCaseArgs,\n'
890 ' idTestCaseArgs,\n'
891 ' sBaseFilename,\n'
892 ' iGangMemberNo,\n'
893 ' idTestSetGangLeader )\n'
894 'VALUES ( %s,\n' # idTestSet
895 ' %s,\n' # tsConfig
896 ' %s,\n' # tsCreated
897 ' %s,\n' # idBuild
898 ' %s,\n' # idBuildCategory
899 ' %s,\n' # idBuildTestSuite
900 ' %s,\n' # idGenTestBox
901 ' %s,\n' # idTestBox
902 ' %s,\n' # idTestGroup
903 ' %s,\n' # idGenTestCase
904 ' %s,\n' # idTestCase
905 ' %s,\n' # idGenTestCaseArgs
906 ' %s,\n' # idTestCaseArgs
907 ' %s,\n' # sBaseFilename
908 ' %s,\n' # iGangMemberNo
909 ' %s)\n' # idTestSetGangLeader
910 , ( idTestSet,
911 oTask.tsConfig,
912 tsNow,
913 oBuild.idBuild,
914 oBuild.idBuildCategory,
915 oValidationKitBuild.idBuild if oValidationKitBuild is not None else None,
916 oTestBoxData.idGenTestBox,
917 oTestBoxData.idTestBox,
918 oTask.idTestGroup,
919 oTestEx.oTestCase.idGenTestCase,
920 oTestEx.oTestCase.idTestCase,
921 oTestEx.idGenTestCaseArgs,
922 oTestEx.idTestCaseArgs,
923 sBaseFilename,
924 iGangMemberNo,
925 oTask.idTestSetGangLeader,
926 ));
927
928 self._oDb.execute('INSERT INTO TestResults (\n'
929 ' idTestResultParent,\n'
930 ' idTestSet,\n'
931 ' tsCreated,\n'
932 ' idStrName,\n'
933 ' cErrors,\n'
934 ' enmStatus,\n'
935 ' iNestingDepth)\n'
936 'VALUES ( NULL,\n' # idTestResultParent
937 ' %s,\n' # idTestSet
938 ' %s,\n' # tsCreated
939 ' 0,\n' # idStrName
940 ' 0,\n' # cErrors
941 ' \'running\'::TestStatus_T,\n'
942 ' 0)\n' # iNestingDepth
943 'RETURNING idTestResult'
944 , ( idTestSet, tsNow, ));
945 idTestResult = self._oDb.fetchOne()[0];
946
947 self._oDb.execute('UPDATE TestSets\n'
948 ' SET idTestResult = %s\n'
949 'WHERE idTestSet = %s\n'
950 , (idTestResult, idTestSet, ));
951
952 return idTestSet;
953
954 def _tryFindValidationKitBit(self, oTestBoxData, tsNow):
955 """
956 Tries to find the most recent validation kit build suitable for the given testbox.
957 Returns BuildDataEx or None. Raise exception on database error.
958
959 Can be overridden by child classes to change the default build requirements.
960 """
961 oBuildLogic = BuildLogic(self._oDb);
962 oBuildSource = BuildSourceData().initFromDbWithId(self._oDb, self._oSchedGrpData.idBuildSrcTestSuite, tsNow);
963 oCursor = BuildSourceLogic(self._oDb).openBuildCursor(oBuildSource, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
964 for _ in range(oCursor.getRowCount()):
965 oBuild = BuildDataEx().initFromDbRow(oCursor.fetchOne());
966 if not oBuildLogic.isBuildBlacklisted(oBuild):
967 return oBuild;
968 return None;
969
970 def _tryFindBuild(self, oTask, oTestEx, oTestBoxData, tsNow):
971 """
972 Tries to find a fitting build.
973 Returns BuildDataEx or None. Raise exception on database error.
974
975 Can be overridden by child classes to change the default build requirements.
976 """
977
978 #
979 # Gather the set of prerequisites we have and turn them into a value
980 # set for use in the loop below.
981 #
982 # Note! We're scheduling on testcase level and ignoring argument variation
983 # selections in TestGroupMembers is intentional.
984 #
985 dPreReqs = {};
986
987 # Direct prerequisites. We assume they're all enabled as this can be
988 # checked at queue creation time.
989 for oPreReq in oTestEx.aoTestCasePreReqs:
990 dPreReqs[oPreReq.idTestCase] = 1;
991
992 # Testgroup dependencies from the scheduling group config.
993 if oTask.aidTestGroupPreReqs is not None:
994 for iTestGroup in oTask.aidTestGroupPreReqs:
995 # Make sure the _active_ test group members are in the cache.
996 if iTestGroup not in self.dTestGroupMembers:
997 self._oDb.execute('SELECT DISTINCT TestGroupMembers.idTestCase\n'
998 'FROM TestGroupMembers, TestCases\n'
999 'WHERE TestGroupMembers.idTestGroup = %s\n'
1000 ' AND TestGroupMembers.tsExpire > %s\n'
1001 ' AND TestGroupMembers.tsEffective <= %s\n'
1002 ' AND TestCases.idTestCase = TestGroupMembers.idTestCase\n'
1003 ' AND TestCases.tsExpire > %s\n'
1004 ' AND TestCases.tsEffective <= %s\n'
1005 ' AND TestCases.fEnabled is TRUE\n'
1006 , (iTestGroup, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig,));
1007 aidTestCases = [];
1008 for aoRow in self._oDb.fetchAll():
1009 aidTestCases.append(aoRow[0]);
1010 self.dTestGroupMembers[iTestGroup] = aidTestCases;
1011
1012 # Add the testgroup members to the prerequisites.
1013 for idTestCase in self.dTestGroupMembers[iTestGroup]:
1014 dPreReqs[idTestCase] = 1;
1015
1016 # Create a SQL values table out of them.
1017 sPreReqSet = ''
1018 if len(dPreReqs) > 0:
1019 for idPreReq in sorted(dPreReqs.keys()):
1020 sPreReqSet += ', (' + str(idPreReq) + ')';
1021 sPreReqSet = sPreReqSet[2:]; # drop the leading ', '.
1022
1023 #
1024 # Try the builds.
1025 #
1026 self.oBuildCache.setupSource(self._oDb, self._oSchedGrpData.idBuildSrc, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1027 for oEntry in self.oBuildCache:
1028 #
1029 # Check build requirements set by the test.
1030 #
1031 if not oTestEx.matchesBuildProps(oEntry.oBuild):
1032 continue;
1033
1034 if oEntry.isBlacklisted(self._oDb):
1035 oEntry.remove();
1036 continue;
1037
1038 #
1039 # Check prerequisites. The default scheduler is satisfied if one
1040 # argument variation has been executed successfully. It is not
1041 # satisfied if there are any failure runs.
1042 #
1043 if len(sPreReqSet) > 0:
1044 fDecision = oEntry.getPreReqDecision(sPreReqSet);
1045 if fDecision is None:
1046 ## @todo DB Tuning
1047 # Check for missing prereqs.
1048 self._oDb.execute('SELECT COUNT(*)\n'
1049 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase)\n'
1050 'LEFT OUTER JOIN (SELECT *\n'
1051 ' FROM TestSets\n'
1052 ' WHERE enmStatus IN (%s, %s)\n'
1053 ' AND idBuild = %s\n'
1054 ' ) AS TestSets\n'
1055 ' ON (PreReqs.idTestCase = TestSets.idTestCase)\n'
1056 'WHERE TestSets.idTestSet is NULL\n'
1057 , ( TestSetData.ksTestStatus_Success, TestSetData.ksTestStatus_Skipped,
1058 oEntry.oBuild.idBuild, ));
1059 cMissingPreReqs = self._oDb.fetchOne()[0];
1060 if cMissingPreReqs > 0:
1061 self.dprint('build %s is missing %u prerequisites (out of %s)'
1062 % (oEntry.oBuild.idBuild, cMissingPreReqs, sPreReqSet,));
1063 oEntry.setPreReqDecision(sPreReqSet, False);
1064 continue;
1065
1066 # Check for failed prereq runs.
1067 self._oDb.execute('SELECT COUNT(*)\n'
1068 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase),\n'
1069 ' TestSets\n'
1070 'WHERE PreReqs.idTestCase = TestSets.idTestCase\n'
1071 ' AND TestSets.idBuild = %s\n'
1072 ' AND TestSets.enmStatus IN (%s, %s, %s)\n'
1073 , ( oEntry.oBuild.idBuild,
1074 TestSetData.ksTestStatus_Failure,
1075 TestSetData.ksTestStatus_TimedOut,
1076 TestSetData.ksTestStatus_Rebooted,
1077 )
1078 );
1079 cFailedPreReqs = self._oDb.fetchOne()[0];
1080 if cFailedPreReqs > 0:
1081 self.dprint('build %s is has %u prerequisite failures (out of %s)'
1082 % (oEntry.oBuild.idBuild, cFailedPreReqs, sPreReqSet,));
1083 oEntry.setPreReqDecision(sPreReqSet, False);
1084 continue;
1085
1086 oEntry.setPreReqDecision(sPreReqSet, True);
1087 elif not fDecision:
1088 continue;
1089
1090 #
1091 # If we can, check if the build files still exist.
1092 #
1093 if oEntry.oBuild.areFilesStillThere() is False:
1094 self.dprint('build %s no longer exists' % (oEntry.oBuild.idBuild,));
1095 oEntry.remove();
1096 continue;
1097
1098 self.dprint('found oBuild=%s' % (oEntry.oBuild,));
1099 return oEntry.oBuild;
1100 return None;
1101
1102 def _tryFindMatchingBuild(self, oLeaderBuild, oTestBoxData, idBuildSrc):
1103 """
1104 Tries to find a matching build for gang scheduling.
1105 Returns BuildDataEx or None. Raise exception on database error.
1106
1107 Can be overridden by child classes to change the default build requirements.
1108 """
1109 #
1110 # Note! Should probably check build prerequisites if we get a different
1111 # build back, so that we don't use a build which hasn't passed
1112 # the smoke test.
1113 #
1114 _ = idBuildSrc;
1115 return BuildLogic(self._oDb).tryFindSameBuildForOsArch(oLeaderBuild, oTestBoxData.sOs, oTestBoxData.sCpuArch);
1116
1117
1118 def _tryAsLeader(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1119 """
1120 Try schedule the task as a gang leader (can be a gang of one).
1121 Returns response or None. May raise exception on DB error.
1122 """
1123
1124 # We don't wait for busy resources, we just try the next test.
1125 oTestArgsLogic = TestCaseArgsLogic(self._oDb);
1126 if not oTestArgsLogic.areResourcesFree(oTestEx):
1127 self.dprint('Cannot get global test resources!');
1128 return None;
1129
1130 #
1131 # Find a matching build (this is the difficult bit).
1132 #
1133 oBuild = self._tryFindBuild(oTask, oTestEx, oTestBoxData, tsNow);
1134 if oBuild is None:
1135 self.dprint('No build!');
1136 return None;
1137 if oTestEx.oTestCase.needValidationKitBit():
1138 oValidationKitBuild = self._tryFindValidationKitBit(oTestBoxData, tsNow);
1139 if oValidationKitBuild is None:
1140 self.dprint('No validation kit build!');
1141 return None;
1142 else:
1143 oValidationKitBuild = None;
1144
1145 #
1146 # Create a testset, allocate the resources and update the state.
1147 # Note! Since resource allocation may still fail, we create a nested
1148 # transaction so we can roll back. (Heed lock warning in docs!)
1149 #
1150 self._oDb.execute('SAVEPOINT tryAsLeader');
1151 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1152
1153 if GlobalResourceLogic(self._oDb).allocateResources(oTestBoxData.idTestBox, oTestEx.aoGlobalRsrc, fCommit = False) \
1154 is not True:
1155 self._oDb.execute('ROLLBACK TO SAVEPOINT tryAsLeader');
1156 self.dprint('Failed to allocate global resources!');
1157 return False;
1158
1159 if oTestEx.cGangMembers <= 1:
1160 # We're alone, put the task back at the end of the queue and issue EXEC cmd.
1161 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1162 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1163 sTBState = TestBoxStatusData.ksTestBoxState_Testing;
1164 else:
1165 # We're missing gang members, issue WAIT cmd.
1166 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1167 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1168 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1169
1170 TestBoxStatusLogic(self._oDb).updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1171 self._oDb.execute('RELEASE SAVEPOINT tryAsLeader');
1172 return dResponse;
1173
1174 def _tryAsGangMember(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1175 """
1176 Try schedule the task as a gang member.
1177 Returns response or None. May raise exception on DB error.
1178 """
1179
1180 #
1181 # The leader has choosen a build, we need to find a matching one for our platform.
1182 # (It's up to the scheduler decide upon how strict dependencies are to be enforced
1183 # upon subordinate group members.)
1184 #
1185 oLeaderTestSet = TestSetData().initFromDbWithId(self._oDb, oTestBoxData.idTestSetGangLeader);
1186
1187 oLeaderBuild = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuild);
1188 oBuild = self._tryFindMatchingBuild(oLeaderBuild, oTestBoxData, self._oSchedGrpData.idBuildSrc);
1189 if oBuild is None:
1190 return None;
1191
1192 oValidationKitBuild = None;
1193 if oLeaderTestSet.idBuildTestSuite is not None:
1194 oLeaderValidationKitBit = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuildTestSuite);
1195 oValidationKitBuild = self._tryFindMatchingBuild(oLeaderValidationKitBit, oTestBoxData,
1196 self._oSchedGrpData.idBuildSrcTestSuite);
1197
1198 #
1199 # Create a testset and update the state(s).
1200 #
1201 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1202
1203 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
1204 if oTask.cMissingGangMembers < 1:
1205 # The whole gang is there, move the task to the end of the queue
1206 # and update the status on the other gang members.
1207 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1208 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1209 sTBState = TestBoxStatusData.ksTestBoxState_GangTesting;
1210 oTBStatusLogic.updateGangStatus(oTask.idTestSetGangLeader, sTBState, fCommit = False);
1211 else:
1212 # We're still missing some gang members, issue WAIT cmd.
1213 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1214 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1215 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1216
1217 oTBStatusLogic.updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1218 return dResponse;
1219
1220
1221 def scheduleNewTaskWorker(self, oTestBoxData, tsNow, sBaseUrl):
1222 """
1223 Worker for schduling a new task.
1224 """
1225
1226 #
1227 # Iterate the scheduler queue (fetch all to avoid having to concurrent
1228 # queries), trying out each task to see if the testbox can execute it.
1229 #
1230 dRejected = {}; # variations we've already checked out and rejected.
1231 self._oDb.execute('SELECT *\n'
1232 'FROM SchedQueues\n'
1233 'WHERE idSchedGroup = %s\n'
1234 ' AND ( bmHourlySchedule IS NULL\n'
1235 ' OR get_bit(bmHourlySchedule, %s) = 1 )\n'
1236 'ORDER BY idItem ASC\n'
1237 , (self._oSchedGrpData.idSchedGroup, utils.getLocalHourOfWeek()) );
1238 aaoRows = self._oDb.fetchAll();
1239 for aoRow in aaoRows:
1240 # Don't loop forever.
1241 if self.getElapsedSecs() >= config.g_kcSecMaxNewTask:
1242 break;
1243
1244 # Unpack the data and check if we've rejected the testcasevar/group variation already (they repeat).
1245 oTask = SchedQueueData().initFromDbRow(aoRow);
1246 if config.g_kfSrvGlueDebugScheduler:
1247 self.dprint('** Considering: idItem=%s idGenTestCaseArgs=%s idTestGroup=%s Deps=%s last=%s cfg=%s\n'
1248 % ( oTask.idItem, oTask.idGenTestCaseArgs, oTask.idTestGroup, oTask.aidTestGroupPreReqs,
1249 oTask.tsLastScheduled, oTask.tsConfig,));
1250
1251 sRejectNm = '%s:%s' % (oTask.idGenTestCaseArgs, oTask.idTestGroup,);
1252 if sRejectNm in dRejected:
1253 self.dprint('Duplicate, already rejected! (%s)' % (sRejectNm,));
1254 continue;
1255 dRejected[sRejectNm] = 1;
1256
1257 # Fetch all the test case info (too much, but who cares right now).
1258 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(self._oDb, oTask.idGenTestCaseArgs,
1259 tsConfigEff = oTask.tsConfig,
1260 tsRsrcEff = oTask.tsConfig);
1261 if config.g_kfSrvGlueDebugScheduler:
1262 self.dprint('TestCase "%s": %s %s' % (oTestEx.oTestCase.sName, oTestEx.oTestCase.sBaseCmd, oTestEx.sArgs,));
1263
1264 # This shouldn't happen, but just in case it does...
1265 if oTestEx.oTestCase.fEnabled is not True:
1266 self.dprint('Testcase is not enabled!!');
1267 continue;
1268
1269 # Check if the testbox properties matches the test.
1270 if not oTestEx.matchesTestBoxProps(oTestBoxData):
1271 self.dprint('Testbox mismatch!');
1272 continue;
1273
1274 # Try schedule it.
1275 if oTask.idTestSetGangLeader is None or oTestEx.cGangMembers <= 1:
1276 dResponse = self._tryAsLeader(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1277 elif oTask.cMissingGangMembers > 1:
1278 dResponse = self._tryAsGangMember(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1279 else:
1280 dResponse = None; # Shouldn't happen!
1281 if dResponse is not None:
1282 self.dprint('Found a task! dResponse=%s' % (dResponse,));
1283 return dResponse;
1284
1285 # Found no suitable task.
1286 return None;
1287
1288 @staticmethod
1289 def scheduleNewTask(oDb, oTestBoxData, sBaseUrl, iVerbosity = 0):
1290 """
1291 Schedules a new task.
1292 """
1293 try:
1294 #
1295 # To avoid concurrency issues in SchedQueues we lock all the rows
1296 # related to our scheduling queue. Also, since this is a very
1297 # expensive operation we lock the testbox status row to fend of
1298 # repeated retires by fault testbox script.
1299 #
1300 tsSecStart = utils.timestampSecond();
1301 oDb.rollback();
1302 oDb.begin();
1303 oDb.execute('SELECT idTestBox FROM TestBoxStatuses WHERE idTestBox = %s FOR UPDATE NOWAIT'
1304 % (oTestBoxData.idTestBox,));
1305 oDb.execute('SELECT idSchedGroup FROM SchedQueues WHERE idSchedGroup = %s FOR UPDATE'
1306 % (oTestBoxData.idSchedGroup,));
1307
1308 # We need the current timestamp.
1309 tsNow = oDb.getCurrentTimestamp();
1310
1311 # Re-read the testbox data ...
1312 oTestBoxDataCur = TestBoxData().initFromDbWithId(oDb, oTestBoxData.idTestBox, tsNow);
1313 if oTestBoxDataCur.fEnabled \
1314 and oTestBoxDataCur.idGenTestBox == oTestBoxData.idGenTestBox \
1315 and oTestBoxDataCur.idSchedGroup == oTestBoxData.idSchedGroup: # (paranoia wrt idSchedGroup)
1316
1317 # ... and schedule group data.
1318 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestBoxDataCur.idSchedGroup, tsNow);
1319 if oSchedGrpData.fEnabled and oSchedGrpData.idBuildSrc is not None:
1320
1321 #
1322 # Instantiate the specified scheduler and let it do the rest.
1323 #
1324 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity, tsSecStart);
1325 dResponse = oScheduler.scheduleNewTaskWorker(oTestBoxDataCur, tsNow, sBaseUrl);
1326 if dResponse is not None:
1327 oDb.commit();
1328 return dResponse;
1329 except:
1330 oDb.rollback();
1331 raise;
1332
1333 # Not enabled, rollback and return no task.
1334 oDb.rollback();
1335 return None;
1336
1337 @staticmethod
1338 def tryCancelGangGathering(oDb, oStatusData):
1339 """
1340 Try canceling a gang gathering.
1341
1342 Returns True if successfully cancelled.
1343 Returns False if not (someone raced us to the SchedQueue table).
1344
1345 Note! oStatusData is re-initialized.
1346 """
1347 assert oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering;
1348 try:
1349 #
1350 # Lock the tables we're updating so we don't run into concurrency
1351 # issues (we're racing both scheduleNewTask and other callers of
1352 # this method).
1353 #
1354 oDb.rollback();
1355 oDb.begin();
1356 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
1357
1358 #
1359 # Re-read the testbox data and check that we're still in the same state.
1360 #
1361 oStatusData.initFromDbWithId(oDb, oStatusData.idTestBox);
1362 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering:
1363 #
1364 # Get the leader thru the test set and change the state of the whole gang.
1365 #
1366 oTestSetData = TestSetData().initFromDbWithId(oDb, oStatusData.idTestSet);
1367
1368 oTBStatusLogic = TestBoxStatusLogic(oDb);
1369 oTBStatusLogic.updateGangStatus(oTestSetData.idTestSetGangLeader,
1370 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
1371 fCommit = False);
1372
1373 #
1374 # Move the scheduling queue item to the end.
1375 #
1376 oDb.execute('SELECT *\n'
1377 'FROM SchedQueues\n'
1378 'WHERE idTestSetGangLeader = %s\n'
1379 , (oTestSetData.idTestSetGangLeader,) );
1380 oTask = SchedQueueData().initFromDbRow(oDb.fetchOne());
1381 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTask.idGenTestCaseArgs);
1382
1383 oDb.execute('UPDATE SchedQueues\n'
1384 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
1385 ' idTestSetGangLeader = NULL,\n'
1386 ' cMissingGangMembers = %s\n'
1387 'WHERE idItem = %s\n'
1388 , (oTestEx.cGangMembers, oTask.idItem,) );
1389
1390 oDb.commit();
1391 return True;
1392
1393 elif oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut:
1394 oDb.rollback();
1395 return True;
1396 except:
1397 oDb.rollback();
1398 raise;
1399
1400 # Not enabled, rollback and return no task.
1401 oDb.rollback();
1402 return False;
1403
1404
1405#
1406# Unit testing.
1407#
1408
1409# pylint: disable=C0111
1410class SchedQueueDataTestCase(ModelDataBaseTestCase):
1411 def setUp(self):
1412 self.aoSamples = [SchedQueueData(),];
1413
1414if __name__ == '__main__':
1415 unittest.main();
1416 # not reached.
1417
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette