VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/schedulerbase.py@ 82968

Last change on this file since 82968 was 82968, checked in by vboxsync, 5 years ago

Copyright year updates by scm.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 67.5 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: schedulerbase.py 82968 2020-02-04 10:35:17Z vboxsync $
3# pylint: disable=too-many-lines
4
5
6"""
7Test Manager - Base class and utilities for the schedulers.
8"""
9
10__copyright__ = \
11"""
12Copyright (C) 2012-2020 Oracle Corporation
13
14This file is part of VirtualBox Open Source Edition (OSE), as
15available from http://www.virtualbox.org. This file is free software;
16you can redistribute it and/or modify it under the terms of the GNU
17General Public License (GPL) as published by the Free Software
18Foundation, in version 2 as it comes in the "COPYING" file of the
19VirtualBox OSE distribution. VirtualBox OSE is distributed in the
20hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
21
22The contents of this file may alternatively be used under the terms
23of the Common Development and Distribution License Version 1.0
24(CDDL) only, as it comes in the "COPYING.CDDL" file of the
25VirtualBox OSE distribution, in which case the provisions of the
26CDDL are applicable instead of those of the GPL.
27
28You may elect to license modified versions of this file under the
29terms and conditions of either the GPL or the CDDL or both.
30"""
31__version__ = "$Revision: 82968 $"
32
33
34# Standard python imports.
35import sys;
36import unittest;
37
38# Validation Kit imports.
39from common import utils, constants;
40from testmanager import config;
41from testmanager.core.build import BuildDataEx, BuildLogic;
42from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, TMExceptionBase;
43from testmanager.core.buildsource import BuildSourceData, BuildSourceLogic;
44from testmanager.core.globalresource import GlobalResourceLogic;
45from testmanager.core.schedgroup import SchedGroupData, SchedGroupLogic;
46from testmanager.core.systemlog import SystemLogData, SystemLogLogic;
47from testmanager.core.testbox import TestBoxData, TestBoxDataEx;
48from testmanager.core.testboxstatus import TestBoxStatusData, TestBoxStatusLogic;
49from testmanager.core.testcase import TestCaseLogic;
50from testmanager.core.testcaseargs import TestCaseArgsDataEx, TestCaseArgsLogic;
51from testmanager.core.testset import TestSetData, TestSetLogic;
52
53# Python 3 hacks:
54if sys.version_info[0] >= 3:
55 xrange = range; # pylint: disable=redefined-builtin,invalid-name
56
57
58
59class ReCreateQueueData(object):
60 """
61 Data object for recreating a scheduling queue.
62
63 It's mostly a storage object, but has a few data checking operation
64 associated with it.
65 """
66
67 def __init__(self, oDb, idSchedGroup):
68 #
69 # Load data from the database.
70 #
71
72 # Will extend the entries with aoTestCases and dTestCases members
73 # further down. checkForGroupDepCycles will add aidTestGroupPreReqs.
74 self.aoTestGroups = SchedGroupLogic(oDb).getMembers(idSchedGroup);
75
76 # aoTestCases entries are TestCaseData instance with iSchedPriority
77 # and idTestGroup added for our purposes.
78 # We will add oTestGroup and aoArgsVariations members to each further down.
79 self.aoTestCases = SchedGroupLogic(oDb).getTestCasesForGroup(idSchedGroup, cMax = 4096);
80
81 # Load dependencies.
82 oTestCaseLogic = TestCaseLogic(oDb)
83 for oTestCase in self.aoTestCases:
84 oTestCase.aidPreReqs = oTestCaseLogic.getTestCasePreReqIds(oTestCase.idTestCase, cMax = 4096);
85
86 # aoTestCases entries are TestCaseArgsData instance with iSchedPriority
87 # and idTestGroup added for our purposes.
88 # We will add oTestGroup and oTestCase members to each further down.
89 self.aoArgsVariations = SchedGroupLogic(oDb).getTestCaseArgsForGroup(idSchedGroup, cMax = 65536);
90
91 #
92 # Generate global lookups.
93 #
94
95 # Generate a testcase lookup dictionary for use when working on
96 # argument variations.
97 self.dTestCases = dict();
98 for oTestCase in self.aoTestCases:
99 self.dTestCases[oTestCase.idTestCase] = oTestCase;
100 assert len(self.dTestCases) <= len(self.aoTestCases); # Note! Can be shorter!
101
102 # Generate a testgroup lookup dictionary.
103 self.dTestGroups = dict();
104 for oTestGroup in self.aoTestGroups:
105 self.dTestGroups[oTestGroup.idTestGroup] = oTestGroup;
106 assert len(self.dTestGroups) == len(self.aoTestGroups);
107
108 #
109 # Associate extra members with the base data.
110 #
111 if self.aoTestGroups:
112 # Prep the test groups.
113 for oTestGroup in self.aoTestGroups:
114 oTestGroup.aoTestCases = list();
115 oTestGroup.dTestCases = dict();
116
117 # Link testcases to their group, both directions. Prep testcases for
118 # argument varation association.
119 oTestGroup = self.aoTestGroups[0];
120 for oTestCase in self.aoTestCases:
121 if oTestGroup.idTestGroup != oTestCase.idTestGroup:
122 oTestGroup = self.dTestGroups[oTestCase.idTestGroup];
123
124 assert oTestCase.idTestCase not in oTestGroup.dTestCases;
125 oTestGroup.dTestCases[oTestCase.idTestCase] = oTestCase;
126 oTestGroup.aoTestCases.append(oTestCase);
127 oTestCase.oTestGroup = oTestGroup;
128 oTestCase.aoArgsVariations = list();
129
130 # Associate testcase argument variations with their testcases (group)
131 # in both directions.
132 oTestGroup = self.aoTestGroups[0];
133 oTestCase = self.aoTestCases[0] if self.aoTestCases else None;
134 for oArgVariation in self.aoArgsVariations:
135 if oTestGroup.idTestGroup != oArgVariation.idTestGroup:
136 oTestGroup = self.dTestGroups[oArgVariation.idTestGroup];
137 if oTestCase.idTestCase != oArgVariation.idTestCase or oTestCase.idTestGroup != oArgVariation.idTestGroup:
138 oTestCase = oTestGroup.dTestCases[oArgVariation.idTestCase];
139
140 oTestCase.aoArgsVariations.append(oArgVariation);
141 oArgVariation.oTestCase = oTestCase;
142 oArgVariation.oTestGroup = oTestGroup;
143
144 else:
145 assert not self.aoTestCases;
146 assert not self.aoArgsVariations;
147 # done.
148
149 @staticmethod
150 def _addPreReqError(aoErrors, aidChain, oObj, sMsg):
151 """ Returns a chain of IDs error entry. """
152
153 sMsg += ' Dependency chain: %s' % (aidChain[0],);
154 for i in range(1, len(aidChain)):
155 sMsg += ' -> %s' % (aidChain[i],);
156
157 aoErrors.append([sMsg, oObj]);
158 return aoErrors;
159
160 def checkForGroupDepCycles(self):
161 """
162 Checks for testgroup depencency cycles and any missing testgroup
163 dependencies.
164 Returns array of errors (see SchedulderBase.recreateQueue()).
165 """
166 aoErrors = list();
167 for oTestGroup in self.aoTestGroups:
168 idPreReq = oTestGroup.idTestGroupPreReq;
169 if idPreReq is None:
170 oTestGroup.aidTestGroupPreReqs = list();
171 continue;
172
173 aidChain = [oTestGroup.idTestGroup,];
174 while idPreReq is not None:
175 aidChain.append(idPreReq);
176 if len(aidChain) >= 10:
177 self._addPreReqError(aoErrors, aidChain, oTestGroup,
178 'TestGroup #%s prerequisite chain is too long!'
179 % (oTestGroup.idTestGroup,));
180 break;
181
182 oDep = self.dTestGroups.get(idPreReq, None);
183 if oDep is None:
184 self._addPreReqError(aoErrors, aidChain, oTestGroup,
185 'TestGroup #%s prerequisite #%s is not in the scheduling group!'
186 % (oTestGroup.idTestGroup, idPreReq,));
187 break;
188
189 idPreReq = oDep.idTestGroupPreReq;
190 oTestGroup.aidTestGroupPreReqs = aidChain[1:];
191
192 return aoErrors;
193
194
195 def checkForMissingTestCaseDeps(self):
196 """
197 Checks that testcase dependencies stays within bounds. We do not allow
198 dependencies outside a testgroup, no dependency cycles or even remotely
199 long dependency chains.
200
201 Returns array of errors (see SchedulderBase.recreateQueue()).
202 """
203 aoErrors = list();
204 for oTestGroup in self.aoTestGroups:
205 for oTestCase in oTestGroup.aoTestCases:
206 if not oTestCase.aidPreReqs:
207 continue;
208
209 # Stupid recursion code using special stack(s).
210 aiIndexes = [[oTestCase, 0], ];
211 aidChain = [oTestCase.idTestGroup,];
212 while aiIndexes:
213 (oCur, i) = aiIndexes[-1];
214 if i >= len(oCur.aidPreReqs):
215 aiIndexes.pop();
216 aidChain.pop();
217 else:
218 aiIndexes[-1][1] = i + 1; # whatever happens, we'll advance on the current level.
219
220 idPreReq = oTestCase.aidPreReqs[i];
221 oDep = oTestGroup.dTestCases.get(idPreReq, None);
222 if oDep is None:
223 self._addPreReqError(aoErrors, aidChain, oTestCase,
224 'TestCase #%s prerequisite #%s is not in the scheduling group!'
225 % (oTestCase.idTestCase, idPreReq));
226 elif idPreReq in aidChain:
227 self._addPreReqError(aoErrors, aidChain, oTestCase,
228 'TestCase #%s prerequisite #%s creates a cycle!'
229 % (oTestCase.idTestCase, idPreReq));
230 elif not oDep.aiPreReqs:
231 pass;
232 elif len(aidChain) >= 10:
233 self._addPreReqError(aoErrors, aidChain, oTestCase,
234 'TestCase #%s prerequisite chain is too long!' % (oTestCase.idTestCase,));
235 else:
236 aiIndexes.append([oDep, 0]);
237 aidChain.append(idPreReq);
238
239 return aoErrors;
240
241 def deepTestGroupSort(self):
242 """
243 Sorts the testgroups and their testcases by priority and dependencies.
244 Note! Don't call this before checking for dependency cycles!
245 """
246 if not self.aoTestGroups:
247 return;
248
249 #
250 # ASSUMES groups as well as testcases are sorted by priority by the
251 # database. So we only have to concern ourselves with the dependency
252 # sorting.
253 #
254 iGrpPrio = self.aoTestGroups[0].iSchedPriority;
255 for iTestGroup, oTestGroup in enumerate(self.aoTestGroups):
256 if oTestGroup.iSchedPriority > iGrpPrio:
257 raise TMExceptionBase('Incorrectly sorted testgroups returned by database: iTestGroup=%s prio=%s %s'
258 % ( iTestGroup, iGrpPrio,
259 ', '.join(['(%s: %s)' % (oCur.idTestGroup, oCur.iSchedPriority)
260 for oCur in self.aoTestGroups]), ) );
261 iGrpPrio = oTestGroup.iSchedPriority;
262
263 if oTestGroup.aoTestCases:
264 iTstPrio = oTestGroup.aoTestCases[0];
265 for iTestCase, oTestCase in enumerate(oTestGroup.aoTestCases):
266 if oTestCase.iSchedPriority > iTstPrio:
267 raise TMExceptionBase('Incorrectly sorted testcases returned by database: i=%s prio=%s idGrp=%s %s'
268 % ( iTestCase, iTstPrio, oTestGroup.idTestGroup,
269 ', '.join(['(%s: %s)' % (oCur.idTestCase, oCur.iSchedPriority)
270 for oCur in oTestGroup.aoTestCases]),));
271
272 #
273 # Sort the testgroups by dependencies.
274 #
275 i = 0;
276 while i < len(self.aoTestGroups):
277 oTestGroup = self.aoTestGroups[i];
278 if oTestGroup.idTestGroupPreReq is not None:
279 iPreReq = self.aoTestGroups.index(self.dTestGroups[oTestGroup.idTestGroupPreReq]);
280 if iPreReq > i:
281 # The prerequisite is after the current entry. Move the
282 # current entry so that it's following it's prereq entry.
283 self.aoTestGroups.insert(iPreReq + 1, oTestGroup);
284 self.aoTestGroups.pop(i);
285 continue;
286 assert iPreReq < i;
287 i += 1; # Advance.
288
289 #
290 # Sort the testcases by dependencies.
291 # Same algorithm as above, just more prerequisites.
292 #
293 for oTestGroup in self.aoTestGroups:
294 i = 0;
295 while i < len(oTestGroup.aoTestCases):
296 oTestCase = oTestGroup.aoTestCases[i];
297 if oTestCase.aidPreReqs:
298 for idPreReq in oTestCase.aidPreReqs:
299 iPreReq = oTestGroup.aoTestCases.index(oTestGroup.dTestCases[idPreReq]);
300 if iPreReq > i:
301 # The prerequisite is after the current entry. Move the
302 # current entry so that it's following it's prereq entry.
303 oTestGroup.aoTestGroups.insert(iPreReq + 1, oTestCase);
304 oTestGroup.aoTestGroups.pop(i);
305 i -= 1; # Don't advance.
306 break;
307 assert iPreReq < i;
308 i += 1; # Advance.
309
310
311
312class SchedQueueData(ModelDataBase):
313 """
314 Scheduling queue data item.
315 """
316
317 ksIdAttr = 'idSchedGroup';
318
319 ksParam_idSchedGroup = 'SchedQueueData_idSchedGroup';
320 ksParam_idItem = 'SchedQueueData_idItem';
321 ksParam_offQueue = 'SchedQueueData_offQueue';
322 ksParam_idGenTestCaseArgs = 'SchedQueueData_idGenTestCaseArgs';
323 ksParam_idTestGroup = 'SchedQueueData_idTestGroup';
324 ksParam_aidTestGroupPreReqs = 'SchedQueueData_aidTestGroupPreReqs';
325 ksParam_bmHourlySchedule = 'SchedQueueData_bmHourlySchedule';
326 ksParam_tsConfig = 'SchedQueueData_tsConfig';
327 ksParam_tsLastScheduled = 'SchedQueueData_tsLastScheduled';
328 ksParam_idTestSetGangLeader = 'SchedQueueData_idTestSetGangLeader';
329 ksParam_cMissingGangMembers = 'SchedQueueData_cMissingGangMembers';
330
331 kasAllowNullAttributes = [ 'idItem', 'offQueue', 'aidTestGroupPreReqs', 'bmHourlySchedule', 'idTestSetGangLeader',
332 'tsConfig', 'tsLastScheduled' ];
333
334
335 def __init__(self):
336 ModelDataBase.__init__(self);
337
338 #
339 # Initialize with defaults.
340 # See the database for explanations of each of these fields.
341 #
342 self.idSchedGroup = None;
343 self.idItem = None;
344 self.offQueue = None;
345 self.idGenTestCaseArgs = None;
346 self.idTestGroup = None;
347 self.aidTestGroupPreReqs = None;
348 self.bmHourlySchedule = None;
349 self.tsConfig = None;
350 self.tsLastScheduled = None;
351 self.idTestSetGangLeader = None;
352 self.cMissingGangMembers = 1;
353
354 def initFromValues(self, idSchedGroup, idGenTestCaseArgs, idTestGroup, aidTestGroupPreReqs, # pylint: disable=too-many-arguments
355 bmHourlySchedule, cMissingGangMembers,
356 idItem = None, offQueue = None, tsConfig = None, tsLastScheduled = None, idTestSetGangLeader = None):
357 """
358 Reinitialize with all attributes potentially given as inputs.
359 Return self.
360 """
361 self.idSchedGroup = idSchedGroup;
362 self.idItem = idItem;
363 self.offQueue = offQueue;
364 self.idGenTestCaseArgs = idGenTestCaseArgs;
365 self.idTestGroup = idTestGroup;
366 self.aidTestGroupPreReqs = aidTestGroupPreReqs;
367 self.bmHourlySchedule = bmHourlySchedule;
368 self.tsConfig = tsConfig;
369 self.tsLastScheduled = tsLastScheduled;
370 self.idTestSetGangLeader = idTestSetGangLeader;
371 self.cMissingGangMembers = cMissingGangMembers;
372 return self;
373
374 def initFromDbRow(self, aoRow):
375 """
376 Initialize from database row (SELECT * FROM SchedQueues).
377 Returns self.
378 Raises exception if no row is specfied.
379 """
380 if aoRow is None:
381 raise TMExceptionBase('SchedQueueData not found.');
382
383 self.idSchedGroup = aoRow[0];
384 self.idItem = aoRow[1];
385 self.offQueue = aoRow[2];
386 self.idGenTestCaseArgs = aoRow[3];
387 self.idTestGroup = aoRow[4];
388 self.aidTestGroupPreReqs = aoRow[5];
389 self.bmHourlySchedule = aoRow[6];
390 self.tsConfig = aoRow[7];
391 self.tsLastScheduled = aoRow[8];
392 self.idTestSetGangLeader = aoRow[9];
393 self.cMissingGangMembers = aoRow[10];
394 return self;
395
396
397
398
399
400
401class SchedulerBase(object):
402 """
403 The scheduler base class.
404
405 The scheduler classes have two functions:
406 1. Recreate the scheduling queue.
407 2. Pick the next task from the queue.
408
409 The first is scheduler specific, the latter isn't.
410 """
411
412 class BuildCache(object):
413 """ Build cache. """
414
415 class BuildCacheIterator(object):
416 """ Build class iterator. """
417 def __init__(self, oCache):
418 self.oCache = oCache;
419 self.iCur = 0;
420
421 def __iter__(self):
422 """Returns self, required by the language."""
423 return self;
424
425 def __next__(self):
426 """Returns the next build, raises StopIteration when the end has been reached."""
427 while True:
428 if self.iCur >= len(self.oCache.aoEntries):
429 oEntry = self.oCache.fetchFromCursor();
430 if oEntry is None:
431 raise StopIteration;
432 else:
433 oEntry = self.oCache.aoEntries[self.iCur];
434 self.iCur += 1;
435 if not oEntry.fRemoved:
436 return oEntry;
437 return None; # not reached, but make pylint happy (for now).
438
439 def next(self):
440 """ For python 2.x. """
441 return self.__next__();
442
443 class BuildCacheEntry(object):
444 """ Build cache entry. """
445
446 def __init__(self, oBuild, fMaybeBlacklisted):
447 self.oBuild = oBuild;
448 self._fBlacklisted = None if fMaybeBlacklisted is True else False;
449 self.fRemoved = False;
450 self._dPreReqDecisions = dict();
451
452 def remove(self):
453 """
454 Marks the cache entry as removed.
455 This doesn't actually remove it from the cache array, only marks
456 it as removed. It has no effect on open iterators.
457 """
458 self.fRemoved = True;
459
460 def getPreReqDecision(self, sPreReqSet):
461 """
462 Retrieves a cached prerequisite decision.
463 Returns boolean if found, None if not.
464 """
465 return self._dPreReqDecisions.get(sPreReqSet);
466
467 def setPreReqDecision(self, sPreReqSet, fDecision):
468 """
469 Caches a prerequistie decision.
470 """
471 self._dPreReqDecisions[sPreReqSet] = fDecision;
472 return fDecision;
473
474 def isBlacklisted(self, oDb):
475 """ Checks if the build is blacklisted. """
476 if self._fBlacklisted is None:
477 self._fBlacklisted = BuildLogic(oDb).isBuildBlacklisted(self.oBuild);
478 return self._fBlacklisted;
479
480
481 def __init__(self):
482 self.aoEntries = [];
483 self.oCursor = None;
484
485 def setupSource(self, oDb, idBuildSrc, sOs, sCpuArch, tsNow):
486 """ Configures the build cursor for the cache. """
487 if not self.aoEntries and self.oCursor is None:
488 oBuildSource = BuildSourceData().initFromDbWithId(oDb, idBuildSrc, tsNow);
489 self.oCursor = BuildSourceLogic(oDb).openBuildCursor(oBuildSource, sOs, sCpuArch, tsNow);
490 return True;
491
492 def __iter__(self):
493 """Return an iterator."""
494 return self.BuildCacheIterator(self);
495
496 def fetchFromCursor(self):
497 """ Fetches a build from the cursor and adds it to the cache."""
498 if self.oCursor is None:
499 return None;
500
501 try:
502 aoRow = self.oCursor.fetchOne();
503 except:
504 return None;
505 if aoRow is None:
506 return None;
507
508 oBuild = BuildDataEx().initFromDbRow(aoRow);
509 oEntry = self.BuildCacheEntry(oBuild, aoRow[-1]);
510 self.aoEntries.append(oEntry);
511 return oEntry;
512
513 def __init__(self, oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
514 self._oDb = oDb;
515 self._oSchedGrpData = oSchedGrpData;
516 self._iVerbosity = iVerbosity;
517 self._asMessages = [];
518 self._tsSecStart = tsSecStart if tsSecStart is not None else utils.timestampSecond();
519 self.oBuildCache = self.BuildCache();
520 self.dTestGroupMembers = dict();
521
522 @staticmethod
523 def _instantiate(oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
524 """
525 Instantiate the scheduler specified by the scheduling group.
526 Returns scheduler child class instance. May raise exception if
527 the input is invalid.
528 """
529 if oSchedGrpData.enmScheduler == SchedGroupData.ksScheduler_BestEffortContinuousIntegration:
530 from testmanager.core.schedulerbeci import SchdulerBeci;
531 oScheduler = SchdulerBeci(oDb, oSchedGrpData, iVerbosity, tsSecStart);
532 else:
533 raise oDb.integrityException('Invalid scheduler "%s", idSchedGroup=%d' \
534 % (oSchedGrpData.enmScheduler, oSchedGrpData.idSchedGroup));
535 return oScheduler;
536
537
538 #
539 # Misc.
540 #
541
542 def msgDebug(self, sText):
543 """Debug printing."""
544 if self._iVerbosity > 1:
545 self._asMessages.append('debug:' + sText);
546 return None;
547
548 def msgInfo(self, sText):
549 """Info printing."""
550 if self._iVerbosity > 1:
551 self._asMessages.append('info: ' + sText);
552 return None;
553
554 def dprint(self, sMsg):
555 """Prints a debug message to the srv glue log (see config.py). """
556 if config.g_kfSrvGlueDebugScheduler:
557 self._oDb.dprint(sMsg);
558 return None;
559
560 def getElapsedSecs(self):
561 """ Returns the number of seconds this scheduling task has been running. """
562 tsSecNow = utils.timestampSecond();
563 if tsSecNow < self._tsSecStart: # paranoia
564 self._tsSecStart = tsSecNow;
565 return tsSecNow - self._tsSecStart;
566
567
568 #
569 # Create schedule.
570 #
571
572 def _recreateQueueCancelGatherings(self):
573 """
574 Cancels all pending gang gatherings on the current queue.
575 """
576 self._oDb.execute('SELECT idTestSetGangLeader\n'
577 'FROM SchedQueues\n'
578 'WHERE idSchedGroup = %s\n'
579 ' AND idTestSetGangLeader is not NULL\n'
580 , (self._oSchedGrpData.idSchedGroup,));
581 if self._oDb.getRowCount() > 0:
582 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
583 for aoRow in self._oDb.fetchAll():
584 idTestSetGangLeader = aoRow[0];
585 oTBStatusLogic.updateGangStatus(idTestSetGangLeader,
586 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
587 fCommit = False);
588 return True;
589
590 def _recreateQueueItems(self, oData):
591 """
592 Returns an array of queue items (SchedQueueData).
593 Child classes must override this.
594 """
595 _ = oData;
596 return [];
597
598 def recreateQueueWorker(self):
599 """
600 Worker for recreateQueue.
601 """
602
603 #
604 # Collect the necessary data and validate it.
605 #
606 oData = ReCreateQueueData(self._oDb, self._oSchedGrpData.idSchedGroup);
607 aoErrors = oData.checkForGroupDepCycles();
608 aoErrors.extend(oData.checkForMissingTestCaseDeps());
609 if not aoErrors:
610 oData.deepTestGroupSort();
611
612 #
613 # The creation of the scheduling queue is done by the child class.
614 #
615 # We will try guess where in queue we're currently at and rotate
616 # the items such that we will resume execution in the approximately
617 # same position. The goal of the scheduler is to provide a 100%
618 # deterministic result so that if we regenerate the queue when there
619 # are no changes to the testcases, testgroups or scheduling groups
620 # involved, test execution will be unchanged (save for maybe just a
621 # little for gang gathering).
622 #
623 aoItems = list();
624 if oData.aoArgsVariations:
625 aoItems = self._recreateQueueItems(oData);
626 self.msgDebug('len(aoItems)=%s' % (len(aoItems),));
627 #for i in range(len(aoItems)):
628 # self.msgDebug('aoItems[%2d]=%s' % (i, aoItems[i]));
629 if aoItems:
630 self._oDb.execute('SELECT offQueue FROM SchedQueues WHERE idSchedGroup = %s ORDER BY idItem LIMIT 1'
631 , (self._oSchedGrpData.idSchedGroup,));
632 if self._oDb.getRowCount() > 0:
633 offQueue = self._oDb.fetchOne()[0];
634 self._oDb.execute('SELECT COUNT(*) FROM SchedQueues WHERE idSchedGroup = %s'
635 , (self._oSchedGrpData.idSchedGroup,));
636 cItems = self._oDb.fetchOne()[0];
637 offQueueNew = (offQueue * cItems) / len(aoItems);
638 if offQueueNew != 0:
639 aoItems = aoItems[offQueueNew:] + aoItems[:offQueueNew];
640
641 #
642 # Replace the scheduling queue.
643 # Care need to be take to first timeout/abort any gangs in the
644 # gathering state since these use the queue to set up the date.
645 #
646 self._recreateQueueCancelGatherings();
647 self._oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup = %s\n', (self._oSchedGrpData.idSchedGroup,));
648 self._oDb.insertList('INSERT INTO SchedQueues (\n'
649 ' idSchedGroup,\n'
650 ' offQueue,\n'
651 ' idGenTestCaseArgs,\n'
652 ' idTestGroup,\n'
653 ' aidTestGroupPreReqs,\n'
654 ' bmHourlySchedule,\n'
655 ' cMissingGangMembers )\n',
656 aoItems, self._formatItemForInsert);
657 return (aoErrors, self._asMessages);
658
659 def _formatItemForInsert(self, oItem):
660 """
661 Used by recreateQueueWorker together with TMDatabaseConnect::insertList
662 """
663 return self._oDb.formatBindArgs('(%s,%s,%s,%s,%s,%s,%s)'
664 , ( oItem.idSchedGroup,
665 oItem.offQueue,
666 oItem.idGenTestCaseArgs,
667 oItem.idTestGroup,
668 oItem.aidTestGroupPreReqs if oItem.aidTestGroupPreReqs else None,
669 oItem.bmHourlySchedule,
670 oItem.cMissingGangMembers
671 ));
672
673 @staticmethod
674 def recreateQueue(oDb, uidAuthor, idSchedGroup, iVerbosity = 1):
675 """
676 (Re-)creates the scheduling queue for the given group.
677
678 Returns (asMessages, asMessages). On success the array with the error
679 will be empty, on failure it will contain (sError, oRelatedObject)
680 entries. The messages is for debugging and are simple strings.
681
682 Raises exception database error.
683 """
684
685 aoExtraMsgs = [];
686 if oDb.debugIsExplainEnabled():
687 aoExtraMsgs += ['Warning! Disabling SQL explain to avoid deadlocking against locked tables.'];
688 oDb.debugDisableExplain();
689
690 aoErrors = [];
691 asMessages = [];
692 try:
693 #
694 # To avoid concurrency issues (SchedQueues) and inconsistent data (*),
695 # we lock quite a few tables while doing this work. We access more
696 # data than scheduleNewTask so we lock some additional tables.
697 #
698 oDb.rollback();
699 oDb.begin();
700 oDb.execute('LOCK TABLE SchedGroups, SchedGroupMembers, TestGroups, TestGroupMembers IN SHARE MODE');
701 oDb.execute('LOCK TABLE TestBoxes, TestCaseArgs, TestCases IN SHARE MODE');
702 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
703
704 #
705 # Instantiate the scheduler and call the worker function.
706 #
707 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, idSchedGroup);
708 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
709
710 (aoErrors, asMessages) = oScheduler.recreateQueueWorker();
711 if not aoErrors:
712 SystemLogLogic(oDb).addEntry(SystemLogData.ksEvent_SchedQueueRecreate,
713 'User #%d recreated sched queue #%d.' % (uidAuthor, idSchedGroup,));
714 oDb.commit();
715 else:
716 oDb.rollback();
717
718 except:
719 oDb.rollback();
720 raise;
721
722 return (aoErrors, aoExtraMsgs + asMessages);
723
724
725
726 #
727 # Schedule Task.
728 #
729
730 def _composeGangArguments(self, idTestSet):
731 """
732 Composes the gang specific testdriver arguments.
733 Returns command line string, including a leading space.
734 """
735
736 oTestSet = TestSetData().initFromDbWithId(self._oDb, idTestSet);
737 aoGangMembers = TestSetLogic(self._oDb).getGang(oTestSet.idTestSetGangLeader);
738
739 sArgs = ' --gang-member-no %s --gang-members %s' % (oTestSet.iGangMemberNo, len(aoGangMembers));
740 for i, _ in enumerate(aoGangMembers):
741 sArgs = ' --gang-ipv4-%s %s' % (i, aoGangMembers[i].ip); ## @todo IPv6
742
743 return sArgs;
744
745
746 def composeExecResponseWorker(self, idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl):
747 """
748 Given all the bits of data, compose an EXEC command response to the testbox.
749 """
750 sScriptZips = oTestEx.oTestCase.sValidationKitZips;
751 if sScriptZips is None or sScriptZips.find('@VALIDATIONKIT_ZIP@') >= 0:
752 assert oValidationKitBuild;
753 if sScriptZips is None:
754 sScriptZips = oValidationKitBuild.sBinaries;
755 else:
756 sScriptZips = sScriptZips.replace('@VALIDATIONKIT_ZIP@', oValidationKitBuild.sBinaries);
757 sScriptZips = sScriptZips.replace('@DOWNLOAD_BASE_URL@', sBaseUrl + config.g_ksTmDownloadBaseUrlRel);
758
759 sCmdLine = oTestEx.oTestCase.sBaseCmd + ' ' + oTestEx.sArgs;
760 sCmdLine = sCmdLine.replace('@BUILD_BINARIES@', oBuild.sBinaries);
761 sCmdLine = sCmdLine.strip();
762 if oTestEx.cGangMembers > 1:
763 sCmdLine += ' ' + self._composeGangArguments(idTestSet);
764
765 cSecTimeout = oTestEx.cSecTimeout if oTestEx.cSecTimeout is not None else oTestEx.oTestCase.cSecTimeout;
766 cSecTimeout = cSecTimeout * oTestBox.pctScaleTimeout / 100;
767
768 dResponse = \
769 {
770 constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_EXEC,
771 constants.tbresp.EXEC_PARAM_RESULT_ID: idTestSet,
772 constants.tbresp.EXEC_PARAM_SCRIPT_ZIPS: sScriptZips,
773 constants.tbresp.EXEC_PARAM_SCRIPT_CMD_LINE: sCmdLine,
774 constants.tbresp.EXEC_PARAM_TIMEOUT: cSecTimeout,
775 };
776 return dResponse;
777
778 @staticmethod
779 def composeExecResponse(oDb, idTestSet, sBaseUrl, iVerbosity = 0):
780 """
781 Composes an EXEC response for a gang member (other than the last).
782 Returns a EXEC response or raises an exception (DB/input error).
783 """
784 #
785 # Gather the necessary data.
786 #
787 oTestSet = TestSetData().initFromDbWithId(oDb, idTestSet);
788 oTestBox = TestBoxData().initFromDbWithGenId(oDb, oTestSet.idGenTestBox);
789 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(oDb, oTestSet.idGenTestCaseArgs,
790 tsConfigEff = oTestSet.tsConfig,
791 tsRsrcEff = oTestSet.tsConfig);
792 oBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuild);
793 oValidationKitBuild = None;
794 if oTestSet.idBuildTestSuite is not None:
795 oValidationKitBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuildTestSuite);
796
797 #
798 # Instantiate the specified scheduler and let it do the rest.
799 #
800 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestSet.idSchedGroup, oTestSet.tsCreated);
801 assert oSchedGrpData.fEnabled is True;
802 assert oSchedGrpData.idBuildSrc is not None;
803 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
804
805 return oScheduler.composeExecResponseWorker(idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl);
806
807
808 def _updateTask(self, oTask, tsNow):
809 """
810 Updates a gang schedule task.
811 """
812 assert oTask.cMissingGangMembers >= 1;
813 assert oTask.idTestSetGangLeader is not None;
814 assert oTask.idTestSetGangLeader >= 1;
815 if tsNow is not None:
816 self._oDb.execute('UPDATE SchedQueues\n'
817 ' SET idTestSetGangLeader = %s,\n'
818 ' cMissingGangMembers = %s,\n'
819 ' tsLastScheduled = %s\n'
820 'WHERE idItem = %s\n'
821 , (oTask.idTestSetGangLeader, oTask.cMissingGangMembers, tsNow, oTask.idItem,) );
822 else:
823 self._oDb.execute('UPDATE SchedQueues\n'
824 ' SET cMissingGangMembers = %s\n'
825 'WHERE idItem = %s\n'
826 , (oTask.cMissingGangMembers, oTask.idItem,) );
827 return True;
828
829 def _moveTaskToEndOfQueue(self, oTask, cGangMembers, tsNow):
830 """
831 The task has been scheduled successfully, reset it's data move it to
832 the end of the queue.
833 """
834 if cGangMembers > 1:
835 self._oDb.execute('UPDATE SchedQueues\n'
836 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
837 ' idTestSetGangLeader = NULL,\n'
838 ' cMissingGangMembers = %s\n'
839 'WHERE idItem = %s\n'
840 , (cGangMembers, oTask.idItem,) );
841 else:
842 self._oDb.execute('UPDATE SchedQueues\n'
843 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
844 ' idTestSetGangLeader = NULL,\n'
845 ' cMissingGangMembers = 1,\n'
846 ' tsLastScheduled = %s\n'
847 'WHERE idItem = %s\n'
848 , (tsNow, oTask.idItem,) );
849 return True;
850
851
852
853
854 def _createTestSet(self, oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow):
855 # type: (SchedQueueData, TestCaseArgsDataEx, TestBoxData, BuildDataEx, BuildDataEx, datetime.datetime) -> int
856 """
857 Creates a test set for using the given data.
858 Will not commit, someone up the callstack will that later on.
859
860 Returns the test set ID, may raise an exception on database error.
861 """
862 # Lazy bird doesn't want to write testset.py and does it all here.
863
864 #
865 # We're getting the TestSet ID first in order to include it in the base
866 # file name (that way we can directly relate files on the disk to the
867 # test set when doing batch work), and also for idTesetSetGangLeader.
868 #
869 self._oDb.execute('SELECT NEXTVAL(\'TestSetIdSeq\')');
870 idTestSet = self._oDb.fetchOne()[0];
871
872 sBaseFilename = '%04d/%02d/%02d/%02d/TestSet-%s' \
873 % (tsNow.year, tsNow.month, tsNow.day, (tsNow.hour / 6) * 6, idTestSet);
874
875 #
876 # Gang scheduling parameters. Changes the oTask data for updating by caller.
877 #
878 iGangMemberNo = 0;
879
880 if oTestEx.cGangMembers <= 1:
881 assert oTask.idTestSetGangLeader is None;
882 assert oTask.cMissingGangMembers <= 1;
883 elif oTask.idTestSetGangLeader is None:
884 assert oTask.cMissingGangMembers == oTestEx.cGangMembers;
885 oTask.cMissingGangMembers = oTestEx.cGangMembers - 1;
886 oTask.idTestSetGangLeader = idTestSet;
887 else:
888 assert oTask.cMissingGangMembers > 0 and oTask.cMissingGangMembers < oTestEx.cGangMembers;
889 oTask.cMissingGangMembers -= 1;
890
891 #
892 # Do the database stuff.
893 #
894 self._oDb.execute('INSERT INTO TestSets (\n'
895 ' idTestSet,\n'
896 ' tsConfig,\n'
897 ' tsCreated,\n'
898 ' idBuild,\n'
899 ' idBuildCategory,\n'
900 ' idBuildTestSuite,\n'
901 ' idGenTestBox,\n'
902 ' idTestBox,\n'
903 ' idSchedGroup,\n'
904 ' idTestGroup,\n'
905 ' idGenTestCase,\n'
906 ' idTestCase,\n'
907 ' idGenTestCaseArgs,\n'
908 ' idTestCaseArgs,\n'
909 ' sBaseFilename,\n'
910 ' iGangMemberNo,\n'
911 ' idTestSetGangLeader )\n'
912 'VALUES ( %s,\n' # idTestSet
913 ' %s,\n' # tsConfig
914 ' %s,\n' # tsCreated
915 ' %s,\n' # idBuild
916 ' %s,\n' # idBuildCategory
917 ' %s,\n' # idBuildTestSuite
918 ' %s,\n' # idGenTestBox
919 ' %s,\n' # idTestBox
920 ' %s,\n' # idSchedGroup
921 ' %s,\n' # idTestGroup
922 ' %s,\n' # idGenTestCase
923 ' %s,\n' # idTestCase
924 ' %s,\n' # idGenTestCaseArgs
925 ' %s,\n' # idTestCaseArgs
926 ' %s,\n' # sBaseFilename
927 ' %s,\n' # iGangMemberNo
928 ' %s)\n' # idTestSetGangLeader
929 , ( idTestSet,
930 oTask.tsConfig,
931 tsNow,
932 oBuild.idBuild,
933 oBuild.idBuildCategory,
934 oValidationKitBuild.idBuild if oValidationKitBuild is not None else None,
935 oTestBoxData.idGenTestBox,
936 oTestBoxData.idTestBox,
937 oTask.idSchedGroup,
938 oTask.idTestGroup,
939 oTestEx.oTestCase.idGenTestCase,
940 oTestEx.oTestCase.idTestCase,
941 oTestEx.idGenTestCaseArgs,
942 oTestEx.idTestCaseArgs,
943 sBaseFilename,
944 iGangMemberNo,
945 oTask.idTestSetGangLeader,
946 ));
947
948 self._oDb.execute('INSERT INTO TestResults (\n'
949 ' idTestResultParent,\n'
950 ' idTestSet,\n'
951 ' tsCreated,\n'
952 ' idStrName,\n'
953 ' cErrors,\n'
954 ' enmStatus,\n'
955 ' iNestingDepth)\n'
956 'VALUES ( NULL,\n' # idTestResultParent
957 ' %s,\n' # idTestSet
958 ' %s,\n' # tsCreated
959 ' 0,\n' # idStrName
960 ' 0,\n' # cErrors
961 ' \'running\'::TestStatus_T,\n'
962 ' 0)\n' # iNestingDepth
963 'RETURNING idTestResult'
964 , ( idTestSet, tsNow, ));
965 idTestResult = self._oDb.fetchOne()[0];
966
967 self._oDb.execute('UPDATE TestSets\n'
968 ' SET idTestResult = %s\n'
969 'WHERE idTestSet = %s\n'
970 , (idTestResult, idTestSet, ));
971
972 return idTestSet;
973
974 def _tryFindValidationKitBit(self, oTestBoxData, tsNow):
975 """
976 Tries to find the most recent validation kit build suitable for the given testbox.
977 Returns BuildDataEx or None. Raise exception on database error.
978
979 Can be overridden by child classes to change the default build requirements.
980 """
981 oBuildLogic = BuildLogic(self._oDb);
982 oBuildSource = BuildSourceData().initFromDbWithId(self._oDb, self._oSchedGrpData.idBuildSrcTestSuite, tsNow);
983 oCursor = BuildSourceLogic(self._oDb).openBuildCursor(oBuildSource, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
984 for _ in range(oCursor.getRowCount()):
985 oBuild = BuildDataEx().initFromDbRow(oCursor.fetchOne());
986 if not oBuildLogic.isBuildBlacklisted(oBuild):
987 return oBuild;
988 return None;
989
990 def _tryFindBuild(self, oTask, oTestEx, oTestBoxData, tsNow):
991 """
992 Tries to find a fitting build.
993 Returns BuildDataEx or None. Raise exception on database error.
994
995 Can be overridden by child classes to change the default build requirements.
996 """
997
998 #
999 # Gather the set of prerequisites we have and turn them into a value
1000 # set for use in the loop below.
1001 #
1002 # Note! We're scheduling on testcase level and ignoring argument variation
1003 # selections in TestGroupMembers is intentional.
1004 #
1005 dPreReqs = {};
1006
1007 # Direct prerequisites. We assume they're all enabled as this can be
1008 # checked at queue creation time.
1009 for oPreReq in oTestEx.aoTestCasePreReqs:
1010 dPreReqs[oPreReq.idTestCase] = 1;
1011
1012 # Testgroup dependencies from the scheduling group config.
1013 if oTask.aidTestGroupPreReqs is not None:
1014 for iTestGroup in oTask.aidTestGroupPreReqs:
1015 # Make sure the _active_ test group members are in the cache.
1016 if iTestGroup not in self.dTestGroupMembers:
1017 self._oDb.execute('SELECT DISTINCT TestGroupMembers.idTestCase\n'
1018 'FROM TestGroupMembers, TestCases\n'
1019 'WHERE TestGroupMembers.idTestGroup = %s\n'
1020 ' AND TestGroupMembers.tsExpire > %s\n'
1021 ' AND TestGroupMembers.tsEffective <= %s\n'
1022 ' AND TestCases.idTestCase = TestGroupMembers.idTestCase\n'
1023 ' AND TestCases.tsExpire > %s\n'
1024 ' AND TestCases.tsEffective <= %s\n'
1025 ' AND TestCases.fEnabled is TRUE\n'
1026 , (iTestGroup, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig,));
1027 aidTestCases = [];
1028 for aoRow in self._oDb.fetchAll():
1029 aidTestCases.append(aoRow[0]);
1030 self.dTestGroupMembers[iTestGroup] = aidTestCases;
1031
1032 # Add the testgroup members to the prerequisites.
1033 for idTestCase in self.dTestGroupMembers[iTestGroup]:
1034 dPreReqs[idTestCase] = 1;
1035
1036 # Create a SQL values table out of them.
1037 sPreReqSet = ''
1038 if dPreReqs:
1039 for idPreReq in sorted(dPreReqs):
1040 sPreReqSet += ', (' + str(idPreReq) + ')';
1041 sPreReqSet = sPreReqSet[2:]; # drop the leading ', '.
1042
1043 #
1044 # Try the builds.
1045 #
1046 self.oBuildCache.setupSource(self._oDb, self._oSchedGrpData.idBuildSrc, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1047 for oEntry in self.oBuildCache:
1048 #
1049 # Check build requirements set by the test.
1050 #
1051 if not oTestEx.matchesBuildProps(oEntry.oBuild):
1052 continue;
1053
1054 if oEntry.isBlacklisted(self._oDb):
1055 oEntry.remove();
1056 continue;
1057
1058 #
1059 # Check prerequisites. The default scheduler is satisfied if one
1060 # argument variation has been executed successfully. It is not
1061 # satisfied if there are any failure runs.
1062 #
1063 if sPreReqSet:
1064 fDecision = oEntry.getPreReqDecision(sPreReqSet);
1065 if fDecision is None:
1066 # Check for missing prereqs.
1067 self._oDb.execute('SELECT COUNT(*)\n'
1068 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase)\n'
1069 'LEFT OUTER JOIN (SELECT idTestSet\n'
1070 ' FROM TestSets\n'
1071 ' WHERE enmStatus IN (%s, %s)\n'
1072 ' AND idBuild = %s\n'
1073 ' ) AS TestSets\n'
1074 ' ON (PreReqs.idTestCase = TestSets.idTestCase)\n'
1075 'WHERE TestSets.idTestSet is NULL\n'
1076 , ( TestSetData.ksTestStatus_Success, TestSetData.ksTestStatus_Skipped,
1077 oEntry.oBuild.idBuild, ));
1078 cMissingPreReqs = self._oDb.fetchOne()[0];
1079 if cMissingPreReqs > 0:
1080 self.dprint('build %s is missing %u prerequisites (out of %s)'
1081 % (oEntry.oBuild.idBuild, cMissingPreReqs, sPreReqSet,));
1082 oEntry.setPreReqDecision(sPreReqSet, False);
1083 continue;
1084
1085 # Check for failed prereq runs.
1086 self._oDb.execute('SELECT COUNT(*)\n'
1087 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase),\n'
1088 ' TestSets\n'
1089 'WHERE PreReqs.idTestCase = TestSets.idTestCase\n'
1090 ' AND TestSets.idBuild = %s\n'
1091 ' AND TestSets.enmStatus IN (%s, %s, %s)\n'
1092 , ( oEntry.oBuild.idBuild,
1093 TestSetData.ksTestStatus_Failure,
1094 TestSetData.ksTestStatus_TimedOut,
1095 TestSetData.ksTestStatus_Rebooted,
1096 )
1097 );
1098 cFailedPreReqs = self._oDb.fetchOne()[0];
1099 if cFailedPreReqs > 0:
1100 self.dprint('build %s is has %u prerequisite failures (out of %s)'
1101 % (oEntry.oBuild.idBuild, cFailedPreReqs, sPreReqSet,));
1102 oEntry.setPreReqDecision(sPreReqSet, False);
1103 continue;
1104
1105 oEntry.setPreReqDecision(sPreReqSet, True);
1106 elif not fDecision:
1107 continue;
1108
1109 #
1110 # If we can, check if the build files still exist.
1111 #
1112 if oEntry.oBuild.areFilesStillThere() is False:
1113 self.dprint('build %s no longer exists' % (oEntry.oBuild.idBuild,));
1114 oEntry.remove();
1115 continue;
1116
1117 self.dprint('found oBuild=%s' % (oEntry.oBuild,));
1118 return oEntry.oBuild;
1119 return None;
1120
1121 def _tryFindMatchingBuild(self, oLeaderBuild, oTestBoxData, idBuildSrc):
1122 """
1123 Tries to find a matching build for gang scheduling.
1124 Returns BuildDataEx or None. Raise exception on database error.
1125
1126 Can be overridden by child classes to change the default build requirements.
1127 """
1128 #
1129 # Note! Should probably check build prerequisites if we get a different
1130 # build back, so that we don't use a build which hasn't passed
1131 # the smoke test.
1132 #
1133 _ = idBuildSrc;
1134 return BuildLogic(self._oDb).tryFindSameBuildForOsArch(oLeaderBuild, oTestBoxData.sOs, oTestBoxData.sCpuArch);
1135
1136
1137 def _tryAsLeader(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1138 """
1139 Try schedule the task as a gang leader (can be a gang of one).
1140 Returns response or None. May raise exception on DB error.
1141 """
1142
1143 # We don't wait for busy resources, we just try the next test.
1144 oTestArgsLogic = TestCaseArgsLogic(self._oDb);
1145 if not oTestArgsLogic.areResourcesFree(oTestEx):
1146 self.dprint('Cannot get global test resources!');
1147 return None;
1148
1149 #
1150 # Find a matching build (this is the difficult bit).
1151 #
1152 oBuild = self._tryFindBuild(oTask, oTestEx, oTestBoxData, tsNow);
1153 if oBuild is None:
1154 self.dprint('No build!');
1155 return None;
1156 if oTestEx.oTestCase.needValidationKitBit():
1157 oValidationKitBuild = self._tryFindValidationKitBit(oTestBoxData, tsNow);
1158 if oValidationKitBuild is None:
1159 self.dprint('No validation kit build!');
1160 return None;
1161 else:
1162 oValidationKitBuild = None;
1163
1164 #
1165 # Create a testset, allocate the resources and update the state.
1166 # Note! Since resource allocation may still fail, we create a nested
1167 # transaction so we can roll back. (Heed lock warning in docs!)
1168 #
1169 self._oDb.execute('SAVEPOINT tryAsLeader');
1170 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1171
1172 if GlobalResourceLogic(self._oDb).allocateResources(oTestBoxData.idTestBox, oTestEx.aoGlobalRsrc, fCommit = False) \
1173 is not True:
1174 self._oDb.execute('ROLLBACK TO SAVEPOINT tryAsLeader');
1175 self.dprint('Failed to allocate global resources!');
1176 return False;
1177
1178 if oTestEx.cGangMembers <= 1:
1179 # We're alone, put the task back at the end of the queue and issue EXEC cmd.
1180 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1181 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1182 sTBState = TestBoxStatusData.ksTestBoxState_Testing;
1183 else:
1184 # We're missing gang members, issue WAIT cmd.
1185 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1186 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1187 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1188
1189 TestBoxStatusLogic(self._oDb).updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1190 self._oDb.execute('RELEASE SAVEPOINT tryAsLeader');
1191 return dResponse;
1192
1193 def _tryAsGangMember(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1194 """
1195 Try schedule the task as a gang member.
1196 Returns response or None. May raise exception on DB error.
1197 """
1198
1199 #
1200 # The leader has choosen a build, we need to find a matching one for our platform.
1201 # (It's up to the scheduler decide upon how strict dependencies are to be enforced
1202 # upon subordinate group members.)
1203 #
1204 oLeaderTestSet = TestSetData().initFromDbWithId(self._oDb, oTestBoxData.idTestSetGangLeader);
1205
1206 oLeaderBuild = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuild);
1207 oBuild = self._tryFindMatchingBuild(oLeaderBuild, oTestBoxData, self._oSchedGrpData.idBuildSrc);
1208 if oBuild is None:
1209 return None;
1210
1211 oValidationKitBuild = None;
1212 if oLeaderTestSet.idBuildTestSuite is not None:
1213 oLeaderValidationKitBit = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuildTestSuite);
1214 oValidationKitBuild = self._tryFindMatchingBuild(oLeaderValidationKitBit, oTestBoxData,
1215 self._oSchedGrpData.idBuildSrcTestSuite);
1216
1217 #
1218 # Create a testset and update the state(s).
1219 #
1220 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1221
1222 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
1223 if oTask.cMissingGangMembers < 1:
1224 # The whole gang is there, move the task to the end of the queue
1225 # and update the status on the other gang members.
1226 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1227 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1228 sTBState = TestBoxStatusData.ksTestBoxState_GangTesting;
1229 oTBStatusLogic.updateGangStatus(oTask.idTestSetGangLeader, sTBState, fCommit = False);
1230 else:
1231 # We're still missing some gang members, issue WAIT cmd.
1232 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1233 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1234 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1235
1236 oTBStatusLogic.updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1237 return dResponse;
1238
1239
1240 def scheduleNewTaskWorker(self, oTestBoxData, tsNow, sBaseUrl):
1241 """
1242 Worker for schduling a new task.
1243 """
1244
1245 #
1246 # Iterate the scheduler queue (fetch all to avoid having to concurrent
1247 # queries), trying out each task to see if the testbox can execute it.
1248 #
1249 dRejected = {}; # variations we've already checked out and rejected.
1250 self._oDb.execute('SELECT *\n'
1251 'FROM SchedQueues\n'
1252 'WHERE idSchedGroup = %s\n'
1253 ' AND ( bmHourlySchedule IS NULL\n'
1254 ' OR get_bit(bmHourlySchedule, %s) = 1 )\n'
1255 'ORDER BY idItem ASC\n'
1256 , (self._oSchedGrpData.idSchedGroup, utils.getLocalHourOfWeek()) );
1257 aaoRows = self._oDb.fetchAll();
1258 for aoRow in aaoRows:
1259 # Don't loop forever.
1260 if self.getElapsedSecs() >= config.g_kcSecMaxNewTask:
1261 break;
1262
1263 # Unpack the data and check if we've rejected the testcasevar/group variation already (they repeat).
1264 oTask = SchedQueueData().initFromDbRow(aoRow);
1265 if config.g_kfSrvGlueDebugScheduler:
1266 self.dprint('** Considering: idItem=%s idGenTestCaseArgs=%s idTestGroup=%s Deps=%s last=%s cfg=%s\n'
1267 % ( oTask.idItem, oTask.idGenTestCaseArgs, oTask.idTestGroup, oTask.aidTestGroupPreReqs,
1268 oTask.tsLastScheduled, oTask.tsConfig,));
1269
1270 sRejectNm = '%s:%s' % (oTask.idGenTestCaseArgs, oTask.idTestGroup,);
1271 if sRejectNm in dRejected:
1272 self.dprint('Duplicate, already rejected! (%s)' % (sRejectNm,));
1273 continue;
1274 dRejected[sRejectNm] = 1;
1275
1276 # Fetch all the test case info (too much, but who cares right now).
1277 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(self._oDb, oTask.idGenTestCaseArgs,
1278 tsConfigEff = oTask.tsConfig,
1279 tsRsrcEff = oTask.tsConfig);
1280 if config.g_kfSrvGlueDebugScheduler:
1281 self.dprint('TestCase "%s": %s %s' % (oTestEx.oTestCase.sName, oTestEx.oTestCase.sBaseCmd, oTestEx.sArgs,));
1282
1283 # This shouldn't happen, but just in case it does...
1284 if oTestEx.oTestCase.fEnabled is not True:
1285 self.dprint('Testcase is not enabled!!');
1286 continue;
1287
1288 # Check if the testbox properties matches the test.
1289 if not oTestEx.matchesTestBoxProps(oTestBoxData):
1290 self.dprint('Testbox mismatch!');
1291 continue;
1292
1293 # Try schedule it.
1294 if oTask.idTestSetGangLeader is None or oTestEx.cGangMembers <= 1:
1295 dResponse = self._tryAsLeader(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1296 elif oTask.cMissingGangMembers > 1:
1297 dResponse = self._tryAsGangMember(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1298 else:
1299 dResponse = None; # Shouldn't happen!
1300 if dResponse is not None:
1301 self.dprint('Found a task! dResponse=%s' % (dResponse,));
1302 return dResponse;
1303
1304 # Found no suitable task.
1305 return None;
1306
1307 @staticmethod
1308 def _pickSchedGroup(oTestBoxDataEx, iWorkItem, dIgnoreSchedGroupIds):
1309 """
1310 Picks the next scheduling group for the given testbox.
1311 """
1312 if len(oTestBoxDataEx.aoInSchedGroups) == 1:
1313 oSchedGroup = oTestBoxDataEx.aoInSchedGroups[0].oSchedGroup;
1314 if oSchedGroup.fEnabled \
1315 and oSchedGroup.idBuildSrc is not None \
1316 and oSchedGroup.idSchedGroup not in dIgnoreSchedGroupIds:
1317 return (oSchedGroup, 0);
1318 iWorkItem = 0;
1319
1320 elif oTestBoxDataEx.aoInSchedGroups:
1321 # Construct priority table of currently enabled scheduling groups.
1322 aaoList1 = [];
1323 for oInGroup in oTestBoxDataEx.aoInSchedGroups:
1324 oSchedGroup = oInGroup.oSchedGroup;
1325 if oSchedGroup.fEnabled and oSchedGroup.idBuildSrc is not None:
1326 iSchedPriority = oInGroup.iSchedPriority;
1327 if iSchedPriority > 31: # paranoia
1328 iSchedPriority = 31;
1329 elif iSchedPriority < 0: # paranoia
1330 iSchedPriority = 0;
1331
1332 for iSchedPriority in xrange(min(iSchedPriority, len(aaoList1))):
1333 aaoList1[iSchedPriority].append(oSchedGroup);
1334 while len(aaoList1) <= iSchedPriority:
1335 aaoList1.append([oSchedGroup,]);
1336
1337 # Flatten it into a single list, mixing the priorities a little so it doesn't
1338 # take forever before low priority stuff is executed.
1339 aoFlat = [];
1340 iLo = 0;
1341 iHi = len(aaoList1) - 1;
1342 while iHi >= iLo:
1343 aoFlat += aaoList1[iHi];
1344 if iLo < iHi:
1345 aoFlat += aaoList1[iLo];
1346 iLo += 1;
1347 iHi -= 1;
1348
1349 # Pick the next one.
1350 cLeft = len(aoFlat);
1351 while cLeft > 0:
1352 cLeft -= 1;
1353 iWorkItem += 1;
1354 if iWorkItem >= len(aoFlat) or iWorkItem < 0:
1355 iWorkItem = 0;
1356 if aoFlat[iWorkItem].idSchedGroup not in dIgnoreSchedGroupIds:
1357 return (aoFlat[iWorkItem], iWorkItem);
1358 else:
1359 iWorkItem = 0;
1360
1361 # No active group.
1362 return (None, iWorkItem);
1363
1364 @staticmethod
1365 def scheduleNewTask(oDb, oTestBoxData, iWorkItem, sBaseUrl, iVerbosity = 0):
1366 # type: (TMDatabaseConnection, TestBoxData, int, str, int) -> None
1367 """
1368 Schedules a new task for a testbox.
1369 """
1370 oTBStatusLogic = TestBoxStatusLogic(oDb);
1371
1372 try:
1373 #
1374 # To avoid concurrency issues in SchedQueues we lock all the rows
1375 # related to our scheduling queue. Also, since this is a very
1376 # expensive operation we lock the testbox status row to fend of
1377 # repeated retires by faulty testbox scripts.
1378 #
1379 tsSecStart = utils.timestampSecond();
1380 oDb.rollback();
1381 oDb.begin();
1382 oDb.execute('SELECT idTestBox FROM TestBoxStatuses WHERE idTestBox = %s FOR UPDATE NOWAIT'
1383 % (oTestBoxData.idTestBox,));
1384 oDb.execute('SELECT SchedQueues.idSchedGroup\n'
1385 ' FROM SchedQueues, TestBoxesInSchedGroups\n'
1386 'WHERE TestBoxesInSchedGroups.idTestBox = %s\n'
1387 ' AND TestBoxesInSchedGroups.tsExpire = \'infinity\'::TIMESTAMP\n'
1388 ' AND TestBoxesInSchedGroups.idSchedGroup = SchedQueues.idSchedGroup\n'
1389 ' FOR UPDATE'
1390 % (oTestBoxData.idTestBox,));
1391
1392 # We need the current timestamp.
1393 tsNow = oDb.getCurrentTimestamp();
1394
1395 # Re-read the testbox data with scheduling group relations.
1396 oTestBoxDataEx = TestBoxDataEx().initFromDbWithId(oDb, oTestBoxData.idTestBox, tsNow);
1397 if oTestBoxDataEx.fEnabled \
1398 and oTestBoxDataEx.idGenTestBox == oTestBoxData.idGenTestBox:
1399
1400 # We may have to skip scheduling groups that are out of work (e.g. 'No build').
1401 iInitialWorkItem = iWorkItem;
1402 dIgnoreSchedGroupIds = {};
1403 while True:
1404 # Now, pick the scheduling group.
1405 (oSchedGroup, iWorkItem) = SchedulerBase._pickSchedGroup(oTestBoxDataEx, iWorkItem, dIgnoreSchedGroupIds);
1406 if oSchedGroup is None:
1407 break;
1408 assert oSchedGroup.fEnabled and oSchedGroup.idBuildSrc is not None;
1409
1410 # Instantiate the specified scheduler and let it do the rest.
1411 oScheduler = SchedulerBase._instantiate(oDb, oSchedGroup, iVerbosity, tsSecStart);
1412 dResponse = oScheduler.scheduleNewTaskWorker(oTestBoxDataEx, tsNow, sBaseUrl);
1413 if dResponse is not None:
1414 oTBStatusLogic.updateWorkItem(oTestBoxDataEx.idTestBox, iWorkItem);
1415 oDb.commit();
1416 return dResponse;
1417
1418 # Check out the next work item?
1419 if oScheduler.getElapsedSecs() > config.g_kcSecMaxNewTask:
1420 break;
1421 dIgnoreSchedGroupIds[oSchedGroup.idSchedGroup] = oSchedGroup;
1422
1423 # No luck, but best if we update the work item if we've made progress.
1424 # Note! In case of a config.g_kcSecMaxNewTask timeout, this may accidentally skip
1425 # a work item with actually work to do. But that's a small price to pay.
1426 if iWorkItem != iInitialWorkItem:
1427 oTBStatusLogic.updateWorkItem(oTestBoxDataEx.idTestBox, iWorkItem);
1428 oDb.commit();
1429 return None;
1430 except:
1431 oDb.rollback();
1432 raise;
1433
1434 # Not enabled, rollback and return no task.
1435 oDb.rollback();
1436 return None;
1437
1438 @staticmethod
1439 def tryCancelGangGathering(oDb, oStatusData):
1440 """
1441 Try canceling a gang gathering.
1442
1443 Returns True if successfully cancelled.
1444 Returns False if not (someone raced us to the SchedQueue table).
1445
1446 Note! oStatusData is re-initialized.
1447 """
1448 assert oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering;
1449 try:
1450 #
1451 # Lock the tables we're updating so we don't run into concurrency
1452 # issues (we're racing both scheduleNewTask and other callers of
1453 # this method).
1454 #
1455 oDb.rollback();
1456 oDb.begin();
1457 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
1458
1459 #
1460 # Re-read the testbox data and check that we're still in the same state.
1461 #
1462 oStatusData.initFromDbWithId(oDb, oStatusData.idTestBox);
1463 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering:
1464 #
1465 # Get the leader thru the test set and change the state of the whole gang.
1466 #
1467 oTestSetData = TestSetData().initFromDbWithId(oDb, oStatusData.idTestSet);
1468
1469 oTBStatusLogic = TestBoxStatusLogic(oDb);
1470 oTBStatusLogic.updateGangStatus(oTestSetData.idTestSetGangLeader,
1471 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
1472 fCommit = False);
1473
1474 #
1475 # Move the scheduling queue item to the end.
1476 #
1477 oDb.execute('SELECT *\n'
1478 'FROM SchedQueues\n'
1479 'WHERE idTestSetGangLeader = %s\n'
1480 , (oTestSetData.idTestSetGangLeader,) );
1481 oTask = SchedQueueData().initFromDbRow(oDb.fetchOne());
1482 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(oDb, oTask.idGenTestCaseArgs,
1483 tsConfigEff = oTask.tsConfig,
1484 tsRsrcEff = oTask.tsConfig);
1485 oDb.execute('UPDATE SchedQueues\n'
1486 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
1487 ' idTestSetGangLeader = NULL,\n'
1488 ' cMissingGangMembers = %s\n'
1489 'WHERE idItem = %s\n'
1490 , (oTestEx.cGangMembers, oTask.idItem,) );
1491
1492 oDb.commit();
1493 return True;
1494
1495 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut:
1496 oDb.rollback();
1497 return True;
1498 except:
1499 oDb.rollback();
1500 raise;
1501
1502 # Not enabled, rollback and return no task.
1503 oDb.rollback();
1504 return False;
1505
1506
1507#
1508# Unit testing.
1509#
1510
1511# pylint: disable=missing-docstring
1512class SchedQueueDataTestCase(ModelDataBaseTestCase):
1513 def setUp(self):
1514 self.aoSamples = [SchedQueueData(),];
1515
1516if __name__ == '__main__':
1517 unittest.main();
1518 # not reached.
1519
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette