VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 83623

Last change on this file since 83623 was 82968, checked in by vboxsync, 5 years ago

Copyright year updates by scm.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 84.3 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 82968 2020-02-04 10:35:17Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2020 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 82968 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 return abStr.tostring().decode('utf_8');
75 except:
76 reporter.errorXcpt('getSZ(,%u)' % (off));
77 return sDefault;
78
79def getSZLen(abData, off):
80 """
81 Get the length of a zero-terminated string field, in bytes.
82 Returns -1 if off is beyond the data packet or not properly terminated.
83 """
84 cbData = len(abData);
85 if off >= cbData:
86 return -1;
87
88 offCur = off;
89 while abData[offCur] != 0:
90 offCur = offCur + 1;
91 if offCur >= cbData:
92 return -1;
93
94 return offCur - off;
95
96def isValidOpcodeEncoding(sOpcode):
97 """
98 Checks if the specified opcode is valid or not.
99 Returns True on success.
100 Returns False if it is invalid, details in the log.
101 """
102 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
103 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
104 if len(sOpcode) != 8:
105 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
106 return False;
107 for i in range(0, 1):
108 if sSet1.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 for i in range(2, 7):
112 if sSet2.find(sOpcode[i]) < 0:
113 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
114 return False;
115 return True;
116
117#
118# Helper for encoding data sent to the TXS.
119#
120
121def u32ToByteArray(u32):
122 """Encodes the u32 value as a little endian byte (B) array."""
123 return array.array('B',
124 ( u32 % 256,
125 (u32 // 256) % 256,
126 (u32 // 65536) % 256,
127 (u32 // 16777216) % 256) );
128
129def escapeString(sString):
130 """
131 Does $ escaping of the string so TXS doesn't try do variable expansion.
132 """
133 return sString.replace('$', '$$');
134
135
136
137class TransportBase(object):
138 """
139 Base class for the transport layer.
140 """
141
142 def __init__(self, sCaller):
143 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
144 self.fDummy = 0;
145 self.abReadAheadHdr = array.array('B');
146
147 def toString(self):
148 """
149 Stringify the instance for logging and debugging.
150 """
151 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
152
153 def __str__(self):
154 return self.toString();
155
156 def cancelConnect(self):
157 """
158 Cancels any pending connect() call.
159 Returns None;
160 """
161 return None;
162
163 def connect(self, cMsTimeout):
164 """
165 Quietly attempts to connect to the TXS.
166
167 Returns True on success.
168 Returns False on retryable errors (no logging).
169 Returns None on fatal errors with details in the log.
170
171 Override this method, don't call super.
172 """
173 _ = cMsTimeout;
174 return False;
175
176 def disconnect(self, fQuiet = False):
177 """
178 Disconnect from the TXS.
179
180 Returns True.
181
182 Override this method, don't call super.
183 """
184 _ = fQuiet;
185 return True;
186
187 def sendBytes(self, abBuf, cMsTimeout):
188 """
189 Sends the bytes in the buffer abBuf to the TXS.
190
191 Returns True on success.
192 Returns False on failure and error details in the log.
193
194 Override this method, don't call super.
195
196 Remarks: len(abBuf) is always a multiple of 16.
197 """
198 _ = abBuf; _ = cMsTimeout;
199 return False;
200
201 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
202 """
203 Receive cb number of bytes from the TXS.
204
205 Returns the bytes (array('B')) on success.
206 Returns None on failure and error details in the log.
207
208 Override this method, don't call super.
209
210 Remarks: cb is always a multiple of 16.
211 """
212 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
213 return None;
214
215 def isConnectionOk(self):
216 """
217 Checks if the connection is OK.
218
219 Returns True if it is.
220 Returns False if it isn't (caller should call diconnect).
221
222 Override this method, don't call super.
223 """
224 return True;
225
226 def isRecvPending(self, cMsTimeout = 0):
227 """
228 Checks if there is incoming bytes, optionally waiting cMsTimeout
229 milliseconds for something to arrive.
230
231 Returns True if there is, False if there isn't.
232
233 Override this method, don't call super.
234 """
235 _ = cMsTimeout;
236 return False;
237
238 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
239 """
240 Sends a message (opcode + encoded payload).
241
242 Returns True on success.
243 Returns False on failure and error details in the log.
244 """
245 # Fix + check the opcode.
246 if len(sOpcode) < 2:
247 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
248 return False;
249 sOpcode = sOpcode.ljust(8);
250 if not isValidOpcodeEncoding(sOpcode):
251 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
252 return False;
253
254 # Start construct the message.
255 cbMsg = 16 + len(abPayload);
256 abMsg = array.array('B');
257 abMsg.extend(u32ToByteArray(cbMsg));
258 abMsg.extend((0, 0, 0, 0)); # uCrc32
259 try:
260 abMsg.extend(array.array('B', \
261 ( ord(sOpcode[0]), \
262 ord(sOpcode[1]), \
263 ord(sOpcode[2]), \
264 ord(sOpcode[3]), \
265 ord(sOpcode[4]), \
266 ord(sOpcode[5]), \
267 ord(sOpcode[6]), \
268 ord(sOpcode[7]) ) ) );
269 if abPayload:
270 abMsg.extend(abPayload);
271 except:
272 reporter.fatalXcpt('sendMsgInt: packing problem...');
273 return False;
274
275 # checksum it, padd it and send it off.
276 uCrc32 = zlib.crc32(abMsg[8:]);
277 abMsg[4:8] = u32ToByteArray(uCrc32);
278
279 while len(abMsg) % 16:
280 abMsg.append(0);
281
282 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
283 return self.sendBytes(abMsg, cMsTimeout);
284
285 def recvMsg(self, cMsTimeout, fNoDataOk = False):
286 """
287 Receives a message from the TXS.
288
289 Returns the message three-tuple: length, opcode, payload.
290 Returns (None, None, None) on failure and error details in the log.
291 """
292
293 # Read the header.
294 if self.abReadAheadHdr:
295 assert(len(self.abReadAheadHdr) == 16);
296 abHdr = self.abReadAheadHdr;
297 self.abReadAheadHdr = array.array('B');
298 else:
299 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
300 if abHdr is None:
301 return (None, None, None);
302 if len(abHdr) != 16:
303 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
304 return (None, None, None);
305
306 # Unpack and validate the header.
307 cbMsg = getU32(abHdr, 0);
308 uCrc32 = getU32(abHdr, 4);
309 sOpcode = abHdr[8:16].tostring().decode('ascii');
310
311 if cbMsg < 16:
312 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
313 return (None, None, None);
314 if cbMsg > 1024*1024:
315 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
316 return (None, None, None);
317 if not isValidOpcodeEncoding(sOpcode):
318 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
319 return (None, None, None);
320
321 # Get the payload (if any), dropping the padding.
322 abPayload = array.array('B');
323 if cbMsg > 16:
324 if cbMsg % 16:
325 cbPadding = 16 - (cbMsg % 16);
326 else:
327 cbPadding = 0;
328 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
329 if abPayload is None:
330 self.abReadAheadHdr = abHdr;
331 if not fNoDataOk :
332 reporter.log('recvMsg: failed to recv payload bytes!');
333 return (None, None, None);
334
335 while cbPadding > 0:
336 abPayload.pop();
337 cbPadding = cbPadding - 1;
338
339 # Check the CRC-32.
340 if uCrc32 != 0:
341 uActualCrc32 = zlib.crc32(abHdr[8:]);
342 if cbMsg > 16:
343 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
344 uActualCrc32 = uActualCrc32 & 0xffffffff;
345 if uCrc32 != uActualCrc32:
346 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
347 return (None, None, None);
348
349 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
350 return (cbMsg, sOpcode, abPayload);
351
352 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
353 """
354 Sends a message (opcode + payload tuple).
355
356 Returns True on success.
357 Returns False on failure and error details in the log.
358 Returns None if you pass the incorrectly typed parameters.
359 """
360 # Encode the payload.
361 abPayload = array.array('B');
362 for o in aoPayload:
363 try:
364 if utils.isString(o):
365 if sys.version_info[0] >= 3:
366 abPayload.extend(o.encode('utf_8'));
367 else:
368 # the primitive approach...
369 sUtf8 = o.encode('utf_8');
370 for ch in sUtf8:
371 abPayload.append(ord(ch))
372 abPayload.append(0);
373 elif isinstance(o, (long, int)):
374 if o < 0 or o > 0xffffffff:
375 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
376 return None;
377 abPayload.extend(u32ToByteArray(o));
378 elif isinstance(o, array.array):
379 abPayload.extend(o);
380 else:
381 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
382 return None;
383 except:
384 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
385 return None;
386 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
387
388
389class Session(TdTaskBase):
390 """
391 A Test eXecution Service (TXS) client session.
392 """
393
394 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
395 """
396 Construct a TXS session.
397
398 This starts by connecting to the TXS and will enter the signalled state
399 when connected or the timeout has been reached.
400 """
401 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
402 self.oTransport = oTransport;
403 self.sStatus = "";
404 self.cMsTimeout = 0;
405 self.fErr = True; # Whether to report errors as error.
406 self.msStart = 0;
407 self.oThread = None;
408 self.fnTask = self.taskDummy;
409 self.aTaskArgs = None;
410 self.oTaskRc = None;
411 self.t3oReply = (None, None, None);
412 self.fScrewedUpMsgState = False;
413 self.fTryConnect = fTryConnect;
414
415 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
416 raise base.GenError("startTask failed");
417
418 def __del__(self):
419 """Make sure to cancel the task when deleted."""
420 self.cancelTask();
421
422 def toString(self):
423 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
424 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
425 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
426 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
427
428 def taskDummy(self):
429 """Place holder to catch broken state handling."""
430 raise Exception();
431
432 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
433 """
434 Kicks of a new task.
435
436 cMsTimeout: The task timeout in milliseconds. Values less than
437 500 ms will be adjusted to 500 ms. This means it is
438 OK to use negative value.
439 sStatus: The task status.
440 fnTask: The method that'll execute the task.
441 aArgs: Arguments to pass to fnTask.
442
443 Returns True on success, False + error in log on failure.
444 """
445 if not self.cancelTask():
446 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
447 return False;
448
449 # Change status and make sure we're the
450 self.lockTask();
451 if self.sStatus != "":
452 self.unlockTask();
453 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
454 return False;
455 self.sStatus = "setup";
456 self.oTaskRc = None;
457 self.t3oReply = (None, None, None);
458 self.resetTaskLocked();
459 self.unlockTask();
460
461 self.cMsTimeout = max(cMsTimeout, 500);
462 self.fErr = not fIgnoreErrors;
463 self.fnTask = fnTask;
464 self.aTaskArgs = aArgs;
465 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
466 self.oThread.setDaemon(True);
467 self.msStart = base.timestampMilli();
468
469 self.lockTask();
470 self.sStatus = sStatus;
471 self.unlockTask();
472 self.oThread.start();
473
474 return True;
475
476 def cancelTask(self, fSync = True):
477 """
478 Attempts to cancel any pending tasks.
479 Returns success indicator (True/False).
480 """
481 self.lockTask();
482
483 if self.sStatus == "":
484 self.unlockTask();
485 return True;
486 if self.sStatus == "setup":
487 self.unlockTask();
488 return False;
489 if self.sStatus == "cancelled":
490 self.unlockTask();
491 return False;
492
493 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
494 if self.sStatus == 'connecting':
495 self.oTransport.cancelConnect();
496
497 self.sStatus = "cancelled";
498 oThread = self.oThread;
499 self.unlockTask();
500
501 if not fSync:
502 return False;
503
504 oThread.join(61.0);
505 return oThread.isAlive();
506
507 def taskThread(self):
508 """
509 The task thread function.
510 This does some housekeeping activities around the real task method call.
511 """
512 if not self.isCancelled():
513 try:
514 fnTask = self.fnTask;
515 oTaskRc = fnTask(*self.aTaskArgs);
516 except:
517 reporter.fatalXcpt('taskThread', 15);
518 oTaskRc = None;
519 else:
520 reporter.log('taskThread: cancelled already');
521
522 self.lockTask();
523
524 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
525 self.oTaskRc = oTaskRc;
526 self.oThread = None;
527 self.sStatus = '';
528 self.signalTaskLocked();
529
530 self.unlockTask();
531 return None;
532
533 def isCancelled(self):
534 """Internal method for checking if the task has been cancelled."""
535 self.lockTask();
536 sStatus = self.sStatus;
537 self.unlockTask();
538 if sStatus == "cancelled":
539 return True;
540 return False;
541
542 def hasTimedOut(self):
543 """Internal method for checking if the task has timed out or not."""
544 cMsLeft = self.getMsLeft();
545 if cMsLeft <= 0:
546 return True;
547 return False;
548
549 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
550 """Gets the time left until the timeout."""
551 cMsElapsed = base.timestampMilli() - self.msStart;
552 if cMsElapsed < 0:
553 return cMsMin;
554 cMsLeft = self.cMsTimeout - cMsElapsed;
555 if cMsLeft <= cMsMin:
556 return cMsMin;
557 if cMsLeft > cMsMax > 0:
558 return cMsMax
559 return cMsLeft;
560
561 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
562 """
563 Wrapper for TransportBase.recvMsg that stashes the response away
564 so the client can inspect it later on.
565 """
566 if cMsTimeout is None:
567 cMsTimeout = self.getMsLeft(500);
568 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
569 self.lockTask();
570 self.t3oReply = (cbMsg, sOpcode, abPayload);
571 self.unlockTask();
572 return (cbMsg, sOpcode, abPayload);
573
574 def recvAck(self, fNoDataOk = False):
575 """
576 Receives an ACK or error response from the TXS.
577
578 Returns True on success.
579 Returns False on timeout or transport error.
580 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
581 and there are always details of some sort or another.
582 """
583 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
584 if cbMsg is None:
585 return False;
586 sOpcode = sOpcode.strip()
587 if sOpcode == "ACK":
588 return True;
589 return (sOpcode, getSZ(abPayload, 0, sOpcode));
590
591 def recvAckLogged(self, sCommand, fNoDataOk = False):
592 """
593 Wrapper for recvAck and logging.
594 Returns True on success (ACK).
595 Returns False on time, transport error and errors signalled by TXS.
596 """
597 rc = self.recvAck(fNoDataOk);
598 if rc is not True and not fNoDataOk:
599 if rc is False:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 else:
602 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
603 rc = False;
604 return rc;
605
606 def recvTrueFalse(self, sCommand):
607 """
608 Receives a TRUE/FALSE response from the TXS.
609 Returns True on TRUE, False on FALSE and None on error/other (logged).
610 """
611 cbMsg, sOpcode, abPayload = self.recvReply();
612 if cbMsg is None:
613 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
614 return None;
615
616 sOpcode = sOpcode.strip()
617 if sOpcode == "TRUE":
618 return True;
619 if sOpcode == "FALSE":
620 return False;
621 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
622 return None;
623
624 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
625 """
626 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
627 """
628 if cMsTimeout is None:
629 cMsTimeout = self.getMsLeft(500);
630 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
631
632 def asyncToSync(self, fnAsync, *aArgs):
633 """
634 Wraps an asynchronous task into a synchronous operation.
635
636 Returns False on failure, task return status on success.
637 """
638 rc = fnAsync(*aArgs);
639 if rc is False:
640 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
641 return rc;
642
643 rc = self.waitForTask(self.cMsTimeout + 5000);
644 if rc is False:
645 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask failed...');
646 self.cancelTask();
647 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
648 return False;
649
650 rc = self.getResult();
651 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
652 return rc;
653
654 #
655 # Connection tasks.
656 #
657
658 def taskConnect(self, cMsIdleFudge):
659 """Tries to connect to the TXS"""
660 while not self.isCancelled():
661 reporter.log2('taskConnect: connecting ...');
662 rc = self.oTransport.connect(self.getMsLeft(500));
663 if rc is True:
664 reporter.log('taskConnect: succeeded');
665 return self.taskGreet(cMsIdleFudge);
666 if rc is None:
667 reporter.log2('taskConnect: unable to connect');
668 return None;
669 if self.hasTimedOut():
670 reporter.log2('taskConnect: timed out');
671 if not self.fTryConnect:
672 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
673 return False;
674 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
675 if not self.fTryConnect:
676 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
677 return False;
678
679 def taskGreet(self, cMsIdleFudge):
680 """Greets the TXS"""
681 rc = self.sendMsg("HOWDY", ());
682 if rc is True:
683 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
684 if rc is True:
685 while cMsIdleFudge > 0:
686 cMsIdleFudge -= 1000;
687 time.sleep(1);
688 else:
689 self.oTransport.disconnect(self.fTryConnect);
690 return rc;
691
692 def taskBye(self):
693 """Says goodbye to the TXS"""
694 rc = self.sendMsg("BYE");
695 if rc is True:
696 rc = self.recvAckLogged("BYE");
697 self.oTransport.disconnect();
698 return rc;
699
700 def taskUuid(self):
701 """Gets the TXS UUID"""
702 rc = self.sendMsg("UUID");
703 if rc is True:
704 rc = False;
705 cbMsg, sOpcode, abPayload = self.recvReply();
706 if cbMsg is not None:
707 sOpcode = sOpcode.strip()
708 if sOpcode == "ACK UUID":
709 sUuid = getSZ(abPayload, 0);
710 if sUuid is not None:
711 sUuid = '{%s}' % (sUuid,)
712 try:
713 _ = uuid.UUID(sUuid);
714 rc = sUuid;
715 except:
716 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
717 else:
718 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
719 else:
720 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
721 else:
722 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
723 return rc;
724
725 #
726 # Process task
727 # pylint: disable=missing-docstring
728 #
729
730 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
731 # Construct the payload.
732 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
733 for sArg in asArgs:
734 aoPayload.append('%s' % (sArg));
735 aoPayload.append(long(len(asAddEnv)));
736 for sPutEnv in asAddEnv:
737 aoPayload.append('%s' % (sPutEnv));
738 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
739 if utils.isString(o):
740 aoPayload.append(o);
741 elif o is not None:
742 aoPayload.append('|');
743 o.uTxsClientCrc32 = zlib.crc32(b'');
744 else:
745 aoPayload.append('');
746 aoPayload.append('%s' % (sAsUser));
747 aoPayload.append(long(self.cMsTimeout));
748
749 # Kick of the EXEC command.
750 rc = self.sendMsg('EXEC', aoPayload)
751 if rc is True:
752 rc = self.recvAckLogged('EXEC');
753 if rc is True:
754 # Loop till the process completes, feed input to the TXS and
755 # receive output from it.
756 sFailure = "";
757 msPendingInputReply = None;
758 cbMsg, sOpcode, abPayload = (None, None, None);
759 while True:
760 # Pending input?
761 if msPendingInputReply is None \
762 and oStdIn is not None \
763 and not utils.isString(oStdIn):
764 try:
765 sInput = oStdIn.read(65536);
766 except:
767 reporter.errorXcpt('read standard in');
768 sFailure = 'exception reading stdin';
769 rc = None;
770 break;
771 if sInput:
772 # Convert to a byte array before handing it of to sendMsg or the string
773 # will get some zero termination added breaking the CRC (and injecting
774 # unwanted bytes).
775 abInput = array.array('B', sInput.encode('utf-8'));
776 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
777 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
778 if rc is not True:
779 sFailure = 'sendMsg failure';
780 break;
781 msPendingInputReply = base.timestampMilli();
782 continue;
783
784 rc = self.sendMsg('STDINEOS');
785 oStdIn = None;
786 if rc is not True:
787 sFailure = 'sendMsg failure';
788 break;
789 msPendingInputReply = base.timestampMilli();
790
791 # Wait for input (500 ms timeout).
792 if cbMsg is None:
793 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
794 if cbMsg is None:
795 # Check for time out before restarting the loop.
796 # Note! Only doing timeout checking here does mean that
797 # the TXS may prevent us from timing out by
798 # flooding us with data. This is unlikely though.
799 if self.hasTimedOut() \
800 and ( msPendingInputReply is None \
801 or base.timestampMilli() - msPendingInputReply > 30000):
802 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
803 sFailure = 'timeout';
804 rc = None;
805 break;
806 # Check that the connection is OK.
807 if not self.oTransport.isConnectionOk():
808 self.oTransport.disconnect();
809 sFailure = 'disconnected';
810 rc = False;
811 break;
812 continue;
813
814 # Handle the response.
815 sOpcode = sOpcode.rstrip();
816 if sOpcode == 'STDOUT':
817 oOut = oStdOut;
818 elif sOpcode == 'STDERR':
819 oOut = oStdErr;
820 elif sOpcode == 'TESTPIPE':
821 oOut = oTestPipe;
822 else:
823 oOut = None;
824 if oOut is not None:
825 # Output from the process.
826 if len(abPayload) < 4:
827 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
828 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
829 rc = None;
830 break;
831 uStreamCrc32 = getU32(abPayload, 0);
832 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
833 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
834 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
835 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
836 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
837 rc = None;
838 break;
839 try:
840 oOut.write(abPayload[4:]);
841 except:
842 sFailure = 'exception writing %s' % (sOpcode);
843 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
844 rc = None;
845 break;
846 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
847 # Standard input is ignored. Ignore this condition for now.
848 msPendingInputReply = None;
849 reporter.log('taskExecEx: Standard input is ignored... why?');
850 del oStdIn.uTxsClientCrc32;
851 oStdIn = '/dev/null';
852 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
853 and msPendingInputReply is not None:
854 # TXS STDIN error, abort.
855 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
856 msPendingInputReply = None;
857 sFailure = 'TXS is out of memory for std input buffering';
858 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
859 rc = None;
860 break;
861 elif sOpcode == 'ACK' and msPendingInputReply is not None:
862 msPendingInputReply = None;
863 elif sOpcode.startswith('PROC '):
864 # Process status message, handle it outside the loop.
865 rc = True;
866 break;
867 else:
868 sFailure = 'Unexpected opcode %s' % (sOpcode);
869 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
870 rc = None;
871 break;
872 # Clear the message.
873 cbMsg, sOpcode, abPayload = (None, None, None);
874
875 # If we sent an STDIN packet and didn't get a reply yet, we'll give
876 # TXS some 5 seconds to reply to this. If we don't wait here we'll
877 # get screwed later on if we mix it up with the reply to some other
878 # command. Hackish.
879 if msPendingInputReply is not None:
880 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
881 if cbMsg2 is not None:
882 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
883 % (cbMsg2, sOpcode2, abPayload2));
884 msPendingInputReply = None;
885 else:
886 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
887 self.fScrewedUpMsgState = True;
888
889 # Parse the exit status (True), abort (None) or do nothing (False).
890 if rc is True:
891 if sOpcode != 'PROC OK':
892 # Do proper parsing some other day if needed:
893 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
894 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
895 rc = False;
896 else:
897 if rc is None:
898 # Abort it.
899 reporter.log('taskExecEx: sending ABORT...');
900 rc = self.sendMsg('ABORT');
901 while rc is True:
902 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
903 if cbMsg is None:
904 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
905 self.fScrewedUpMsgState = True;
906 break;
907 if sOpcode.startswith('PROC '):
908 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
909 break;
910 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
911 # Check that the connection is OK before looping.
912 if not self.oTransport.isConnectionOk():
913 self.oTransport.disconnect();
914 break;
915
916 # Fake response with the reason why we quit.
917 if sFailure is not None:
918 self.t3oReply = (0, 'EXECFAIL', sFailure);
919 rc = None;
920 else:
921 rc = None;
922
923 # Cleanup.
924 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
925 if o is not None and not utils.isString(o):
926 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
927 # Make sure all files are closed
928 o.close(); # pylint: disable=maybe-no-member
929 reporter.log('taskExecEx: returns %s' % (rc));
930 return rc;
931
932 #
933 # Admin tasks
934 #
935
936 def hlpRebootShutdownWaitForAck(self, sCmd):
937 """Wait for reboot/shutodwn ACK."""
938 rc = self.recvAckLogged(sCmd);
939 if rc is True:
940 # poll a little while for server to disconnect.
941 uMsStart = base.timestampMilli();
942 while self.oTransport.isConnectionOk() \
943 and base.timestampMilli() - uMsStart >= 5000:
944 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
945 break;
946 self.oTransport.disconnect();
947 return rc;
948
949 def taskReboot(self):
950 rc = self.sendMsg('REBOOT');
951 if rc is True:
952 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
953 return rc;
954
955 def taskShutdown(self):
956 rc = self.sendMsg('SHUTDOWN');
957 if rc is True:
958 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
959 return rc;
960
961 #
962 # CD/DVD control tasks.
963 #
964
965 ## TODO
966
967 #
968 # File system tasks
969 #
970
971 def taskMkDir(self, sRemoteDir, fMode):
972 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
973 if rc is True:
974 rc = self.recvAckLogged('MKDIR');
975 return rc;
976
977 def taskMkDirPath(self, sRemoteDir, fMode):
978 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
979 if rc is True:
980 rc = self.recvAckLogged('MKDRPATH');
981 return rc;
982
983 def taskMkSymlink(self, sLinkTarget, sLink):
984 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
985 if rc is True:
986 rc = self.recvAckLogged('MKSYMLNK');
987 return rc;
988
989 def taskRmDir(self, sRemoteDir):
990 rc = self.sendMsg('RMDIR', (sRemoteDir,));
991 if rc is True:
992 rc = self.recvAckLogged('RMDIR');
993 return rc;
994
995 def taskRmFile(self, sRemoteFile):
996 rc = self.sendMsg('RMFILE', (sRemoteFile,));
997 if rc is True:
998 rc = self.recvAckLogged('RMFILE');
999 return rc;
1000
1001 def taskRmSymlink(self, sRemoteSymlink):
1002 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1003 if rc is True:
1004 rc = self.recvAckLogged('RMSYMLNK');
1005 return rc;
1006
1007 def taskRmTree(self, sRemoteTree):
1008 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1009 if rc is True:
1010 rc = self.recvAckLogged('RMTREE');
1011 return rc;
1012
1013 def taskChMod(self, sRemotePath, fMode):
1014 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1015 if rc is True:
1016 rc = self.recvAckLogged('CHMOD');
1017 return rc;
1018
1019 def taskChOwn(self, sRemotePath, idUser, idGroup):
1020 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1021 if rc is True:
1022 rc = self.recvAckLogged('CHOWN');
1023 return rc;
1024
1025 def taskIsDir(self, sRemoteDir):
1026 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1027 if rc is True:
1028 rc = self.recvTrueFalse('ISDIR');
1029 return rc;
1030
1031 def taskIsFile(self, sRemoteFile):
1032 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1033 if rc is True:
1034 rc = self.recvTrueFalse('ISFILE');
1035 return rc;
1036
1037 def taskIsSymlink(self, sRemoteSymlink):
1038 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1039 if rc is True:
1040 rc = self.recvTrueFalse('ISSYMLNK');
1041 return rc;
1042
1043 #def "STAT "
1044 #def "LSTAT "
1045 #def "LIST "
1046
1047 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1048 #
1049 # Open the local file (make sure it exist before bothering TXS) and
1050 # tell TXS that we want to upload a file.
1051 #
1052 try:
1053 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1054 except:
1055 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1056 return False;
1057
1058 # Common cause with taskUploadStr
1059 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1060
1061 # Cleanup.
1062 oLocalFile.close();
1063 return rc;
1064
1065 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1066 # Wrap sContent in a file like class.
1067 class InStringFile(object): # pylint: disable=too-few-public-methods
1068 def __init__(self, sContent):
1069 self.sContent = sContent;
1070 self.off = 0;
1071
1072 def read(self, cbMax):
1073 cbLeft = len(self.sContent) - self.off;
1074 if cbLeft == 0:
1075 return "";
1076 if cbLeft <= cbMax:
1077 sRet = self.sContent[self.off:(self.off + cbLeft)];
1078 else:
1079 sRet = self.sContent[self.off:(self.off + cbMax)];
1080 self.off = self.off + len(sRet);
1081 return sRet;
1082
1083 oLocalString = InStringFile(sContent);
1084 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1085
1086 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1087 """Common worker used by taskUploadFile and taskUploadString."""
1088 #
1089 # Command + ACK.
1090 #
1091 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1092 # Fall back on the old command if the new one is not known by the TXS.
1093 #
1094 if fMode == 0:
1095 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1096 if rc is True:
1097 rc = self.recvAckLogged('PUT FILE');
1098 else:
1099 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1100 if rc is True:
1101 rc = self.recvAck();
1102 if rc is False:
1103 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1104 elif rc is not True:
1105 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1106 # Fallback:
1107 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1108 if rc is True:
1109 rc = self.recvAckLogged('PUT FILE');
1110 else:
1111 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1112 rc = False;
1113 if rc is True:
1114 #
1115 # Push data packets until eof.
1116 #
1117 uMyCrc32 = zlib.crc32(b'');
1118 while True:
1119 # Read up to 64 KB of data.
1120 try:
1121 sRaw = oLocalFile.read(65536);
1122 except:
1123 rc = None;
1124 break;
1125
1126 # Convert to array - this is silly!
1127 abBuf = array.array('B');
1128 if utils.isString(sRaw):
1129 for i, _ in enumerate(sRaw):
1130 abBuf.append(ord(sRaw[i]));
1131 else:
1132 abBuf.extend(sRaw);
1133 sRaw = None;
1134
1135 # Update the file stream CRC and send it off.
1136 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1137 if not abBuf:
1138 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1139 else:
1140 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1141 if rc is False:
1142 break;
1143
1144 # Wait for the reply.
1145 rc = self.recvAck();
1146 if rc is not True:
1147 if rc is False:
1148 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1149 else:
1150 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1151 rc = False;
1152 break;
1153
1154 # EOF?
1155 if not abBuf:
1156 break;
1157
1158 # Send ABORT on ACK and I/O errors.
1159 if rc is None:
1160 rc = self.sendMsg('ABORT');
1161 if rc is True:
1162 self.recvAckLogged('ABORT');
1163 rc = False;
1164 return rc;
1165
1166 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1167 try:
1168 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1169 except:
1170 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1171 return False;
1172
1173 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1174
1175 oLocalFile.close();
1176 if rc is False:
1177 try:
1178 os.remove(sLocalFile);
1179 except:
1180 reporter.errorXcpt();
1181 return rc;
1182
1183 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1184 # Wrap sContent in a file like class.
1185 class OutStringFile(object): # pylint: disable=too-few-public-methods
1186 def __init__(self):
1187 self.asContent = [];
1188
1189 def write(self, sBuf):
1190 self.asContent.append(sBuf);
1191 return None;
1192
1193 oLocalString = OutStringFile();
1194 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1195 if rc is True:
1196 rc = '';
1197 for sBuf in oLocalString.asContent:
1198 if hasattr(sBuf, 'decode'):
1199 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1200 else:
1201 rc += sBuf;
1202 return rc;
1203
1204 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1205 """Common worker for taskDownloadFile and taskDownloadString."""
1206 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1207 if rc is True:
1208 #
1209 # Process data packets until eof.
1210 #
1211 uMyCrc32 = zlib.crc32(b'');
1212 while rc is True:
1213 cbMsg, sOpcode, abPayload = self.recvReply();
1214 if cbMsg is None:
1215 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1216 rc = None;
1217 break;
1218
1219 # Validate.
1220 sOpcode = sOpcode.rstrip();
1221 if sOpcode not in ('DATA', 'DATA EOF',):
1222 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1223 % (sOpcode, getSZ(abPayload, 0, "None")));
1224 rc = False;
1225 break;
1226 if sOpcode == 'DATA' and len(abPayload) < 4:
1227 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1228 rc = None;
1229 break;
1230 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1231 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1232 rc = None;
1233 break;
1234
1235 # Check the CRC (common for both packets).
1236 uCrc32 = getU32(abPayload, 0);
1237 if sOpcode == 'DATA':
1238 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1239 if uCrc32 != (uMyCrc32 & 0xffffffff):
1240 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1241 % (hex(uMyCrc32), hex(uCrc32)));
1242 rc = None;
1243 break;
1244 if sOpcode == 'DATA EOF':
1245 rc = self.sendMsg('ACK');
1246 break;
1247
1248 # Finally, push the data to the file.
1249 try:
1250 oLocalFile.write(abPayload[4:].tostring());
1251 except:
1252 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1253 rc = None;
1254 break;
1255 rc = self.sendMsg('ACK');
1256
1257 # Send NACK on validation and I/O errors.
1258 if rc is None:
1259 rc = self.sendMsg('NACK');
1260 rc = False;
1261 return rc;
1262
1263 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1264 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1265 if rc is True:
1266 rc = self.recvAckLogged('UNPKFILE');
1267 return rc;
1268
1269 # pylint: enable=missing-docstring
1270
1271
1272 #
1273 # Public methods - generic task queries
1274 #
1275
1276 def isSuccess(self):
1277 """Returns True if the task completed successfully, otherwise False."""
1278 self.lockTask();
1279 sStatus = self.sStatus;
1280 oTaskRc = self.oTaskRc;
1281 self.unlockTask();
1282 if sStatus != "":
1283 return False;
1284 if oTaskRc is False or oTaskRc is None:
1285 return False;
1286 return True;
1287
1288 def getResult(self):
1289 """
1290 Returns the result of a completed task.
1291 Returns None if not completed yet or no previous task.
1292 """
1293 self.lockTask();
1294 sStatus = self.sStatus;
1295 oTaskRc = self.oTaskRc;
1296 self.unlockTask();
1297 if sStatus != "":
1298 return None;
1299 return oTaskRc;
1300
1301 def getLastReply(self):
1302 """
1303 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1304 Returns a None, None, None three-tuple if there was no last reply.
1305 """
1306 self.lockTask();
1307 t3oReply = self.t3oReply;
1308 self.unlockTask();
1309 return t3oReply;
1310
1311 #
1312 # Public methods - connection.
1313 #
1314
1315 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1316 """
1317 Initiates a disconnect task.
1318
1319 Returns True on success, False on failure (logged).
1320
1321 The task returns True on success and False on failure.
1322 """
1323 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1324
1325 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1326 """Synchronous version."""
1327 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1328
1329 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1330 """
1331 Initiates a task for getting the TXS UUID.
1332
1333 Returns True on success, False on failure (logged).
1334
1335 The task returns UUID string (in {}) on success and False on failure.
1336 """
1337 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1338
1339 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1340 """Synchronous version."""
1341 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1342
1343 #
1344 # Public methods - execution.
1345 #
1346
1347 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1348 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1349 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1350 """
1351 Initiates a exec process task.
1352
1353 Returns True on success, False on failure (logged).
1354
1355 The task returns True if the process exited normally with status code 0.
1356 The task returns None if on failure prior to executing the process, and
1357 False if the process exited with a different status or in an abnormal
1358 manner. Both None and False are logged of course and further info can
1359 also be obtained by getLastReply().
1360
1361 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1362 these streams. If None, no special action is taken and the output goes
1363 to where ever the TXS sends its output, and ditto for input.
1364 - To send to / read from the bitbucket, pass '/dev/null'.
1365 - To redirect to/from a file, just specify the remote filename.
1366 - To append to a file use '>>' followed by the remote filename.
1367 - To pipe the stream to/from the TXS, specify a file like
1368 object. For StdIn a non-blocking read() method is required. For
1369 the other a write() method is required. Watch out for deadlock
1370 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1371 """
1372 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1373 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1374 oStdOut, oStdErr, oTestPipe, sAsUser));
1375
1376 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1377 oStdIn = '/dev/null', oStdOut = '/dev/null',
1378 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1379 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1380 """Synchronous version."""
1381 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1382 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1383
1384 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1385 cMsTimeout = 3600000, fIgnoreErrors = False):
1386 """
1387 Initiates a exec process test task.
1388
1389 Returns True on success, False on failure (logged).
1390
1391 The task returns True if the process exited normally with status code 0.
1392 The task returns None if on failure prior to executing the process, and
1393 False if the process exited with a different status or in an abnormal
1394 manner. Both None and False are logged of course and further info can
1395 also be obtained by getLastReply().
1396
1397 Standard in is taken from /dev/null. While both standard output and
1398 standard error goes directly to reporter.log(). The testpipe is piped
1399 to reporter.xxxx.
1400 """
1401
1402 sStdIn = '/dev/null';
1403 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1404 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1405 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1406 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1407
1408 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1409 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1410
1411 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1412 cMsTimeout = 3600000, fIgnoreErrors = False):
1413 """Synchronous version."""
1414 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1415 cMsTimeout, fIgnoreErrors);
1416
1417 #
1418 # Public methods - system
1419 #
1420
1421 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1422 """
1423 Initiates a reboot task.
1424
1425 Returns True on success, False on failure (logged).
1426
1427 The task returns True on success, False on failure (logged). The
1428 session will be disconnected on successful task completion.
1429 """
1430 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1431
1432 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1433 """Synchronous version."""
1434 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1435
1436 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1437 """
1438 Initiates a shutdown task.
1439
1440 Returns True on success, False on failure (logged).
1441
1442 The task returns True on success, False on failure (logged).
1443 """
1444 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1445
1446 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1447 """Synchronous version."""
1448 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1449
1450
1451 #
1452 # Public methods - file system
1453 #
1454
1455 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1456 """
1457 Initiates a mkdir task.
1458
1459 Returns True on success, False on failure (logged).
1460
1461 The task returns True on success, False on failure (logged).
1462 """
1463 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1464
1465 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1466 """Synchronous version."""
1467 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1468
1469 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1470 """
1471 Initiates a mkdir -p task.
1472
1473 Returns True on success, False on failure (logged).
1474
1475 The task returns True on success, False on failure (logged).
1476 """
1477 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1478
1479 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1480 """Synchronous version."""
1481 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1482
1483 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1484 """
1485 Initiates a symlink task.
1486
1487 Returns True on success, False on failure (logged).
1488
1489 The task returns True on success, False on failure (logged).
1490 """
1491 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1492
1493 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1494 """Synchronous version."""
1495 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1496
1497 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1498 """
1499 Initiates a rmdir task.
1500
1501 Returns True on success, False on failure (logged).
1502
1503 The task returns True on success, False on failure (logged).
1504 """
1505 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1506
1507 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1508 """Synchronous version."""
1509 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1510
1511 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1512 """
1513 Initiates a rmfile task.
1514
1515 Returns True on success, False on failure (logged).
1516
1517 The task returns True on success, False on failure (logged).
1518 """
1519 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1520
1521 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1522 """Synchronous version."""
1523 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1524
1525 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1526 """
1527 Initiates a rmsymlink task.
1528
1529 Returns True on success, False on failure (logged).
1530
1531 The task returns True on success, False on failure (logged).
1532 """
1533 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1534
1535 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1536 """Synchronous version."""
1537 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1538
1539 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1540 """
1541 Initiates a rmtree task.
1542
1543 Returns True on success, False on failure (logged).
1544
1545 The task returns True on success, False on failure (logged).
1546 """
1547 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1548
1549 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1550 """Synchronous version."""
1551 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1552
1553 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1554 """
1555 Initiates a chmod task.
1556
1557 Returns True on success, False on failure (logged).
1558
1559 The task returns True on success, False on failure (logged).
1560 """
1561 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1562
1563 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1564 """Synchronous version."""
1565 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1566
1567 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1568 """
1569 Initiates a chown task.
1570
1571 Returns True on success, False on failure (logged).
1572
1573 The task returns True on success, False on failure (logged).
1574 """
1575 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1576
1577 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1578 """Synchronous version."""
1579 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1580
1581 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1582 """
1583 Initiates a is-dir query task.
1584
1585 Returns True on success, False on failure (logged).
1586
1587 The task returns True if it's a directory, False if it isn't, and
1588 None on error (logged).
1589 """
1590 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1591
1592 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1593 """Synchronous version."""
1594 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1595
1596 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1597 """
1598 Initiates a is-file query task.
1599
1600 Returns True on success, False on failure (logged).
1601
1602 The task returns True if it's a file, False if it isn't, and None on
1603 error (logged).
1604 """
1605 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1606
1607 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1608 """Synchronous version."""
1609 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1610
1611 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1612 """
1613 Initiates a is-symbolic-link query task.
1614
1615 Returns True on success, False on failure (logged).
1616
1617 The task returns True if it's a symbolic linke, False if it isn't, and
1618 None on error (logged).
1619 """
1620 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1621
1622 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1623 """Synchronous version."""
1624 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1625
1626 #def "STAT "
1627 #def "LSTAT "
1628 #def "LIST "
1629
1630 @staticmethod
1631 def calcFileXferTimeout(cbFile):
1632 """
1633 Calculates a reasonable timeout for an upload/download given the file size.
1634
1635 Returns timeout in milliseconds.
1636 """
1637 return 30000 + cbFile / 256; # 256 KiB/s (picked out of thin air)
1638
1639 @staticmethod
1640 def calcUploadTimeout(sLocalFile):
1641 """
1642 Calculates a reasonable timeout for an upload given the file (will stat it).
1643
1644 Returns timeout in milliseconds.
1645 """
1646 try: cbFile = os.path.getsize(sLocalFile);
1647 except: cbFile = 1024*1024;
1648 return Session.calcFileXferTimeout(cbFile);
1649
1650 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1651 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1652 """
1653 Initiates a download query task.
1654
1655 Returns True on success, False on failure (logged).
1656
1657 The task returns True on success, False on failure (logged).
1658 """
1659 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1660 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1661
1662 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1663 """Synchronous version."""
1664 if cMsTimeout <= 0:
1665 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1666 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1667
1668 def asyncUploadString(self, sContent, sRemoteFile,
1669 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1670 """
1671 Initiates a upload string task.
1672
1673 Returns True on success, False on failure (logged).
1674
1675 The task returns True on success, False on failure (logged).
1676 """
1677 if cMsTimeout <= 0:
1678 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1679 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1680 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1681
1682 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1683 """Synchronous version."""
1684 if cMsTimeout <= 0:
1685 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1686 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1687
1688 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1689 """
1690 Initiates a download file task.
1691
1692 Returns True on success, False on failure (logged).
1693
1694 The task returns True on success, False on failure (logged).
1695 """
1696 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1697
1698 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1699 """Synchronous version."""
1700 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1701
1702 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1703 cMsTimeout = 30000, fIgnoreErrors = False):
1704 """
1705 Initiates a download string task.
1706
1707 Returns True on success, False on failure (logged).
1708
1709 The task returns a byte string on success, False on failure (logged).
1710 """
1711 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1712 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1713
1714 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1715 cMsTimeout = 30000, fIgnoreErrors = False):
1716 """Synchronous version."""
1717 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1718 cMsTimeout, fIgnoreErrors);
1719
1720 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1721 """
1722 Initiates a unpack file task.
1723
1724 Returns True on success, False on failure (logged).
1725
1726 The task returns True on success, False on failure (logged).
1727 """
1728 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1729 (sRemoteFile, sRemoteDir));
1730
1731 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1732 """Synchronous version."""
1733 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1734
1735
1736class TransportTcp(TransportBase):
1737 """
1738 TCP transport layer for the TXS client session class.
1739 """
1740
1741 def __init__(self, sHostname, uPort, fReversedSetup):
1742 """
1743 Save the parameters. The session will call us back to make the
1744 connection later on its worker thread.
1745 """
1746 TransportBase.__init__(self, utils.getCallerName());
1747 self.sHostname = sHostname;
1748 self.fReversedSetup = fReversedSetup;
1749 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1750 self.oSocket = None;
1751 self.oWakeupW = None;
1752 self.oWakeupR = None;
1753 self.fConnectCanceled = False;
1754 self.fIsConnecting = False;
1755 self.oCv = threading.Condition();
1756 self.abReadAhead = array.array('B');
1757
1758 def toString(self):
1759 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1760 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1761 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1762 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1763
1764 def __isInProgressXcpt(self, oXcpt):
1765 """ In progress exception? """
1766 try:
1767 if isinstance(oXcpt, socket.error):
1768 try:
1769 if oXcpt.errno == errno.EINPROGRESS:
1770 return True;
1771 except: pass;
1772 # Windows?
1773 try:
1774 if oXcpt.errno == errno.EWOULDBLOCK:
1775 return True;
1776 except: pass;
1777 except:
1778 pass;
1779 return False;
1780
1781 def __isWouldBlockXcpt(self, oXcpt):
1782 """ Would block exception? """
1783 try:
1784 if isinstance(oXcpt, socket.error):
1785 try:
1786 if oXcpt.errno == errno.EWOULDBLOCK:
1787 return True;
1788 except: pass;
1789 try:
1790 if oXcpt.errno == errno.EAGAIN:
1791 return True;
1792 except: pass;
1793 except:
1794 pass;
1795 return False;
1796
1797 def __isConnectionReset(self, oXcpt):
1798 """ Connection reset by Peer or others. """
1799 try:
1800 if isinstance(oXcpt, socket.error):
1801 try:
1802 if oXcpt.errno == errno.ECONNRESET:
1803 return True;
1804 except: pass;
1805 try:
1806 if oXcpt.errno == errno.ENETRESET:
1807 return True;
1808 except: pass;
1809 except:
1810 pass;
1811 return False;
1812
1813 def _closeWakeupSockets(self):
1814 """ Closes the wakup sockets. Caller should own the CV. """
1815 oWakeupR = self.oWakeupR;
1816 self.oWakeupR = None;
1817 if oWakeupR is not None:
1818 oWakeupR.close();
1819
1820 oWakeupW = self.oWakeupW;
1821 self.oWakeupW = None;
1822 if oWakeupW is not None:
1823 oWakeupW.close();
1824
1825 return None;
1826
1827 def cancelConnect(self):
1828 # This is bad stuff.
1829 self.oCv.acquire();
1830 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1831 self.fConnectCanceled = True;
1832 if self.fIsConnecting:
1833 oSocket = self.oSocket;
1834 self.oSocket = None;
1835 if oSocket is not None:
1836 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1837 oSocket.close();
1838
1839 oWakeupW = self.oWakeupW;
1840 self.oWakeupW = None;
1841 if oWakeupW is not None:
1842 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1843 try: oWakeupW.send('cancelled!\n');
1844 except: reporter.logXcpt();
1845 try: oWakeupW.shutdown(socket.SHUT_WR);
1846 except: reporter.logXcpt();
1847 oWakeupW.close();
1848 self.oCv.release();
1849
1850 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1851 """ Connects to the TXS server as server, i.e. the reversed setup. """
1852 assert(self.fReversedSetup);
1853
1854 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1855
1856 # Workaround for bind() failure...
1857 try:
1858 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1859 except:
1860 reporter.errorXcpt('socket.listen(1) failed');
1861 return None;
1862
1863 # Bind the socket and make it listen.
1864 try:
1865 oSocket.bind((self.sHostname, self.uPort));
1866 except:
1867 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1868 return None;
1869 try:
1870 oSocket.listen(1);
1871 except:
1872 reporter.errorXcpt('socket.listen(1) failed');
1873 return None;
1874
1875 # Accept connections.
1876 oClientSocket = None;
1877 tClientAddr = None;
1878 try:
1879 (oClientSocket, tClientAddr) = oSocket.accept();
1880 except socket.error as e:
1881 if not self.__isInProgressXcpt(e):
1882 raise;
1883
1884 # Do the actual waiting.
1885 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1886 try:
1887 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1888 except socket.error as e:
1889 if e[0] != errno.EBADF or not self.fConnectCanceled:
1890 raise;
1891 reporter.log('socket.select() on accept was canceled');
1892 return None;
1893 except:
1894 reporter.logXcpt('socket.select() on accept');
1895
1896 # Try accept again.
1897 try:
1898 (oClientSocket, tClientAddr) = oSocket.accept();
1899 except socket.error as e:
1900 if not self.__isInProgressXcpt(e):
1901 if e[0] != errno.EBADF or not self.fConnectCanceled:
1902 raise;
1903 reporter.log('socket.accept() was canceled');
1904 return None;
1905 reporter.log('socket.accept() timed out');
1906 return False;
1907 except:
1908 reporter.errorXcpt('socket.accept() failed');
1909 return None;
1910 except:
1911 reporter.errorXcpt('socket.accept() failed');
1912 return None;
1913
1914 # Store the connected socket and throw away the server socket.
1915 self.oCv.acquire();
1916 if not self.fConnectCanceled:
1917 self.oSocket.close();
1918 self.oSocket = oClientSocket;
1919 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1920 self.oCv.release();
1921 return True;
1922
1923 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1924 """ Connects to the TXS server as client. """
1925 assert(not self.fReversedSetup);
1926
1927 # Connect w/ timeouts.
1928 rc = None;
1929 try:
1930 oSocket.connect((self.sHostname, self.uPort));
1931 rc = True;
1932 except socket.error as oXcpt:
1933 iRc = oXcpt.errno;
1934 if self.__isInProgressXcpt(oXcpt):
1935 # Do the actual waiting.
1936 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
1937 try:
1938 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1939 if len(ttRc[1]) + len(ttRc[2]) == 0:
1940 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1941 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1942 rc = iRc == 0;
1943 except socket.error as oXcpt2:
1944 iRc = oXcpt2.errno;
1945 except:
1946 iRc = -42;
1947 reporter.fatalXcpt('socket.select() on connect failed');
1948
1949 if rc is True:
1950 pass;
1951 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
1952 rc = False; # try again.
1953 else:
1954 if iRc != errno.EBADF or not self.fConnectCanceled:
1955 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1956 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1957 except:
1958 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1959 return rc;
1960
1961
1962 def connect(self, cMsTimeout):
1963 # Create a non-blocking socket.
1964 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1965 try:
1966 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1967 except:
1968 reporter.fatalXcpt('socket.socket() failed');
1969 return None;
1970 try:
1971 oSocket.setblocking(0);
1972 except:
1973 oSocket.close();
1974 reporter.fatalXcpt('socket.socket() failed');
1975 return None;
1976
1977 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1978 oWakeupR = None;
1979 oWakeupW = None;
1980 if hasattr(socket, 'socketpair'):
1981 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
1982 except: reporter.logXcpt('socket.socketpair() failed');
1983
1984 # Update the state.
1985 self.oCv.acquire();
1986 rc = None;
1987 if not self.fConnectCanceled:
1988 self.oSocket = oSocket;
1989 self.oWakeupW = oWakeupW;
1990 self.oWakeupR = oWakeupR;
1991 self.fIsConnecting = True;
1992 self.oCv.release();
1993
1994 # Try connect.
1995 if oWakeupR is None:
1996 oWakeupR = oSocket; # Avoid select failure.
1997 if self.fReversedSetup:
1998 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1999 else:
2000 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2001 oSocket = None;
2002
2003 # Update the state and cleanup on failure/cancel.
2004 self.oCv.acquire();
2005 if rc is True and self.fConnectCanceled:
2006 rc = False;
2007 self.fIsConnecting = False;
2008
2009 if rc is not True:
2010 if self.oSocket is not None:
2011 self.oSocket.close();
2012 self.oSocket = None;
2013 self._closeWakeupSockets();
2014 self.oCv.release();
2015
2016 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2017 return rc;
2018
2019 def disconnect(self, fQuiet = False):
2020 if self.oSocket is not None:
2021 self.abReadAhead = array.array('B');
2022
2023 # Try a shutting down the socket gracefully (draining it).
2024 try:
2025 self.oSocket.shutdown(socket.SHUT_WR);
2026 except:
2027 if not fQuiet:
2028 reporter.error('shutdown(SHUT_WR)');
2029 try:
2030 self.oSocket.setblocking(0); # just in case it's not set.
2031 sData = "1";
2032 while sData:
2033 sData = self.oSocket.recv(16384);
2034 except:
2035 pass;
2036
2037 # Close it.
2038 self.oCv.acquire();
2039 try: self.oSocket.setblocking(1);
2040 except: pass;
2041 self.oSocket.close();
2042 self.oSocket = None;
2043 else:
2044 self.oCv.acquire();
2045 self._closeWakeupSockets();
2046 self.oCv.release();
2047
2048 def sendBytes(self, abBuf, cMsTimeout):
2049 if self.oSocket is None:
2050 reporter.error('TransportTcp.sendBytes: No connection.');
2051 return False;
2052
2053 # Try send it all.
2054 try:
2055 cbSent = self.oSocket.send(abBuf);
2056 if cbSent == len(abBuf):
2057 return True;
2058 except Exception as oXcpt:
2059 if not self.__isWouldBlockXcpt(oXcpt):
2060 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2061 return False;
2062 cbSent = 0;
2063
2064 # Do a timed send.
2065 msStart = base.timestampMilli();
2066 while True:
2067 cMsElapsed = base.timestampMilli() - msStart;
2068 if cMsElapsed > cMsTimeout:
2069 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2070 break;
2071
2072 # wait.
2073 try:
2074 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2075 if ttRc[2] and not ttRc[1]:
2076 reporter.error('TranportTcp.sendBytes: select returned with exception');
2077 break;
2078 if not ttRc[1]:
2079 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2080 break;
2081 except:
2082 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2083 break;
2084
2085 # Try send more.
2086 try:
2087 cbSent += self.oSocket.send(abBuf[cbSent:]);
2088 if cbSent == len(abBuf):
2089 return True;
2090 except Exception as oXcpt:
2091 if not self.__isWouldBlockXcpt(oXcpt):
2092 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2093 break;
2094
2095 return False;
2096
2097 def __returnReadAheadBytes(self, cb):
2098 """ Internal worker for recvBytes. """
2099 assert(len(self.abReadAhead) >= cb);
2100 abRet = self.abReadAhead[:cb];
2101 self.abReadAhead = self.abReadAhead[cb:];
2102 return abRet;
2103
2104 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2105 if self.oSocket is None:
2106 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2107 return None;
2108
2109 # Try read in some more data without bothering with timeout handling first.
2110 if len(self.abReadAhead) < cb:
2111 try:
2112 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2113 if abBuf:
2114 self.abReadAhead.extend(array.array('B', abBuf));
2115 except Exception as oXcpt:
2116 if not self.__isWouldBlockXcpt(oXcpt):
2117 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2118 return None;
2119
2120 if len(self.abReadAhead) >= cb:
2121 return self.__returnReadAheadBytes(cb);
2122
2123 # Timeout loop.
2124 msStart = base.timestampMilli();
2125 while True:
2126 cMsElapsed = base.timestampMilli() - msStart;
2127 if cMsElapsed > cMsTimeout:
2128 if not fNoDataOk or self.abReadAhead:
2129 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2130 break;
2131
2132 # Wait.
2133 try:
2134 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2135 if ttRc[2] and not ttRc[0]:
2136 reporter.error('TranportTcp.recvBytes: select returned with exception');
2137 break;
2138 if not ttRc[0]:
2139 if not fNoDataOk or self.abReadAhead:
2140 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2141 % (len(self.abReadAhead), cb, fNoDataOk));
2142 break;
2143 except:
2144 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2145 break;
2146
2147 # Try read more.
2148 try:
2149 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2150 if not abBuf:
2151 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2152 % (len(self.abReadAhead), cb, fNoDataOk));
2153 self.disconnect();
2154 return None;
2155
2156 self.abReadAhead.extend(array.array('B', abBuf));
2157
2158 except Exception as oXcpt:
2159 reporter.log('recv => exception %s' % (oXcpt,));
2160 if not self.__isWouldBlockXcpt(oXcpt):
2161 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2162 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2163 break;
2164
2165 # Done?
2166 if len(self.abReadAhead) >= cb:
2167 return self.__returnReadAheadBytes(cb);
2168
2169 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2170 return None;
2171
2172 def isConnectionOk(self):
2173 if self.oSocket is None:
2174 return False;
2175 try:
2176 ttRc = select.select([], [], [self.oSocket], 0.0);
2177 if ttRc[2]:
2178 return False;
2179
2180 self.oSocket.send(array.array('B')); # send zero bytes.
2181 except:
2182 return False;
2183 return True;
2184
2185 def isRecvPending(self, cMsTimeout = 0):
2186 try:
2187 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2188 if not ttRc[0]:
2189 return False;
2190 except:
2191 pass;
2192 return True;
2193
2194
2195def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2196 """
2197 Opens a connection to a Test Execution Service via TCP, given its name.
2198
2199 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2200 or similar.
2201 """
2202 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2203 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2204 try:
2205 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2206 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2207 except:
2208 reporter.errorXcpt(None, 15);
2209 return None;
2210 return oSession;
2211
2212
2213def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2214 """
2215 Tries to open a connection to a Test Execution Service via TCP, given its name.
2216
2217 This differs from openTcpSession in that it won't log a connection failure
2218 as an error.
2219 """
2220 try:
2221 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2222 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2223 except:
2224 reporter.errorXcpt(None, 15);
2225 return None;
2226 return oSession;
2227
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette