VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 90594

Last change on this file since 90594 was 90594, checked in by vboxsync, 3 years ago

ValKit: More Python 3.9 API changes needed (Thread.isAlive() -> is_alive()) bugref:10079

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 86.6 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 90594 2021-08-10 12:15:27Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2020 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 90594 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 return abStr.tostring().decode('utf_8');
75 except:
76 reporter.errorXcpt('getSZ(,%u)' % (off));
77 return sDefault;
78
79def getSZLen(abData, off):
80 """
81 Get the length of a zero-terminated string field, in bytes.
82 Returns -1 if off is beyond the data packet or not properly terminated.
83 """
84 cbData = len(abData);
85 if off >= cbData:
86 return -1;
87
88 offCur = off;
89 while abData[offCur] != 0:
90 offCur = offCur + 1;
91 if offCur >= cbData:
92 return -1;
93
94 return offCur - off;
95
96def isValidOpcodeEncoding(sOpcode):
97 """
98 Checks if the specified opcode is valid or not.
99 Returns True on success.
100 Returns False if it is invalid, details in the log.
101 """
102 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
103 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
104 if len(sOpcode) != 8:
105 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
106 return False;
107 for i in range(0, 1):
108 if sSet1.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 for i in range(2, 7):
112 if sSet2.find(sOpcode[i]) < 0:
113 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
114 return False;
115 return True;
116
117#
118# Helper for encoding data sent to the TXS.
119#
120
121def u32ToByteArray(u32):
122 """Encodes the u32 value as a little endian byte (B) array."""
123 return array.array('B',
124 ( u32 % 256,
125 (u32 // 256) % 256,
126 (u32 // 65536) % 256,
127 (u32 // 16777216) % 256) );
128
129def escapeString(sString):
130 """
131 Does $ escaping of the string so TXS doesn't try do variable expansion.
132 """
133 return sString.replace('$', '$$');
134
135
136
137class TransportBase(object):
138 """
139 Base class for the transport layer.
140 """
141
142 def __init__(self, sCaller):
143 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
144 self.fDummy = 0;
145 self.abReadAheadHdr = array.array('B');
146
147 def toString(self):
148 """
149 Stringify the instance for logging and debugging.
150 """
151 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
152
153 def __str__(self):
154 return self.toString();
155
156 def cancelConnect(self):
157 """
158 Cancels any pending connect() call.
159 Returns None;
160 """
161 return None;
162
163 def connect(self, cMsTimeout):
164 """
165 Quietly attempts to connect to the TXS.
166
167 Returns True on success.
168 Returns False on retryable errors (no logging).
169 Returns None on fatal errors with details in the log.
170
171 Override this method, don't call super.
172 """
173 _ = cMsTimeout;
174 return False;
175
176 def disconnect(self, fQuiet = False):
177 """
178 Disconnect from the TXS.
179
180 Returns True.
181
182 Override this method, don't call super.
183 """
184 _ = fQuiet;
185 return True;
186
187 def sendBytes(self, abBuf, cMsTimeout):
188 """
189 Sends the bytes in the buffer abBuf to the TXS.
190
191 Returns True on success.
192 Returns False on failure and error details in the log.
193
194 Override this method, don't call super.
195
196 Remarks: len(abBuf) is always a multiple of 16.
197 """
198 _ = abBuf; _ = cMsTimeout;
199 return False;
200
201 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
202 """
203 Receive cb number of bytes from the TXS.
204
205 Returns the bytes (array('B')) on success.
206 Returns None on failure and error details in the log.
207
208 Override this method, don't call super.
209
210 Remarks: cb is always a multiple of 16.
211 """
212 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
213 return None;
214
215 def isConnectionOk(self):
216 """
217 Checks if the connection is OK.
218
219 Returns True if it is.
220 Returns False if it isn't (caller should call diconnect).
221
222 Override this method, don't call super.
223 """
224 return True;
225
226 def isRecvPending(self, cMsTimeout = 0):
227 """
228 Checks if there is incoming bytes, optionally waiting cMsTimeout
229 milliseconds for something to arrive.
230
231 Returns True if there is, False if there isn't.
232
233 Override this method, don't call super.
234 """
235 _ = cMsTimeout;
236 return False;
237
238 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
239 """
240 Sends a message (opcode + encoded payload).
241
242 Returns True on success.
243 Returns False on failure and error details in the log.
244 """
245 # Fix + check the opcode.
246 if len(sOpcode) < 2:
247 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
248 return False;
249 sOpcode = sOpcode.ljust(8);
250 if not isValidOpcodeEncoding(sOpcode):
251 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
252 return False;
253
254 # Start construct the message.
255 cbMsg = 16 + len(abPayload);
256 abMsg = array.array('B');
257 abMsg.extend(u32ToByteArray(cbMsg));
258 abMsg.extend((0, 0, 0, 0)); # uCrc32
259 try:
260 abMsg.extend(array.array('B', \
261 ( ord(sOpcode[0]), \
262 ord(sOpcode[1]), \
263 ord(sOpcode[2]), \
264 ord(sOpcode[3]), \
265 ord(sOpcode[4]), \
266 ord(sOpcode[5]), \
267 ord(sOpcode[6]), \
268 ord(sOpcode[7]) ) ) );
269 if abPayload:
270 abMsg.extend(abPayload);
271 except:
272 reporter.fatalXcpt('sendMsgInt: packing problem...');
273 return False;
274
275 # checksum it, padd it and send it off.
276 uCrc32 = zlib.crc32(abMsg[8:]);
277 abMsg[4:8] = u32ToByteArray(uCrc32);
278
279 while len(abMsg) % 16:
280 abMsg.append(0);
281
282 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
283 return self.sendBytes(abMsg, cMsTimeout);
284
285 def recvMsg(self, cMsTimeout, fNoDataOk = False):
286 """
287 Receives a message from the TXS.
288
289 Returns the message three-tuple: length, opcode, payload.
290 Returns (None, None, None) on failure and error details in the log.
291 """
292
293 # Read the header.
294 if self.abReadAheadHdr:
295 assert(len(self.abReadAheadHdr) == 16);
296 abHdr = self.abReadAheadHdr;
297 self.abReadAheadHdr = array.array('B');
298 else:
299 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
300 if abHdr is None:
301 return (None, None, None);
302 if len(abHdr) != 16:
303 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
304 return (None, None, None);
305
306 # Unpack and validate the header.
307 cbMsg = getU32(abHdr, 0);
308 uCrc32 = getU32(abHdr, 4);
309 sOpcode = abHdr[8:16].tostring().decode('ascii');
310
311 if cbMsg < 16:
312 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
313 return (None, None, None);
314 if cbMsg > 1024*1024:
315 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
316 return (None, None, None);
317 if not isValidOpcodeEncoding(sOpcode):
318 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
319 return (None, None, None);
320
321 # Get the payload (if any), dropping the padding.
322 abPayload = array.array('B');
323 if cbMsg > 16:
324 if cbMsg % 16:
325 cbPadding = 16 - (cbMsg % 16);
326 else:
327 cbPadding = 0;
328 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
329 if abPayload is None:
330 self.abReadAheadHdr = abHdr;
331 if not fNoDataOk :
332 reporter.log('recvMsg: failed to recv payload bytes!');
333 return (None, None, None);
334
335 while cbPadding > 0:
336 abPayload.pop();
337 cbPadding = cbPadding - 1;
338
339 # Check the CRC-32.
340 if uCrc32 != 0:
341 uActualCrc32 = zlib.crc32(abHdr[8:]);
342 if cbMsg > 16:
343 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
344 uActualCrc32 = uActualCrc32 & 0xffffffff;
345 if uCrc32 != uActualCrc32:
346 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
347 return (None, None, None);
348
349 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
350 return (cbMsg, sOpcode, abPayload);
351
352 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
353 """
354 Sends a message (opcode + payload tuple).
355
356 Returns True on success.
357 Returns False on failure and error details in the log.
358 Returns None if you pass the incorrectly typed parameters.
359 """
360 # Encode the payload.
361 abPayload = array.array('B');
362 for o in aoPayload:
363 try:
364 if utils.isString(o):
365 if sys.version_info[0] >= 3:
366 abPayload.extend(o.encode('utf_8'));
367 else:
368 # the primitive approach...
369 sUtf8 = o.encode('utf_8');
370 for ch in sUtf8:
371 abPayload.append(ord(ch))
372 abPayload.append(0);
373 elif isinstance(o, (long, int)):
374 if o < 0 or o > 0xffffffff:
375 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
376 return None;
377 abPayload.extend(u32ToByteArray(o));
378 elif isinstance(o, array.array):
379 abPayload.extend(o);
380 else:
381 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
382 return None;
383 except:
384 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
385 return None;
386 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
387
388
389class Session(TdTaskBase):
390 """
391 A Test eXecution Service (TXS) client session.
392 """
393
394 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
395 """
396 Construct a TXS session.
397
398 This starts by connecting to the TXS and will enter the signalled state
399 when connected or the timeout has been reached.
400 """
401 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
402 self.oTransport = oTransport;
403 self.sStatus = "";
404 self.cMsTimeout = 0;
405 self.fErr = True; # Whether to report errors as error.
406 self.msStart = 0;
407 self.oThread = None;
408 self.fnTask = self.taskDummy;
409 self.aTaskArgs = None;
410 self.oTaskRc = None;
411 self.t3oReply = (None, None, None);
412 self.fScrewedUpMsgState = False;
413 self.fTryConnect = fTryConnect;
414
415 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
416 raise base.GenError("startTask failed");
417
418 def __del__(self):
419 """Make sure to cancel the task when deleted."""
420 self.cancelTask();
421
422 def toString(self):
423 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
424 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
425 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
426 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
427
428 def taskDummy(self):
429 """Place holder to catch broken state handling."""
430 raise Exception();
431
432 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
433 """
434 Kicks of a new task.
435
436 cMsTimeout: The task timeout in milliseconds. Values less than
437 500 ms will be adjusted to 500 ms. This means it is
438 OK to use negative value.
439 sStatus: The task status.
440 fnTask: The method that'll execute the task.
441 aArgs: Arguments to pass to fnTask.
442
443 Returns True on success, False + error in log on failure.
444 """
445 if not self.cancelTask():
446 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
447 return False;
448
449 # Change status and make sure we're the
450 self.lockTask();
451 if self.sStatus != "":
452 self.unlockTask();
453 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
454 return False;
455 self.sStatus = "setup";
456 self.oTaskRc = None;
457 self.t3oReply = (None, None, None);
458 self.resetTaskLocked();
459 self.unlockTask();
460
461 self.cMsTimeout = max(cMsTimeout, 500);
462 self.fErr = not fIgnoreErrors;
463 self.fnTask = fnTask;
464 self.aTaskArgs = aArgs;
465 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
466 self.oThread.setDaemon(True);
467 self.msStart = base.timestampMilli();
468
469 self.lockTask();
470 self.sStatus = sStatus;
471 self.unlockTask();
472 self.oThread.start();
473
474 return True;
475
476 def cancelTask(self, fSync = True):
477 """
478 Attempts to cancel any pending tasks.
479 Returns success indicator (True/False).
480 """
481 self.lockTask();
482
483 if self.sStatus == "":
484 self.unlockTask();
485 return True;
486 if self.sStatus == "setup":
487 self.unlockTask();
488 return False;
489 if self.sStatus == "cancelled":
490 self.unlockTask();
491 return False;
492
493 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
494 if self.sStatus == 'connecting':
495 self.oTransport.cancelConnect();
496
497 self.sStatus = "cancelled";
498 oThread = self.oThread;
499 self.unlockTask();
500
501 if not fSync:
502 return False;
503
504 oThread.join(61.0);
505
506 if sys.version_info < (3, 9, 0):
507 # Removed since Python 3.9.
508 return oThread.isAlive(); # pylint: disable=no-member
509 return oThread.is_alive();
510
511 def taskThread(self):
512 """
513 The task thread function.
514 This does some housekeeping activities around the real task method call.
515 """
516 if not self.isCancelled():
517 try:
518 fnTask = self.fnTask;
519 oTaskRc = fnTask(*self.aTaskArgs);
520 except:
521 reporter.fatalXcpt('taskThread', 15);
522 oTaskRc = None;
523 else:
524 reporter.log('taskThread: cancelled already');
525
526 self.lockTask();
527
528 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
529 self.oTaskRc = oTaskRc;
530 self.oThread = None;
531 self.sStatus = '';
532 self.signalTaskLocked();
533
534 self.unlockTask();
535 return None;
536
537 def isCancelled(self):
538 """Internal method for checking if the task has been cancelled."""
539 self.lockTask();
540 sStatus = self.sStatus;
541 self.unlockTask();
542 if sStatus == "cancelled":
543 return True;
544 return False;
545
546 def hasTimedOut(self):
547 """Internal method for checking if the task has timed out or not."""
548 cMsLeft = self.getMsLeft();
549 if cMsLeft <= 0:
550 return True;
551 return False;
552
553 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
554 """Gets the time left until the timeout."""
555 cMsElapsed = base.timestampMilli() - self.msStart;
556 if cMsElapsed < 0:
557 return cMsMin;
558 cMsLeft = self.cMsTimeout - cMsElapsed;
559 if cMsLeft <= cMsMin:
560 return cMsMin;
561 if cMsLeft > cMsMax > 0:
562 return cMsMax
563 return cMsLeft;
564
565 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
566 """
567 Wrapper for TransportBase.recvMsg that stashes the response away
568 so the client can inspect it later on.
569 """
570 if cMsTimeout is None:
571 cMsTimeout = self.getMsLeft(500);
572 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
573 self.lockTask();
574 self.t3oReply = (cbMsg, sOpcode, abPayload);
575 self.unlockTask();
576 return (cbMsg, sOpcode, abPayload);
577
578 def recvAck(self, fNoDataOk = False):
579 """
580 Receives an ACK or error response from the TXS.
581
582 Returns True on success.
583 Returns False on timeout or transport error.
584 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
585 and there are always details of some sort or another.
586 """
587 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
588 if cbMsg is None:
589 return False;
590 sOpcode = sOpcode.strip()
591 if sOpcode == "ACK":
592 return True;
593 return (sOpcode, getSZ(abPayload, 0, sOpcode));
594
595 def recvAckLogged(self, sCommand, fNoDataOk = False):
596 """
597 Wrapper for recvAck and logging.
598 Returns True on success (ACK).
599 Returns False on time, transport error and errors signalled by TXS.
600 """
601 rc = self.recvAck(fNoDataOk);
602 if rc is not True and not fNoDataOk:
603 if rc is False:
604 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
605 else:
606 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
607 rc = False;
608 return rc;
609
610 def recvTrueFalse(self, sCommand):
611 """
612 Receives a TRUE/FALSE response from the TXS.
613 Returns True on TRUE, False on FALSE and None on error/other (logged).
614 """
615 cbMsg, sOpcode, abPayload = self.recvReply();
616 if cbMsg is None:
617 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
618 return None;
619
620 sOpcode = sOpcode.strip()
621 if sOpcode == "TRUE":
622 return True;
623 if sOpcode == "FALSE":
624 return False;
625 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
626 return None;
627
628 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
629 """
630 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
631 """
632 if cMsTimeout is None:
633 cMsTimeout = self.getMsLeft(500);
634 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
635
636 def asyncToSync(self, fnAsync, *aArgs):
637 """
638 Wraps an asynchronous task into a synchronous operation.
639
640 Returns False on failure, task return status on success.
641 """
642 rc = fnAsync(*aArgs);
643 if rc is False:
644 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
645 return rc;
646
647 rc = self.waitForTask(self.cMsTimeout + 5000);
648 if rc is False:
649 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
650 self.cancelTask();
651 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
652 return False;
653
654 rc = self.getResult();
655 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
656 return rc;
657
658 #
659 # Connection tasks.
660 #
661
662 def taskConnect(self, cMsIdleFudge):
663 """Tries to connect to the TXS"""
664 while not self.isCancelled():
665 reporter.log2('taskConnect: connecting ...');
666 rc = self.oTransport.connect(self.getMsLeft(500));
667 if rc is True:
668 reporter.log('taskConnect: succeeded');
669 return self.taskGreet(cMsIdleFudge);
670 if rc is None:
671 reporter.log2('taskConnect: unable to connect');
672 return None;
673 if self.hasTimedOut():
674 reporter.log2('taskConnect: timed out');
675 if not self.fTryConnect:
676 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
677 return False;
678 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
679 if not self.fTryConnect:
680 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
681 return False;
682
683 def taskGreet(self, cMsIdleFudge):
684 """Greets the TXS"""
685 rc = self.sendMsg("HOWDY", ());
686 if rc is True:
687 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
688 if rc is True:
689 while cMsIdleFudge > 0:
690 cMsIdleFudge -= 1000;
691 time.sleep(1);
692 else:
693 self.oTransport.disconnect(self.fTryConnect);
694 return rc;
695
696 def taskBye(self):
697 """Says goodbye to the TXS"""
698 rc = self.sendMsg("BYE");
699 if rc is True:
700 rc = self.recvAckLogged("BYE");
701 self.oTransport.disconnect();
702 return rc;
703
704 def taskVer(self):
705 """Requests version information from TXS"""
706 rc = self.sendMsg("VER");
707 if rc is True:
708 rc = False;
709 cbMsg, sOpcode, abPayload = self.recvReply();
710 if cbMsg is not None:
711 sOpcode = sOpcode.strip();
712 if sOpcode == "ACK VER":
713 sVer = getSZ(abPayload, 0);
714 if sVer is not None:
715 rc = sVer;
716 else:
717 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
718 else:
719 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
720 return rc;
721
722 def taskUuid(self):
723 """Gets the TXS UUID"""
724 rc = self.sendMsg("UUID");
725 if rc is True:
726 rc = False;
727 cbMsg, sOpcode, abPayload = self.recvReply();
728 if cbMsg is not None:
729 sOpcode = sOpcode.strip()
730 if sOpcode == "ACK UUID":
731 sUuid = getSZ(abPayload, 0);
732 if sUuid is not None:
733 sUuid = '{%s}' % (sUuid,)
734 try:
735 _ = uuid.UUID(sUuid);
736 rc = sUuid;
737 except:
738 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
739 else:
740 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
741 else:
742 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
743 else:
744 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
745 return rc;
746
747 #
748 # Process task
749 # pylint: disable=missing-docstring
750 #
751
752 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
753 # Construct the payload.
754 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
755 for sArg in asArgs:
756 aoPayload.append('%s' % (sArg));
757 aoPayload.append(long(len(asAddEnv)));
758 for sPutEnv in asAddEnv:
759 aoPayload.append('%s' % (sPutEnv));
760 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
761 if utils.isString(o):
762 aoPayload.append(o);
763 elif o is not None:
764 aoPayload.append('|');
765 o.uTxsClientCrc32 = zlib.crc32(b'');
766 else:
767 aoPayload.append('');
768 aoPayload.append('%s' % (sAsUser));
769 aoPayload.append(long(self.cMsTimeout));
770
771 # Kick of the EXEC command.
772 rc = self.sendMsg('EXEC', aoPayload)
773 if rc is True:
774 rc = self.recvAckLogged('EXEC');
775 if rc is True:
776 # Loop till the process completes, feed input to the TXS and
777 # receive output from it.
778 sFailure = "";
779 msPendingInputReply = None;
780 cbMsg, sOpcode, abPayload = (None, None, None);
781 while True:
782 # Pending input?
783 if msPendingInputReply is None \
784 and oStdIn is not None \
785 and not utils.isString(oStdIn):
786 try:
787 sInput = oStdIn.read(65536);
788 except:
789 reporter.errorXcpt('read standard in');
790 sFailure = 'exception reading stdin';
791 rc = None;
792 break;
793 if sInput:
794 # Convert to a byte array before handing it of to sendMsg or the string
795 # will get some zero termination added breaking the CRC (and injecting
796 # unwanted bytes).
797 abInput = array.array('B', sInput.encode('utf-8'));
798 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
799 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
800 if rc is not True:
801 sFailure = 'sendMsg failure';
802 break;
803 msPendingInputReply = base.timestampMilli();
804 continue;
805
806 rc = self.sendMsg('STDINEOS');
807 oStdIn = None;
808 if rc is not True:
809 sFailure = 'sendMsg failure';
810 break;
811 msPendingInputReply = base.timestampMilli();
812
813 # Wait for input (500 ms timeout).
814 if cbMsg is None:
815 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
816 if cbMsg is None:
817 # Check for time out before restarting the loop.
818 # Note! Only doing timeout checking here does mean that
819 # the TXS may prevent us from timing out by
820 # flooding us with data. This is unlikely though.
821 if self.hasTimedOut() \
822 and ( msPendingInputReply is None \
823 or base.timestampMilli() - msPendingInputReply > 30000):
824 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
825 sFailure = 'timeout';
826 rc = None;
827 break;
828 # Check that the connection is OK.
829 if not self.oTransport.isConnectionOk():
830 self.oTransport.disconnect();
831 sFailure = 'disconnected';
832 rc = False;
833 break;
834 continue;
835
836 # Handle the response.
837 sOpcode = sOpcode.rstrip();
838 if sOpcode == 'STDOUT':
839 oOut = oStdOut;
840 elif sOpcode == 'STDERR':
841 oOut = oStdErr;
842 elif sOpcode == 'TESTPIPE':
843 oOut = oTestPipe;
844 else:
845 oOut = None;
846 if oOut is not None:
847 # Output from the process.
848 if len(abPayload) < 4:
849 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
850 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
851 rc = None;
852 break;
853 uStreamCrc32 = getU32(abPayload, 0);
854 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
855 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
856 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
857 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
858 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
859 rc = None;
860 break;
861 try:
862 oOut.write(abPayload[4:]);
863 except:
864 sFailure = 'exception writing %s' % (sOpcode);
865 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
866 rc = None;
867 break;
868 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
869 # Standard input is ignored. Ignore this condition for now.
870 msPendingInputReply = None;
871 reporter.log('taskExecEx: Standard input is ignored... why?');
872 del oStdIn.uTxsClientCrc32;
873 oStdIn = '/dev/null';
874 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
875 and msPendingInputReply is not None:
876 # TXS STDIN error, abort.
877 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
878 msPendingInputReply = None;
879 sFailure = 'TXS is out of memory for std input buffering';
880 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
881 rc = None;
882 break;
883 elif sOpcode == 'ACK' and msPendingInputReply is not None:
884 msPendingInputReply = None;
885 elif sOpcode.startswith('PROC '):
886 # Process status message, handle it outside the loop.
887 rc = True;
888 break;
889 else:
890 sFailure = 'Unexpected opcode %s' % (sOpcode);
891 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
892 rc = None;
893 break;
894 # Clear the message.
895 cbMsg, sOpcode, abPayload = (None, None, None);
896
897 # If we sent an STDIN packet and didn't get a reply yet, we'll give
898 # TXS some 5 seconds to reply to this. If we don't wait here we'll
899 # get screwed later on if we mix it up with the reply to some other
900 # command. Hackish.
901 if msPendingInputReply is not None:
902 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
903 if cbMsg2 is not None:
904 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
905 % (cbMsg2, sOpcode2, abPayload2));
906 msPendingInputReply = None;
907 else:
908 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
909 self.fScrewedUpMsgState = True;
910
911 # Parse the exit status (True), abort (None) or do nothing (False).
912 if rc is True:
913 if sOpcode != 'PROC OK':
914 # Do proper parsing some other day if needed:
915 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
916 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
917 rc = False;
918 else:
919 if rc is None:
920 # Abort it.
921 reporter.log('taskExecEx: sending ABORT...');
922 rc = self.sendMsg('ABORT');
923 while rc is True:
924 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
925 if cbMsg is None:
926 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
927 self.fScrewedUpMsgState = True;
928 break;
929 if sOpcode.startswith('PROC '):
930 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
931 break;
932 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
933 # Check that the connection is OK before looping.
934 if not self.oTransport.isConnectionOk():
935 self.oTransport.disconnect();
936 break;
937
938 # Fake response with the reason why we quit.
939 if sFailure is not None:
940 self.t3oReply = (0, 'EXECFAIL', sFailure);
941 rc = None;
942 else:
943 rc = None;
944
945 # Cleanup.
946 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
947 if o is not None and not utils.isString(o):
948 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
949 # Make sure all files are closed
950 o.close(); # pylint: disable=maybe-no-member
951 reporter.log('taskExecEx: returns %s' % (rc));
952 return rc;
953
954 #
955 # Admin tasks
956 #
957
958 def hlpRebootShutdownWaitForAck(self, sCmd):
959 """Wait for reboot/shutodwn ACK."""
960 rc = self.recvAckLogged(sCmd);
961 if rc is True:
962 # poll a little while for server to disconnect.
963 uMsStart = base.timestampMilli();
964 while self.oTransport.isConnectionOk() \
965 and base.timestampMilli() - uMsStart >= 5000:
966 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
967 break;
968 self.oTransport.disconnect();
969 return rc;
970
971 def taskReboot(self):
972 rc = self.sendMsg('REBOOT');
973 if rc is True:
974 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
975 return rc;
976
977 def taskShutdown(self):
978 rc = self.sendMsg('SHUTDOWN');
979 if rc is True:
980 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
981 return rc;
982
983 #
984 # CD/DVD control tasks.
985 #
986
987 ## TODO
988
989 #
990 # File system tasks
991 #
992
993 def taskMkDir(self, sRemoteDir, fMode):
994 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
995 if rc is True:
996 rc = self.recvAckLogged('MKDIR');
997 return rc;
998
999 def taskMkDirPath(self, sRemoteDir, fMode):
1000 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
1001 if rc is True:
1002 rc = self.recvAckLogged('MKDRPATH');
1003 return rc;
1004
1005 def taskMkSymlink(self, sLinkTarget, sLink):
1006 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1007 if rc is True:
1008 rc = self.recvAckLogged('MKSYMLNK');
1009 return rc;
1010
1011 def taskRmDir(self, sRemoteDir):
1012 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1013 if rc is True:
1014 rc = self.recvAckLogged('RMDIR');
1015 return rc;
1016
1017 def taskRmFile(self, sRemoteFile):
1018 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1019 if rc is True:
1020 rc = self.recvAckLogged('RMFILE');
1021 return rc;
1022
1023 def taskRmSymlink(self, sRemoteSymlink):
1024 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1025 if rc is True:
1026 rc = self.recvAckLogged('RMSYMLNK');
1027 return rc;
1028
1029 def taskRmTree(self, sRemoteTree):
1030 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1031 if rc is True:
1032 rc = self.recvAckLogged('RMTREE');
1033 return rc;
1034
1035 def taskChMod(self, sRemotePath, fMode):
1036 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1037 if rc is True:
1038 rc = self.recvAckLogged('CHMOD');
1039 return rc;
1040
1041 def taskChOwn(self, sRemotePath, idUser, idGroup):
1042 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1043 if rc is True:
1044 rc = self.recvAckLogged('CHOWN');
1045 return rc;
1046
1047 def taskIsDir(self, sRemoteDir):
1048 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1049 if rc is True:
1050 rc = self.recvTrueFalse('ISDIR');
1051 return rc;
1052
1053 def taskIsFile(self, sRemoteFile):
1054 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1055 if rc is True:
1056 rc = self.recvTrueFalse('ISFILE');
1057 return rc;
1058
1059 def taskIsSymlink(self, sRemoteSymlink):
1060 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1061 if rc is True:
1062 rc = self.recvTrueFalse('ISSYMLNK');
1063 return rc;
1064
1065 #def "STAT "
1066 #def "LSTAT "
1067 #def "LIST "
1068
1069 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1070 #
1071 # Open the local file (make sure it exist before bothering TXS) and
1072 # tell TXS that we want to upload a file.
1073 #
1074 try:
1075 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1076 except:
1077 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1078 return False;
1079
1080 # Common cause with taskUploadStr
1081 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1082
1083 # Cleanup.
1084 oLocalFile.close();
1085 return rc;
1086
1087 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1088 # Wrap sContent in a file like class.
1089 class InStringFile(object): # pylint: disable=too-few-public-methods
1090 def __init__(self, sContent):
1091 self.sContent = sContent;
1092 self.off = 0;
1093
1094 def read(self, cbMax):
1095 cbLeft = len(self.sContent) - self.off;
1096 if cbLeft == 0:
1097 return "";
1098 if cbLeft <= cbMax:
1099 sRet = self.sContent[self.off:(self.off + cbLeft)];
1100 else:
1101 sRet = self.sContent[self.off:(self.off + cbMax)];
1102 self.off = self.off + len(sRet);
1103 return sRet;
1104
1105 oLocalString = InStringFile(sContent);
1106 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1107
1108 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1109 """Common worker used by taskUploadFile and taskUploadString."""
1110 #
1111 # Command + ACK.
1112 #
1113 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1114 # Fall back on the old command if the new one is not known by the TXS.
1115 #
1116 if fMode == 0:
1117 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1118 if rc is True:
1119 rc = self.recvAckLogged('PUT FILE');
1120 else:
1121 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1122 if rc is True:
1123 rc = self.recvAck();
1124 if rc is False:
1125 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1126 elif rc is not True:
1127 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1128 # Fallback:
1129 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1130 if rc is True:
1131 rc = self.recvAckLogged('PUT FILE');
1132 else:
1133 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1134 rc = False;
1135 if rc is True:
1136 #
1137 # Push data packets until eof.
1138 #
1139 uMyCrc32 = zlib.crc32(b'');
1140 while True:
1141 # Read up to 64 KB of data.
1142 try:
1143 sRaw = oLocalFile.read(65536);
1144 except:
1145 rc = None;
1146 break;
1147
1148 # Convert to array - this is silly!
1149 abBuf = array.array('B');
1150 if utils.isString(sRaw):
1151 for i, _ in enumerate(sRaw):
1152 abBuf.append(ord(sRaw[i]));
1153 else:
1154 abBuf.extend(sRaw);
1155 sRaw = None;
1156
1157 # Update the file stream CRC and send it off.
1158 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1159 if not abBuf:
1160 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1161 else:
1162 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1163 if rc is False:
1164 break;
1165
1166 # Wait for the reply.
1167 rc = self.recvAck();
1168 if rc is not True:
1169 if rc is False:
1170 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1171 else:
1172 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1173 rc = False;
1174 break;
1175
1176 # EOF?
1177 if not abBuf:
1178 break;
1179
1180 # Send ABORT on ACK and I/O errors.
1181 if rc is None:
1182 rc = self.sendMsg('ABORT');
1183 if rc is True:
1184 self.recvAckLogged('ABORT');
1185 rc = False;
1186 return rc;
1187
1188 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1189 try:
1190 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1191 except:
1192 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1193 return False;
1194
1195 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1196
1197 oLocalFile.close();
1198 if rc is False:
1199 try:
1200 os.remove(sLocalFile);
1201 except:
1202 reporter.errorXcpt();
1203 return rc;
1204
1205 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1206 # Wrap sContent in a file like class.
1207 class OutStringFile(object): # pylint: disable=too-few-public-methods
1208 def __init__(self):
1209 self.asContent = [];
1210
1211 def write(self, sBuf):
1212 self.asContent.append(sBuf);
1213 return None;
1214
1215 oLocalString = OutStringFile();
1216 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1217 if rc is True:
1218 rc = '';
1219 for sBuf in oLocalString.asContent:
1220 if hasattr(sBuf, 'decode'):
1221 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1222 else:
1223 rc += sBuf;
1224 return rc;
1225
1226 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1227 """Common worker for taskDownloadFile and taskDownloadString."""
1228 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1229 if rc is True:
1230 #
1231 # Process data packets until eof.
1232 #
1233 uMyCrc32 = zlib.crc32(b'');
1234 while rc is True:
1235 cbMsg, sOpcode, abPayload = self.recvReply();
1236 if cbMsg is None:
1237 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1238 rc = None;
1239 break;
1240
1241 # Validate.
1242 sOpcode = sOpcode.rstrip();
1243 if sOpcode not in ('DATA', 'DATA EOF',):
1244 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1245 % (sOpcode, getSZ(abPayload, 0, "None")));
1246 rc = False;
1247 break;
1248 if sOpcode == 'DATA' and len(abPayload) < 4:
1249 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1250 rc = None;
1251 break;
1252 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1253 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1254 rc = None;
1255 break;
1256
1257 # Check the CRC (common for both packets).
1258 uCrc32 = getU32(abPayload, 0);
1259 if sOpcode == 'DATA':
1260 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1261 if uCrc32 != (uMyCrc32 & 0xffffffff):
1262 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1263 % (hex(uMyCrc32), hex(uCrc32)));
1264 rc = None;
1265 break;
1266 if sOpcode == 'DATA EOF':
1267 rc = self.sendMsg('ACK');
1268 break;
1269
1270 # Finally, push the data to the file.
1271 try:
1272 oLocalFile.write(abPayload[4:].tostring());
1273 except:
1274 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1275 rc = None;
1276 break;
1277 rc = self.sendMsg('ACK');
1278
1279 # Send NACK on validation and I/O errors.
1280 if rc is None:
1281 rc = self.sendMsg('NACK');
1282 rc = False;
1283 return rc;
1284
1285 def taskPackFile(self, sRemoteFile, sRemoteSource):
1286 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1287 if rc is True:
1288 rc = self.recvAckLogged('PKFILE');
1289 return rc;
1290
1291 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1292 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1293 if rc is True:
1294 rc = self.recvAckLogged('UNPKFILE');
1295 return rc;
1296
1297 # pylint: enable=missing-docstring
1298
1299
1300 #
1301 # Public methods - generic task queries
1302 #
1303
1304 def isSuccess(self):
1305 """Returns True if the task completed successfully, otherwise False."""
1306 self.lockTask();
1307 sStatus = self.sStatus;
1308 oTaskRc = self.oTaskRc;
1309 self.unlockTask();
1310 if sStatus != "":
1311 return False;
1312 if oTaskRc is False or oTaskRc is None:
1313 return False;
1314 return True;
1315
1316 def getResult(self):
1317 """
1318 Returns the result of a completed task.
1319 Returns None if not completed yet or no previous task.
1320 """
1321 self.lockTask();
1322 sStatus = self.sStatus;
1323 oTaskRc = self.oTaskRc;
1324 self.unlockTask();
1325 if sStatus != "":
1326 return None;
1327 return oTaskRc;
1328
1329 def getLastReply(self):
1330 """
1331 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1332 Returns a None, None, None three-tuple if there was no last reply.
1333 """
1334 self.lockTask();
1335 t3oReply = self.t3oReply;
1336 self.unlockTask();
1337 return t3oReply;
1338
1339 #
1340 # Public methods - connection.
1341 #
1342
1343 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1344 """
1345 Initiates a disconnect task.
1346
1347 Returns True on success, False on failure (logged).
1348
1349 The task returns True on success and False on failure.
1350 """
1351 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1352
1353 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1354 """Synchronous version."""
1355 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1356
1357 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1358 """
1359 Initiates a task for getting the TXS version information.
1360
1361 Returns True on success, False on failure (logged).
1362
1363 The task returns the version string on success and False on failure.
1364 """
1365 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1366
1367 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1368 """Synchronous version."""
1369 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1370
1371 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1372 """
1373 Initiates a task for getting the TXS UUID.
1374
1375 Returns True on success, False on failure (logged).
1376
1377 The task returns UUID string (in {}) on success and False on failure.
1378 """
1379 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1380
1381 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1382 """Synchronous version."""
1383 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1384
1385 #
1386 # Public methods - execution.
1387 #
1388
1389 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1390 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1391 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1392 """
1393 Initiates a exec process task.
1394
1395 Returns True on success, False on failure (logged).
1396
1397 The task returns True if the process exited normally with status code 0.
1398 The task returns None if on failure prior to executing the process, and
1399 False if the process exited with a different status or in an abnormal
1400 manner. Both None and False are logged of course and further info can
1401 also be obtained by getLastReply().
1402
1403 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1404 these streams. If None, no special action is taken and the output goes
1405 to where ever the TXS sends its output, and ditto for input.
1406 - To send to / read from the bitbucket, pass '/dev/null'.
1407 - To redirect to/from a file, just specify the remote filename.
1408 - To append to a file use '>>' followed by the remote filename.
1409 - To pipe the stream to/from the TXS, specify a file like
1410 object. For StdIn a non-blocking read() method is required. For
1411 the other a write() method is required. Watch out for deadlock
1412 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1413 """
1414 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1415 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1416 oStdOut, oStdErr, oTestPipe, sAsUser));
1417
1418 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1419 oStdIn = '/dev/null', oStdOut = '/dev/null',
1420 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1421 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1422 """Synchronous version."""
1423 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1424 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1425
1426 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1427 cMsTimeout = 3600000, fIgnoreErrors = False):
1428 """
1429 Initiates a exec process test task.
1430
1431 Returns True on success, False on failure (logged).
1432
1433 The task returns True if the process exited normally with status code 0.
1434 The task returns None if on failure prior to executing the process, and
1435 False if the process exited with a different status or in an abnormal
1436 manner. Both None and False are logged of course and further info can
1437 also be obtained by getLastReply().
1438
1439 Standard in is taken from /dev/null. While both standard output and
1440 standard error goes directly to reporter.log(). The testpipe is piped
1441 to reporter.xxxx.
1442 """
1443
1444 sStdIn = '/dev/null';
1445 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1446 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1447 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1448 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1449
1450 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1451 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1452
1453 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1454 cMsTimeout = 3600000, fIgnoreErrors = False):
1455 """Synchronous version."""
1456 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1457 cMsTimeout, fIgnoreErrors);
1458
1459 #
1460 # Public methods - system
1461 #
1462
1463 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1464 """
1465 Initiates a reboot task.
1466
1467 Returns True on success, False on failure (logged).
1468
1469 The task returns True on success, False on failure (logged). The
1470 session will be disconnected on successful task completion.
1471 """
1472 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1473
1474 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1475 """Synchronous version."""
1476 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1477
1478 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1479 """
1480 Initiates a shutdown task.
1481
1482 Returns True on success, False on failure (logged).
1483
1484 The task returns True on success, False on failure (logged).
1485 """
1486 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1487
1488 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1489 """Synchronous version."""
1490 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1491
1492
1493 #
1494 # Public methods - file system
1495 #
1496
1497 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1498 """
1499 Initiates a mkdir task.
1500
1501 Returns True on success, False on failure (logged).
1502
1503 The task returns True on success, False on failure (logged).
1504 """
1505 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1506
1507 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1508 """Synchronous version."""
1509 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1510
1511 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1512 """
1513 Initiates a mkdir -p task.
1514
1515 Returns True on success, False on failure (logged).
1516
1517 The task returns True on success, False on failure (logged).
1518 """
1519 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1520
1521 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1522 """Synchronous version."""
1523 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1524
1525 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1526 """
1527 Initiates a symlink task.
1528
1529 Returns True on success, False on failure (logged).
1530
1531 The task returns True on success, False on failure (logged).
1532 """
1533 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1534
1535 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1536 """Synchronous version."""
1537 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1538
1539 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1540 """
1541 Initiates a rmdir task.
1542
1543 Returns True on success, False on failure (logged).
1544
1545 The task returns True on success, False on failure (logged).
1546 """
1547 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1548
1549 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1550 """Synchronous version."""
1551 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1552
1553 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1554 """
1555 Initiates a rmfile task.
1556
1557 Returns True on success, False on failure (logged).
1558
1559 The task returns True on success, False on failure (logged).
1560 """
1561 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1562
1563 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1564 """Synchronous version."""
1565 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1566
1567 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1568 """
1569 Initiates a rmsymlink task.
1570
1571 Returns True on success, False on failure (logged).
1572
1573 The task returns True on success, False on failure (logged).
1574 """
1575 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1576
1577 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1578 """Synchronous version."""
1579 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1580
1581 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1582 """
1583 Initiates a rmtree task.
1584
1585 Returns True on success, False on failure (logged).
1586
1587 The task returns True on success, False on failure (logged).
1588 """
1589 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1590
1591 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1592 """Synchronous version."""
1593 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1594
1595 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1596 """
1597 Initiates a chmod task.
1598
1599 Returns True on success, False on failure (logged).
1600
1601 The task returns True on success, False on failure (logged).
1602 """
1603 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1604
1605 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1606 """Synchronous version."""
1607 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1608
1609 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1610 """
1611 Initiates a chown task.
1612
1613 Returns True on success, False on failure (logged).
1614
1615 The task returns True on success, False on failure (logged).
1616 """
1617 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1618
1619 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1620 """Synchronous version."""
1621 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1622
1623 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1624 """
1625 Initiates a is-dir query task.
1626
1627 Returns True on success, False on failure (logged).
1628
1629 The task returns True if it's a directory, False if it isn't, and
1630 None on error (logged).
1631 """
1632 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1633
1634 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1635 """Synchronous version."""
1636 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1637
1638 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1639 """
1640 Initiates a is-file query task.
1641
1642 Returns True on success, False on failure (logged).
1643
1644 The task returns True if it's a file, False if it isn't, and None on
1645 error (logged).
1646 """
1647 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1648
1649 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1650 """Synchronous version."""
1651 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1652
1653 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1654 """
1655 Initiates a is-symbolic-link query task.
1656
1657 Returns True on success, False on failure (logged).
1658
1659 The task returns True if it's a symbolic linke, False if it isn't, and
1660 None on error (logged).
1661 """
1662 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1663
1664 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1665 """Synchronous version."""
1666 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1667
1668 #def "STAT "
1669 #def "LSTAT "
1670 #def "LIST "
1671
1672 @staticmethod
1673 def calcFileXferTimeout(cbFile):
1674 """
1675 Calculates a reasonable timeout for an upload/download given the file size.
1676
1677 Returns timeout in milliseconds.
1678 """
1679 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1680
1681 @staticmethod
1682 def calcUploadTimeout(sLocalFile):
1683 """
1684 Calculates a reasonable timeout for an upload given the file (will stat it).
1685
1686 Returns timeout in milliseconds.
1687 """
1688 try: cbFile = os.path.getsize(sLocalFile);
1689 except: cbFile = 1024*1024;
1690 return Session.calcFileXferTimeout(cbFile);
1691
1692 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1693 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1694 """
1695 Initiates a download query task.
1696
1697 Returns True on success, False on failure (logged).
1698
1699 The task returns True on success, False on failure (logged).
1700 """
1701 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1702 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1703
1704 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1705 """Synchronous version."""
1706 if cMsTimeout <= 0:
1707 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1708 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1709
1710 def asyncUploadString(self, sContent, sRemoteFile,
1711 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1712 """
1713 Initiates a upload string task.
1714
1715 Returns True on success, False on failure (logged).
1716
1717 The task returns True on success, False on failure (logged).
1718 """
1719 if cMsTimeout <= 0:
1720 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1721 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1722 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1723
1724 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1725 """Synchronous version."""
1726 if cMsTimeout <= 0:
1727 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1728 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1729
1730 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1731 """
1732 Initiates a download file task.
1733
1734 Returns True on success, False on failure (logged).
1735
1736 The task returns True on success, False on failure (logged).
1737 """
1738 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1739
1740 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1741 """Synchronous version."""
1742 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1743
1744 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1745 cMsTimeout = 30000, fIgnoreErrors = False):
1746 """
1747 Initiates a download string task.
1748
1749 Returns True on success, False on failure (logged).
1750
1751 The task returns a byte string on success, False on failure (logged).
1752 """
1753 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1754 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1755
1756 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1757 cMsTimeout = 30000, fIgnoreErrors = False):
1758 """Synchronous version."""
1759 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1760 cMsTimeout, fIgnoreErrors);
1761
1762 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1763 """
1764 Initiates a packing file/directory task.
1765
1766 Returns True on success, False on failure (logged).
1767
1768 The task returns True on success, False on failure (logged).
1769 """
1770 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1771 (sRemoteFile, sRemoteSource));
1772
1773 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1774 """Synchronous version."""
1775 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1776
1777 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1778 """
1779 Initiates a unpack file task.
1780
1781 Returns True on success, False on failure (logged).
1782
1783 The task returns True on success, False on failure (logged).
1784 """
1785 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1786 (sRemoteFile, sRemoteDir));
1787
1788 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1789 """Synchronous version."""
1790 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1791
1792
1793class TransportTcp(TransportBase):
1794 """
1795 TCP transport layer for the TXS client session class.
1796 """
1797
1798 def __init__(self, sHostname, uPort, fReversedSetup):
1799 """
1800 Save the parameters. The session will call us back to make the
1801 connection later on its worker thread.
1802 """
1803 TransportBase.__init__(self, utils.getCallerName());
1804 self.sHostname = sHostname;
1805 self.fReversedSetup = fReversedSetup;
1806 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1807 self.oSocket = None;
1808 self.oWakeupW = None;
1809 self.oWakeupR = None;
1810 self.fConnectCanceled = False;
1811 self.fIsConnecting = False;
1812 self.oCv = threading.Condition();
1813 self.abReadAhead = array.array('B');
1814
1815 def toString(self):
1816 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1817 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1818 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1819 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1820
1821 def __isInProgressXcpt(self, oXcpt):
1822 """ In progress exception? """
1823 try:
1824 if isinstance(oXcpt, socket.error):
1825 try:
1826 if oXcpt.errno == errno.EINPROGRESS:
1827 return True;
1828 except: pass;
1829 # Windows?
1830 try:
1831 if oXcpt.errno == errno.EWOULDBLOCK:
1832 return True;
1833 except: pass;
1834 except:
1835 pass;
1836 return False;
1837
1838 def __isWouldBlockXcpt(self, oXcpt):
1839 """ Would block exception? """
1840 try:
1841 if isinstance(oXcpt, socket.error):
1842 try:
1843 if oXcpt.errno == errno.EWOULDBLOCK:
1844 return True;
1845 except: pass;
1846 try:
1847 if oXcpt.errno == errno.EAGAIN:
1848 return True;
1849 except: pass;
1850 except:
1851 pass;
1852 return False;
1853
1854 def __isConnectionReset(self, oXcpt):
1855 """ Connection reset by Peer or others. """
1856 try:
1857 if isinstance(oXcpt, socket.error):
1858 try:
1859 if oXcpt.errno == errno.ECONNRESET:
1860 return True;
1861 except: pass;
1862 try:
1863 if oXcpt.errno == errno.ENETRESET:
1864 return True;
1865 except: pass;
1866 except:
1867 pass;
1868 return False;
1869
1870 def _closeWakeupSockets(self):
1871 """ Closes the wakup sockets. Caller should own the CV. """
1872 oWakeupR = self.oWakeupR;
1873 self.oWakeupR = None;
1874 if oWakeupR is not None:
1875 oWakeupR.close();
1876
1877 oWakeupW = self.oWakeupW;
1878 self.oWakeupW = None;
1879 if oWakeupW is not None:
1880 oWakeupW.close();
1881
1882 return None;
1883
1884 def cancelConnect(self):
1885 # This is bad stuff.
1886 self.oCv.acquire();
1887 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1888 self.fConnectCanceled = True;
1889 if self.fIsConnecting:
1890 oSocket = self.oSocket;
1891 self.oSocket = None;
1892 if oSocket is not None:
1893 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1894 oSocket.close();
1895
1896 oWakeupW = self.oWakeupW;
1897 self.oWakeupW = None;
1898 if oWakeupW is not None:
1899 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1900 try: oWakeupW.send('cancelled!\n');
1901 except: reporter.logXcpt();
1902 try: oWakeupW.shutdown(socket.SHUT_WR);
1903 except: reporter.logXcpt();
1904 oWakeupW.close();
1905 self.oCv.release();
1906
1907 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1908 """ Connects to the TXS server as server, i.e. the reversed setup. """
1909 assert(self.fReversedSetup);
1910
1911 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1912
1913 # Workaround for bind() failure...
1914 try:
1915 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1916 except:
1917 reporter.errorXcpt('socket.listen(1) failed');
1918 return None;
1919
1920 # Bind the socket and make it listen.
1921 try:
1922 oSocket.bind((self.sHostname, self.uPort));
1923 except:
1924 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1925 return None;
1926 try:
1927 oSocket.listen(1);
1928 except:
1929 reporter.errorXcpt('socket.listen(1) failed');
1930 return None;
1931
1932 # Accept connections.
1933 oClientSocket = None;
1934 tClientAddr = None;
1935 try:
1936 (oClientSocket, tClientAddr) = oSocket.accept();
1937 except socket.error as e:
1938 if not self.__isInProgressXcpt(e):
1939 raise;
1940
1941 # Do the actual waiting.
1942 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1943 try:
1944 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1945 except socket.error as oXctp:
1946 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
1947 raise;
1948 reporter.log('socket.select() on accept was canceled');
1949 return None;
1950 except:
1951 reporter.logXcpt('socket.select() on accept');
1952
1953 # Try accept again.
1954 try:
1955 (oClientSocket, tClientAddr) = oSocket.accept();
1956 except socket.error as oXcpt:
1957 if not self.__isInProgressXcpt(e):
1958 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
1959 raise;
1960 reporter.log('socket.accept() was canceled');
1961 return None;
1962 reporter.log('socket.accept() timed out');
1963 return False;
1964 except:
1965 reporter.errorXcpt('socket.accept() failed');
1966 return None;
1967 except:
1968 reporter.errorXcpt('socket.accept() failed');
1969 return None;
1970
1971 # Store the connected socket and throw away the server socket.
1972 self.oCv.acquire();
1973 if not self.fConnectCanceled:
1974 self.oSocket.close();
1975 self.oSocket = oClientSocket;
1976 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1977 self.oCv.release();
1978 return True;
1979
1980 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1981 """ Connects to the TXS server as client. """
1982 assert(not self.fReversedSetup);
1983
1984 # Connect w/ timeouts.
1985 rc = None;
1986 try:
1987 oSocket.connect((self.sHostname, self.uPort));
1988 rc = True;
1989 except socket.error as oXcpt:
1990 iRc = oXcpt.errno;
1991 if self.__isInProgressXcpt(oXcpt):
1992 # Do the actual waiting.
1993 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
1994 try:
1995 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1996 if len(ttRc[1]) + len(ttRc[2]) == 0:
1997 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1998 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1999 rc = iRc == 0;
2000 except socket.error as oXcpt2:
2001 iRc = oXcpt2.errno;
2002 except:
2003 iRc = -42;
2004 reporter.fatalXcpt('socket.select() on connect failed');
2005
2006 if rc is True:
2007 pass;
2008 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2009 rc = False; # try again.
2010 else:
2011 if iRc != errno.EBADF or not self.fConnectCanceled:
2012 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2013 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2014 except:
2015 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2016 return rc;
2017
2018
2019 def connect(self, cMsTimeout):
2020 # Create a non-blocking socket.
2021 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2022 try:
2023 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2024 except:
2025 reporter.fatalXcpt('socket.socket() failed');
2026 return None;
2027 try:
2028 oSocket.setblocking(0);
2029 except:
2030 oSocket.close();
2031 reporter.fatalXcpt('socket.socket() failed');
2032 return None;
2033
2034 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2035 oWakeupR = None;
2036 oWakeupW = None;
2037 if hasattr(socket, 'socketpair'):
2038 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2039 except: reporter.logXcpt('socket.socketpair() failed');
2040
2041 # Update the state.
2042 self.oCv.acquire();
2043 rc = None;
2044 if not self.fConnectCanceled:
2045 self.oSocket = oSocket;
2046 self.oWakeupW = oWakeupW;
2047 self.oWakeupR = oWakeupR;
2048 self.fIsConnecting = True;
2049 self.oCv.release();
2050
2051 # Try connect.
2052 if oWakeupR is None:
2053 oWakeupR = oSocket; # Avoid select failure.
2054 if self.fReversedSetup:
2055 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2056 else:
2057 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2058 oSocket = None;
2059
2060 # Update the state and cleanup on failure/cancel.
2061 self.oCv.acquire();
2062 if rc is True and self.fConnectCanceled:
2063 rc = False;
2064 self.fIsConnecting = False;
2065
2066 if rc is not True:
2067 if self.oSocket is not None:
2068 self.oSocket.close();
2069 self.oSocket = None;
2070 self._closeWakeupSockets();
2071 self.oCv.release();
2072
2073 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2074 return rc;
2075
2076 def disconnect(self, fQuiet = False):
2077 if self.oSocket is not None:
2078 self.abReadAhead = array.array('B');
2079
2080 # Try a shutting down the socket gracefully (draining it).
2081 try:
2082 self.oSocket.shutdown(socket.SHUT_WR);
2083 except:
2084 if not fQuiet:
2085 reporter.error('shutdown(SHUT_WR)');
2086 try:
2087 self.oSocket.setblocking(0); # just in case it's not set.
2088 sData = "1";
2089 while sData:
2090 sData = self.oSocket.recv(16384);
2091 except:
2092 pass;
2093
2094 # Close it.
2095 self.oCv.acquire();
2096 try: self.oSocket.setblocking(1);
2097 except: pass;
2098 self.oSocket.close();
2099 self.oSocket = None;
2100 else:
2101 self.oCv.acquire();
2102 self._closeWakeupSockets();
2103 self.oCv.release();
2104
2105 def sendBytes(self, abBuf, cMsTimeout):
2106 if self.oSocket is None:
2107 reporter.error('TransportTcp.sendBytes: No connection.');
2108 return False;
2109
2110 # Try send it all.
2111 try:
2112 cbSent = self.oSocket.send(abBuf);
2113 if cbSent == len(abBuf):
2114 return True;
2115 except Exception as oXcpt:
2116 if not self.__isWouldBlockXcpt(oXcpt):
2117 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2118 return False;
2119 cbSent = 0;
2120
2121 # Do a timed send.
2122 msStart = base.timestampMilli();
2123 while True:
2124 cMsElapsed = base.timestampMilli() - msStart;
2125 if cMsElapsed > cMsTimeout:
2126 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2127 break;
2128
2129 # wait.
2130 try:
2131 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2132 if ttRc[2] and not ttRc[1]:
2133 reporter.error('TranportTcp.sendBytes: select returned with exception');
2134 break;
2135 if not ttRc[1]:
2136 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2137 break;
2138 except:
2139 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2140 break;
2141
2142 # Try send more.
2143 try:
2144 cbSent += self.oSocket.send(abBuf[cbSent:]);
2145 if cbSent == len(abBuf):
2146 return True;
2147 except Exception as oXcpt:
2148 if not self.__isWouldBlockXcpt(oXcpt):
2149 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2150 break;
2151
2152 return False;
2153
2154 def __returnReadAheadBytes(self, cb):
2155 """ Internal worker for recvBytes. """
2156 assert(len(self.abReadAhead) >= cb);
2157 abRet = self.abReadAhead[:cb];
2158 self.abReadAhead = self.abReadAhead[cb:];
2159 return abRet;
2160
2161 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2162 if self.oSocket is None:
2163 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2164 return None;
2165
2166 # Try read in some more data without bothering with timeout handling first.
2167 if len(self.abReadAhead) < cb:
2168 try:
2169 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2170 if abBuf:
2171 self.abReadAhead.extend(array.array('B', abBuf));
2172 except Exception as oXcpt:
2173 if not self.__isWouldBlockXcpt(oXcpt):
2174 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2175 return None;
2176
2177 if len(self.abReadAhead) >= cb:
2178 return self.__returnReadAheadBytes(cb);
2179
2180 # Timeout loop.
2181 msStart = base.timestampMilli();
2182 while True:
2183 cMsElapsed = base.timestampMilli() - msStart;
2184 if cMsElapsed > cMsTimeout:
2185 if not fNoDataOk or self.abReadAhead:
2186 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2187 break;
2188
2189 # Wait.
2190 try:
2191 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2192 if ttRc[2] and not ttRc[0]:
2193 reporter.error('TranportTcp.recvBytes: select returned with exception');
2194 break;
2195 if not ttRc[0]:
2196 if not fNoDataOk or self.abReadAhead:
2197 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2198 % (len(self.abReadAhead), cb, fNoDataOk));
2199 break;
2200 except:
2201 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2202 break;
2203
2204 # Try read more.
2205 try:
2206 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2207 if not abBuf:
2208 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2209 % (len(self.abReadAhead), cb, fNoDataOk));
2210 self.disconnect();
2211 return None;
2212
2213 self.abReadAhead.extend(array.array('B', abBuf));
2214
2215 except Exception as oXcpt:
2216 reporter.log('recv => exception %s' % (oXcpt,));
2217 if not self.__isWouldBlockXcpt(oXcpt):
2218 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2219 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2220 break;
2221
2222 # Done?
2223 if len(self.abReadAhead) >= cb:
2224 return self.__returnReadAheadBytes(cb);
2225
2226 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2227 return None;
2228
2229 def isConnectionOk(self):
2230 if self.oSocket is None:
2231 return False;
2232 try:
2233 ttRc = select.select([], [], [self.oSocket], 0.0);
2234 if ttRc[2]:
2235 return False;
2236
2237 self.oSocket.send(array.array('B')); # send zero bytes.
2238 except:
2239 return False;
2240 return True;
2241
2242 def isRecvPending(self, cMsTimeout = 0):
2243 try:
2244 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2245 if not ttRc[0]:
2246 return False;
2247 except:
2248 pass;
2249 return True;
2250
2251
2252def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2253 """
2254 Opens a connection to a Test Execution Service via TCP, given its name.
2255
2256 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2257 or similar.
2258 """
2259 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2260 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2261 try:
2262 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2263 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2264 except:
2265 reporter.errorXcpt(None, 15);
2266 return None;
2267 return oSession;
2268
2269
2270def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2271 """
2272 Tries to open a connection to a Test Execution Service via TCP, given its name.
2273
2274 This differs from openTcpSession in that it won't log a connection failure
2275 as an error.
2276 """
2277 try:
2278 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2279 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2280 except:
2281 reporter.errorXcpt(None, 15);
2282 return None;
2283 return oSession;
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette