VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 93895

Last change on this file since 93895 was 93895, checked in by vboxsync, 3 years ago

Validation Kit/TxS: Implemented (local) copy file support for TxS. Useful for copying stuff around on guests. bugref:10195

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 88.7 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 93895 2022-02-23 12:48:02Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2022 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 93895 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 if sys.version_info < (3, 9, 0):
75 # Removed since Python 3.9.
76 sStr = abStr.tostring(); # pylint: disable=no-member
77 else:
78 sStr = abStr.tobytes();
79 return sStr.decode('utf_8');
80 except:
81 reporter.errorXcpt('getSZ(,%u)' % (off));
82 return sDefault;
83
84def getSZLen(abData, off):
85 """
86 Get the length of a zero-terminated string field, in bytes.
87 Returns -1 if off is beyond the data packet or not properly terminated.
88 """
89 cbData = len(abData);
90 if off >= cbData:
91 return -1;
92
93 offCur = off;
94 while abData[offCur] != 0:
95 offCur = offCur + 1;
96 if offCur >= cbData:
97 return -1;
98
99 return offCur - off;
100
101def isValidOpcodeEncoding(sOpcode):
102 """
103 Checks if the specified opcode is valid or not.
104 Returns True on success.
105 Returns False if it is invalid, details in the log.
106 """
107 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
108 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
109 if len(sOpcode) != 8:
110 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
111 return False;
112 for i in range(0, 1):
113 if sSet1.find(sOpcode[i]) < 0:
114 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
115 return False;
116 for i in range(2, 7):
117 if sSet2.find(sOpcode[i]) < 0:
118 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
119 return False;
120 return True;
121
122#
123# Helper for encoding data sent to the TXS.
124#
125
126def u32ToByteArray(u32):
127 """Encodes the u32 value as a little endian byte (B) array."""
128 return array.array('B',
129 ( u32 % 256,
130 (u32 // 256) % 256,
131 (u32 // 65536) % 256,
132 (u32 // 16777216) % 256) );
133
134def escapeString(sString):
135 """
136 Does $ escaping of the string so TXS doesn't try do variable expansion.
137 """
138 return sString.replace('$', '$$');
139
140
141
142class TransportBase(object):
143 """
144 Base class for the transport layer.
145 """
146
147 def __init__(self, sCaller):
148 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
149 self.fDummy = 0;
150 self.abReadAheadHdr = array.array('B');
151
152 def toString(self):
153 """
154 Stringify the instance for logging and debugging.
155 """
156 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
157
158 def __str__(self):
159 return self.toString();
160
161 def cancelConnect(self):
162 """
163 Cancels any pending connect() call.
164 Returns None;
165 """
166 return None;
167
168 def connect(self, cMsTimeout):
169 """
170 Quietly attempts to connect to the TXS.
171
172 Returns True on success.
173 Returns False on retryable errors (no logging).
174 Returns None on fatal errors with details in the log.
175
176 Override this method, don't call super.
177 """
178 _ = cMsTimeout;
179 return False;
180
181 def disconnect(self, fQuiet = False):
182 """
183 Disconnect from the TXS.
184
185 Returns True.
186
187 Override this method, don't call super.
188 """
189 _ = fQuiet;
190 return True;
191
192 def sendBytes(self, abBuf, cMsTimeout):
193 """
194 Sends the bytes in the buffer abBuf to the TXS.
195
196 Returns True on success.
197 Returns False on failure and error details in the log.
198
199 Override this method, don't call super.
200
201 Remarks: len(abBuf) is always a multiple of 16.
202 """
203 _ = abBuf; _ = cMsTimeout;
204 return False;
205
206 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
207 """
208 Receive cb number of bytes from the TXS.
209
210 Returns the bytes (array('B')) on success.
211 Returns None on failure and error details in the log.
212
213 Override this method, don't call super.
214
215 Remarks: cb is always a multiple of 16.
216 """
217 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
218 return None;
219
220 def isConnectionOk(self):
221 """
222 Checks if the connection is OK.
223
224 Returns True if it is.
225 Returns False if it isn't (caller should call diconnect).
226
227 Override this method, don't call super.
228 """
229 return True;
230
231 def isRecvPending(self, cMsTimeout = 0):
232 """
233 Checks if there is incoming bytes, optionally waiting cMsTimeout
234 milliseconds for something to arrive.
235
236 Returns True if there is, False if there isn't.
237
238 Override this method, don't call super.
239 """
240 _ = cMsTimeout;
241 return False;
242
243 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
244 """
245 Sends a message (opcode + encoded payload).
246
247 Returns True on success.
248 Returns False on failure and error details in the log.
249 """
250 # Fix + check the opcode.
251 if len(sOpcode) < 2:
252 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
253 return False;
254 sOpcode = sOpcode.ljust(8);
255 if not isValidOpcodeEncoding(sOpcode):
256 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
257 return False;
258
259 # Start construct the message.
260 cbMsg = 16 + len(abPayload);
261 abMsg = array.array('B');
262 abMsg.extend(u32ToByteArray(cbMsg));
263 abMsg.extend((0, 0, 0, 0)); # uCrc32
264 try:
265 abMsg.extend(array.array('B', \
266 ( ord(sOpcode[0]), \
267 ord(sOpcode[1]), \
268 ord(sOpcode[2]), \
269 ord(sOpcode[3]), \
270 ord(sOpcode[4]), \
271 ord(sOpcode[5]), \
272 ord(sOpcode[6]), \
273 ord(sOpcode[7]) ) ) );
274 if abPayload:
275 abMsg.extend(abPayload);
276 except:
277 reporter.fatalXcpt('sendMsgInt: packing problem...');
278 return False;
279
280 # checksum it, padd it and send it off.
281 uCrc32 = zlib.crc32(abMsg[8:]);
282 abMsg[4:8] = u32ToByteArray(uCrc32);
283
284 while len(abMsg) % 16:
285 abMsg.append(0);
286
287 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
288 return self.sendBytes(abMsg, cMsTimeout);
289
290 def recvMsg(self, cMsTimeout, fNoDataOk = False):
291 """
292 Receives a message from the TXS.
293
294 Returns the message three-tuple: length, opcode, payload.
295 Returns (None, None, None) on failure and error details in the log.
296 """
297
298 # Read the header.
299 if self.abReadAheadHdr:
300 assert(len(self.abReadAheadHdr) == 16);
301 abHdr = self.abReadAheadHdr;
302 self.abReadAheadHdr = array.array('B');
303 else:
304 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
305 if abHdr is None:
306 return (None, None, None);
307 if len(abHdr) != 16:
308 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
309 return (None, None, None);
310
311 # Unpack and validate the header.
312 cbMsg = getU32(abHdr, 0);
313 uCrc32 = getU32(abHdr, 4);
314
315 if sys.version_info < (3, 9, 0):
316 # Removed since Python 3.9.
317 sOpcode = abHdr[8:16].tostring(); # pylint: disable=no-member
318 else:
319 sOpcode = abHdr[8:16].tobytes();
320 sOpcode = sOpcode.decode('ascii');
321
322 if cbMsg < 16:
323 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
324 return (None, None, None);
325 if cbMsg > 1024*1024:
326 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
327 return (None, None, None);
328 if not isValidOpcodeEncoding(sOpcode):
329 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
330 return (None, None, None);
331
332 # Get the payload (if any), dropping the padding.
333 abPayload = array.array('B');
334 if cbMsg > 16:
335 if cbMsg % 16:
336 cbPadding = 16 - (cbMsg % 16);
337 else:
338 cbPadding = 0;
339 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
340 if abPayload is None:
341 self.abReadAheadHdr = abHdr;
342 if not fNoDataOk :
343 reporter.log('recvMsg: failed to recv payload bytes!');
344 return (None, None, None);
345
346 while cbPadding > 0:
347 abPayload.pop();
348 cbPadding = cbPadding - 1;
349
350 # Check the CRC-32.
351 if uCrc32 != 0:
352 uActualCrc32 = zlib.crc32(abHdr[8:]);
353 if cbMsg > 16:
354 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
355 uActualCrc32 = uActualCrc32 & 0xffffffff;
356 if uCrc32 != uActualCrc32:
357 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
358 return (None, None, None);
359
360 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
361 return (cbMsg, sOpcode, abPayload);
362
363 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
364 """
365 Sends a message (opcode + payload tuple).
366
367 Returns True on success.
368 Returns False on failure and error details in the log.
369 Returns None if you pass the incorrectly typed parameters.
370 """
371 # Encode the payload.
372 abPayload = array.array('B');
373 for o in aoPayload:
374 try:
375 if utils.isString(o):
376 if sys.version_info[0] >= 3:
377 abPayload.extend(o.encode('utf_8'));
378 else:
379 # the primitive approach...
380 sUtf8 = o.encode('utf_8');
381 for ch in sUtf8:
382 abPayload.append(ord(ch))
383 abPayload.append(0);
384 elif isinstance(o, (long, int)):
385 if o < 0 or o > 0xffffffff:
386 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
387 return None;
388 abPayload.extend(u32ToByteArray(o));
389 elif isinstance(o, array.array):
390 abPayload.extend(o);
391 else:
392 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
393 return None;
394 except:
395 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
396 return None;
397 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
398
399
400class Session(TdTaskBase):
401 """
402 A Test eXecution Service (TXS) client session.
403 """
404
405 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
406 """
407 Construct a TXS session.
408
409 This starts by connecting to the TXS and will enter the signalled state
410 when connected or the timeout has been reached.
411 """
412 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
413 self.oTransport = oTransport;
414 self.sStatus = "";
415 self.cMsTimeout = 0;
416 self.fErr = True; # Whether to report errors as error.
417 self.msStart = 0;
418 self.oThread = None;
419 self.fnTask = self.taskDummy;
420 self.aTaskArgs = None;
421 self.oTaskRc = None;
422 self.t3oReply = (None, None, None);
423 self.fScrewedUpMsgState = False;
424 self.fTryConnect = fTryConnect;
425
426 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
427 raise base.GenError("startTask failed");
428
429 def __del__(self):
430 """Make sure to cancel the task when deleted."""
431 self.cancelTask();
432
433 def toString(self):
434 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
435 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
436 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
437 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
438
439 def taskDummy(self):
440 """Place holder to catch broken state handling."""
441 raise Exception();
442
443 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
444 """
445 Kicks of a new task.
446
447 cMsTimeout: The task timeout in milliseconds. Values less than
448 500 ms will be adjusted to 500 ms. This means it is
449 OK to use negative value.
450 sStatus: The task status.
451 fnTask: The method that'll execute the task.
452 aArgs: Arguments to pass to fnTask.
453
454 Returns True on success, False + error in log on failure.
455 """
456 if not self.cancelTask():
457 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
458 return False;
459
460 # Change status and make sure we're the
461 self.lockTask();
462 if self.sStatus != "":
463 self.unlockTask();
464 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
465 return False;
466 self.sStatus = "setup";
467 self.oTaskRc = None;
468 self.t3oReply = (None, None, None);
469 self.resetTaskLocked();
470 self.unlockTask();
471
472 self.cMsTimeout = max(cMsTimeout, 500);
473 self.fErr = not fIgnoreErrors;
474 self.fnTask = fnTask;
475 self.aTaskArgs = aArgs;
476 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
477 self.oThread.setDaemon(True);
478 self.msStart = base.timestampMilli();
479
480 self.lockTask();
481 self.sStatus = sStatus;
482 self.unlockTask();
483 self.oThread.start();
484
485 return True;
486
487 def cancelTask(self, fSync = True):
488 """
489 Attempts to cancel any pending tasks.
490 Returns success indicator (True/False).
491 """
492 self.lockTask();
493
494 if self.sStatus == "":
495 self.unlockTask();
496 return True;
497 if self.sStatus == "setup":
498 self.unlockTask();
499 return False;
500 if self.sStatus == "cancelled":
501 self.unlockTask();
502 return False;
503
504 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
505 if self.sStatus == 'connecting':
506 self.oTransport.cancelConnect();
507
508 self.sStatus = "cancelled";
509 oThread = self.oThread;
510 self.unlockTask();
511
512 if not fSync:
513 return False;
514
515 oThread.join(61.0);
516
517 if sys.version_info < (3, 9, 0):
518 # Removed since Python 3.9.
519 return oThread.isAlive(); # pylint: disable=no-member
520 return oThread.is_alive();
521
522 def taskThread(self):
523 """
524 The task thread function.
525 This does some housekeeping activities around the real task method call.
526 """
527 if not self.isCancelled():
528 try:
529 fnTask = self.fnTask;
530 oTaskRc = fnTask(*self.aTaskArgs);
531 except:
532 reporter.fatalXcpt('taskThread', 15);
533 oTaskRc = None;
534 else:
535 reporter.log('taskThread: cancelled already');
536
537 self.lockTask();
538
539 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
540 self.oTaskRc = oTaskRc;
541 self.oThread = None;
542 self.sStatus = '';
543 self.signalTaskLocked();
544
545 self.unlockTask();
546 return None;
547
548 def isCancelled(self):
549 """Internal method for checking if the task has been cancelled."""
550 self.lockTask();
551 sStatus = self.sStatus;
552 self.unlockTask();
553 if sStatus == "cancelled":
554 return True;
555 return False;
556
557 def hasTimedOut(self):
558 """Internal method for checking if the task has timed out or not."""
559 cMsLeft = self.getMsLeft();
560 if cMsLeft <= 0:
561 return True;
562 return False;
563
564 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
565 """Gets the time left until the timeout."""
566 cMsElapsed = base.timestampMilli() - self.msStart;
567 if cMsElapsed < 0:
568 return cMsMin;
569 cMsLeft = self.cMsTimeout - cMsElapsed;
570 if cMsLeft <= cMsMin:
571 return cMsMin;
572 if cMsLeft > cMsMax > 0:
573 return cMsMax
574 return cMsLeft;
575
576 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
577 """
578 Wrapper for TransportBase.recvMsg that stashes the response away
579 so the client can inspect it later on.
580 """
581 if cMsTimeout is None:
582 cMsTimeout = self.getMsLeft(500);
583 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
584 self.lockTask();
585 self.t3oReply = (cbMsg, sOpcode, abPayload);
586 self.unlockTask();
587 return (cbMsg, sOpcode, abPayload);
588
589 def recvAck(self, fNoDataOk = False):
590 """
591 Receives an ACK or error response from the TXS.
592
593 Returns True on success.
594 Returns False on timeout or transport error.
595 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
596 and there are always details of some sort or another.
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
599 if cbMsg is None:
600 return False;
601 sOpcode = sOpcode.strip()
602 if sOpcode == "ACK":
603 return True;
604 return (sOpcode, getSZ(abPayload, 0, sOpcode));
605
606 def recvAckLogged(self, sCommand, fNoDataOk = False):
607 """
608 Wrapper for recvAck and logging.
609 Returns True on success (ACK).
610 Returns False on time, transport error and errors signalled by TXS.
611 """
612 rc = self.recvAck(fNoDataOk);
613 if rc is not True and not fNoDataOk:
614 if rc is False:
615 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
616 else:
617 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
618 rc = False;
619 return rc;
620
621 def recvTrueFalse(self, sCommand):
622 """
623 Receives a TRUE/FALSE response from the TXS.
624 Returns True on TRUE, False on FALSE and None on error/other (logged).
625 """
626 cbMsg, sOpcode, abPayload = self.recvReply();
627 if cbMsg is None:
628 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
629 return None;
630
631 sOpcode = sOpcode.strip()
632 if sOpcode == "TRUE":
633 return True;
634 if sOpcode == "FALSE":
635 return False;
636 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
637 return None;
638
639 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
640 """
641 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
642 """
643 if cMsTimeout is None:
644 cMsTimeout = self.getMsLeft(500);
645 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
646
647 def asyncToSync(self, fnAsync, *aArgs):
648 """
649 Wraps an asynchronous task into a synchronous operation.
650
651 Returns False on failure, task return status on success.
652 """
653 rc = fnAsync(*aArgs);
654 if rc is False:
655 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
656 return rc;
657
658 rc = self.waitForTask(self.cMsTimeout + 5000);
659 if rc is False:
660 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
661 self.cancelTask();
662 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
663 return False;
664
665 rc = self.getResult();
666 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
667 return rc;
668
669 #
670 # Connection tasks.
671 #
672
673 def taskConnect(self, cMsIdleFudge):
674 """Tries to connect to the TXS"""
675 while not self.isCancelled():
676 reporter.log2('taskConnect: connecting ...');
677 rc = self.oTransport.connect(self.getMsLeft(500));
678 if rc is True:
679 reporter.log('taskConnect: succeeded');
680 return self.taskGreet(cMsIdleFudge);
681 if rc is None:
682 reporter.log2('taskConnect: unable to connect');
683 return None;
684 if self.hasTimedOut():
685 reporter.log2('taskConnect: timed out');
686 if not self.fTryConnect:
687 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
688 return False;
689 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
690 if not self.fTryConnect:
691 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
692 return False;
693
694 def taskGreet(self, cMsIdleFudge):
695 """Greets the TXS"""
696 rc = self.sendMsg("HOWDY", ());
697 if rc is True:
698 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
699 if rc is True:
700 while cMsIdleFudge > 0:
701 cMsIdleFudge -= 1000;
702 time.sleep(1);
703 else:
704 self.oTransport.disconnect(self.fTryConnect);
705 return rc;
706
707 def taskBye(self):
708 """Says goodbye to the TXS"""
709 rc = self.sendMsg("BYE");
710 if rc is True:
711 rc = self.recvAckLogged("BYE");
712 self.oTransport.disconnect();
713 return rc;
714
715 def taskVer(self):
716 """Requests version information from TXS"""
717 rc = self.sendMsg("VER");
718 if rc is True:
719 rc = False;
720 cbMsg, sOpcode, abPayload = self.recvReply();
721 if cbMsg is not None:
722 sOpcode = sOpcode.strip();
723 if sOpcode == "ACK VER":
724 sVer = getSZ(abPayload, 0);
725 if sVer is not None:
726 rc = sVer;
727 else:
728 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
729 else:
730 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
731 return rc;
732
733 def taskUuid(self):
734 """Gets the TXS UUID"""
735 rc = self.sendMsg("UUID");
736 if rc is True:
737 rc = False;
738 cbMsg, sOpcode, abPayload = self.recvReply();
739 if cbMsg is not None:
740 sOpcode = sOpcode.strip()
741 if sOpcode == "ACK UUID":
742 sUuid = getSZ(abPayload, 0);
743 if sUuid is not None:
744 sUuid = '{%s}' % (sUuid,)
745 try:
746 _ = uuid.UUID(sUuid);
747 rc = sUuid;
748 except:
749 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
750 else:
751 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
752 else:
753 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
754 else:
755 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
756 return rc;
757
758 #
759 # Process task
760 # pylint: disable=missing-docstring
761 #
762
763 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
764 # Construct the payload.
765 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
766 for sArg in asArgs:
767 aoPayload.append('%s' % (sArg));
768 aoPayload.append(long(len(asAddEnv)));
769 for sPutEnv in asAddEnv:
770 aoPayload.append('%s' % (sPutEnv));
771 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
772 if utils.isString(o):
773 aoPayload.append(o);
774 elif o is not None:
775 aoPayload.append('|');
776 o.uTxsClientCrc32 = zlib.crc32(b'');
777 else:
778 aoPayload.append('');
779 aoPayload.append('%s' % (sAsUser));
780 aoPayload.append(long(self.cMsTimeout));
781
782 # Kick of the EXEC command.
783 rc = self.sendMsg('EXEC', aoPayload)
784 if rc is True:
785 rc = self.recvAckLogged('EXEC');
786 if rc is True:
787 # Loop till the process completes, feed input to the TXS and
788 # receive output from it.
789 sFailure = "";
790 msPendingInputReply = None;
791 cbMsg, sOpcode, abPayload = (None, None, None);
792 while True:
793 # Pending input?
794 if msPendingInputReply is None \
795 and oStdIn is not None \
796 and not utils.isString(oStdIn):
797 try:
798 sInput = oStdIn.read(65536);
799 except:
800 reporter.errorXcpt('read standard in');
801 sFailure = 'exception reading stdin';
802 rc = None;
803 break;
804 if sInput:
805 # Convert to a byte array before handing it of to sendMsg or the string
806 # will get some zero termination added breaking the CRC (and injecting
807 # unwanted bytes).
808 abInput = array.array('B', sInput.encode('utf-8'));
809 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
810 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
811 if rc is not True:
812 sFailure = 'sendMsg failure';
813 break;
814 msPendingInputReply = base.timestampMilli();
815 continue;
816
817 rc = self.sendMsg('STDINEOS');
818 oStdIn = None;
819 if rc is not True:
820 sFailure = 'sendMsg failure';
821 break;
822 msPendingInputReply = base.timestampMilli();
823
824 # Wait for input (500 ms timeout).
825 if cbMsg is None:
826 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
827 if cbMsg is None:
828 # Check for time out before restarting the loop.
829 # Note! Only doing timeout checking here does mean that
830 # the TXS may prevent us from timing out by
831 # flooding us with data. This is unlikely though.
832 if self.hasTimedOut() \
833 and ( msPendingInputReply is None \
834 or base.timestampMilli() - msPendingInputReply > 30000):
835 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
836 sFailure = 'timeout';
837 rc = None;
838 break;
839 # Check that the connection is OK.
840 if not self.oTransport.isConnectionOk():
841 self.oTransport.disconnect();
842 sFailure = 'disconnected';
843 rc = False;
844 break;
845 continue;
846
847 # Handle the response.
848 sOpcode = sOpcode.rstrip();
849 if sOpcode == 'STDOUT':
850 oOut = oStdOut;
851 elif sOpcode == 'STDERR':
852 oOut = oStdErr;
853 elif sOpcode == 'TESTPIPE':
854 oOut = oTestPipe;
855 else:
856 oOut = None;
857 if oOut is not None:
858 # Output from the process.
859 if len(abPayload) < 4:
860 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
861 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
862 rc = None;
863 break;
864 uStreamCrc32 = getU32(abPayload, 0);
865 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
866 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
867 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
868 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
869 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
870 rc = None;
871 break;
872 try:
873 oOut.write(abPayload[4:]);
874 except:
875 sFailure = 'exception writing %s' % (sOpcode);
876 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
877 rc = None;
878 break;
879 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
880 # Standard input is ignored. Ignore this condition for now.
881 msPendingInputReply = None;
882 reporter.log('taskExecEx: Standard input is ignored... why?');
883 del oStdIn.uTxsClientCrc32;
884 oStdIn = '/dev/null';
885 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
886 and msPendingInputReply is not None:
887 # TXS STDIN error, abort.
888 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
889 msPendingInputReply = None;
890 sFailure = 'TXS is out of memory for std input buffering';
891 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
892 rc = None;
893 break;
894 elif sOpcode == 'ACK' and msPendingInputReply is not None:
895 msPendingInputReply = None;
896 elif sOpcode.startswith('PROC '):
897 # Process status message, handle it outside the loop.
898 rc = True;
899 break;
900 else:
901 sFailure = 'Unexpected opcode %s' % (sOpcode);
902 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
903 rc = None;
904 break;
905 # Clear the message.
906 cbMsg, sOpcode, abPayload = (None, None, None);
907
908 # If we sent an STDIN packet and didn't get a reply yet, we'll give
909 # TXS some 5 seconds to reply to this. If we don't wait here we'll
910 # get screwed later on if we mix it up with the reply to some other
911 # command. Hackish.
912 if msPendingInputReply is not None:
913 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
914 if cbMsg2 is not None:
915 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
916 % (cbMsg2, sOpcode2, abPayload2));
917 msPendingInputReply = None;
918 else:
919 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
920 self.fScrewedUpMsgState = True;
921
922 # Parse the exit status (True), abort (None) or do nothing (False).
923 if rc is True:
924 if sOpcode == 'PROC OK':
925 pass;
926 else:
927 rc = False;
928 # Do proper parsing some other day if needed:
929 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
930 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
931 if sOpcode == 'PROC DOO':
932 reporter.log('taskExecEx: PROC DOO[FUS]: %s' % (abPayload,));
933 elif sOpcode.startswith('PROC NOK'):
934 reporter.log('taskExecEx: PROC NOK: rcExit=%s' % (abPayload,));
935 elif abPayload and sOpcode.startswith('PROC '):
936 reporter.log('taskExecEx: %s payload=%s' % (sOpcode, abPayload,));
937
938 else:
939 if rc is None:
940 # Abort it.
941 reporter.log('taskExecEx: sending ABORT...');
942 rc = self.sendMsg('ABORT');
943 while rc is True:
944 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
945 if cbMsg is None:
946 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
947 self.fScrewedUpMsgState = True;
948 break;
949 if sOpcode.startswith('PROC '):
950 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
951 break;
952 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
953 # Check that the connection is OK before looping.
954 if not self.oTransport.isConnectionOk():
955 self.oTransport.disconnect();
956 break;
957
958 # Fake response with the reason why we quit.
959 if sFailure is not None:
960 self.t3oReply = (0, 'EXECFAIL', sFailure);
961 rc = None;
962 else:
963 rc = None;
964
965 # Cleanup.
966 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
967 if o is not None and not utils.isString(o):
968 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
969 # Make sure all files are closed
970 o.close(); # pylint: disable=maybe-no-member
971 reporter.log('taskExecEx: returns %s' % (rc));
972 return rc;
973
974 #
975 # Admin tasks
976 #
977
978 def hlpRebootShutdownWaitForAck(self, sCmd):
979 """Wait for reboot/shutodwn ACK."""
980 rc = self.recvAckLogged(sCmd);
981 if rc is True:
982 # poll a little while for server to disconnect.
983 uMsStart = base.timestampMilli();
984 while self.oTransport.isConnectionOk() \
985 and base.timestampMilli() - uMsStart >= 5000:
986 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
987 break;
988 self.oTransport.disconnect();
989 return rc;
990
991 def taskReboot(self):
992 rc = self.sendMsg('REBOOT');
993 if rc is True:
994 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
995 return rc;
996
997 def taskShutdown(self):
998 rc = self.sendMsg('SHUTDOWN');
999 if rc is True:
1000 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
1001 return rc;
1002
1003 #
1004 # CD/DVD control tasks.
1005 #
1006
1007 ## TODO
1008
1009 #
1010 # File system tasks
1011 #
1012
1013 def taskMkDir(self, sRemoteDir, fMode):
1014 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
1015 if rc is True:
1016 rc = self.recvAckLogged('MKDIR');
1017 return rc;
1018
1019 def taskMkDirPath(self, sRemoteDir, fMode):
1020 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
1021 if rc is True:
1022 rc = self.recvAckLogged('MKDRPATH');
1023 return rc;
1024
1025 def taskMkSymlink(self, sLinkTarget, sLink):
1026 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1027 if rc is True:
1028 rc = self.recvAckLogged('MKSYMLNK');
1029 return rc;
1030
1031 def taskRmDir(self, sRemoteDir):
1032 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1033 if rc is True:
1034 rc = self.recvAckLogged('RMDIR');
1035 return rc;
1036
1037 def taskRmFile(self, sRemoteFile):
1038 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1039 if rc is True:
1040 rc = self.recvAckLogged('RMFILE');
1041 return rc;
1042
1043 def taskRmSymlink(self, sRemoteSymlink):
1044 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1045 if rc is True:
1046 rc = self.recvAckLogged('RMSYMLNK');
1047 return rc;
1048
1049 def taskRmTree(self, sRemoteTree):
1050 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1051 if rc is True:
1052 rc = self.recvAckLogged('RMTREE');
1053 return rc;
1054
1055 def taskChMod(self, sRemotePath, fMode):
1056 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1057 if rc is True:
1058 rc = self.recvAckLogged('CHMOD');
1059 return rc;
1060
1061 def taskChOwn(self, sRemotePath, idUser, idGroup):
1062 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1063 if rc is True:
1064 rc = self.recvAckLogged('CHOWN');
1065 return rc;
1066
1067 def taskIsDir(self, sRemoteDir):
1068 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1069 if rc is True:
1070 rc = self.recvTrueFalse('ISDIR');
1071 return rc;
1072
1073 def taskIsFile(self, sRemoteFile):
1074 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1075 if rc is True:
1076 rc = self.recvTrueFalse('ISFILE');
1077 return rc;
1078
1079 def taskIsSymlink(self, sRemoteSymlink):
1080 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1081 if rc is True:
1082 rc = self.recvTrueFalse('ISSYMLNK');
1083 return rc;
1084
1085 #def "STAT "
1086 #def "LSTAT "
1087 #def "LIST "
1088
1089 def taskCopyFile(self, sSrcFile, sDstFile, fMode, fFallbackOkay):
1090 """ Copies a file within the remote from source to destination. """
1091 # Note: If fMode is set to 0, it's up to the target OS' implementation with
1092 # what a file mode the destination file gets created (i.e. via umask).
1093 rc = self.sendMsg('CPFILE', (fMode, sSrcFile, sDstFile,));
1094 if rc is True:
1095 rc = self.recvAckLogged('CPFILE');
1096 return rc;
1097
1098 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1099 #
1100 # Open the local file (make sure it exist before bothering TXS) and
1101 # tell TXS that we want to upload a file.
1102 #
1103 try:
1104 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1105 except:
1106 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1107 return False;
1108
1109 # Common cause with taskUploadStr
1110 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1111
1112 # Cleanup.
1113 oLocalFile.close();
1114 return rc;
1115
1116 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1117 # Wrap sContent in a file like class.
1118 class InStringFile(object): # pylint: disable=too-few-public-methods
1119 def __init__(self, sContent):
1120 self.sContent = sContent;
1121 self.off = 0;
1122
1123 def read(self, cbMax):
1124 cbLeft = len(self.sContent) - self.off;
1125 if cbLeft == 0:
1126 return "";
1127 if cbLeft <= cbMax:
1128 sRet = self.sContent[self.off:(self.off + cbLeft)];
1129 else:
1130 sRet = self.sContent[self.off:(self.off + cbMax)];
1131 self.off = self.off + len(sRet);
1132 return sRet;
1133
1134 oLocalString = InStringFile(sContent);
1135 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1136
1137 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1138 """Common worker used by taskUploadFile and taskUploadString."""
1139 #
1140 # Command + ACK.
1141 #
1142 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1143 # Fall back on the old command if the new one is not known by the TXS.
1144 #
1145 if fMode == 0:
1146 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1147 if rc is True:
1148 rc = self.recvAckLogged('PUT FILE');
1149 else:
1150 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1151 if rc is True:
1152 rc = self.recvAck();
1153 if rc is False:
1154 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1155 elif rc is not True:
1156 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1157 # Fallback:
1158 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1159 if rc is True:
1160 rc = self.recvAckLogged('PUT FILE');
1161 else:
1162 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1163 rc = False;
1164 if rc is True:
1165 #
1166 # Push data packets until eof.
1167 #
1168 uMyCrc32 = zlib.crc32(b'');
1169 while True:
1170 # Read up to 64 KB of data.
1171 try:
1172 sRaw = oLocalFile.read(65536);
1173 except:
1174 rc = None;
1175 break;
1176
1177 # Convert to array - this is silly!
1178 abBuf = array.array('B');
1179 if utils.isString(sRaw):
1180 for i, _ in enumerate(sRaw):
1181 abBuf.append(ord(sRaw[i]));
1182 else:
1183 abBuf.extend(sRaw);
1184 sRaw = None;
1185
1186 # Update the file stream CRC and send it off.
1187 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1188 if not abBuf:
1189 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1190 else:
1191 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1192 if rc is False:
1193 break;
1194
1195 # Wait for the reply.
1196 rc = self.recvAck();
1197 if rc is not True:
1198 if rc is False:
1199 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1200 else:
1201 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1202 rc = False;
1203 break;
1204
1205 # EOF?
1206 if not abBuf:
1207 break;
1208
1209 # Send ABORT on ACK and I/O errors.
1210 if rc is None:
1211 rc = self.sendMsg('ABORT');
1212 if rc is True:
1213 self.recvAckLogged('ABORT');
1214 rc = False;
1215 return rc;
1216
1217 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1218 try:
1219 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1220 except:
1221 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1222 return False;
1223
1224 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1225
1226 oLocalFile.close();
1227 if rc is False:
1228 try:
1229 os.remove(sLocalFile);
1230 except:
1231 reporter.errorXcpt();
1232 return rc;
1233
1234 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1235 # Wrap sContent in a file like class.
1236 class OutStringFile(object): # pylint: disable=too-few-public-methods
1237 def __init__(self):
1238 self.asContent = [];
1239
1240 def write(self, sBuf):
1241 self.asContent.append(sBuf);
1242 return None;
1243
1244 oLocalString = OutStringFile();
1245 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1246 if rc is True:
1247 rc = '';
1248 for sBuf in oLocalString.asContent:
1249 if hasattr(sBuf, 'decode'):
1250 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1251 else:
1252 rc += sBuf;
1253 return rc;
1254
1255 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1256 """Common worker for taskDownloadFile and taskDownloadString."""
1257 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1258 if rc is True:
1259 #
1260 # Process data packets until eof.
1261 #
1262 uMyCrc32 = zlib.crc32(b'');
1263 while rc is True:
1264 cbMsg, sOpcode, abPayload = self.recvReply();
1265 if cbMsg is None:
1266 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1267 rc = None;
1268 break;
1269
1270 # Validate.
1271 sOpcode = sOpcode.rstrip();
1272 if sOpcode not in ('DATA', 'DATA EOF',):
1273 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1274 % (sOpcode, getSZ(abPayload, 0, "None")));
1275 rc = False;
1276 break;
1277 if sOpcode == 'DATA' and len(abPayload) < 4:
1278 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1279 rc = None;
1280 break;
1281 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1282 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1283 rc = None;
1284 break;
1285
1286 # Check the CRC (common for both packets).
1287 uCrc32 = getU32(abPayload, 0);
1288 if sOpcode == 'DATA':
1289 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1290 if uCrc32 != (uMyCrc32 & 0xffffffff):
1291 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1292 % (hex(uMyCrc32), hex(uCrc32)));
1293 rc = None;
1294 break;
1295 if sOpcode == 'DATA EOF':
1296 rc = self.sendMsg('ACK');
1297 break;
1298
1299 # Finally, push the data to the file.
1300 try:
1301 if sys.version_info < (3, 9, 0):
1302 # Removed since Python 3.9.
1303 abData = abPayload[4:].tostring();
1304 else:
1305 abData = abPayload[4:].tobytes();
1306 oLocalFile.write(abData);
1307 except:
1308 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1309 rc = None;
1310 break;
1311 rc = self.sendMsg('ACK');
1312
1313 # Send NACK on validation and I/O errors.
1314 if rc is None:
1315 rc = self.sendMsg('NACK');
1316 rc = False;
1317 return rc;
1318
1319 def taskPackFile(self, sRemoteFile, sRemoteSource):
1320 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1321 if rc is True:
1322 rc = self.recvAckLogged('PKFILE');
1323 return rc;
1324
1325 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1326 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1327 if rc is True:
1328 rc = self.recvAckLogged('UNPKFILE');
1329 return rc;
1330
1331 # pylint: enable=missing-docstring
1332
1333
1334 #
1335 # Public methods - generic task queries
1336 #
1337
1338 def isSuccess(self):
1339 """Returns True if the task completed successfully, otherwise False."""
1340 self.lockTask();
1341 sStatus = self.sStatus;
1342 oTaskRc = self.oTaskRc;
1343 self.unlockTask();
1344 if sStatus != "":
1345 return False;
1346 if oTaskRc is False or oTaskRc is None:
1347 return False;
1348 return True;
1349
1350 def getResult(self):
1351 """
1352 Returns the result of a completed task.
1353 Returns None if not completed yet or no previous task.
1354 """
1355 self.lockTask();
1356 sStatus = self.sStatus;
1357 oTaskRc = self.oTaskRc;
1358 self.unlockTask();
1359 if sStatus != "":
1360 return None;
1361 return oTaskRc;
1362
1363 def getLastReply(self):
1364 """
1365 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1366 Returns a None, None, None three-tuple if there was no last reply.
1367 """
1368 self.lockTask();
1369 t3oReply = self.t3oReply;
1370 self.unlockTask();
1371 return t3oReply;
1372
1373 #
1374 # Public methods - connection.
1375 #
1376
1377 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1378 """
1379 Initiates a disconnect task.
1380
1381 Returns True on success, False on failure (logged).
1382
1383 The task returns True on success and False on failure.
1384 """
1385 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1386
1387 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1388 """Synchronous version."""
1389 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1390
1391 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1392 """
1393 Initiates a task for getting the TXS version information.
1394
1395 Returns True on success, False on failure (logged).
1396
1397 The task returns the version string on success and False on failure.
1398 """
1399 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1400
1401 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1402 """Synchronous version."""
1403 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1404
1405 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1406 """
1407 Initiates a task for getting the TXS UUID.
1408
1409 Returns True on success, False on failure (logged).
1410
1411 The task returns UUID string (in {}) on success and False on failure.
1412 """
1413 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1414
1415 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1416 """Synchronous version."""
1417 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1418
1419 #
1420 # Public methods - execution.
1421 #
1422
1423 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1424 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1425 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1426 """
1427 Initiates a exec process task.
1428
1429 Returns True on success, False on failure (logged).
1430
1431 The task returns True if the process exited normally with status code 0.
1432 The task returns None if on failure prior to executing the process, and
1433 False if the process exited with a different status or in an abnormal
1434 manner. Both None and False are logged of course and further info can
1435 also be obtained by getLastReply().
1436
1437 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1438 these streams. If None, no special action is taken and the output goes
1439 to where ever the TXS sends its output, and ditto for input.
1440 - To send to / read from the bitbucket, pass '/dev/null'.
1441 - To redirect to/from a file, just specify the remote filename.
1442 - To append to a file use '>>' followed by the remote filename.
1443 - To pipe the stream to/from the TXS, specify a file like
1444 object. For StdIn a non-blocking read() method is required. For
1445 the other a write() method is required. Watch out for deadlock
1446 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1447 """
1448 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1449 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1450 oStdOut, oStdErr, oTestPipe, sAsUser));
1451
1452 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1453 oStdIn = '/dev/null', oStdOut = '/dev/null',
1454 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1455 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1456 """Synchronous version."""
1457 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1458 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1459
1460 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1461 cMsTimeout = 3600000, fIgnoreErrors = False):
1462 """
1463 Initiates a exec process test task.
1464
1465 Returns True on success, False on failure (logged).
1466
1467 The task returns True if the process exited normally with status code 0.
1468 The task returns None if on failure prior to executing the process, and
1469 False if the process exited with a different status or in an abnormal
1470 manner. Both None and False are logged of course and further info can
1471 also be obtained by getLastReply().
1472
1473 Standard in is taken from /dev/null. While both standard output and
1474 standard error goes directly to reporter.log(). The testpipe is piped
1475 to reporter.xxxx.
1476 """
1477
1478 sStdIn = '/dev/null';
1479 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1480 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1481 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1482 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1483
1484 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1485 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1486
1487 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1488 cMsTimeout = 3600000, fIgnoreErrors = False):
1489 """Synchronous version."""
1490 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1491 cMsTimeout, fIgnoreErrors);
1492
1493 #
1494 # Public methods - system
1495 #
1496
1497 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1498 """
1499 Initiates a reboot task.
1500
1501 Returns True on success, False on failure (logged).
1502
1503 The task returns True on success, False on failure (logged). The
1504 session will be disconnected on successful task completion.
1505 """
1506 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1507
1508 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1509 """Synchronous version."""
1510 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1511
1512 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1513 """
1514 Initiates a shutdown task.
1515
1516 Returns True on success, False on failure (logged).
1517
1518 The task returns True on success, False on failure (logged).
1519 """
1520 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1521
1522 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1523 """Synchronous version."""
1524 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1525
1526
1527 #
1528 # Public methods - file system
1529 #
1530
1531 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1532 """
1533 Initiates a mkdir task.
1534
1535 Returns True on success, False on failure (logged).
1536
1537 The task returns True on success, False on failure (logged).
1538 """
1539 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1540
1541 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1542 """Synchronous version."""
1543 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1544
1545 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1546 """
1547 Initiates a mkdir -p task.
1548
1549 Returns True on success, False on failure (logged).
1550
1551 The task returns True on success, False on failure (logged).
1552 """
1553 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1554
1555 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1556 """Synchronous version."""
1557 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1558
1559 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1560 """
1561 Initiates a symlink task.
1562
1563 Returns True on success, False on failure (logged).
1564
1565 The task returns True on success, False on failure (logged).
1566 """
1567 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1568
1569 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1570 """Synchronous version."""
1571 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1572
1573 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1574 """
1575 Initiates a rmdir task.
1576
1577 Returns True on success, False on failure (logged).
1578
1579 The task returns True on success, False on failure (logged).
1580 """
1581 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1582
1583 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1584 """Synchronous version."""
1585 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1586
1587 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1588 """
1589 Initiates a rmfile task.
1590
1591 Returns True on success, False on failure (logged).
1592
1593 The task returns True on success, False on failure (logged).
1594 """
1595 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1596
1597 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1598 """Synchronous version."""
1599 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1600
1601 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1602 """
1603 Initiates a rmsymlink task.
1604
1605 Returns True on success, False on failure (logged).
1606
1607 The task returns True on success, False on failure (logged).
1608 """
1609 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1610
1611 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1612 """Synchronous version."""
1613 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1614
1615 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1616 """
1617 Initiates a rmtree task.
1618
1619 Returns True on success, False on failure (logged).
1620
1621 The task returns True on success, False on failure (logged).
1622 """
1623 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1624
1625 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1626 """Synchronous version."""
1627 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1628
1629 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1630 """
1631 Initiates a chmod task.
1632
1633 Returns True on success, False on failure (logged).
1634
1635 The task returns True on success, False on failure (logged).
1636 """
1637 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1638
1639 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1640 """Synchronous version."""
1641 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1642
1643 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1644 """
1645 Initiates a chown task.
1646
1647 Returns True on success, False on failure (logged).
1648
1649 The task returns True on success, False on failure (logged).
1650 """
1651 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1652
1653 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1654 """Synchronous version."""
1655 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1656
1657 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1658 """
1659 Initiates a is-dir query task.
1660
1661 Returns True on success, False on failure (logged).
1662
1663 The task returns True if it's a directory, False if it isn't, and
1664 None on error (logged).
1665 """
1666 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1667
1668 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1669 """Synchronous version."""
1670 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1671
1672 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1673 """
1674 Initiates a is-file query task.
1675
1676 Returns True on success, False on failure (logged).
1677
1678 The task returns True if it's a file, False if it isn't, and None on
1679 error (logged).
1680 """
1681 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1682
1683 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1684 """Synchronous version."""
1685 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1686
1687 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1688 """
1689 Initiates a is-symbolic-link query task.
1690
1691 Returns True on success, False on failure (logged).
1692
1693 The task returns True if it's a symbolic linke, False if it isn't, and
1694 None on error (logged).
1695 """
1696 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1697
1698 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1699 """Synchronous version."""
1700 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1701
1702 #def "STAT "
1703 #def "LSTAT "
1704 #def "LIST "
1705
1706 @staticmethod
1707 def calcFileXferTimeout(cbFile):
1708 """
1709 Calculates a reasonable timeout for an upload/download given the file size.
1710
1711 Returns timeout in milliseconds.
1712 """
1713 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1714
1715 @staticmethod
1716 def calcUploadTimeout(sLocalFile):
1717 """
1718 Calculates a reasonable timeout for an upload given the file (will stat it).
1719
1720 Returns timeout in milliseconds.
1721 """
1722 try: cbFile = os.path.getsize(sLocalFile);
1723 except: cbFile = 1024*1024;
1724 return Session.calcFileXferTimeout(cbFile);
1725
1726 def asyncCopyFile(self, sSrcFile, sDstFile,
1727 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1728 """
1729 Initiates a file copying task on the remote.
1730
1731 Returns True on success, False on failure (logged).
1732
1733 The task returns True on success, False on failure (logged).
1734 """
1735 return self.startTask(cMsTimeout, fIgnoreErrors, "cpfile",
1736 self.taskCopyFile, (sSrcFile, sDstFile, fMode, fFallbackOkay));
1737
1738 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1739 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1740 """
1741 Initiates a download query task.
1742
1743 Returns True on success, False on failure (logged).
1744
1745 The task returns True on success, False on failure (logged).
1746 """
1747 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1748 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1749
1750 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1751 """Synchronous version."""
1752 if cMsTimeout <= 0:
1753 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1754 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1755
1756 def asyncUploadString(self, sContent, sRemoteFile,
1757 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1758 """
1759 Initiates a upload string task.
1760
1761 Returns True on success, False on failure (logged).
1762
1763 The task returns True on success, False on failure (logged).
1764 """
1765 if cMsTimeout <= 0:
1766 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1767 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1768 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1769
1770 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1771 """Synchronous version."""
1772 if cMsTimeout <= 0:
1773 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1774 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1775
1776 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1777 """
1778 Initiates a download file task.
1779
1780 Returns True on success, False on failure (logged).
1781
1782 The task returns True on success, False on failure (logged).
1783 """
1784 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1785
1786 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1787 """Synchronous version."""
1788 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1789
1790 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1791 cMsTimeout = 30000, fIgnoreErrors = False):
1792 """
1793 Initiates a download string task.
1794
1795 Returns True on success, False on failure (logged).
1796
1797 The task returns a byte string on success, False on failure (logged).
1798 """
1799 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1800 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1801
1802 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1803 cMsTimeout = 30000, fIgnoreErrors = False):
1804 """Synchronous version."""
1805 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1806 cMsTimeout, fIgnoreErrors);
1807
1808 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1809 """
1810 Initiates a packing file/directory task.
1811
1812 Returns True on success, False on failure (logged).
1813
1814 The task returns True on success, False on failure (logged).
1815 """
1816 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1817 (sRemoteFile, sRemoteSource));
1818
1819 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1820 """Synchronous version."""
1821 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1822
1823 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1824 """
1825 Initiates a unpack file task.
1826
1827 Returns True on success, False on failure (logged).
1828
1829 The task returns True on success, False on failure (logged).
1830 """
1831 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1832 (sRemoteFile, sRemoteDir));
1833
1834 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1835 """Synchronous version."""
1836 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1837
1838
1839class TransportTcp(TransportBase):
1840 """
1841 TCP transport layer for the TXS client session class.
1842 """
1843
1844 def __init__(self, sHostname, uPort, fReversedSetup):
1845 """
1846 Save the parameters. The session will call us back to make the
1847 connection later on its worker thread.
1848 """
1849 TransportBase.__init__(self, utils.getCallerName());
1850 self.sHostname = sHostname;
1851 self.fReversedSetup = fReversedSetup;
1852 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1853 self.oSocket = None;
1854 self.oWakeupW = None;
1855 self.oWakeupR = None;
1856 self.fConnectCanceled = False;
1857 self.fIsConnecting = False;
1858 self.oCv = threading.Condition();
1859 self.abReadAhead = array.array('B');
1860
1861 def toString(self):
1862 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1863 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1864 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1865 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1866
1867 def __isInProgressXcpt(self, oXcpt):
1868 """ In progress exception? """
1869 try:
1870 if isinstance(oXcpt, socket.error):
1871 try:
1872 if oXcpt.errno == errno.EINPROGRESS:
1873 return True;
1874 except: pass;
1875 # Windows?
1876 try:
1877 if oXcpt.errno == errno.EWOULDBLOCK:
1878 return True;
1879 except: pass;
1880 except:
1881 pass;
1882 return False;
1883
1884 def __isWouldBlockXcpt(self, oXcpt):
1885 """ Would block exception? """
1886 try:
1887 if isinstance(oXcpt, socket.error):
1888 try:
1889 if oXcpt.errno == errno.EWOULDBLOCK:
1890 return True;
1891 except: pass;
1892 try:
1893 if oXcpt.errno == errno.EAGAIN:
1894 return True;
1895 except: pass;
1896 except:
1897 pass;
1898 return False;
1899
1900 def __isConnectionReset(self, oXcpt):
1901 """ Connection reset by Peer or others. """
1902 try:
1903 if isinstance(oXcpt, socket.error):
1904 try:
1905 if oXcpt.errno == errno.ECONNRESET:
1906 return True;
1907 except: pass;
1908 try:
1909 if oXcpt.errno == errno.ENETRESET:
1910 return True;
1911 except: pass;
1912 except:
1913 pass;
1914 return False;
1915
1916 def _closeWakeupSockets(self):
1917 """ Closes the wakup sockets. Caller should own the CV. """
1918 oWakeupR = self.oWakeupR;
1919 self.oWakeupR = None;
1920 if oWakeupR is not None:
1921 oWakeupR.close();
1922
1923 oWakeupW = self.oWakeupW;
1924 self.oWakeupW = None;
1925 if oWakeupW is not None:
1926 oWakeupW.close();
1927
1928 return None;
1929
1930 def cancelConnect(self):
1931 # This is bad stuff.
1932 self.oCv.acquire();
1933 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1934 self.fConnectCanceled = True;
1935 if self.fIsConnecting:
1936 oSocket = self.oSocket;
1937 self.oSocket = None;
1938 if oSocket is not None:
1939 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1940 oSocket.close();
1941
1942 oWakeupW = self.oWakeupW;
1943 self.oWakeupW = None;
1944 if oWakeupW is not None:
1945 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1946 try: oWakeupW.send(b'cancelled!\n');
1947 except: reporter.logXcpt();
1948 try: oWakeupW.shutdown(socket.SHUT_WR);
1949 except: reporter.logXcpt();
1950 oWakeupW.close();
1951 self.oCv.release();
1952
1953 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1954 """ Connects to the TXS server as server, i.e. the reversed setup. """
1955 assert(self.fReversedSetup);
1956
1957 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1958
1959 # Workaround for bind() failure...
1960 try:
1961 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1962 except:
1963 reporter.errorXcpt('socket.listen(1) failed');
1964 return None;
1965
1966 # Bind the socket and make it listen.
1967 try:
1968 oSocket.bind((self.sHostname, self.uPort));
1969 except:
1970 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1971 return None;
1972 try:
1973 oSocket.listen(1);
1974 except:
1975 reporter.errorXcpt('socket.listen(1) failed');
1976 return None;
1977
1978 # Accept connections.
1979 oClientSocket = None;
1980 tClientAddr = None;
1981 try:
1982 (oClientSocket, tClientAddr) = oSocket.accept();
1983 except socket.error as e:
1984 if not self.__isInProgressXcpt(e):
1985 raise;
1986
1987 # Do the actual waiting.
1988 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1989 try:
1990 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1991 except socket.error as oXctp:
1992 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
1993 raise;
1994 reporter.log('socket.select() on accept was canceled');
1995 return None;
1996 except:
1997 reporter.logXcpt('socket.select() on accept');
1998
1999 # Try accept again.
2000 try:
2001 (oClientSocket, tClientAddr) = oSocket.accept();
2002 except socket.error as oXcpt:
2003 if not self.__isInProgressXcpt(e):
2004 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
2005 raise;
2006 reporter.log('socket.accept() was canceled');
2007 return None;
2008 reporter.log('socket.accept() timed out');
2009 return False;
2010 except:
2011 reporter.errorXcpt('socket.accept() failed');
2012 return None;
2013 except:
2014 reporter.errorXcpt('socket.accept() failed');
2015 return None;
2016
2017 # Store the connected socket and throw away the server socket.
2018 self.oCv.acquire();
2019 if not self.fConnectCanceled:
2020 self.oSocket.close();
2021 self.oSocket = oClientSocket;
2022 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
2023 self.oCv.release();
2024 return True;
2025
2026 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
2027 """ Connects to the TXS server as client. """
2028 assert(not self.fReversedSetup);
2029
2030 # Connect w/ timeouts.
2031 rc = None;
2032 try:
2033 oSocket.connect((self.sHostname, self.uPort));
2034 rc = True;
2035 except socket.error as oXcpt:
2036 iRc = oXcpt.errno;
2037 if self.__isInProgressXcpt(oXcpt):
2038 # Do the actual waiting.
2039 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
2040 try:
2041 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2042 if len(ttRc[1]) + len(ttRc[2]) == 0:
2043 raise socket.error(errno.ETIMEDOUT, 'select timed out');
2044 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
2045 rc = iRc == 0;
2046 except socket.error as oXcpt2:
2047 iRc = oXcpt2.errno;
2048 except:
2049 iRc = -42;
2050 reporter.fatalXcpt('socket.select() on connect failed');
2051
2052 if rc is True:
2053 pass;
2054 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2055 rc = False; # try again.
2056 else:
2057 if iRc != errno.EBADF or not self.fConnectCanceled:
2058 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2059 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2060 except:
2061 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2062 return rc;
2063
2064
2065 def connect(self, cMsTimeout):
2066 # Create a non-blocking socket.
2067 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2068 try:
2069 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2070 except:
2071 reporter.fatalXcpt('socket.socket() failed');
2072 return None;
2073 try:
2074 oSocket.setblocking(0);
2075 except:
2076 oSocket.close();
2077 reporter.fatalXcpt('socket.socket() failed');
2078 return None;
2079
2080 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2081 oWakeupR = None;
2082 oWakeupW = None;
2083 if hasattr(socket, 'socketpair'):
2084 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2085 except: reporter.logXcpt('socket.socketpair() failed');
2086
2087 # Update the state.
2088 self.oCv.acquire();
2089 rc = None;
2090 if not self.fConnectCanceled:
2091 self.oSocket = oSocket;
2092 self.oWakeupW = oWakeupW;
2093 self.oWakeupR = oWakeupR;
2094 self.fIsConnecting = True;
2095 self.oCv.release();
2096
2097 # Try connect.
2098 if oWakeupR is None:
2099 oWakeupR = oSocket; # Avoid select failure.
2100 if self.fReversedSetup:
2101 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2102 else:
2103 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2104 oSocket = None;
2105
2106 # Update the state and cleanup on failure/cancel.
2107 self.oCv.acquire();
2108 if rc is True and self.fConnectCanceled:
2109 rc = False;
2110 self.fIsConnecting = False;
2111
2112 if rc is not True:
2113 if self.oSocket is not None:
2114 self.oSocket.close();
2115 self.oSocket = None;
2116 self._closeWakeupSockets();
2117 self.oCv.release();
2118
2119 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2120 return rc;
2121
2122 def disconnect(self, fQuiet = False):
2123 if self.oSocket is not None:
2124 self.abReadAhead = array.array('B');
2125
2126 # Try a shutting down the socket gracefully (draining it).
2127 try:
2128 self.oSocket.shutdown(socket.SHUT_WR);
2129 except:
2130 if not fQuiet:
2131 reporter.error('shutdown(SHUT_WR)');
2132 try:
2133 self.oSocket.setblocking(0); # just in case it's not set.
2134 sData = "1";
2135 while sData:
2136 sData = self.oSocket.recv(16384);
2137 except:
2138 pass;
2139
2140 # Close it.
2141 self.oCv.acquire();
2142 try: self.oSocket.setblocking(1);
2143 except: pass;
2144 self.oSocket.close();
2145 self.oSocket = None;
2146 else:
2147 self.oCv.acquire();
2148 self._closeWakeupSockets();
2149 self.oCv.release();
2150
2151 def sendBytes(self, abBuf, cMsTimeout):
2152 if self.oSocket is None:
2153 reporter.error('TransportTcp.sendBytes: No connection.');
2154 return False;
2155
2156 # Try send it all.
2157 try:
2158 cbSent = self.oSocket.send(abBuf);
2159 if cbSent == len(abBuf):
2160 return True;
2161 except Exception as oXcpt:
2162 if not self.__isWouldBlockXcpt(oXcpt):
2163 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2164 return False;
2165 cbSent = 0;
2166
2167 # Do a timed send.
2168 msStart = base.timestampMilli();
2169 while True:
2170 cMsElapsed = base.timestampMilli() - msStart;
2171 if cMsElapsed > cMsTimeout:
2172 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2173 break;
2174
2175 # wait.
2176 try:
2177 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2178 if ttRc[2] and not ttRc[1]:
2179 reporter.error('TranportTcp.sendBytes: select returned with exception');
2180 break;
2181 if not ttRc[1]:
2182 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2183 break;
2184 except:
2185 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2186 break;
2187
2188 # Try send more.
2189 try:
2190 cbSent += self.oSocket.send(abBuf[cbSent:]);
2191 if cbSent == len(abBuf):
2192 return True;
2193 except Exception as oXcpt:
2194 if not self.__isWouldBlockXcpt(oXcpt):
2195 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2196 break;
2197
2198 return False;
2199
2200 def __returnReadAheadBytes(self, cb):
2201 """ Internal worker for recvBytes. """
2202 assert(len(self.abReadAhead) >= cb);
2203 abRet = self.abReadAhead[:cb];
2204 self.abReadAhead = self.abReadAhead[cb:];
2205 return abRet;
2206
2207 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2208 if self.oSocket is None:
2209 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2210 return None;
2211
2212 # Try read in some more data without bothering with timeout handling first.
2213 if len(self.abReadAhead) < cb:
2214 try:
2215 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2216 if abBuf:
2217 self.abReadAhead.extend(array.array('B', abBuf));
2218 except Exception as oXcpt:
2219 if not self.__isWouldBlockXcpt(oXcpt):
2220 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2221 return None;
2222
2223 if len(self.abReadAhead) >= cb:
2224 return self.__returnReadAheadBytes(cb);
2225
2226 # Timeout loop.
2227 msStart = base.timestampMilli();
2228 while True:
2229 cMsElapsed = base.timestampMilli() - msStart;
2230 if cMsElapsed > cMsTimeout:
2231 if not fNoDataOk or self.abReadAhead:
2232 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2233 break;
2234
2235 # Wait.
2236 try:
2237 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2238 if ttRc[2] and not ttRc[0]:
2239 reporter.error('TranportTcp.recvBytes: select returned with exception');
2240 break;
2241 if not ttRc[0]:
2242 if not fNoDataOk or self.abReadAhead:
2243 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2244 % (len(self.abReadAhead), cb, fNoDataOk));
2245 break;
2246 except:
2247 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2248 break;
2249
2250 # Try read more.
2251 try:
2252 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2253 if not abBuf:
2254 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2255 % (len(self.abReadAhead), cb, fNoDataOk));
2256 self.disconnect();
2257 return None;
2258
2259 self.abReadAhead.extend(array.array('B', abBuf));
2260
2261 except Exception as oXcpt:
2262 reporter.log('recv => exception %s' % (oXcpt,));
2263 if not self.__isWouldBlockXcpt(oXcpt):
2264 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2265 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2266 break;
2267
2268 # Done?
2269 if len(self.abReadAhead) >= cb:
2270 return self.__returnReadAheadBytes(cb);
2271
2272 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2273 return None;
2274
2275 def isConnectionOk(self):
2276 if self.oSocket is None:
2277 return False;
2278 try:
2279 ttRc = select.select([], [], [self.oSocket], 0.0);
2280 if ttRc[2]:
2281 return False;
2282
2283 self.oSocket.send(array.array('B')); # send zero bytes.
2284 except:
2285 return False;
2286 return True;
2287
2288 def isRecvPending(self, cMsTimeout = 0):
2289 try:
2290 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2291 if not ttRc[0]:
2292 return False;
2293 except:
2294 pass;
2295 return True;
2296
2297
2298def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2299 """
2300 Opens a connection to a Test Execution Service via TCP, given its name.
2301
2302 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2303 or similar.
2304 """
2305 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2306 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2307 try:
2308 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2309 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2310 except:
2311 reporter.errorXcpt(None, 15);
2312 return None;
2313 return oSession;
2314
2315
2316def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2317 """
2318 Tries to open a connection to a Test Execution Service via TCP, given its name.
2319
2320 This differs from openTcpSession in that it won't log a connection failure
2321 as an error.
2322 """
2323 try:
2324 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2325 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2326 except:
2327 reporter.errorXcpt(None, 15);
2328 return None;
2329 return oSession;
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette