VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 93151

Last change on this file since 93151 was 93151, checked in by vboxsync, 3 years ago

ValKit/txsclient.py: log the PROC DOO detail text

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 87.4 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 93151 2022-01-09 01:49:42Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2022 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 93151 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 if sys.version_info < (3, 9, 0):
75 # Removed since Python 3.9.
76 sStr = abStr.tostring(); # pylint: disable=no-member
77 else:
78 sStr = abStr.tobytes();
79 return sStr.decode('utf_8');
80 except:
81 reporter.errorXcpt('getSZ(,%u)' % (off));
82 return sDefault;
83
84def getSZLen(abData, off):
85 """
86 Get the length of a zero-terminated string field, in bytes.
87 Returns -1 if off is beyond the data packet or not properly terminated.
88 """
89 cbData = len(abData);
90 if off >= cbData:
91 return -1;
92
93 offCur = off;
94 while abData[offCur] != 0:
95 offCur = offCur + 1;
96 if offCur >= cbData:
97 return -1;
98
99 return offCur - off;
100
101def isValidOpcodeEncoding(sOpcode):
102 """
103 Checks if the specified opcode is valid or not.
104 Returns True on success.
105 Returns False if it is invalid, details in the log.
106 """
107 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
108 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
109 if len(sOpcode) != 8:
110 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
111 return False;
112 for i in range(0, 1):
113 if sSet1.find(sOpcode[i]) < 0:
114 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
115 return False;
116 for i in range(2, 7):
117 if sSet2.find(sOpcode[i]) < 0:
118 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
119 return False;
120 return True;
121
122#
123# Helper for encoding data sent to the TXS.
124#
125
126def u32ToByteArray(u32):
127 """Encodes the u32 value as a little endian byte (B) array."""
128 return array.array('B',
129 ( u32 % 256,
130 (u32 // 256) % 256,
131 (u32 // 65536) % 256,
132 (u32 // 16777216) % 256) );
133
134def escapeString(sString):
135 """
136 Does $ escaping of the string so TXS doesn't try do variable expansion.
137 """
138 return sString.replace('$', '$$');
139
140
141
142class TransportBase(object):
143 """
144 Base class for the transport layer.
145 """
146
147 def __init__(self, sCaller):
148 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
149 self.fDummy = 0;
150 self.abReadAheadHdr = array.array('B');
151
152 def toString(self):
153 """
154 Stringify the instance for logging and debugging.
155 """
156 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
157
158 def __str__(self):
159 return self.toString();
160
161 def cancelConnect(self):
162 """
163 Cancels any pending connect() call.
164 Returns None;
165 """
166 return None;
167
168 def connect(self, cMsTimeout):
169 """
170 Quietly attempts to connect to the TXS.
171
172 Returns True on success.
173 Returns False on retryable errors (no logging).
174 Returns None on fatal errors with details in the log.
175
176 Override this method, don't call super.
177 """
178 _ = cMsTimeout;
179 return False;
180
181 def disconnect(self, fQuiet = False):
182 """
183 Disconnect from the TXS.
184
185 Returns True.
186
187 Override this method, don't call super.
188 """
189 _ = fQuiet;
190 return True;
191
192 def sendBytes(self, abBuf, cMsTimeout):
193 """
194 Sends the bytes in the buffer abBuf to the TXS.
195
196 Returns True on success.
197 Returns False on failure and error details in the log.
198
199 Override this method, don't call super.
200
201 Remarks: len(abBuf) is always a multiple of 16.
202 """
203 _ = abBuf; _ = cMsTimeout;
204 return False;
205
206 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
207 """
208 Receive cb number of bytes from the TXS.
209
210 Returns the bytes (array('B')) on success.
211 Returns None on failure and error details in the log.
212
213 Override this method, don't call super.
214
215 Remarks: cb is always a multiple of 16.
216 """
217 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
218 return None;
219
220 def isConnectionOk(self):
221 """
222 Checks if the connection is OK.
223
224 Returns True if it is.
225 Returns False if it isn't (caller should call diconnect).
226
227 Override this method, don't call super.
228 """
229 return True;
230
231 def isRecvPending(self, cMsTimeout = 0):
232 """
233 Checks if there is incoming bytes, optionally waiting cMsTimeout
234 milliseconds for something to arrive.
235
236 Returns True if there is, False if there isn't.
237
238 Override this method, don't call super.
239 """
240 _ = cMsTimeout;
241 return False;
242
243 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
244 """
245 Sends a message (opcode + encoded payload).
246
247 Returns True on success.
248 Returns False on failure and error details in the log.
249 """
250 # Fix + check the opcode.
251 if len(sOpcode) < 2:
252 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
253 return False;
254 sOpcode = sOpcode.ljust(8);
255 if not isValidOpcodeEncoding(sOpcode):
256 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
257 return False;
258
259 # Start construct the message.
260 cbMsg = 16 + len(abPayload);
261 abMsg = array.array('B');
262 abMsg.extend(u32ToByteArray(cbMsg));
263 abMsg.extend((0, 0, 0, 0)); # uCrc32
264 try:
265 abMsg.extend(array.array('B', \
266 ( ord(sOpcode[0]), \
267 ord(sOpcode[1]), \
268 ord(sOpcode[2]), \
269 ord(sOpcode[3]), \
270 ord(sOpcode[4]), \
271 ord(sOpcode[5]), \
272 ord(sOpcode[6]), \
273 ord(sOpcode[7]) ) ) );
274 if abPayload:
275 abMsg.extend(abPayload);
276 except:
277 reporter.fatalXcpt('sendMsgInt: packing problem...');
278 return False;
279
280 # checksum it, padd it and send it off.
281 uCrc32 = zlib.crc32(abMsg[8:]);
282 abMsg[4:8] = u32ToByteArray(uCrc32);
283
284 while len(abMsg) % 16:
285 abMsg.append(0);
286
287 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
288 return self.sendBytes(abMsg, cMsTimeout);
289
290 def recvMsg(self, cMsTimeout, fNoDataOk = False):
291 """
292 Receives a message from the TXS.
293
294 Returns the message three-tuple: length, opcode, payload.
295 Returns (None, None, None) on failure and error details in the log.
296 """
297
298 # Read the header.
299 if self.abReadAheadHdr:
300 assert(len(self.abReadAheadHdr) == 16);
301 abHdr = self.abReadAheadHdr;
302 self.abReadAheadHdr = array.array('B');
303 else:
304 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
305 if abHdr is None:
306 return (None, None, None);
307 if len(abHdr) != 16:
308 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
309 return (None, None, None);
310
311 # Unpack and validate the header.
312 cbMsg = getU32(abHdr, 0);
313 uCrc32 = getU32(abHdr, 4);
314
315 if sys.version_info < (3, 9, 0):
316 # Removed since Python 3.9.
317 sOpcode = abHdr[8:16].tostring(); # pylint: disable=no-member
318 else:
319 sOpcode = abHdr[8:16].tobytes();
320 sOpcode = sOpcode.decode('ascii');
321
322 if cbMsg < 16:
323 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
324 return (None, None, None);
325 if cbMsg > 1024*1024:
326 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
327 return (None, None, None);
328 if not isValidOpcodeEncoding(sOpcode):
329 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
330 return (None, None, None);
331
332 # Get the payload (if any), dropping the padding.
333 abPayload = array.array('B');
334 if cbMsg > 16:
335 if cbMsg % 16:
336 cbPadding = 16 - (cbMsg % 16);
337 else:
338 cbPadding = 0;
339 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
340 if abPayload is None:
341 self.abReadAheadHdr = abHdr;
342 if not fNoDataOk :
343 reporter.log('recvMsg: failed to recv payload bytes!');
344 return (None, None, None);
345
346 while cbPadding > 0:
347 abPayload.pop();
348 cbPadding = cbPadding - 1;
349
350 # Check the CRC-32.
351 if uCrc32 != 0:
352 uActualCrc32 = zlib.crc32(abHdr[8:]);
353 if cbMsg > 16:
354 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
355 uActualCrc32 = uActualCrc32 & 0xffffffff;
356 if uCrc32 != uActualCrc32:
357 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
358 return (None, None, None);
359
360 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
361 return (cbMsg, sOpcode, abPayload);
362
363 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
364 """
365 Sends a message (opcode + payload tuple).
366
367 Returns True on success.
368 Returns False on failure and error details in the log.
369 Returns None if you pass the incorrectly typed parameters.
370 """
371 # Encode the payload.
372 abPayload = array.array('B');
373 for o in aoPayload:
374 try:
375 if utils.isString(o):
376 if sys.version_info[0] >= 3:
377 abPayload.extend(o.encode('utf_8'));
378 else:
379 # the primitive approach...
380 sUtf8 = o.encode('utf_8');
381 for ch in sUtf8:
382 abPayload.append(ord(ch))
383 abPayload.append(0);
384 elif isinstance(o, (long, int)):
385 if o < 0 or o > 0xffffffff:
386 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
387 return None;
388 abPayload.extend(u32ToByteArray(o));
389 elif isinstance(o, array.array):
390 abPayload.extend(o);
391 else:
392 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
393 return None;
394 except:
395 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
396 return None;
397 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
398
399
400class Session(TdTaskBase):
401 """
402 A Test eXecution Service (TXS) client session.
403 """
404
405 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
406 """
407 Construct a TXS session.
408
409 This starts by connecting to the TXS and will enter the signalled state
410 when connected or the timeout has been reached.
411 """
412 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
413 self.oTransport = oTransport;
414 self.sStatus = "";
415 self.cMsTimeout = 0;
416 self.fErr = True; # Whether to report errors as error.
417 self.msStart = 0;
418 self.oThread = None;
419 self.fnTask = self.taskDummy;
420 self.aTaskArgs = None;
421 self.oTaskRc = None;
422 self.t3oReply = (None, None, None);
423 self.fScrewedUpMsgState = False;
424 self.fTryConnect = fTryConnect;
425
426 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
427 raise base.GenError("startTask failed");
428
429 def __del__(self):
430 """Make sure to cancel the task when deleted."""
431 self.cancelTask();
432
433 def toString(self):
434 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
435 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
436 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
437 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
438
439 def taskDummy(self):
440 """Place holder to catch broken state handling."""
441 raise Exception();
442
443 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
444 """
445 Kicks of a new task.
446
447 cMsTimeout: The task timeout in milliseconds. Values less than
448 500 ms will be adjusted to 500 ms. This means it is
449 OK to use negative value.
450 sStatus: The task status.
451 fnTask: The method that'll execute the task.
452 aArgs: Arguments to pass to fnTask.
453
454 Returns True on success, False + error in log on failure.
455 """
456 if not self.cancelTask():
457 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
458 return False;
459
460 # Change status and make sure we're the
461 self.lockTask();
462 if self.sStatus != "":
463 self.unlockTask();
464 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
465 return False;
466 self.sStatus = "setup";
467 self.oTaskRc = None;
468 self.t3oReply = (None, None, None);
469 self.resetTaskLocked();
470 self.unlockTask();
471
472 self.cMsTimeout = max(cMsTimeout, 500);
473 self.fErr = not fIgnoreErrors;
474 self.fnTask = fnTask;
475 self.aTaskArgs = aArgs;
476 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
477 self.oThread.setDaemon(True);
478 self.msStart = base.timestampMilli();
479
480 self.lockTask();
481 self.sStatus = sStatus;
482 self.unlockTask();
483 self.oThread.start();
484
485 return True;
486
487 def cancelTask(self, fSync = True):
488 """
489 Attempts to cancel any pending tasks.
490 Returns success indicator (True/False).
491 """
492 self.lockTask();
493
494 if self.sStatus == "":
495 self.unlockTask();
496 return True;
497 if self.sStatus == "setup":
498 self.unlockTask();
499 return False;
500 if self.sStatus == "cancelled":
501 self.unlockTask();
502 return False;
503
504 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
505 if self.sStatus == 'connecting':
506 self.oTransport.cancelConnect();
507
508 self.sStatus = "cancelled";
509 oThread = self.oThread;
510 self.unlockTask();
511
512 if not fSync:
513 return False;
514
515 oThread.join(61.0);
516
517 if sys.version_info < (3, 9, 0):
518 # Removed since Python 3.9.
519 return oThread.isAlive(); # pylint: disable=no-member
520 return oThread.is_alive();
521
522 def taskThread(self):
523 """
524 The task thread function.
525 This does some housekeeping activities around the real task method call.
526 """
527 if not self.isCancelled():
528 try:
529 fnTask = self.fnTask;
530 oTaskRc = fnTask(*self.aTaskArgs);
531 except:
532 reporter.fatalXcpt('taskThread', 15);
533 oTaskRc = None;
534 else:
535 reporter.log('taskThread: cancelled already');
536
537 self.lockTask();
538
539 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
540 self.oTaskRc = oTaskRc;
541 self.oThread = None;
542 self.sStatus = '';
543 self.signalTaskLocked();
544
545 self.unlockTask();
546 return None;
547
548 def isCancelled(self):
549 """Internal method for checking if the task has been cancelled."""
550 self.lockTask();
551 sStatus = self.sStatus;
552 self.unlockTask();
553 if sStatus == "cancelled":
554 return True;
555 return False;
556
557 def hasTimedOut(self):
558 """Internal method for checking if the task has timed out or not."""
559 cMsLeft = self.getMsLeft();
560 if cMsLeft <= 0:
561 return True;
562 return False;
563
564 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
565 """Gets the time left until the timeout."""
566 cMsElapsed = base.timestampMilli() - self.msStart;
567 if cMsElapsed < 0:
568 return cMsMin;
569 cMsLeft = self.cMsTimeout - cMsElapsed;
570 if cMsLeft <= cMsMin:
571 return cMsMin;
572 if cMsLeft > cMsMax > 0:
573 return cMsMax
574 return cMsLeft;
575
576 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
577 """
578 Wrapper for TransportBase.recvMsg that stashes the response away
579 so the client can inspect it later on.
580 """
581 if cMsTimeout is None:
582 cMsTimeout = self.getMsLeft(500);
583 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
584 self.lockTask();
585 self.t3oReply = (cbMsg, sOpcode, abPayload);
586 self.unlockTask();
587 return (cbMsg, sOpcode, abPayload);
588
589 def recvAck(self, fNoDataOk = False):
590 """
591 Receives an ACK or error response from the TXS.
592
593 Returns True on success.
594 Returns False on timeout or transport error.
595 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
596 and there are always details of some sort or another.
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
599 if cbMsg is None:
600 return False;
601 sOpcode = sOpcode.strip()
602 if sOpcode == "ACK":
603 return True;
604 return (sOpcode, getSZ(abPayload, 0, sOpcode));
605
606 def recvAckLogged(self, sCommand, fNoDataOk = False):
607 """
608 Wrapper for recvAck and logging.
609 Returns True on success (ACK).
610 Returns False on time, transport error and errors signalled by TXS.
611 """
612 rc = self.recvAck(fNoDataOk);
613 if rc is not True and not fNoDataOk:
614 if rc is False:
615 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
616 else:
617 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
618 rc = False;
619 return rc;
620
621 def recvTrueFalse(self, sCommand):
622 """
623 Receives a TRUE/FALSE response from the TXS.
624 Returns True on TRUE, False on FALSE and None on error/other (logged).
625 """
626 cbMsg, sOpcode, abPayload = self.recvReply();
627 if cbMsg is None:
628 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
629 return None;
630
631 sOpcode = sOpcode.strip()
632 if sOpcode == "TRUE":
633 return True;
634 if sOpcode == "FALSE":
635 return False;
636 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
637 return None;
638
639 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
640 """
641 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
642 """
643 if cMsTimeout is None:
644 cMsTimeout = self.getMsLeft(500);
645 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
646
647 def asyncToSync(self, fnAsync, *aArgs):
648 """
649 Wraps an asynchronous task into a synchronous operation.
650
651 Returns False on failure, task return status on success.
652 """
653 rc = fnAsync(*aArgs);
654 if rc is False:
655 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
656 return rc;
657
658 rc = self.waitForTask(self.cMsTimeout + 5000);
659 if rc is False:
660 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
661 self.cancelTask();
662 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
663 return False;
664
665 rc = self.getResult();
666 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
667 return rc;
668
669 #
670 # Connection tasks.
671 #
672
673 def taskConnect(self, cMsIdleFudge):
674 """Tries to connect to the TXS"""
675 while not self.isCancelled():
676 reporter.log2('taskConnect: connecting ...');
677 rc = self.oTransport.connect(self.getMsLeft(500));
678 if rc is True:
679 reporter.log('taskConnect: succeeded');
680 return self.taskGreet(cMsIdleFudge);
681 if rc is None:
682 reporter.log2('taskConnect: unable to connect');
683 return None;
684 if self.hasTimedOut():
685 reporter.log2('taskConnect: timed out');
686 if not self.fTryConnect:
687 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
688 return False;
689 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
690 if not self.fTryConnect:
691 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
692 return False;
693
694 def taskGreet(self, cMsIdleFudge):
695 """Greets the TXS"""
696 rc = self.sendMsg("HOWDY", ());
697 if rc is True:
698 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
699 if rc is True:
700 while cMsIdleFudge > 0:
701 cMsIdleFudge -= 1000;
702 time.sleep(1);
703 else:
704 self.oTransport.disconnect(self.fTryConnect);
705 return rc;
706
707 def taskBye(self):
708 """Says goodbye to the TXS"""
709 rc = self.sendMsg("BYE");
710 if rc is True:
711 rc = self.recvAckLogged("BYE");
712 self.oTransport.disconnect();
713 return rc;
714
715 def taskVer(self):
716 """Requests version information from TXS"""
717 rc = self.sendMsg("VER");
718 if rc is True:
719 rc = False;
720 cbMsg, sOpcode, abPayload = self.recvReply();
721 if cbMsg is not None:
722 sOpcode = sOpcode.strip();
723 if sOpcode == "ACK VER":
724 sVer = getSZ(abPayload, 0);
725 if sVer is not None:
726 rc = sVer;
727 else:
728 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
729 else:
730 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
731 return rc;
732
733 def taskUuid(self):
734 """Gets the TXS UUID"""
735 rc = self.sendMsg("UUID");
736 if rc is True:
737 rc = False;
738 cbMsg, sOpcode, abPayload = self.recvReply();
739 if cbMsg is not None:
740 sOpcode = sOpcode.strip()
741 if sOpcode == "ACK UUID":
742 sUuid = getSZ(abPayload, 0);
743 if sUuid is not None:
744 sUuid = '{%s}' % (sUuid,)
745 try:
746 _ = uuid.UUID(sUuid);
747 rc = sUuid;
748 except:
749 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
750 else:
751 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
752 else:
753 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
754 else:
755 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
756 return rc;
757
758 #
759 # Process task
760 # pylint: disable=missing-docstring
761 #
762
763 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
764 # Construct the payload.
765 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
766 for sArg in asArgs:
767 aoPayload.append('%s' % (sArg));
768 aoPayload.append(long(len(asAddEnv)));
769 for sPutEnv in asAddEnv:
770 aoPayload.append('%s' % (sPutEnv));
771 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
772 if utils.isString(o):
773 aoPayload.append(o);
774 elif o is not None:
775 aoPayload.append('|');
776 o.uTxsClientCrc32 = zlib.crc32(b'');
777 else:
778 aoPayload.append('');
779 aoPayload.append('%s' % (sAsUser));
780 aoPayload.append(long(self.cMsTimeout));
781
782 # Kick of the EXEC command.
783 rc = self.sendMsg('EXEC', aoPayload)
784 if rc is True:
785 rc = self.recvAckLogged('EXEC');
786 if rc is True:
787 # Loop till the process completes, feed input to the TXS and
788 # receive output from it.
789 sFailure = "";
790 msPendingInputReply = None;
791 cbMsg, sOpcode, abPayload = (None, None, None);
792 while True:
793 # Pending input?
794 if msPendingInputReply is None \
795 and oStdIn is not None \
796 and not utils.isString(oStdIn):
797 try:
798 sInput = oStdIn.read(65536);
799 except:
800 reporter.errorXcpt('read standard in');
801 sFailure = 'exception reading stdin';
802 rc = None;
803 break;
804 if sInput:
805 # Convert to a byte array before handing it of to sendMsg or the string
806 # will get some zero termination added breaking the CRC (and injecting
807 # unwanted bytes).
808 abInput = array.array('B', sInput.encode('utf-8'));
809 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
810 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
811 if rc is not True:
812 sFailure = 'sendMsg failure';
813 break;
814 msPendingInputReply = base.timestampMilli();
815 continue;
816
817 rc = self.sendMsg('STDINEOS');
818 oStdIn = None;
819 if rc is not True:
820 sFailure = 'sendMsg failure';
821 break;
822 msPendingInputReply = base.timestampMilli();
823
824 # Wait for input (500 ms timeout).
825 if cbMsg is None:
826 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
827 if cbMsg is None:
828 # Check for time out before restarting the loop.
829 # Note! Only doing timeout checking here does mean that
830 # the TXS may prevent us from timing out by
831 # flooding us with data. This is unlikely though.
832 if self.hasTimedOut() \
833 and ( msPendingInputReply is None \
834 or base.timestampMilli() - msPendingInputReply > 30000):
835 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
836 sFailure = 'timeout';
837 rc = None;
838 break;
839 # Check that the connection is OK.
840 if not self.oTransport.isConnectionOk():
841 self.oTransport.disconnect();
842 sFailure = 'disconnected';
843 rc = False;
844 break;
845 continue;
846
847 # Handle the response.
848 sOpcode = sOpcode.rstrip();
849 if sOpcode == 'STDOUT':
850 oOut = oStdOut;
851 elif sOpcode == 'STDERR':
852 oOut = oStdErr;
853 elif sOpcode == 'TESTPIPE':
854 oOut = oTestPipe;
855 else:
856 oOut = None;
857 if oOut is not None:
858 # Output from the process.
859 if len(abPayload) < 4:
860 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
861 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
862 rc = None;
863 break;
864 uStreamCrc32 = getU32(abPayload, 0);
865 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
866 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
867 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
868 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
869 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
870 rc = None;
871 break;
872 try:
873 oOut.write(abPayload[4:]);
874 except:
875 sFailure = 'exception writing %s' % (sOpcode);
876 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
877 rc = None;
878 break;
879 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
880 # Standard input is ignored. Ignore this condition for now.
881 msPendingInputReply = None;
882 reporter.log('taskExecEx: Standard input is ignored... why?');
883 del oStdIn.uTxsClientCrc32;
884 oStdIn = '/dev/null';
885 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
886 and msPendingInputReply is not None:
887 # TXS STDIN error, abort.
888 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
889 msPendingInputReply = None;
890 sFailure = 'TXS is out of memory for std input buffering';
891 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
892 rc = None;
893 break;
894 elif sOpcode == 'ACK' and msPendingInputReply is not None:
895 msPendingInputReply = None;
896 elif sOpcode.startswith('PROC '):
897 # Process status message, handle it outside the loop.
898 rc = True;
899 break;
900 else:
901 sFailure = 'Unexpected opcode %s' % (sOpcode);
902 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
903 rc = None;
904 break;
905 # Clear the message.
906 cbMsg, sOpcode, abPayload = (None, None, None);
907
908 # If we sent an STDIN packet and didn't get a reply yet, we'll give
909 # TXS some 5 seconds to reply to this. If we don't wait here we'll
910 # get screwed later on if we mix it up with the reply to some other
911 # command. Hackish.
912 if msPendingInputReply is not None:
913 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
914 if cbMsg2 is not None:
915 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
916 % (cbMsg2, sOpcode2, abPayload2));
917 msPendingInputReply = None;
918 else:
919 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
920 self.fScrewedUpMsgState = True;
921
922 # Parse the exit status (True), abort (None) or do nothing (False).
923 if rc is True:
924 if sOpcode == 'PROC OK':
925 pass;
926 else:
927 rc = False;
928 # Do proper parsing some other day if needed:
929 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
930 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
931 if sOpcode == 'PROC DOO':
932 reporter.log('taskExecEx: PROC DOO[fus]: %s' % (abPayload,));
933
934 else:
935 if rc is None:
936 # Abort it.
937 reporter.log('taskExecEx: sending ABORT...');
938 rc = self.sendMsg('ABORT');
939 while rc is True:
940 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
941 if cbMsg is None:
942 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
943 self.fScrewedUpMsgState = True;
944 break;
945 if sOpcode.startswith('PROC '):
946 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
947 break;
948 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
949 # Check that the connection is OK before looping.
950 if not self.oTransport.isConnectionOk():
951 self.oTransport.disconnect();
952 break;
953
954 # Fake response with the reason why we quit.
955 if sFailure is not None:
956 self.t3oReply = (0, 'EXECFAIL', sFailure);
957 rc = None;
958 else:
959 rc = None;
960
961 # Cleanup.
962 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
963 if o is not None and not utils.isString(o):
964 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
965 # Make sure all files are closed
966 o.close(); # pylint: disable=maybe-no-member
967 reporter.log('taskExecEx: returns %s' % (rc));
968 return rc;
969
970 #
971 # Admin tasks
972 #
973
974 def hlpRebootShutdownWaitForAck(self, sCmd):
975 """Wait for reboot/shutodwn ACK."""
976 rc = self.recvAckLogged(sCmd);
977 if rc is True:
978 # poll a little while for server to disconnect.
979 uMsStart = base.timestampMilli();
980 while self.oTransport.isConnectionOk() \
981 and base.timestampMilli() - uMsStart >= 5000:
982 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
983 break;
984 self.oTransport.disconnect();
985 return rc;
986
987 def taskReboot(self):
988 rc = self.sendMsg('REBOOT');
989 if rc is True:
990 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
991 return rc;
992
993 def taskShutdown(self):
994 rc = self.sendMsg('SHUTDOWN');
995 if rc is True:
996 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
997 return rc;
998
999 #
1000 # CD/DVD control tasks.
1001 #
1002
1003 ## TODO
1004
1005 #
1006 # File system tasks
1007 #
1008
1009 def taskMkDir(self, sRemoteDir, fMode):
1010 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
1011 if rc is True:
1012 rc = self.recvAckLogged('MKDIR');
1013 return rc;
1014
1015 def taskMkDirPath(self, sRemoteDir, fMode):
1016 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
1017 if rc is True:
1018 rc = self.recvAckLogged('MKDRPATH');
1019 return rc;
1020
1021 def taskMkSymlink(self, sLinkTarget, sLink):
1022 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1023 if rc is True:
1024 rc = self.recvAckLogged('MKSYMLNK');
1025 return rc;
1026
1027 def taskRmDir(self, sRemoteDir):
1028 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1029 if rc is True:
1030 rc = self.recvAckLogged('RMDIR');
1031 return rc;
1032
1033 def taskRmFile(self, sRemoteFile):
1034 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1035 if rc is True:
1036 rc = self.recvAckLogged('RMFILE');
1037 return rc;
1038
1039 def taskRmSymlink(self, sRemoteSymlink):
1040 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1041 if rc is True:
1042 rc = self.recvAckLogged('RMSYMLNK');
1043 return rc;
1044
1045 def taskRmTree(self, sRemoteTree):
1046 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1047 if rc is True:
1048 rc = self.recvAckLogged('RMTREE');
1049 return rc;
1050
1051 def taskChMod(self, sRemotePath, fMode):
1052 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1053 if rc is True:
1054 rc = self.recvAckLogged('CHMOD');
1055 return rc;
1056
1057 def taskChOwn(self, sRemotePath, idUser, idGroup):
1058 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1059 if rc is True:
1060 rc = self.recvAckLogged('CHOWN');
1061 return rc;
1062
1063 def taskIsDir(self, sRemoteDir):
1064 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1065 if rc is True:
1066 rc = self.recvTrueFalse('ISDIR');
1067 return rc;
1068
1069 def taskIsFile(self, sRemoteFile):
1070 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1071 if rc is True:
1072 rc = self.recvTrueFalse('ISFILE');
1073 return rc;
1074
1075 def taskIsSymlink(self, sRemoteSymlink):
1076 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1077 if rc is True:
1078 rc = self.recvTrueFalse('ISSYMLNK');
1079 return rc;
1080
1081 #def "STAT "
1082 #def "LSTAT "
1083 #def "LIST "
1084
1085 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1086 #
1087 # Open the local file (make sure it exist before bothering TXS) and
1088 # tell TXS that we want to upload a file.
1089 #
1090 try:
1091 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1092 except:
1093 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1094 return False;
1095
1096 # Common cause with taskUploadStr
1097 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1098
1099 # Cleanup.
1100 oLocalFile.close();
1101 return rc;
1102
1103 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1104 # Wrap sContent in a file like class.
1105 class InStringFile(object): # pylint: disable=too-few-public-methods
1106 def __init__(self, sContent):
1107 self.sContent = sContent;
1108 self.off = 0;
1109
1110 def read(self, cbMax):
1111 cbLeft = len(self.sContent) - self.off;
1112 if cbLeft == 0:
1113 return "";
1114 if cbLeft <= cbMax:
1115 sRet = self.sContent[self.off:(self.off + cbLeft)];
1116 else:
1117 sRet = self.sContent[self.off:(self.off + cbMax)];
1118 self.off = self.off + len(sRet);
1119 return sRet;
1120
1121 oLocalString = InStringFile(sContent);
1122 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1123
1124 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1125 """Common worker used by taskUploadFile and taskUploadString."""
1126 #
1127 # Command + ACK.
1128 #
1129 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1130 # Fall back on the old command if the new one is not known by the TXS.
1131 #
1132 if fMode == 0:
1133 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1134 if rc is True:
1135 rc = self.recvAckLogged('PUT FILE');
1136 else:
1137 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1138 if rc is True:
1139 rc = self.recvAck();
1140 if rc is False:
1141 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1142 elif rc is not True:
1143 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1144 # Fallback:
1145 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1146 if rc is True:
1147 rc = self.recvAckLogged('PUT FILE');
1148 else:
1149 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1150 rc = False;
1151 if rc is True:
1152 #
1153 # Push data packets until eof.
1154 #
1155 uMyCrc32 = zlib.crc32(b'');
1156 while True:
1157 # Read up to 64 KB of data.
1158 try:
1159 sRaw = oLocalFile.read(65536);
1160 except:
1161 rc = None;
1162 break;
1163
1164 # Convert to array - this is silly!
1165 abBuf = array.array('B');
1166 if utils.isString(sRaw):
1167 for i, _ in enumerate(sRaw):
1168 abBuf.append(ord(sRaw[i]));
1169 else:
1170 abBuf.extend(sRaw);
1171 sRaw = None;
1172
1173 # Update the file stream CRC and send it off.
1174 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1175 if not abBuf:
1176 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1177 else:
1178 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1179 if rc is False:
1180 break;
1181
1182 # Wait for the reply.
1183 rc = self.recvAck();
1184 if rc is not True:
1185 if rc is False:
1186 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1187 else:
1188 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1189 rc = False;
1190 break;
1191
1192 # EOF?
1193 if not abBuf:
1194 break;
1195
1196 # Send ABORT on ACK and I/O errors.
1197 if rc is None:
1198 rc = self.sendMsg('ABORT');
1199 if rc is True:
1200 self.recvAckLogged('ABORT');
1201 rc = False;
1202 return rc;
1203
1204 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1205 try:
1206 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1207 except:
1208 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1209 return False;
1210
1211 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1212
1213 oLocalFile.close();
1214 if rc is False:
1215 try:
1216 os.remove(sLocalFile);
1217 except:
1218 reporter.errorXcpt();
1219 return rc;
1220
1221 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1222 # Wrap sContent in a file like class.
1223 class OutStringFile(object): # pylint: disable=too-few-public-methods
1224 def __init__(self):
1225 self.asContent = [];
1226
1227 def write(self, sBuf):
1228 self.asContent.append(sBuf);
1229 return None;
1230
1231 oLocalString = OutStringFile();
1232 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1233 if rc is True:
1234 rc = '';
1235 for sBuf in oLocalString.asContent:
1236 if hasattr(sBuf, 'decode'):
1237 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1238 else:
1239 rc += sBuf;
1240 return rc;
1241
1242 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1243 """Common worker for taskDownloadFile and taskDownloadString."""
1244 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1245 if rc is True:
1246 #
1247 # Process data packets until eof.
1248 #
1249 uMyCrc32 = zlib.crc32(b'');
1250 while rc is True:
1251 cbMsg, sOpcode, abPayload = self.recvReply();
1252 if cbMsg is None:
1253 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1254 rc = None;
1255 break;
1256
1257 # Validate.
1258 sOpcode = sOpcode.rstrip();
1259 if sOpcode not in ('DATA', 'DATA EOF',):
1260 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1261 % (sOpcode, getSZ(abPayload, 0, "None")));
1262 rc = False;
1263 break;
1264 if sOpcode == 'DATA' and len(abPayload) < 4:
1265 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1266 rc = None;
1267 break;
1268 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1269 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1270 rc = None;
1271 break;
1272
1273 # Check the CRC (common for both packets).
1274 uCrc32 = getU32(abPayload, 0);
1275 if sOpcode == 'DATA':
1276 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1277 if uCrc32 != (uMyCrc32 & 0xffffffff):
1278 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1279 % (hex(uMyCrc32), hex(uCrc32)));
1280 rc = None;
1281 break;
1282 if sOpcode == 'DATA EOF':
1283 rc = self.sendMsg('ACK');
1284 break;
1285
1286 # Finally, push the data to the file.
1287 try:
1288 if sys.version_info < (3, 9, 0):
1289 # Removed since Python 3.9.
1290 abData = abPayload[4:].tostring();
1291 else:
1292 abData = abPayload[4:].tobytes();
1293 oLocalFile.write(abData);
1294 except:
1295 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1296 rc = None;
1297 break;
1298 rc = self.sendMsg('ACK');
1299
1300 # Send NACK on validation and I/O errors.
1301 if rc is None:
1302 rc = self.sendMsg('NACK');
1303 rc = False;
1304 return rc;
1305
1306 def taskPackFile(self, sRemoteFile, sRemoteSource):
1307 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1308 if rc is True:
1309 rc = self.recvAckLogged('PKFILE');
1310 return rc;
1311
1312 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1313 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1314 if rc is True:
1315 rc = self.recvAckLogged('UNPKFILE');
1316 return rc;
1317
1318 # pylint: enable=missing-docstring
1319
1320
1321 #
1322 # Public methods - generic task queries
1323 #
1324
1325 def isSuccess(self):
1326 """Returns True if the task completed successfully, otherwise False."""
1327 self.lockTask();
1328 sStatus = self.sStatus;
1329 oTaskRc = self.oTaskRc;
1330 self.unlockTask();
1331 if sStatus != "":
1332 return False;
1333 if oTaskRc is False or oTaskRc is None:
1334 return False;
1335 return True;
1336
1337 def getResult(self):
1338 """
1339 Returns the result of a completed task.
1340 Returns None if not completed yet or no previous task.
1341 """
1342 self.lockTask();
1343 sStatus = self.sStatus;
1344 oTaskRc = self.oTaskRc;
1345 self.unlockTask();
1346 if sStatus != "":
1347 return None;
1348 return oTaskRc;
1349
1350 def getLastReply(self):
1351 """
1352 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1353 Returns a None, None, None three-tuple if there was no last reply.
1354 """
1355 self.lockTask();
1356 t3oReply = self.t3oReply;
1357 self.unlockTask();
1358 return t3oReply;
1359
1360 #
1361 # Public methods - connection.
1362 #
1363
1364 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1365 """
1366 Initiates a disconnect task.
1367
1368 Returns True on success, False on failure (logged).
1369
1370 The task returns True on success and False on failure.
1371 """
1372 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1373
1374 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1375 """Synchronous version."""
1376 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1377
1378 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1379 """
1380 Initiates a task for getting the TXS version information.
1381
1382 Returns True on success, False on failure (logged).
1383
1384 The task returns the version string on success and False on failure.
1385 """
1386 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1387
1388 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1389 """Synchronous version."""
1390 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1391
1392 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1393 """
1394 Initiates a task for getting the TXS UUID.
1395
1396 Returns True on success, False on failure (logged).
1397
1398 The task returns UUID string (in {}) on success and False on failure.
1399 """
1400 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1401
1402 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1403 """Synchronous version."""
1404 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1405
1406 #
1407 # Public methods - execution.
1408 #
1409
1410 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1411 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1412 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1413 """
1414 Initiates a exec process task.
1415
1416 Returns True on success, False on failure (logged).
1417
1418 The task returns True if the process exited normally with status code 0.
1419 The task returns None if on failure prior to executing the process, and
1420 False if the process exited with a different status or in an abnormal
1421 manner. Both None and False are logged of course and further info can
1422 also be obtained by getLastReply().
1423
1424 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1425 these streams. If None, no special action is taken and the output goes
1426 to where ever the TXS sends its output, and ditto for input.
1427 - To send to / read from the bitbucket, pass '/dev/null'.
1428 - To redirect to/from a file, just specify the remote filename.
1429 - To append to a file use '>>' followed by the remote filename.
1430 - To pipe the stream to/from the TXS, specify a file like
1431 object. For StdIn a non-blocking read() method is required. For
1432 the other a write() method is required. Watch out for deadlock
1433 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1434 """
1435 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1436 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1437 oStdOut, oStdErr, oTestPipe, sAsUser));
1438
1439 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1440 oStdIn = '/dev/null', oStdOut = '/dev/null',
1441 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1442 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1443 """Synchronous version."""
1444 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1445 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1446
1447 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1448 cMsTimeout = 3600000, fIgnoreErrors = False):
1449 """
1450 Initiates a exec process test task.
1451
1452 Returns True on success, False on failure (logged).
1453
1454 The task returns True if the process exited normally with status code 0.
1455 The task returns None if on failure prior to executing the process, and
1456 False if the process exited with a different status or in an abnormal
1457 manner. Both None and False are logged of course and further info can
1458 also be obtained by getLastReply().
1459
1460 Standard in is taken from /dev/null. While both standard output and
1461 standard error goes directly to reporter.log(). The testpipe is piped
1462 to reporter.xxxx.
1463 """
1464
1465 sStdIn = '/dev/null';
1466 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1467 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1468 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1469 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1470
1471 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1472 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1473
1474 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1475 cMsTimeout = 3600000, fIgnoreErrors = False):
1476 """Synchronous version."""
1477 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1478 cMsTimeout, fIgnoreErrors);
1479
1480 #
1481 # Public methods - system
1482 #
1483
1484 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1485 """
1486 Initiates a reboot task.
1487
1488 Returns True on success, False on failure (logged).
1489
1490 The task returns True on success, False on failure (logged). The
1491 session will be disconnected on successful task completion.
1492 """
1493 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1494
1495 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1496 """Synchronous version."""
1497 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1498
1499 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1500 """
1501 Initiates a shutdown task.
1502
1503 Returns True on success, False on failure (logged).
1504
1505 The task returns True on success, False on failure (logged).
1506 """
1507 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1508
1509 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1510 """Synchronous version."""
1511 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1512
1513
1514 #
1515 # Public methods - file system
1516 #
1517
1518 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1519 """
1520 Initiates a mkdir task.
1521
1522 Returns True on success, False on failure (logged).
1523
1524 The task returns True on success, False on failure (logged).
1525 """
1526 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1527
1528 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1529 """Synchronous version."""
1530 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1531
1532 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1533 """
1534 Initiates a mkdir -p task.
1535
1536 Returns True on success, False on failure (logged).
1537
1538 The task returns True on success, False on failure (logged).
1539 """
1540 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1541
1542 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1543 """Synchronous version."""
1544 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1545
1546 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1547 """
1548 Initiates a symlink task.
1549
1550 Returns True on success, False on failure (logged).
1551
1552 The task returns True on success, False on failure (logged).
1553 """
1554 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1555
1556 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1557 """Synchronous version."""
1558 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1559
1560 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1561 """
1562 Initiates a rmdir task.
1563
1564 Returns True on success, False on failure (logged).
1565
1566 The task returns True on success, False on failure (logged).
1567 """
1568 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1569
1570 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1571 """Synchronous version."""
1572 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1573
1574 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1575 """
1576 Initiates a rmfile task.
1577
1578 Returns True on success, False on failure (logged).
1579
1580 The task returns True on success, False on failure (logged).
1581 """
1582 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1583
1584 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1585 """Synchronous version."""
1586 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1587
1588 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1589 """
1590 Initiates a rmsymlink task.
1591
1592 Returns True on success, False on failure (logged).
1593
1594 The task returns True on success, False on failure (logged).
1595 """
1596 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1597
1598 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1599 """Synchronous version."""
1600 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1601
1602 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1603 """
1604 Initiates a rmtree task.
1605
1606 Returns True on success, False on failure (logged).
1607
1608 The task returns True on success, False on failure (logged).
1609 """
1610 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1611
1612 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1613 """Synchronous version."""
1614 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1615
1616 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1617 """
1618 Initiates a chmod task.
1619
1620 Returns True on success, False on failure (logged).
1621
1622 The task returns True on success, False on failure (logged).
1623 """
1624 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1625
1626 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1627 """Synchronous version."""
1628 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1629
1630 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1631 """
1632 Initiates a chown task.
1633
1634 Returns True on success, False on failure (logged).
1635
1636 The task returns True on success, False on failure (logged).
1637 """
1638 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1639
1640 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1641 """Synchronous version."""
1642 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1643
1644 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1645 """
1646 Initiates a is-dir query task.
1647
1648 Returns True on success, False on failure (logged).
1649
1650 The task returns True if it's a directory, False if it isn't, and
1651 None on error (logged).
1652 """
1653 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1654
1655 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1656 """Synchronous version."""
1657 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1658
1659 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1660 """
1661 Initiates a is-file query task.
1662
1663 Returns True on success, False on failure (logged).
1664
1665 The task returns True if it's a file, False if it isn't, and None on
1666 error (logged).
1667 """
1668 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1669
1670 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1671 """Synchronous version."""
1672 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1673
1674 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1675 """
1676 Initiates a is-symbolic-link query task.
1677
1678 Returns True on success, False on failure (logged).
1679
1680 The task returns True if it's a symbolic linke, False if it isn't, and
1681 None on error (logged).
1682 """
1683 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1684
1685 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1686 """Synchronous version."""
1687 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1688
1689 #def "STAT "
1690 #def "LSTAT "
1691 #def "LIST "
1692
1693 @staticmethod
1694 def calcFileXferTimeout(cbFile):
1695 """
1696 Calculates a reasonable timeout for an upload/download given the file size.
1697
1698 Returns timeout in milliseconds.
1699 """
1700 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1701
1702 @staticmethod
1703 def calcUploadTimeout(sLocalFile):
1704 """
1705 Calculates a reasonable timeout for an upload given the file (will stat it).
1706
1707 Returns timeout in milliseconds.
1708 """
1709 try: cbFile = os.path.getsize(sLocalFile);
1710 except: cbFile = 1024*1024;
1711 return Session.calcFileXferTimeout(cbFile);
1712
1713 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1714 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1715 """
1716 Initiates a download query task.
1717
1718 Returns True on success, False on failure (logged).
1719
1720 The task returns True on success, False on failure (logged).
1721 """
1722 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1723 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1724
1725 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1726 """Synchronous version."""
1727 if cMsTimeout <= 0:
1728 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1729 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1730
1731 def asyncUploadString(self, sContent, sRemoteFile,
1732 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1733 """
1734 Initiates a upload string task.
1735
1736 Returns True on success, False on failure (logged).
1737
1738 The task returns True on success, False on failure (logged).
1739 """
1740 if cMsTimeout <= 0:
1741 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1742 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1743 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1744
1745 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1746 """Synchronous version."""
1747 if cMsTimeout <= 0:
1748 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1749 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1750
1751 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1752 """
1753 Initiates a download file task.
1754
1755 Returns True on success, False on failure (logged).
1756
1757 The task returns True on success, False on failure (logged).
1758 """
1759 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1760
1761 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1762 """Synchronous version."""
1763 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1764
1765 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1766 cMsTimeout = 30000, fIgnoreErrors = False):
1767 """
1768 Initiates a download string task.
1769
1770 Returns True on success, False on failure (logged).
1771
1772 The task returns a byte string on success, False on failure (logged).
1773 """
1774 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1775 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1776
1777 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1778 cMsTimeout = 30000, fIgnoreErrors = False):
1779 """Synchronous version."""
1780 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1781 cMsTimeout, fIgnoreErrors);
1782
1783 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1784 """
1785 Initiates a packing file/directory task.
1786
1787 Returns True on success, False on failure (logged).
1788
1789 The task returns True on success, False on failure (logged).
1790 """
1791 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1792 (sRemoteFile, sRemoteSource));
1793
1794 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1795 """Synchronous version."""
1796 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1797
1798 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1799 """
1800 Initiates a unpack file task.
1801
1802 Returns True on success, False on failure (logged).
1803
1804 The task returns True on success, False on failure (logged).
1805 """
1806 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1807 (sRemoteFile, sRemoteDir));
1808
1809 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1810 """Synchronous version."""
1811 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1812
1813
1814class TransportTcp(TransportBase):
1815 """
1816 TCP transport layer for the TXS client session class.
1817 """
1818
1819 def __init__(self, sHostname, uPort, fReversedSetup):
1820 """
1821 Save the parameters. The session will call us back to make the
1822 connection later on its worker thread.
1823 """
1824 TransportBase.__init__(self, utils.getCallerName());
1825 self.sHostname = sHostname;
1826 self.fReversedSetup = fReversedSetup;
1827 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1828 self.oSocket = None;
1829 self.oWakeupW = None;
1830 self.oWakeupR = None;
1831 self.fConnectCanceled = False;
1832 self.fIsConnecting = False;
1833 self.oCv = threading.Condition();
1834 self.abReadAhead = array.array('B');
1835
1836 def toString(self):
1837 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1838 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1839 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1840 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1841
1842 def __isInProgressXcpt(self, oXcpt):
1843 """ In progress exception? """
1844 try:
1845 if isinstance(oXcpt, socket.error):
1846 try:
1847 if oXcpt.errno == errno.EINPROGRESS:
1848 return True;
1849 except: pass;
1850 # Windows?
1851 try:
1852 if oXcpt.errno == errno.EWOULDBLOCK:
1853 return True;
1854 except: pass;
1855 except:
1856 pass;
1857 return False;
1858
1859 def __isWouldBlockXcpt(self, oXcpt):
1860 """ Would block exception? """
1861 try:
1862 if isinstance(oXcpt, socket.error):
1863 try:
1864 if oXcpt.errno == errno.EWOULDBLOCK:
1865 return True;
1866 except: pass;
1867 try:
1868 if oXcpt.errno == errno.EAGAIN:
1869 return True;
1870 except: pass;
1871 except:
1872 pass;
1873 return False;
1874
1875 def __isConnectionReset(self, oXcpt):
1876 """ Connection reset by Peer or others. """
1877 try:
1878 if isinstance(oXcpt, socket.error):
1879 try:
1880 if oXcpt.errno == errno.ECONNRESET:
1881 return True;
1882 except: pass;
1883 try:
1884 if oXcpt.errno == errno.ENETRESET:
1885 return True;
1886 except: pass;
1887 except:
1888 pass;
1889 return False;
1890
1891 def _closeWakeupSockets(self):
1892 """ Closes the wakup sockets. Caller should own the CV. """
1893 oWakeupR = self.oWakeupR;
1894 self.oWakeupR = None;
1895 if oWakeupR is not None:
1896 oWakeupR.close();
1897
1898 oWakeupW = self.oWakeupW;
1899 self.oWakeupW = None;
1900 if oWakeupW is not None:
1901 oWakeupW.close();
1902
1903 return None;
1904
1905 def cancelConnect(self):
1906 # This is bad stuff.
1907 self.oCv.acquire();
1908 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1909 self.fConnectCanceled = True;
1910 if self.fIsConnecting:
1911 oSocket = self.oSocket;
1912 self.oSocket = None;
1913 if oSocket is not None:
1914 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1915 oSocket.close();
1916
1917 oWakeupW = self.oWakeupW;
1918 self.oWakeupW = None;
1919 if oWakeupW is not None:
1920 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1921 try: oWakeupW.send(b'cancelled!\n');
1922 except: reporter.logXcpt();
1923 try: oWakeupW.shutdown(socket.SHUT_WR);
1924 except: reporter.logXcpt();
1925 oWakeupW.close();
1926 self.oCv.release();
1927
1928 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1929 """ Connects to the TXS server as server, i.e. the reversed setup. """
1930 assert(self.fReversedSetup);
1931
1932 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1933
1934 # Workaround for bind() failure...
1935 try:
1936 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1937 except:
1938 reporter.errorXcpt('socket.listen(1) failed');
1939 return None;
1940
1941 # Bind the socket and make it listen.
1942 try:
1943 oSocket.bind((self.sHostname, self.uPort));
1944 except:
1945 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1946 return None;
1947 try:
1948 oSocket.listen(1);
1949 except:
1950 reporter.errorXcpt('socket.listen(1) failed');
1951 return None;
1952
1953 # Accept connections.
1954 oClientSocket = None;
1955 tClientAddr = None;
1956 try:
1957 (oClientSocket, tClientAddr) = oSocket.accept();
1958 except socket.error as e:
1959 if not self.__isInProgressXcpt(e):
1960 raise;
1961
1962 # Do the actual waiting.
1963 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1964 try:
1965 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1966 except socket.error as oXctp:
1967 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
1968 raise;
1969 reporter.log('socket.select() on accept was canceled');
1970 return None;
1971 except:
1972 reporter.logXcpt('socket.select() on accept');
1973
1974 # Try accept again.
1975 try:
1976 (oClientSocket, tClientAddr) = oSocket.accept();
1977 except socket.error as oXcpt:
1978 if not self.__isInProgressXcpt(e):
1979 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
1980 raise;
1981 reporter.log('socket.accept() was canceled');
1982 return None;
1983 reporter.log('socket.accept() timed out');
1984 return False;
1985 except:
1986 reporter.errorXcpt('socket.accept() failed');
1987 return None;
1988 except:
1989 reporter.errorXcpt('socket.accept() failed');
1990 return None;
1991
1992 # Store the connected socket and throw away the server socket.
1993 self.oCv.acquire();
1994 if not self.fConnectCanceled:
1995 self.oSocket.close();
1996 self.oSocket = oClientSocket;
1997 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1998 self.oCv.release();
1999 return True;
2000
2001 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
2002 """ Connects to the TXS server as client. """
2003 assert(not self.fReversedSetup);
2004
2005 # Connect w/ timeouts.
2006 rc = None;
2007 try:
2008 oSocket.connect((self.sHostname, self.uPort));
2009 rc = True;
2010 except socket.error as oXcpt:
2011 iRc = oXcpt.errno;
2012 if self.__isInProgressXcpt(oXcpt):
2013 # Do the actual waiting.
2014 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
2015 try:
2016 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2017 if len(ttRc[1]) + len(ttRc[2]) == 0:
2018 raise socket.error(errno.ETIMEDOUT, 'select timed out');
2019 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
2020 rc = iRc == 0;
2021 except socket.error as oXcpt2:
2022 iRc = oXcpt2.errno;
2023 except:
2024 iRc = -42;
2025 reporter.fatalXcpt('socket.select() on connect failed');
2026
2027 if rc is True:
2028 pass;
2029 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2030 rc = False; # try again.
2031 else:
2032 if iRc != errno.EBADF or not self.fConnectCanceled:
2033 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2034 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2035 except:
2036 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2037 return rc;
2038
2039
2040 def connect(self, cMsTimeout):
2041 # Create a non-blocking socket.
2042 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2043 try:
2044 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2045 except:
2046 reporter.fatalXcpt('socket.socket() failed');
2047 return None;
2048 try:
2049 oSocket.setblocking(0);
2050 except:
2051 oSocket.close();
2052 reporter.fatalXcpt('socket.socket() failed');
2053 return None;
2054
2055 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2056 oWakeupR = None;
2057 oWakeupW = None;
2058 if hasattr(socket, 'socketpair'):
2059 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2060 except: reporter.logXcpt('socket.socketpair() failed');
2061
2062 # Update the state.
2063 self.oCv.acquire();
2064 rc = None;
2065 if not self.fConnectCanceled:
2066 self.oSocket = oSocket;
2067 self.oWakeupW = oWakeupW;
2068 self.oWakeupR = oWakeupR;
2069 self.fIsConnecting = True;
2070 self.oCv.release();
2071
2072 # Try connect.
2073 if oWakeupR is None:
2074 oWakeupR = oSocket; # Avoid select failure.
2075 if self.fReversedSetup:
2076 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2077 else:
2078 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2079 oSocket = None;
2080
2081 # Update the state and cleanup on failure/cancel.
2082 self.oCv.acquire();
2083 if rc is True and self.fConnectCanceled:
2084 rc = False;
2085 self.fIsConnecting = False;
2086
2087 if rc is not True:
2088 if self.oSocket is not None:
2089 self.oSocket.close();
2090 self.oSocket = None;
2091 self._closeWakeupSockets();
2092 self.oCv.release();
2093
2094 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2095 return rc;
2096
2097 def disconnect(self, fQuiet = False):
2098 if self.oSocket is not None:
2099 self.abReadAhead = array.array('B');
2100
2101 # Try a shutting down the socket gracefully (draining it).
2102 try:
2103 self.oSocket.shutdown(socket.SHUT_WR);
2104 except:
2105 if not fQuiet:
2106 reporter.error('shutdown(SHUT_WR)');
2107 try:
2108 self.oSocket.setblocking(0); # just in case it's not set.
2109 sData = "1";
2110 while sData:
2111 sData = self.oSocket.recv(16384);
2112 except:
2113 pass;
2114
2115 # Close it.
2116 self.oCv.acquire();
2117 try: self.oSocket.setblocking(1);
2118 except: pass;
2119 self.oSocket.close();
2120 self.oSocket = None;
2121 else:
2122 self.oCv.acquire();
2123 self._closeWakeupSockets();
2124 self.oCv.release();
2125
2126 def sendBytes(self, abBuf, cMsTimeout):
2127 if self.oSocket is None:
2128 reporter.error('TransportTcp.sendBytes: No connection.');
2129 return False;
2130
2131 # Try send it all.
2132 try:
2133 cbSent = self.oSocket.send(abBuf);
2134 if cbSent == len(abBuf):
2135 return True;
2136 except Exception as oXcpt:
2137 if not self.__isWouldBlockXcpt(oXcpt):
2138 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2139 return False;
2140 cbSent = 0;
2141
2142 # Do a timed send.
2143 msStart = base.timestampMilli();
2144 while True:
2145 cMsElapsed = base.timestampMilli() - msStart;
2146 if cMsElapsed > cMsTimeout:
2147 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2148 break;
2149
2150 # wait.
2151 try:
2152 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2153 if ttRc[2] and not ttRc[1]:
2154 reporter.error('TranportTcp.sendBytes: select returned with exception');
2155 break;
2156 if not ttRc[1]:
2157 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2158 break;
2159 except:
2160 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2161 break;
2162
2163 # Try send more.
2164 try:
2165 cbSent += self.oSocket.send(abBuf[cbSent:]);
2166 if cbSent == len(abBuf):
2167 return True;
2168 except Exception as oXcpt:
2169 if not self.__isWouldBlockXcpt(oXcpt):
2170 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2171 break;
2172
2173 return False;
2174
2175 def __returnReadAheadBytes(self, cb):
2176 """ Internal worker for recvBytes. """
2177 assert(len(self.abReadAhead) >= cb);
2178 abRet = self.abReadAhead[:cb];
2179 self.abReadAhead = self.abReadAhead[cb:];
2180 return abRet;
2181
2182 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2183 if self.oSocket is None:
2184 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2185 return None;
2186
2187 # Try read in some more data without bothering with timeout handling first.
2188 if len(self.abReadAhead) < cb:
2189 try:
2190 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2191 if abBuf:
2192 self.abReadAhead.extend(array.array('B', abBuf));
2193 except Exception as oXcpt:
2194 if not self.__isWouldBlockXcpt(oXcpt):
2195 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2196 return None;
2197
2198 if len(self.abReadAhead) >= cb:
2199 return self.__returnReadAheadBytes(cb);
2200
2201 # Timeout loop.
2202 msStart = base.timestampMilli();
2203 while True:
2204 cMsElapsed = base.timestampMilli() - msStart;
2205 if cMsElapsed > cMsTimeout:
2206 if not fNoDataOk or self.abReadAhead:
2207 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2208 break;
2209
2210 # Wait.
2211 try:
2212 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2213 if ttRc[2] and not ttRc[0]:
2214 reporter.error('TranportTcp.recvBytes: select returned with exception');
2215 break;
2216 if not ttRc[0]:
2217 if not fNoDataOk or self.abReadAhead:
2218 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2219 % (len(self.abReadAhead), cb, fNoDataOk));
2220 break;
2221 except:
2222 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2223 break;
2224
2225 # Try read more.
2226 try:
2227 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2228 if not abBuf:
2229 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2230 % (len(self.abReadAhead), cb, fNoDataOk));
2231 self.disconnect();
2232 return None;
2233
2234 self.abReadAhead.extend(array.array('B', abBuf));
2235
2236 except Exception as oXcpt:
2237 reporter.log('recv => exception %s' % (oXcpt,));
2238 if not self.__isWouldBlockXcpt(oXcpt):
2239 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2240 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2241 break;
2242
2243 # Done?
2244 if len(self.abReadAhead) >= cb:
2245 return self.__returnReadAheadBytes(cb);
2246
2247 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2248 return None;
2249
2250 def isConnectionOk(self):
2251 if self.oSocket is None:
2252 return False;
2253 try:
2254 ttRc = select.select([], [], [self.oSocket], 0.0);
2255 if ttRc[2]:
2256 return False;
2257
2258 self.oSocket.send(array.array('B')); # send zero bytes.
2259 except:
2260 return False;
2261 return True;
2262
2263 def isRecvPending(self, cMsTimeout = 0):
2264 try:
2265 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2266 if not ttRc[0]:
2267 return False;
2268 except:
2269 pass;
2270 return True;
2271
2272
2273def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2274 """
2275 Opens a connection to a Test Execution Service via TCP, given its name.
2276
2277 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2278 or similar.
2279 """
2280 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2281 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2282 try:
2283 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2284 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2285 except:
2286 reporter.errorXcpt(None, 15);
2287 return None;
2288 return oSession;
2289
2290
2291def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2292 """
2293 Tries to open a connection to a Test Execution Service via TCP, given its name.
2294
2295 This differs from openTcpSession in that it won't log a connection failure
2296 as an error.
2297 """
2298 try:
2299 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2300 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2301 except:
2302 reporter.errorXcpt(None, 15);
2303 return None;
2304 return oSession;
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette