VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 90780

Last change on this file since 90780 was 90740, checked in by vboxsync, 3 years ago

ValKit: More Python 3.9 API changes needed (array.array.tostring() -> .tobytes()) ​bugref:10079

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 87.3 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 90740 2021-08-19 09:42:27Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2020 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 90740 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 if sys.version_info < (3, 9, 0):
75 # Removed since Python 3.9.
76 sStr = abStr.tostring(); # pylint: disable=no-member
77 else:
78 sStr = abStr.tobytes();
79 return sStr.decode('utf_8');
80 except:
81 reporter.errorXcpt('getSZ(,%u)' % (off));
82 return sDefault;
83
84def getSZLen(abData, off):
85 """
86 Get the length of a zero-terminated string field, in bytes.
87 Returns -1 if off is beyond the data packet or not properly terminated.
88 """
89 cbData = len(abData);
90 if off >= cbData:
91 return -1;
92
93 offCur = off;
94 while abData[offCur] != 0:
95 offCur = offCur + 1;
96 if offCur >= cbData:
97 return -1;
98
99 return offCur - off;
100
101def isValidOpcodeEncoding(sOpcode):
102 """
103 Checks if the specified opcode is valid or not.
104 Returns True on success.
105 Returns False if it is invalid, details in the log.
106 """
107 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
108 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
109 if len(sOpcode) != 8:
110 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
111 return False;
112 for i in range(0, 1):
113 if sSet1.find(sOpcode[i]) < 0:
114 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
115 return False;
116 for i in range(2, 7):
117 if sSet2.find(sOpcode[i]) < 0:
118 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
119 return False;
120 return True;
121
122#
123# Helper for encoding data sent to the TXS.
124#
125
126def u32ToByteArray(u32):
127 """Encodes the u32 value as a little endian byte (B) array."""
128 return array.array('B',
129 ( u32 % 256,
130 (u32 // 256) % 256,
131 (u32 // 65536) % 256,
132 (u32 // 16777216) % 256) );
133
134def escapeString(sString):
135 """
136 Does $ escaping of the string so TXS doesn't try do variable expansion.
137 """
138 return sString.replace('$', '$$');
139
140
141
142class TransportBase(object):
143 """
144 Base class for the transport layer.
145 """
146
147 def __init__(self, sCaller):
148 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
149 self.fDummy = 0;
150 self.abReadAheadHdr = array.array('B');
151
152 def toString(self):
153 """
154 Stringify the instance for logging and debugging.
155 """
156 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
157
158 def __str__(self):
159 return self.toString();
160
161 def cancelConnect(self):
162 """
163 Cancels any pending connect() call.
164 Returns None;
165 """
166 return None;
167
168 def connect(self, cMsTimeout):
169 """
170 Quietly attempts to connect to the TXS.
171
172 Returns True on success.
173 Returns False on retryable errors (no logging).
174 Returns None on fatal errors with details in the log.
175
176 Override this method, don't call super.
177 """
178 _ = cMsTimeout;
179 return False;
180
181 def disconnect(self, fQuiet = False):
182 """
183 Disconnect from the TXS.
184
185 Returns True.
186
187 Override this method, don't call super.
188 """
189 _ = fQuiet;
190 return True;
191
192 def sendBytes(self, abBuf, cMsTimeout):
193 """
194 Sends the bytes in the buffer abBuf to the TXS.
195
196 Returns True on success.
197 Returns False on failure and error details in the log.
198
199 Override this method, don't call super.
200
201 Remarks: len(abBuf) is always a multiple of 16.
202 """
203 _ = abBuf; _ = cMsTimeout;
204 return False;
205
206 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
207 """
208 Receive cb number of bytes from the TXS.
209
210 Returns the bytes (array('B')) on success.
211 Returns None on failure and error details in the log.
212
213 Override this method, don't call super.
214
215 Remarks: cb is always a multiple of 16.
216 """
217 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
218 return None;
219
220 def isConnectionOk(self):
221 """
222 Checks if the connection is OK.
223
224 Returns True if it is.
225 Returns False if it isn't (caller should call diconnect).
226
227 Override this method, don't call super.
228 """
229 return True;
230
231 def isRecvPending(self, cMsTimeout = 0):
232 """
233 Checks if there is incoming bytes, optionally waiting cMsTimeout
234 milliseconds for something to arrive.
235
236 Returns True if there is, False if there isn't.
237
238 Override this method, don't call super.
239 """
240 _ = cMsTimeout;
241 return False;
242
243 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
244 """
245 Sends a message (opcode + encoded payload).
246
247 Returns True on success.
248 Returns False on failure and error details in the log.
249 """
250 # Fix + check the opcode.
251 if len(sOpcode) < 2:
252 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
253 return False;
254 sOpcode = sOpcode.ljust(8);
255 if not isValidOpcodeEncoding(sOpcode):
256 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
257 return False;
258
259 # Start construct the message.
260 cbMsg = 16 + len(abPayload);
261 abMsg = array.array('B');
262 abMsg.extend(u32ToByteArray(cbMsg));
263 abMsg.extend((0, 0, 0, 0)); # uCrc32
264 try:
265 abMsg.extend(array.array('B', \
266 ( ord(sOpcode[0]), \
267 ord(sOpcode[1]), \
268 ord(sOpcode[2]), \
269 ord(sOpcode[3]), \
270 ord(sOpcode[4]), \
271 ord(sOpcode[5]), \
272 ord(sOpcode[6]), \
273 ord(sOpcode[7]) ) ) );
274 if abPayload:
275 abMsg.extend(abPayload);
276 except:
277 reporter.fatalXcpt('sendMsgInt: packing problem...');
278 return False;
279
280 # checksum it, padd it and send it off.
281 uCrc32 = zlib.crc32(abMsg[8:]);
282 abMsg[4:8] = u32ToByteArray(uCrc32);
283
284 while len(abMsg) % 16:
285 abMsg.append(0);
286
287 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
288 return self.sendBytes(abMsg, cMsTimeout);
289
290 def recvMsg(self, cMsTimeout, fNoDataOk = False):
291 """
292 Receives a message from the TXS.
293
294 Returns the message three-tuple: length, opcode, payload.
295 Returns (None, None, None) on failure and error details in the log.
296 """
297
298 # Read the header.
299 if self.abReadAheadHdr:
300 assert(len(self.abReadAheadHdr) == 16);
301 abHdr = self.abReadAheadHdr;
302 self.abReadAheadHdr = array.array('B');
303 else:
304 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
305 if abHdr is None:
306 return (None, None, None);
307 if len(abHdr) != 16:
308 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
309 return (None, None, None);
310
311 # Unpack and validate the header.
312 cbMsg = getU32(abHdr, 0);
313 uCrc32 = getU32(abHdr, 4);
314
315 if sys.version_info < (3, 9, 0):
316 # Removed since Python 3.9.
317 sOpcode = abHdr[8:16].tostring(); # pylint: disable=no-member
318 else:
319 sOpcode = abHdr[8:16].tobytes();
320 sOpcode = sOpcode.decode('ascii');
321
322 if cbMsg < 16:
323 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
324 return (None, None, None);
325 if cbMsg > 1024*1024:
326 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
327 return (None, None, None);
328 if not isValidOpcodeEncoding(sOpcode):
329 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
330 return (None, None, None);
331
332 # Get the payload (if any), dropping the padding.
333 abPayload = array.array('B');
334 if cbMsg > 16:
335 if cbMsg % 16:
336 cbPadding = 16 - (cbMsg % 16);
337 else:
338 cbPadding = 0;
339 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
340 if abPayload is None:
341 self.abReadAheadHdr = abHdr;
342 if not fNoDataOk :
343 reporter.log('recvMsg: failed to recv payload bytes!');
344 return (None, None, None);
345
346 while cbPadding > 0:
347 abPayload.pop();
348 cbPadding = cbPadding - 1;
349
350 # Check the CRC-32.
351 if uCrc32 != 0:
352 uActualCrc32 = zlib.crc32(abHdr[8:]);
353 if cbMsg > 16:
354 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
355 uActualCrc32 = uActualCrc32 & 0xffffffff;
356 if uCrc32 != uActualCrc32:
357 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
358 return (None, None, None);
359
360 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
361 return (cbMsg, sOpcode, abPayload);
362
363 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
364 """
365 Sends a message (opcode + payload tuple).
366
367 Returns True on success.
368 Returns False on failure and error details in the log.
369 Returns None if you pass the incorrectly typed parameters.
370 """
371 # Encode the payload.
372 abPayload = array.array('B');
373 for o in aoPayload:
374 try:
375 if utils.isString(o):
376 if sys.version_info[0] >= 3:
377 abPayload.extend(o.encode('utf_8'));
378 else:
379 # the primitive approach...
380 sUtf8 = o.encode('utf_8');
381 for ch in sUtf8:
382 abPayload.append(ord(ch))
383 abPayload.append(0);
384 elif isinstance(o, (long, int)):
385 if o < 0 or o > 0xffffffff:
386 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
387 return None;
388 abPayload.extend(u32ToByteArray(o));
389 elif isinstance(o, array.array):
390 abPayload.extend(o);
391 else:
392 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
393 return None;
394 except:
395 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
396 return None;
397 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
398
399
400class Session(TdTaskBase):
401 """
402 A Test eXecution Service (TXS) client session.
403 """
404
405 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
406 """
407 Construct a TXS session.
408
409 This starts by connecting to the TXS and will enter the signalled state
410 when connected or the timeout has been reached.
411 """
412 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
413 self.oTransport = oTransport;
414 self.sStatus = "";
415 self.cMsTimeout = 0;
416 self.fErr = True; # Whether to report errors as error.
417 self.msStart = 0;
418 self.oThread = None;
419 self.fnTask = self.taskDummy;
420 self.aTaskArgs = None;
421 self.oTaskRc = None;
422 self.t3oReply = (None, None, None);
423 self.fScrewedUpMsgState = False;
424 self.fTryConnect = fTryConnect;
425
426 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
427 raise base.GenError("startTask failed");
428
429 def __del__(self):
430 """Make sure to cancel the task when deleted."""
431 self.cancelTask();
432
433 def toString(self):
434 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
435 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
436 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
437 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
438
439 def taskDummy(self):
440 """Place holder to catch broken state handling."""
441 raise Exception();
442
443 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
444 """
445 Kicks of a new task.
446
447 cMsTimeout: The task timeout in milliseconds. Values less than
448 500 ms will be adjusted to 500 ms. This means it is
449 OK to use negative value.
450 sStatus: The task status.
451 fnTask: The method that'll execute the task.
452 aArgs: Arguments to pass to fnTask.
453
454 Returns True on success, False + error in log on failure.
455 """
456 if not self.cancelTask():
457 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
458 return False;
459
460 # Change status and make sure we're the
461 self.lockTask();
462 if self.sStatus != "":
463 self.unlockTask();
464 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
465 return False;
466 self.sStatus = "setup";
467 self.oTaskRc = None;
468 self.t3oReply = (None, None, None);
469 self.resetTaskLocked();
470 self.unlockTask();
471
472 self.cMsTimeout = max(cMsTimeout, 500);
473 self.fErr = not fIgnoreErrors;
474 self.fnTask = fnTask;
475 self.aTaskArgs = aArgs;
476 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
477 self.oThread.setDaemon(True);
478 self.msStart = base.timestampMilli();
479
480 self.lockTask();
481 self.sStatus = sStatus;
482 self.unlockTask();
483 self.oThread.start();
484
485 return True;
486
487 def cancelTask(self, fSync = True):
488 """
489 Attempts to cancel any pending tasks.
490 Returns success indicator (True/False).
491 """
492 self.lockTask();
493
494 if self.sStatus == "":
495 self.unlockTask();
496 return True;
497 if self.sStatus == "setup":
498 self.unlockTask();
499 return False;
500 if self.sStatus == "cancelled":
501 self.unlockTask();
502 return False;
503
504 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
505 if self.sStatus == 'connecting':
506 self.oTransport.cancelConnect();
507
508 self.sStatus = "cancelled";
509 oThread = self.oThread;
510 self.unlockTask();
511
512 if not fSync:
513 return False;
514
515 oThread.join(61.0);
516
517 if sys.version_info < (3, 9, 0):
518 # Removed since Python 3.9.
519 return oThread.isAlive(); # pylint: disable=no-member
520 return oThread.is_alive();
521
522 def taskThread(self):
523 """
524 The task thread function.
525 This does some housekeeping activities around the real task method call.
526 """
527 if not self.isCancelled():
528 try:
529 fnTask = self.fnTask;
530 oTaskRc = fnTask(*self.aTaskArgs);
531 except:
532 reporter.fatalXcpt('taskThread', 15);
533 oTaskRc = None;
534 else:
535 reporter.log('taskThread: cancelled already');
536
537 self.lockTask();
538
539 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
540 self.oTaskRc = oTaskRc;
541 self.oThread = None;
542 self.sStatus = '';
543 self.signalTaskLocked();
544
545 self.unlockTask();
546 return None;
547
548 def isCancelled(self):
549 """Internal method for checking if the task has been cancelled."""
550 self.lockTask();
551 sStatus = self.sStatus;
552 self.unlockTask();
553 if sStatus == "cancelled":
554 return True;
555 return False;
556
557 def hasTimedOut(self):
558 """Internal method for checking if the task has timed out or not."""
559 cMsLeft = self.getMsLeft();
560 if cMsLeft <= 0:
561 return True;
562 return False;
563
564 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
565 """Gets the time left until the timeout."""
566 cMsElapsed = base.timestampMilli() - self.msStart;
567 if cMsElapsed < 0:
568 return cMsMin;
569 cMsLeft = self.cMsTimeout - cMsElapsed;
570 if cMsLeft <= cMsMin:
571 return cMsMin;
572 if cMsLeft > cMsMax > 0:
573 return cMsMax
574 return cMsLeft;
575
576 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
577 """
578 Wrapper for TransportBase.recvMsg that stashes the response away
579 so the client can inspect it later on.
580 """
581 if cMsTimeout is None:
582 cMsTimeout = self.getMsLeft(500);
583 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
584 self.lockTask();
585 self.t3oReply = (cbMsg, sOpcode, abPayload);
586 self.unlockTask();
587 return (cbMsg, sOpcode, abPayload);
588
589 def recvAck(self, fNoDataOk = False):
590 """
591 Receives an ACK or error response from the TXS.
592
593 Returns True on success.
594 Returns False on timeout or transport error.
595 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
596 and there are always details of some sort or another.
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
599 if cbMsg is None:
600 return False;
601 sOpcode = sOpcode.strip()
602 if sOpcode == "ACK":
603 return True;
604 return (sOpcode, getSZ(abPayload, 0, sOpcode));
605
606 def recvAckLogged(self, sCommand, fNoDataOk = False):
607 """
608 Wrapper for recvAck and logging.
609 Returns True on success (ACK).
610 Returns False on time, transport error and errors signalled by TXS.
611 """
612 rc = self.recvAck(fNoDataOk);
613 if rc is not True and not fNoDataOk:
614 if rc is False:
615 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
616 else:
617 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
618 rc = False;
619 return rc;
620
621 def recvTrueFalse(self, sCommand):
622 """
623 Receives a TRUE/FALSE response from the TXS.
624 Returns True on TRUE, False on FALSE and None on error/other (logged).
625 """
626 cbMsg, sOpcode, abPayload = self.recvReply();
627 if cbMsg is None:
628 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
629 return None;
630
631 sOpcode = sOpcode.strip()
632 if sOpcode == "TRUE":
633 return True;
634 if sOpcode == "FALSE":
635 return False;
636 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
637 return None;
638
639 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
640 """
641 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
642 """
643 if cMsTimeout is None:
644 cMsTimeout = self.getMsLeft(500);
645 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
646
647 def asyncToSync(self, fnAsync, *aArgs):
648 """
649 Wraps an asynchronous task into a synchronous operation.
650
651 Returns False on failure, task return status on success.
652 """
653 rc = fnAsync(*aArgs);
654 if rc is False:
655 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
656 return rc;
657
658 rc = self.waitForTask(self.cMsTimeout + 5000);
659 if rc is False:
660 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
661 self.cancelTask();
662 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
663 return False;
664
665 rc = self.getResult();
666 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
667 return rc;
668
669 #
670 # Connection tasks.
671 #
672
673 def taskConnect(self, cMsIdleFudge):
674 """Tries to connect to the TXS"""
675 while not self.isCancelled():
676 reporter.log2('taskConnect: connecting ...');
677 rc = self.oTransport.connect(self.getMsLeft(500));
678 if rc is True:
679 reporter.log('taskConnect: succeeded');
680 return self.taskGreet(cMsIdleFudge);
681 if rc is None:
682 reporter.log2('taskConnect: unable to connect');
683 return None;
684 if self.hasTimedOut():
685 reporter.log2('taskConnect: timed out');
686 if not self.fTryConnect:
687 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
688 return False;
689 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
690 if not self.fTryConnect:
691 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
692 return False;
693
694 def taskGreet(self, cMsIdleFudge):
695 """Greets the TXS"""
696 rc = self.sendMsg("HOWDY", ());
697 if rc is True:
698 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
699 if rc is True:
700 while cMsIdleFudge > 0:
701 cMsIdleFudge -= 1000;
702 time.sleep(1);
703 else:
704 self.oTransport.disconnect(self.fTryConnect);
705 return rc;
706
707 def taskBye(self):
708 """Says goodbye to the TXS"""
709 rc = self.sendMsg("BYE");
710 if rc is True:
711 rc = self.recvAckLogged("BYE");
712 self.oTransport.disconnect();
713 return rc;
714
715 def taskVer(self):
716 """Requests version information from TXS"""
717 rc = self.sendMsg("VER");
718 if rc is True:
719 rc = False;
720 cbMsg, sOpcode, abPayload = self.recvReply();
721 if cbMsg is not None:
722 sOpcode = sOpcode.strip();
723 if sOpcode == "ACK VER":
724 sVer = getSZ(abPayload, 0);
725 if sVer is not None:
726 rc = sVer;
727 else:
728 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
729 else:
730 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
731 return rc;
732
733 def taskUuid(self):
734 """Gets the TXS UUID"""
735 rc = self.sendMsg("UUID");
736 if rc is True:
737 rc = False;
738 cbMsg, sOpcode, abPayload = self.recvReply();
739 if cbMsg is not None:
740 sOpcode = sOpcode.strip()
741 if sOpcode == "ACK UUID":
742 sUuid = getSZ(abPayload, 0);
743 if sUuid is not None:
744 sUuid = '{%s}' % (sUuid,)
745 try:
746 _ = uuid.UUID(sUuid);
747 rc = sUuid;
748 except:
749 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
750 else:
751 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
752 else:
753 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
754 else:
755 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
756 return rc;
757
758 #
759 # Process task
760 # pylint: disable=missing-docstring
761 #
762
763 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
764 # Construct the payload.
765 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
766 for sArg in asArgs:
767 aoPayload.append('%s' % (sArg));
768 aoPayload.append(long(len(asAddEnv)));
769 for sPutEnv in asAddEnv:
770 aoPayload.append('%s' % (sPutEnv));
771 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
772 if utils.isString(o):
773 aoPayload.append(o);
774 elif o is not None:
775 aoPayload.append('|');
776 o.uTxsClientCrc32 = zlib.crc32(b'');
777 else:
778 aoPayload.append('');
779 aoPayload.append('%s' % (sAsUser));
780 aoPayload.append(long(self.cMsTimeout));
781
782 # Kick of the EXEC command.
783 rc = self.sendMsg('EXEC', aoPayload)
784 if rc is True:
785 rc = self.recvAckLogged('EXEC');
786 if rc is True:
787 # Loop till the process completes, feed input to the TXS and
788 # receive output from it.
789 sFailure = "";
790 msPendingInputReply = None;
791 cbMsg, sOpcode, abPayload = (None, None, None);
792 while True:
793 # Pending input?
794 if msPendingInputReply is None \
795 and oStdIn is not None \
796 and not utils.isString(oStdIn):
797 try:
798 sInput = oStdIn.read(65536);
799 except:
800 reporter.errorXcpt('read standard in');
801 sFailure = 'exception reading stdin';
802 rc = None;
803 break;
804 if sInput:
805 # Convert to a byte array before handing it of to sendMsg or the string
806 # will get some zero termination added breaking the CRC (and injecting
807 # unwanted bytes).
808 abInput = array.array('B', sInput.encode('utf-8'));
809 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
810 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
811 if rc is not True:
812 sFailure = 'sendMsg failure';
813 break;
814 msPendingInputReply = base.timestampMilli();
815 continue;
816
817 rc = self.sendMsg('STDINEOS');
818 oStdIn = None;
819 if rc is not True:
820 sFailure = 'sendMsg failure';
821 break;
822 msPendingInputReply = base.timestampMilli();
823
824 # Wait for input (500 ms timeout).
825 if cbMsg is None:
826 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
827 if cbMsg is None:
828 # Check for time out before restarting the loop.
829 # Note! Only doing timeout checking here does mean that
830 # the TXS may prevent us from timing out by
831 # flooding us with data. This is unlikely though.
832 if self.hasTimedOut() \
833 and ( msPendingInputReply is None \
834 or base.timestampMilli() - msPendingInputReply > 30000):
835 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
836 sFailure = 'timeout';
837 rc = None;
838 break;
839 # Check that the connection is OK.
840 if not self.oTransport.isConnectionOk():
841 self.oTransport.disconnect();
842 sFailure = 'disconnected';
843 rc = False;
844 break;
845 continue;
846
847 # Handle the response.
848 sOpcode = sOpcode.rstrip();
849 if sOpcode == 'STDOUT':
850 oOut = oStdOut;
851 elif sOpcode == 'STDERR':
852 oOut = oStdErr;
853 elif sOpcode == 'TESTPIPE':
854 oOut = oTestPipe;
855 else:
856 oOut = None;
857 if oOut is not None:
858 # Output from the process.
859 if len(abPayload) < 4:
860 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
861 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
862 rc = None;
863 break;
864 uStreamCrc32 = getU32(abPayload, 0);
865 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
866 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
867 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
868 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
869 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
870 rc = None;
871 break;
872 try:
873 oOut.write(abPayload[4:]);
874 except:
875 sFailure = 'exception writing %s' % (sOpcode);
876 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
877 rc = None;
878 break;
879 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
880 # Standard input is ignored. Ignore this condition for now.
881 msPendingInputReply = None;
882 reporter.log('taskExecEx: Standard input is ignored... why?');
883 del oStdIn.uTxsClientCrc32;
884 oStdIn = '/dev/null';
885 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
886 and msPendingInputReply is not None:
887 # TXS STDIN error, abort.
888 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
889 msPendingInputReply = None;
890 sFailure = 'TXS is out of memory for std input buffering';
891 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
892 rc = None;
893 break;
894 elif sOpcode == 'ACK' and msPendingInputReply is not None:
895 msPendingInputReply = None;
896 elif sOpcode.startswith('PROC '):
897 # Process status message, handle it outside the loop.
898 rc = True;
899 break;
900 else:
901 sFailure = 'Unexpected opcode %s' % (sOpcode);
902 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
903 rc = None;
904 break;
905 # Clear the message.
906 cbMsg, sOpcode, abPayload = (None, None, None);
907
908 # If we sent an STDIN packet and didn't get a reply yet, we'll give
909 # TXS some 5 seconds to reply to this. If we don't wait here we'll
910 # get screwed later on if we mix it up with the reply to some other
911 # command. Hackish.
912 if msPendingInputReply is not None:
913 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
914 if cbMsg2 is not None:
915 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
916 % (cbMsg2, sOpcode2, abPayload2));
917 msPendingInputReply = None;
918 else:
919 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
920 self.fScrewedUpMsgState = True;
921
922 # Parse the exit status (True), abort (None) or do nothing (False).
923 if rc is True:
924 if sOpcode != 'PROC OK':
925 # Do proper parsing some other day if needed:
926 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
927 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
928 rc = False;
929 else:
930 if rc is None:
931 # Abort it.
932 reporter.log('taskExecEx: sending ABORT...');
933 rc = self.sendMsg('ABORT');
934 while rc is True:
935 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
936 if cbMsg is None:
937 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
938 self.fScrewedUpMsgState = True;
939 break;
940 if sOpcode.startswith('PROC '):
941 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
942 break;
943 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
944 # Check that the connection is OK before looping.
945 if not self.oTransport.isConnectionOk():
946 self.oTransport.disconnect();
947 break;
948
949 # Fake response with the reason why we quit.
950 if sFailure is not None:
951 self.t3oReply = (0, 'EXECFAIL', sFailure);
952 rc = None;
953 else:
954 rc = None;
955
956 # Cleanup.
957 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
958 if o is not None and not utils.isString(o):
959 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
960 # Make sure all files are closed
961 o.close(); # pylint: disable=maybe-no-member
962 reporter.log('taskExecEx: returns %s' % (rc));
963 return rc;
964
965 #
966 # Admin tasks
967 #
968
969 def hlpRebootShutdownWaitForAck(self, sCmd):
970 """Wait for reboot/shutodwn ACK."""
971 rc = self.recvAckLogged(sCmd);
972 if rc is True:
973 # poll a little while for server to disconnect.
974 uMsStart = base.timestampMilli();
975 while self.oTransport.isConnectionOk() \
976 and base.timestampMilli() - uMsStart >= 5000:
977 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
978 break;
979 self.oTransport.disconnect();
980 return rc;
981
982 def taskReboot(self):
983 rc = self.sendMsg('REBOOT');
984 if rc is True:
985 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
986 return rc;
987
988 def taskShutdown(self):
989 rc = self.sendMsg('SHUTDOWN');
990 if rc is True:
991 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
992 return rc;
993
994 #
995 # CD/DVD control tasks.
996 #
997
998 ## TODO
999
1000 #
1001 # File system tasks
1002 #
1003
1004 def taskMkDir(self, sRemoteDir, fMode):
1005 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
1006 if rc is True:
1007 rc = self.recvAckLogged('MKDIR');
1008 return rc;
1009
1010 def taskMkDirPath(self, sRemoteDir, fMode):
1011 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
1012 if rc is True:
1013 rc = self.recvAckLogged('MKDRPATH');
1014 return rc;
1015
1016 def taskMkSymlink(self, sLinkTarget, sLink):
1017 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1018 if rc is True:
1019 rc = self.recvAckLogged('MKSYMLNK');
1020 return rc;
1021
1022 def taskRmDir(self, sRemoteDir):
1023 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1024 if rc is True:
1025 rc = self.recvAckLogged('RMDIR');
1026 return rc;
1027
1028 def taskRmFile(self, sRemoteFile):
1029 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1030 if rc is True:
1031 rc = self.recvAckLogged('RMFILE');
1032 return rc;
1033
1034 def taskRmSymlink(self, sRemoteSymlink):
1035 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1036 if rc is True:
1037 rc = self.recvAckLogged('RMSYMLNK');
1038 return rc;
1039
1040 def taskRmTree(self, sRemoteTree):
1041 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1042 if rc is True:
1043 rc = self.recvAckLogged('RMTREE');
1044 return rc;
1045
1046 def taskChMod(self, sRemotePath, fMode):
1047 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1048 if rc is True:
1049 rc = self.recvAckLogged('CHMOD');
1050 return rc;
1051
1052 def taskChOwn(self, sRemotePath, idUser, idGroup):
1053 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1054 if rc is True:
1055 rc = self.recvAckLogged('CHOWN');
1056 return rc;
1057
1058 def taskIsDir(self, sRemoteDir):
1059 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1060 if rc is True:
1061 rc = self.recvTrueFalse('ISDIR');
1062 return rc;
1063
1064 def taskIsFile(self, sRemoteFile):
1065 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1066 if rc is True:
1067 rc = self.recvTrueFalse('ISFILE');
1068 return rc;
1069
1070 def taskIsSymlink(self, sRemoteSymlink):
1071 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1072 if rc is True:
1073 rc = self.recvTrueFalse('ISSYMLNK');
1074 return rc;
1075
1076 #def "STAT "
1077 #def "LSTAT "
1078 #def "LIST "
1079
1080 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1081 #
1082 # Open the local file (make sure it exist before bothering TXS) and
1083 # tell TXS that we want to upload a file.
1084 #
1085 try:
1086 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1087 except:
1088 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1089 return False;
1090
1091 # Common cause with taskUploadStr
1092 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1093
1094 # Cleanup.
1095 oLocalFile.close();
1096 return rc;
1097
1098 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1099 # Wrap sContent in a file like class.
1100 class InStringFile(object): # pylint: disable=too-few-public-methods
1101 def __init__(self, sContent):
1102 self.sContent = sContent;
1103 self.off = 0;
1104
1105 def read(self, cbMax):
1106 cbLeft = len(self.sContent) - self.off;
1107 if cbLeft == 0:
1108 return "";
1109 if cbLeft <= cbMax:
1110 sRet = self.sContent[self.off:(self.off + cbLeft)];
1111 else:
1112 sRet = self.sContent[self.off:(self.off + cbMax)];
1113 self.off = self.off + len(sRet);
1114 return sRet;
1115
1116 oLocalString = InStringFile(sContent);
1117 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1118
1119 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1120 """Common worker used by taskUploadFile and taskUploadString."""
1121 #
1122 # Command + ACK.
1123 #
1124 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1125 # Fall back on the old command if the new one is not known by the TXS.
1126 #
1127 if fMode == 0:
1128 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1129 if rc is True:
1130 rc = self.recvAckLogged('PUT FILE');
1131 else:
1132 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1133 if rc is True:
1134 rc = self.recvAck();
1135 if rc is False:
1136 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1137 elif rc is not True:
1138 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1139 # Fallback:
1140 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1141 if rc is True:
1142 rc = self.recvAckLogged('PUT FILE');
1143 else:
1144 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1145 rc = False;
1146 if rc is True:
1147 #
1148 # Push data packets until eof.
1149 #
1150 uMyCrc32 = zlib.crc32(b'');
1151 while True:
1152 # Read up to 64 KB of data.
1153 try:
1154 sRaw = oLocalFile.read(65536);
1155 except:
1156 rc = None;
1157 break;
1158
1159 # Convert to array - this is silly!
1160 abBuf = array.array('B');
1161 if utils.isString(sRaw):
1162 for i, _ in enumerate(sRaw):
1163 abBuf.append(ord(sRaw[i]));
1164 else:
1165 abBuf.extend(sRaw);
1166 sRaw = None;
1167
1168 # Update the file stream CRC and send it off.
1169 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1170 if not abBuf:
1171 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1172 else:
1173 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1174 if rc is False:
1175 break;
1176
1177 # Wait for the reply.
1178 rc = self.recvAck();
1179 if rc is not True:
1180 if rc is False:
1181 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1182 else:
1183 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1184 rc = False;
1185 break;
1186
1187 # EOF?
1188 if not abBuf:
1189 break;
1190
1191 # Send ABORT on ACK and I/O errors.
1192 if rc is None:
1193 rc = self.sendMsg('ABORT');
1194 if rc is True:
1195 self.recvAckLogged('ABORT');
1196 rc = False;
1197 return rc;
1198
1199 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1200 try:
1201 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1202 except:
1203 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1204 return False;
1205
1206 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1207
1208 oLocalFile.close();
1209 if rc is False:
1210 try:
1211 os.remove(sLocalFile);
1212 except:
1213 reporter.errorXcpt();
1214 return rc;
1215
1216 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1217 # Wrap sContent in a file like class.
1218 class OutStringFile(object): # pylint: disable=too-few-public-methods
1219 def __init__(self):
1220 self.asContent = [];
1221
1222 def write(self, sBuf):
1223 self.asContent.append(sBuf);
1224 return None;
1225
1226 oLocalString = OutStringFile();
1227 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1228 if rc is True:
1229 rc = '';
1230 for sBuf in oLocalString.asContent:
1231 if hasattr(sBuf, 'decode'):
1232 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1233 else:
1234 rc += sBuf;
1235 return rc;
1236
1237 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1238 """Common worker for taskDownloadFile and taskDownloadString."""
1239 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1240 if rc is True:
1241 #
1242 # Process data packets until eof.
1243 #
1244 uMyCrc32 = zlib.crc32(b'');
1245 while rc is True:
1246 cbMsg, sOpcode, abPayload = self.recvReply();
1247 if cbMsg is None:
1248 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1249 rc = None;
1250 break;
1251
1252 # Validate.
1253 sOpcode = sOpcode.rstrip();
1254 if sOpcode not in ('DATA', 'DATA EOF',):
1255 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1256 % (sOpcode, getSZ(abPayload, 0, "None")));
1257 rc = False;
1258 break;
1259 if sOpcode == 'DATA' and len(abPayload) < 4:
1260 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1261 rc = None;
1262 break;
1263 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1264 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1265 rc = None;
1266 break;
1267
1268 # Check the CRC (common for both packets).
1269 uCrc32 = getU32(abPayload, 0);
1270 if sOpcode == 'DATA':
1271 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1272 if uCrc32 != (uMyCrc32 & 0xffffffff):
1273 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1274 % (hex(uMyCrc32), hex(uCrc32)));
1275 rc = None;
1276 break;
1277 if sOpcode == 'DATA EOF':
1278 rc = self.sendMsg('ACK');
1279 break;
1280
1281 # Finally, push the data to the file.
1282 try:
1283 if sys.version_info < (3, 9, 0):
1284 # Removed since Python 3.9.
1285 abData = abPayload[4:].tostring();
1286 else:
1287 abData = abPayload[4:].tobytes();
1288 oLocalFile.write(abData);
1289 except:
1290 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1291 rc = None;
1292 break;
1293 rc = self.sendMsg('ACK');
1294
1295 # Send NACK on validation and I/O errors.
1296 if rc is None:
1297 rc = self.sendMsg('NACK');
1298 rc = False;
1299 return rc;
1300
1301 def taskPackFile(self, sRemoteFile, sRemoteSource):
1302 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1303 if rc is True:
1304 rc = self.recvAckLogged('PKFILE');
1305 return rc;
1306
1307 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1308 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1309 if rc is True:
1310 rc = self.recvAckLogged('UNPKFILE');
1311 return rc;
1312
1313 # pylint: enable=missing-docstring
1314
1315
1316 #
1317 # Public methods - generic task queries
1318 #
1319
1320 def isSuccess(self):
1321 """Returns True if the task completed successfully, otherwise False."""
1322 self.lockTask();
1323 sStatus = self.sStatus;
1324 oTaskRc = self.oTaskRc;
1325 self.unlockTask();
1326 if sStatus != "":
1327 return False;
1328 if oTaskRc is False or oTaskRc is None:
1329 return False;
1330 return True;
1331
1332 def getResult(self):
1333 """
1334 Returns the result of a completed task.
1335 Returns None if not completed yet or no previous task.
1336 """
1337 self.lockTask();
1338 sStatus = self.sStatus;
1339 oTaskRc = self.oTaskRc;
1340 self.unlockTask();
1341 if sStatus != "":
1342 return None;
1343 return oTaskRc;
1344
1345 def getLastReply(self):
1346 """
1347 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1348 Returns a None, None, None three-tuple if there was no last reply.
1349 """
1350 self.lockTask();
1351 t3oReply = self.t3oReply;
1352 self.unlockTask();
1353 return t3oReply;
1354
1355 #
1356 # Public methods - connection.
1357 #
1358
1359 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1360 """
1361 Initiates a disconnect task.
1362
1363 Returns True on success, False on failure (logged).
1364
1365 The task returns True on success and False on failure.
1366 """
1367 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1368
1369 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1370 """Synchronous version."""
1371 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1372
1373 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1374 """
1375 Initiates a task for getting the TXS version information.
1376
1377 Returns True on success, False on failure (logged).
1378
1379 The task returns the version string on success and False on failure.
1380 """
1381 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1382
1383 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1384 """Synchronous version."""
1385 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1386
1387 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1388 """
1389 Initiates a task for getting the TXS UUID.
1390
1391 Returns True on success, False on failure (logged).
1392
1393 The task returns UUID string (in {}) on success and False on failure.
1394 """
1395 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1396
1397 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1398 """Synchronous version."""
1399 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1400
1401 #
1402 # Public methods - execution.
1403 #
1404
1405 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1406 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1407 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1408 """
1409 Initiates a exec process task.
1410
1411 Returns True on success, False on failure (logged).
1412
1413 The task returns True if the process exited normally with status code 0.
1414 The task returns None if on failure prior to executing the process, and
1415 False if the process exited with a different status or in an abnormal
1416 manner. Both None and False are logged of course and further info can
1417 also be obtained by getLastReply().
1418
1419 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1420 these streams. If None, no special action is taken and the output goes
1421 to where ever the TXS sends its output, and ditto for input.
1422 - To send to / read from the bitbucket, pass '/dev/null'.
1423 - To redirect to/from a file, just specify the remote filename.
1424 - To append to a file use '>>' followed by the remote filename.
1425 - To pipe the stream to/from the TXS, specify a file like
1426 object. For StdIn a non-blocking read() method is required. For
1427 the other a write() method is required. Watch out for deadlock
1428 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1429 """
1430 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1431 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1432 oStdOut, oStdErr, oTestPipe, sAsUser));
1433
1434 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1435 oStdIn = '/dev/null', oStdOut = '/dev/null',
1436 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1437 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1438 """Synchronous version."""
1439 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1440 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1441
1442 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1443 cMsTimeout = 3600000, fIgnoreErrors = False):
1444 """
1445 Initiates a exec process test task.
1446
1447 Returns True on success, False on failure (logged).
1448
1449 The task returns True if the process exited normally with status code 0.
1450 The task returns None if on failure prior to executing the process, and
1451 False if the process exited with a different status or in an abnormal
1452 manner. Both None and False are logged of course and further info can
1453 also be obtained by getLastReply().
1454
1455 Standard in is taken from /dev/null. While both standard output and
1456 standard error goes directly to reporter.log(). The testpipe is piped
1457 to reporter.xxxx.
1458 """
1459
1460 sStdIn = '/dev/null';
1461 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1462 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1463 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1464 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1465
1466 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1467 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1468
1469 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1470 cMsTimeout = 3600000, fIgnoreErrors = False):
1471 """Synchronous version."""
1472 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1473 cMsTimeout, fIgnoreErrors);
1474
1475 #
1476 # Public methods - system
1477 #
1478
1479 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1480 """
1481 Initiates a reboot task.
1482
1483 Returns True on success, False on failure (logged).
1484
1485 The task returns True on success, False on failure (logged). The
1486 session will be disconnected on successful task completion.
1487 """
1488 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1489
1490 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1491 """Synchronous version."""
1492 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1493
1494 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1495 """
1496 Initiates a shutdown task.
1497
1498 Returns True on success, False on failure (logged).
1499
1500 The task returns True on success, False on failure (logged).
1501 """
1502 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1503
1504 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1505 """Synchronous version."""
1506 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1507
1508
1509 #
1510 # Public methods - file system
1511 #
1512
1513 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1514 """
1515 Initiates a mkdir task.
1516
1517 Returns True on success, False on failure (logged).
1518
1519 The task returns True on success, False on failure (logged).
1520 """
1521 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1522
1523 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1524 """Synchronous version."""
1525 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1526
1527 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1528 """
1529 Initiates a mkdir -p task.
1530
1531 Returns True on success, False on failure (logged).
1532
1533 The task returns True on success, False on failure (logged).
1534 """
1535 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1536
1537 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1538 """Synchronous version."""
1539 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1540
1541 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1542 """
1543 Initiates a symlink task.
1544
1545 Returns True on success, False on failure (logged).
1546
1547 The task returns True on success, False on failure (logged).
1548 """
1549 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1550
1551 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1552 """Synchronous version."""
1553 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1554
1555 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1556 """
1557 Initiates a rmdir task.
1558
1559 Returns True on success, False on failure (logged).
1560
1561 The task returns True on success, False on failure (logged).
1562 """
1563 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1564
1565 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1566 """Synchronous version."""
1567 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1568
1569 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1570 """
1571 Initiates a rmfile task.
1572
1573 Returns True on success, False on failure (logged).
1574
1575 The task returns True on success, False on failure (logged).
1576 """
1577 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1578
1579 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1580 """Synchronous version."""
1581 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1582
1583 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1584 """
1585 Initiates a rmsymlink task.
1586
1587 Returns True on success, False on failure (logged).
1588
1589 The task returns True on success, False on failure (logged).
1590 """
1591 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1592
1593 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1594 """Synchronous version."""
1595 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1596
1597 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1598 """
1599 Initiates a rmtree task.
1600
1601 Returns True on success, False on failure (logged).
1602
1603 The task returns True on success, False on failure (logged).
1604 """
1605 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1606
1607 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1608 """Synchronous version."""
1609 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1610
1611 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1612 """
1613 Initiates a chmod task.
1614
1615 Returns True on success, False on failure (logged).
1616
1617 The task returns True on success, False on failure (logged).
1618 """
1619 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1620
1621 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1622 """Synchronous version."""
1623 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1624
1625 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1626 """
1627 Initiates a chown task.
1628
1629 Returns True on success, False on failure (logged).
1630
1631 The task returns True on success, False on failure (logged).
1632 """
1633 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1634
1635 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1636 """Synchronous version."""
1637 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1638
1639 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1640 """
1641 Initiates a is-dir query task.
1642
1643 Returns True on success, False on failure (logged).
1644
1645 The task returns True if it's a directory, False if it isn't, and
1646 None on error (logged).
1647 """
1648 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1649
1650 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1651 """Synchronous version."""
1652 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1653
1654 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1655 """
1656 Initiates a is-file query task.
1657
1658 Returns True on success, False on failure (logged).
1659
1660 The task returns True if it's a file, False if it isn't, and None on
1661 error (logged).
1662 """
1663 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1664
1665 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1666 """Synchronous version."""
1667 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1668
1669 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1670 """
1671 Initiates a is-symbolic-link query task.
1672
1673 Returns True on success, False on failure (logged).
1674
1675 The task returns True if it's a symbolic linke, False if it isn't, and
1676 None on error (logged).
1677 """
1678 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1679
1680 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1681 """Synchronous version."""
1682 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1683
1684 #def "STAT "
1685 #def "LSTAT "
1686 #def "LIST "
1687
1688 @staticmethod
1689 def calcFileXferTimeout(cbFile):
1690 """
1691 Calculates a reasonable timeout for an upload/download given the file size.
1692
1693 Returns timeout in milliseconds.
1694 """
1695 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1696
1697 @staticmethod
1698 def calcUploadTimeout(sLocalFile):
1699 """
1700 Calculates a reasonable timeout for an upload given the file (will stat it).
1701
1702 Returns timeout in milliseconds.
1703 """
1704 try: cbFile = os.path.getsize(sLocalFile);
1705 except: cbFile = 1024*1024;
1706 return Session.calcFileXferTimeout(cbFile);
1707
1708 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1709 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1710 """
1711 Initiates a download query task.
1712
1713 Returns True on success, False on failure (logged).
1714
1715 The task returns True on success, False on failure (logged).
1716 """
1717 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1718 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1719
1720 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1721 """Synchronous version."""
1722 if cMsTimeout <= 0:
1723 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1724 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1725
1726 def asyncUploadString(self, sContent, sRemoteFile,
1727 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1728 """
1729 Initiates a upload string task.
1730
1731 Returns True on success, False on failure (logged).
1732
1733 The task returns True on success, False on failure (logged).
1734 """
1735 if cMsTimeout <= 0:
1736 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1737 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1738 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1739
1740 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1741 """Synchronous version."""
1742 if cMsTimeout <= 0:
1743 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1744 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1745
1746 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1747 """
1748 Initiates a download file task.
1749
1750 Returns True on success, False on failure (logged).
1751
1752 The task returns True on success, False on failure (logged).
1753 """
1754 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1755
1756 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1757 """Synchronous version."""
1758 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1759
1760 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1761 cMsTimeout = 30000, fIgnoreErrors = False):
1762 """
1763 Initiates a download string task.
1764
1765 Returns True on success, False on failure (logged).
1766
1767 The task returns a byte string on success, False on failure (logged).
1768 """
1769 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1770 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1771
1772 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1773 cMsTimeout = 30000, fIgnoreErrors = False):
1774 """Synchronous version."""
1775 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1776 cMsTimeout, fIgnoreErrors);
1777
1778 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1779 """
1780 Initiates a packing file/directory task.
1781
1782 Returns True on success, False on failure (logged).
1783
1784 The task returns True on success, False on failure (logged).
1785 """
1786 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1787 (sRemoteFile, sRemoteSource));
1788
1789 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1790 """Synchronous version."""
1791 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1792
1793 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1794 """
1795 Initiates a unpack file task.
1796
1797 Returns True on success, False on failure (logged).
1798
1799 The task returns True on success, False on failure (logged).
1800 """
1801 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1802 (sRemoteFile, sRemoteDir));
1803
1804 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1805 """Synchronous version."""
1806 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1807
1808
1809class TransportTcp(TransportBase):
1810 """
1811 TCP transport layer for the TXS client session class.
1812 """
1813
1814 def __init__(self, sHostname, uPort, fReversedSetup):
1815 """
1816 Save the parameters. The session will call us back to make the
1817 connection later on its worker thread.
1818 """
1819 TransportBase.__init__(self, utils.getCallerName());
1820 self.sHostname = sHostname;
1821 self.fReversedSetup = fReversedSetup;
1822 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1823 self.oSocket = None;
1824 self.oWakeupW = None;
1825 self.oWakeupR = None;
1826 self.fConnectCanceled = False;
1827 self.fIsConnecting = False;
1828 self.oCv = threading.Condition();
1829 self.abReadAhead = array.array('B');
1830
1831 def toString(self):
1832 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1833 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1834 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1835 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1836
1837 def __isInProgressXcpt(self, oXcpt):
1838 """ In progress exception? """
1839 try:
1840 if isinstance(oXcpt, socket.error):
1841 try:
1842 if oXcpt.errno == errno.EINPROGRESS:
1843 return True;
1844 except: pass;
1845 # Windows?
1846 try:
1847 if oXcpt.errno == errno.EWOULDBLOCK:
1848 return True;
1849 except: pass;
1850 except:
1851 pass;
1852 return False;
1853
1854 def __isWouldBlockXcpt(self, oXcpt):
1855 """ Would block exception? """
1856 try:
1857 if isinstance(oXcpt, socket.error):
1858 try:
1859 if oXcpt.errno == errno.EWOULDBLOCK:
1860 return True;
1861 except: pass;
1862 try:
1863 if oXcpt.errno == errno.EAGAIN:
1864 return True;
1865 except: pass;
1866 except:
1867 pass;
1868 return False;
1869
1870 def __isConnectionReset(self, oXcpt):
1871 """ Connection reset by Peer or others. """
1872 try:
1873 if isinstance(oXcpt, socket.error):
1874 try:
1875 if oXcpt.errno == errno.ECONNRESET:
1876 return True;
1877 except: pass;
1878 try:
1879 if oXcpt.errno == errno.ENETRESET:
1880 return True;
1881 except: pass;
1882 except:
1883 pass;
1884 return False;
1885
1886 def _closeWakeupSockets(self):
1887 """ Closes the wakup sockets. Caller should own the CV. """
1888 oWakeupR = self.oWakeupR;
1889 self.oWakeupR = None;
1890 if oWakeupR is not None:
1891 oWakeupR.close();
1892
1893 oWakeupW = self.oWakeupW;
1894 self.oWakeupW = None;
1895 if oWakeupW is not None:
1896 oWakeupW.close();
1897
1898 return None;
1899
1900 def cancelConnect(self):
1901 # This is bad stuff.
1902 self.oCv.acquire();
1903 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1904 self.fConnectCanceled = True;
1905 if self.fIsConnecting:
1906 oSocket = self.oSocket;
1907 self.oSocket = None;
1908 if oSocket is not None:
1909 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1910 oSocket.close();
1911
1912 oWakeupW = self.oWakeupW;
1913 self.oWakeupW = None;
1914 if oWakeupW is not None:
1915 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1916 try: oWakeupW.send('cancelled!\n');
1917 except: reporter.logXcpt();
1918 try: oWakeupW.shutdown(socket.SHUT_WR);
1919 except: reporter.logXcpt();
1920 oWakeupW.close();
1921 self.oCv.release();
1922
1923 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1924 """ Connects to the TXS server as server, i.e. the reversed setup. """
1925 assert(self.fReversedSetup);
1926
1927 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1928
1929 # Workaround for bind() failure...
1930 try:
1931 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1932 except:
1933 reporter.errorXcpt('socket.listen(1) failed');
1934 return None;
1935
1936 # Bind the socket and make it listen.
1937 try:
1938 oSocket.bind((self.sHostname, self.uPort));
1939 except:
1940 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1941 return None;
1942 try:
1943 oSocket.listen(1);
1944 except:
1945 reporter.errorXcpt('socket.listen(1) failed');
1946 return None;
1947
1948 # Accept connections.
1949 oClientSocket = None;
1950 tClientAddr = None;
1951 try:
1952 (oClientSocket, tClientAddr) = oSocket.accept();
1953 except socket.error as e:
1954 if not self.__isInProgressXcpt(e):
1955 raise;
1956
1957 # Do the actual waiting.
1958 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1959 try:
1960 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1961 except socket.error as oXctp:
1962 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
1963 raise;
1964 reporter.log('socket.select() on accept was canceled');
1965 return None;
1966 except:
1967 reporter.logXcpt('socket.select() on accept');
1968
1969 # Try accept again.
1970 try:
1971 (oClientSocket, tClientAddr) = oSocket.accept();
1972 except socket.error as oXcpt:
1973 if not self.__isInProgressXcpt(e):
1974 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
1975 raise;
1976 reporter.log('socket.accept() was canceled');
1977 return None;
1978 reporter.log('socket.accept() timed out');
1979 return False;
1980 except:
1981 reporter.errorXcpt('socket.accept() failed');
1982 return None;
1983 except:
1984 reporter.errorXcpt('socket.accept() failed');
1985 return None;
1986
1987 # Store the connected socket and throw away the server socket.
1988 self.oCv.acquire();
1989 if not self.fConnectCanceled:
1990 self.oSocket.close();
1991 self.oSocket = oClientSocket;
1992 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1993 self.oCv.release();
1994 return True;
1995
1996 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1997 """ Connects to the TXS server as client. """
1998 assert(not self.fReversedSetup);
1999
2000 # Connect w/ timeouts.
2001 rc = None;
2002 try:
2003 oSocket.connect((self.sHostname, self.uPort));
2004 rc = True;
2005 except socket.error as oXcpt:
2006 iRc = oXcpt.errno;
2007 if self.__isInProgressXcpt(oXcpt):
2008 # Do the actual waiting.
2009 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
2010 try:
2011 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2012 if len(ttRc[1]) + len(ttRc[2]) == 0:
2013 raise socket.error(errno.ETIMEDOUT, 'select timed out');
2014 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
2015 rc = iRc == 0;
2016 except socket.error as oXcpt2:
2017 iRc = oXcpt2.errno;
2018 except:
2019 iRc = -42;
2020 reporter.fatalXcpt('socket.select() on connect failed');
2021
2022 if rc is True:
2023 pass;
2024 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2025 rc = False; # try again.
2026 else:
2027 if iRc != errno.EBADF or not self.fConnectCanceled:
2028 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2029 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2030 except:
2031 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2032 return rc;
2033
2034
2035 def connect(self, cMsTimeout):
2036 # Create a non-blocking socket.
2037 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2038 try:
2039 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2040 except:
2041 reporter.fatalXcpt('socket.socket() failed');
2042 return None;
2043 try:
2044 oSocket.setblocking(0);
2045 except:
2046 oSocket.close();
2047 reporter.fatalXcpt('socket.socket() failed');
2048 return None;
2049
2050 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2051 oWakeupR = None;
2052 oWakeupW = None;
2053 if hasattr(socket, 'socketpair'):
2054 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2055 except: reporter.logXcpt('socket.socketpair() failed');
2056
2057 # Update the state.
2058 self.oCv.acquire();
2059 rc = None;
2060 if not self.fConnectCanceled:
2061 self.oSocket = oSocket;
2062 self.oWakeupW = oWakeupW;
2063 self.oWakeupR = oWakeupR;
2064 self.fIsConnecting = True;
2065 self.oCv.release();
2066
2067 # Try connect.
2068 if oWakeupR is None:
2069 oWakeupR = oSocket; # Avoid select failure.
2070 if self.fReversedSetup:
2071 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2072 else:
2073 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2074 oSocket = None;
2075
2076 # Update the state and cleanup on failure/cancel.
2077 self.oCv.acquire();
2078 if rc is True and self.fConnectCanceled:
2079 rc = False;
2080 self.fIsConnecting = False;
2081
2082 if rc is not True:
2083 if self.oSocket is not None:
2084 self.oSocket.close();
2085 self.oSocket = None;
2086 self._closeWakeupSockets();
2087 self.oCv.release();
2088
2089 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2090 return rc;
2091
2092 def disconnect(self, fQuiet = False):
2093 if self.oSocket is not None:
2094 self.abReadAhead = array.array('B');
2095
2096 # Try a shutting down the socket gracefully (draining it).
2097 try:
2098 self.oSocket.shutdown(socket.SHUT_WR);
2099 except:
2100 if not fQuiet:
2101 reporter.error('shutdown(SHUT_WR)');
2102 try:
2103 self.oSocket.setblocking(0); # just in case it's not set.
2104 sData = "1";
2105 while sData:
2106 sData = self.oSocket.recv(16384);
2107 except:
2108 pass;
2109
2110 # Close it.
2111 self.oCv.acquire();
2112 try: self.oSocket.setblocking(1);
2113 except: pass;
2114 self.oSocket.close();
2115 self.oSocket = None;
2116 else:
2117 self.oCv.acquire();
2118 self._closeWakeupSockets();
2119 self.oCv.release();
2120
2121 def sendBytes(self, abBuf, cMsTimeout):
2122 if self.oSocket is None:
2123 reporter.error('TransportTcp.sendBytes: No connection.');
2124 return False;
2125
2126 # Try send it all.
2127 try:
2128 cbSent = self.oSocket.send(abBuf);
2129 if cbSent == len(abBuf):
2130 return True;
2131 except Exception as oXcpt:
2132 if not self.__isWouldBlockXcpt(oXcpt):
2133 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2134 return False;
2135 cbSent = 0;
2136
2137 # Do a timed send.
2138 msStart = base.timestampMilli();
2139 while True:
2140 cMsElapsed = base.timestampMilli() - msStart;
2141 if cMsElapsed > cMsTimeout:
2142 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2143 break;
2144
2145 # wait.
2146 try:
2147 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2148 if ttRc[2] and not ttRc[1]:
2149 reporter.error('TranportTcp.sendBytes: select returned with exception');
2150 break;
2151 if not ttRc[1]:
2152 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2153 break;
2154 except:
2155 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2156 break;
2157
2158 # Try send more.
2159 try:
2160 cbSent += self.oSocket.send(abBuf[cbSent:]);
2161 if cbSent == len(abBuf):
2162 return True;
2163 except Exception as oXcpt:
2164 if not self.__isWouldBlockXcpt(oXcpt):
2165 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2166 break;
2167
2168 return False;
2169
2170 def __returnReadAheadBytes(self, cb):
2171 """ Internal worker for recvBytes. """
2172 assert(len(self.abReadAhead) >= cb);
2173 abRet = self.abReadAhead[:cb];
2174 self.abReadAhead = self.abReadAhead[cb:];
2175 return abRet;
2176
2177 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2178 if self.oSocket is None:
2179 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2180 return None;
2181
2182 # Try read in some more data without bothering with timeout handling first.
2183 if len(self.abReadAhead) < cb:
2184 try:
2185 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2186 if abBuf:
2187 self.abReadAhead.extend(array.array('B', abBuf));
2188 except Exception as oXcpt:
2189 if not self.__isWouldBlockXcpt(oXcpt):
2190 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2191 return None;
2192
2193 if len(self.abReadAhead) >= cb:
2194 return self.__returnReadAheadBytes(cb);
2195
2196 # Timeout loop.
2197 msStart = base.timestampMilli();
2198 while True:
2199 cMsElapsed = base.timestampMilli() - msStart;
2200 if cMsElapsed > cMsTimeout:
2201 if not fNoDataOk or self.abReadAhead:
2202 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2203 break;
2204
2205 # Wait.
2206 try:
2207 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2208 if ttRc[2] and not ttRc[0]:
2209 reporter.error('TranportTcp.recvBytes: select returned with exception');
2210 break;
2211 if not ttRc[0]:
2212 if not fNoDataOk or self.abReadAhead:
2213 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2214 % (len(self.abReadAhead), cb, fNoDataOk));
2215 break;
2216 except:
2217 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2218 break;
2219
2220 # Try read more.
2221 try:
2222 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2223 if not abBuf:
2224 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2225 % (len(self.abReadAhead), cb, fNoDataOk));
2226 self.disconnect();
2227 return None;
2228
2229 self.abReadAhead.extend(array.array('B', abBuf));
2230
2231 except Exception as oXcpt:
2232 reporter.log('recv => exception %s' % (oXcpt,));
2233 if not self.__isWouldBlockXcpt(oXcpt):
2234 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2235 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2236 break;
2237
2238 # Done?
2239 if len(self.abReadAhead) >= cb:
2240 return self.__returnReadAheadBytes(cb);
2241
2242 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2243 return None;
2244
2245 def isConnectionOk(self):
2246 if self.oSocket is None:
2247 return False;
2248 try:
2249 ttRc = select.select([], [], [self.oSocket], 0.0);
2250 if ttRc[2]:
2251 return False;
2252
2253 self.oSocket.send(array.array('B')); # send zero bytes.
2254 except:
2255 return False;
2256 return True;
2257
2258 def isRecvPending(self, cMsTimeout = 0):
2259 try:
2260 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2261 if not ttRc[0]:
2262 return False;
2263 except:
2264 pass;
2265 return True;
2266
2267
2268def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2269 """
2270 Opens a connection to a Test Execution Service via TCP, given its name.
2271
2272 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2273 or similar.
2274 """
2275 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2276 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2277 try:
2278 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2279 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2280 except:
2281 reporter.errorXcpt(None, 15);
2282 return None;
2283 return oSession;
2284
2285
2286def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2287 """
2288 Tries to open a connection to a Test Execution Service via TCP, given its name.
2289
2290 This differs from openTcpSession in that it won't log a connection failure
2291 as an error.
2292 """
2293 try:
2294 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2295 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2296 except:
2297 reporter.errorXcpt(None, 15);
2298 return None;
2299 return oSession;
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette