VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 93718

Last change on this file since 93718 was 93156, checked in by vboxsync, 3 years ago

ValKit/txsclient.py: log more PROC payload [pylint fix]

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 87.7 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 93156 2022-01-09 13:22:31Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2022 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 93156 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 if sys.version_info < (3, 9, 0):
75 # Removed since Python 3.9.
76 sStr = abStr.tostring(); # pylint: disable=no-member
77 else:
78 sStr = abStr.tobytes();
79 return sStr.decode('utf_8');
80 except:
81 reporter.errorXcpt('getSZ(,%u)' % (off));
82 return sDefault;
83
84def getSZLen(abData, off):
85 """
86 Get the length of a zero-terminated string field, in bytes.
87 Returns -1 if off is beyond the data packet or not properly terminated.
88 """
89 cbData = len(abData);
90 if off >= cbData:
91 return -1;
92
93 offCur = off;
94 while abData[offCur] != 0:
95 offCur = offCur + 1;
96 if offCur >= cbData:
97 return -1;
98
99 return offCur - off;
100
101def isValidOpcodeEncoding(sOpcode):
102 """
103 Checks if the specified opcode is valid or not.
104 Returns True on success.
105 Returns False if it is invalid, details in the log.
106 """
107 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
108 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
109 if len(sOpcode) != 8:
110 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
111 return False;
112 for i in range(0, 1):
113 if sSet1.find(sOpcode[i]) < 0:
114 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
115 return False;
116 for i in range(2, 7):
117 if sSet2.find(sOpcode[i]) < 0:
118 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
119 return False;
120 return True;
121
122#
123# Helper for encoding data sent to the TXS.
124#
125
126def u32ToByteArray(u32):
127 """Encodes the u32 value as a little endian byte (B) array."""
128 return array.array('B',
129 ( u32 % 256,
130 (u32 // 256) % 256,
131 (u32 // 65536) % 256,
132 (u32 // 16777216) % 256) );
133
134def escapeString(sString):
135 """
136 Does $ escaping of the string so TXS doesn't try do variable expansion.
137 """
138 return sString.replace('$', '$$');
139
140
141
142class TransportBase(object):
143 """
144 Base class for the transport layer.
145 """
146
147 def __init__(self, sCaller):
148 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
149 self.fDummy = 0;
150 self.abReadAheadHdr = array.array('B');
151
152 def toString(self):
153 """
154 Stringify the instance for logging and debugging.
155 """
156 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
157
158 def __str__(self):
159 return self.toString();
160
161 def cancelConnect(self):
162 """
163 Cancels any pending connect() call.
164 Returns None;
165 """
166 return None;
167
168 def connect(self, cMsTimeout):
169 """
170 Quietly attempts to connect to the TXS.
171
172 Returns True on success.
173 Returns False on retryable errors (no logging).
174 Returns None on fatal errors with details in the log.
175
176 Override this method, don't call super.
177 """
178 _ = cMsTimeout;
179 return False;
180
181 def disconnect(self, fQuiet = False):
182 """
183 Disconnect from the TXS.
184
185 Returns True.
186
187 Override this method, don't call super.
188 """
189 _ = fQuiet;
190 return True;
191
192 def sendBytes(self, abBuf, cMsTimeout):
193 """
194 Sends the bytes in the buffer abBuf to the TXS.
195
196 Returns True on success.
197 Returns False on failure and error details in the log.
198
199 Override this method, don't call super.
200
201 Remarks: len(abBuf) is always a multiple of 16.
202 """
203 _ = abBuf; _ = cMsTimeout;
204 return False;
205
206 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
207 """
208 Receive cb number of bytes from the TXS.
209
210 Returns the bytes (array('B')) on success.
211 Returns None on failure and error details in the log.
212
213 Override this method, don't call super.
214
215 Remarks: cb is always a multiple of 16.
216 """
217 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
218 return None;
219
220 def isConnectionOk(self):
221 """
222 Checks if the connection is OK.
223
224 Returns True if it is.
225 Returns False if it isn't (caller should call diconnect).
226
227 Override this method, don't call super.
228 """
229 return True;
230
231 def isRecvPending(self, cMsTimeout = 0):
232 """
233 Checks if there is incoming bytes, optionally waiting cMsTimeout
234 milliseconds for something to arrive.
235
236 Returns True if there is, False if there isn't.
237
238 Override this method, don't call super.
239 """
240 _ = cMsTimeout;
241 return False;
242
243 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
244 """
245 Sends a message (opcode + encoded payload).
246
247 Returns True on success.
248 Returns False on failure and error details in the log.
249 """
250 # Fix + check the opcode.
251 if len(sOpcode) < 2:
252 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
253 return False;
254 sOpcode = sOpcode.ljust(8);
255 if not isValidOpcodeEncoding(sOpcode):
256 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
257 return False;
258
259 # Start construct the message.
260 cbMsg = 16 + len(abPayload);
261 abMsg = array.array('B');
262 abMsg.extend(u32ToByteArray(cbMsg));
263 abMsg.extend((0, 0, 0, 0)); # uCrc32
264 try:
265 abMsg.extend(array.array('B', \
266 ( ord(sOpcode[0]), \
267 ord(sOpcode[1]), \
268 ord(sOpcode[2]), \
269 ord(sOpcode[3]), \
270 ord(sOpcode[4]), \
271 ord(sOpcode[5]), \
272 ord(sOpcode[6]), \
273 ord(sOpcode[7]) ) ) );
274 if abPayload:
275 abMsg.extend(abPayload);
276 except:
277 reporter.fatalXcpt('sendMsgInt: packing problem...');
278 return False;
279
280 # checksum it, padd it and send it off.
281 uCrc32 = zlib.crc32(abMsg[8:]);
282 abMsg[4:8] = u32ToByteArray(uCrc32);
283
284 while len(abMsg) % 16:
285 abMsg.append(0);
286
287 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
288 return self.sendBytes(abMsg, cMsTimeout);
289
290 def recvMsg(self, cMsTimeout, fNoDataOk = False):
291 """
292 Receives a message from the TXS.
293
294 Returns the message three-tuple: length, opcode, payload.
295 Returns (None, None, None) on failure and error details in the log.
296 """
297
298 # Read the header.
299 if self.abReadAheadHdr:
300 assert(len(self.abReadAheadHdr) == 16);
301 abHdr = self.abReadAheadHdr;
302 self.abReadAheadHdr = array.array('B');
303 else:
304 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
305 if abHdr is None:
306 return (None, None, None);
307 if len(abHdr) != 16:
308 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
309 return (None, None, None);
310
311 # Unpack and validate the header.
312 cbMsg = getU32(abHdr, 0);
313 uCrc32 = getU32(abHdr, 4);
314
315 if sys.version_info < (3, 9, 0):
316 # Removed since Python 3.9.
317 sOpcode = abHdr[8:16].tostring(); # pylint: disable=no-member
318 else:
319 sOpcode = abHdr[8:16].tobytes();
320 sOpcode = sOpcode.decode('ascii');
321
322 if cbMsg < 16:
323 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
324 return (None, None, None);
325 if cbMsg > 1024*1024:
326 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
327 return (None, None, None);
328 if not isValidOpcodeEncoding(sOpcode):
329 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
330 return (None, None, None);
331
332 # Get the payload (if any), dropping the padding.
333 abPayload = array.array('B');
334 if cbMsg > 16:
335 if cbMsg % 16:
336 cbPadding = 16 - (cbMsg % 16);
337 else:
338 cbPadding = 0;
339 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
340 if abPayload is None:
341 self.abReadAheadHdr = abHdr;
342 if not fNoDataOk :
343 reporter.log('recvMsg: failed to recv payload bytes!');
344 return (None, None, None);
345
346 while cbPadding > 0:
347 abPayload.pop();
348 cbPadding = cbPadding - 1;
349
350 # Check the CRC-32.
351 if uCrc32 != 0:
352 uActualCrc32 = zlib.crc32(abHdr[8:]);
353 if cbMsg > 16:
354 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
355 uActualCrc32 = uActualCrc32 & 0xffffffff;
356 if uCrc32 != uActualCrc32:
357 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
358 return (None, None, None);
359
360 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
361 return (cbMsg, sOpcode, abPayload);
362
363 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
364 """
365 Sends a message (opcode + payload tuple).
366
367 Returns True on success.
368 Returns False on failure and error details in the log.
369 Returns None if you pass the incorrectly typed parameters.
370 """
371 # Encode the payload.
372 abPayload = array.array('B');
373 for o in aoPayload:
374 try:
375 if utils.isString(o):
376 if sys.version_info[0] >= 3:
377 abPayload.extend(o.encode('utf_8'));
378 else:
379 # the primitive approach...
380 sUtf8 = o.encode('utf_8');
381 for ch in sUtf8:
382 abPayload.append(ord(ch))
383 abPayload.append(0);
384 elif isinstance(o, (long, int)):
385 if o < 0 or o > 0xffffffff:
386 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
387 return None;
388 abPayload.extend(u32ToByteArray(o));
389 elif isinstance(o, array.array):
390 abPayload.extend(o);
391 else:
392 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
393 return None;
394 except:
395 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
396 return None;
397 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
398
399
400class Session(TdTaskBase):
401 """
402 A Test eXecution Service (TXS) client session.
403 """
404
405 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
406 """
407 Construct a TXS session.
408
409 This starts by connecting to the TXS and will enter the signalled state
410 when connected or the timeout has been reached.
411 """
412 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
413 self.oTransport = oTransport;
414 self.sStatus = "";
415 self.cMsTimeout = 0;
416 self.fErr = True; # Whether to report errors as error.
417 self.msStart = 0;
418 self.oThread = None;
419 self.fnTask = self.taskDummy;
420 self.aTaskArgs = None;
421 self.oTaskRc = None;
422 self.t3oReply = (None, None, None);
423 self.fScrewedUpMsgState = False;
424 self.fTryConnect = fTryConnect;
425
426 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
427 raise base.GenError("startTask failed");
428
429 def __del__(self):
430 """Make sure to cancel the task when deleted."""
431 self.cancelTask();
432
433 def toString(self):
434 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
435 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
436 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
437 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
438
439 def taskDummy(self):
440 """Place holder to catch broken state handling."""
441 raise Exception();
442
443 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
444 """
445 Kicks of a new task.
446
447 cMsTimeout: The task timeout in milliseconds. Values less than
448 500 ms will be adjusted to 500 ms. This means it is
449 OK to use negative value.
450 sStatus: The task status.
451 fnTask: The method that'll execute the task.
452 aArgs: Arguments to pass to fnTask.
453
454 Returns True on success, False + error in log on failure.
455 """
456 if not self.cancelTask():
457 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
458 return False;
459
460 # Change status and make sure we're the
461 self.lockTask();
462 if self.sStatus != "":
463 self.unlockTask();
464 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
465 return False;
466 self.sStatus = "setup";
467 self.oTaskRc = None;
468 self.t3oReply = (None, None, None);
469 self.resetTaskLocked();
470 self.unlockTask();
471
472 self.cMsTimeout = max(cMsTimeout, 500);
473 self.fErr = not fIgnoreErrors;
474 self.fnTask = fnTask;
475 self.aTaskArgs = aArgs;
476 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
477 self.oThread.setDaemon(True);
478 self.msStart = base.timestampMilli();
479
480 self.lockTask();
481 self.sStatus = sStatus;
482 self.unlockTask();
483 self.oThread.start();
484
485 return True;
486
487 def cancelTask(self, fSync = True):
488 """
489 Attempts to cancel any pending tasks.
490 Returns success indicator (True/False).
491 """
492 self.lockTask();
493
494 if self.sStatus == "":
495 self.unlockTask();
496 return True;
497 if self.sStatus == "setup":
498 self.unlockTask();
499 return False;
500 if self.sStatus == "cancelled":
501 self.unlockTask();
502 return False;
503
504 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
505 if self.sStatus == 'connecting':
506 self.oTransport.cancelConnect();
507
508 self.sStatus = "cancelled";
509 oThread = self.oThread;
510 self.unlockTask();
511
512 if not fSync:
513 return False;
514
515 oThread.join(61.0);
516
517 if sys.version_info < (3, 9, 0):
518 # Removed since Python 3.9.
519 return oThread.isAlive(); # pylint: disable=no-member
520 return oThread.is_alive();
521
522 def taskThread(self):
523 """
524 The task thread function.
525 This does some housekeeping activities around the real task method call.
526 """
527 if not self.isCancelled():
528 try:
529 fnTask = self.fnTask;
530 oTaskRc = fnTask(*self.aTaskArgs);
531 except:
532 reporter.fatalXcpt('taskThread', 15);
533 oTaskRc = None;
534 else:
535 reporter.log('taskThread: cancelled already');
536
537 self.lockTask();
538
539 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
540 self.oTaskRc = oTaskRc;
541 self.oThread = None;
542 self.sStatus = '';
543 self.signalTaskLocked();
544
545 self.unlockTask();
546 return None;
547
548 def isCancelled(self):
549 """Internal method for checking if the task has been cancelled."""
550 self.lockTask();
551 sStatus = self.sStatus;
552 self.unlockTask();
553 if sStatus == "cancelled":
554 return True;
555 return False;
556
557 def hasTimedOut(self):
558 """Internal method for checking if the task has timed out or not."""
559 cMsLeft = self.getMsLeft();
560 if cMsLeft <= 0:
561 return True;
562 return False;
563
564 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
565 """Gets the time left until the timeout."""
566 cMsElapsed = base.timestampMilli() - self.msStart;
567 if cMsElapsed < 0:
568 return cMsMin;
569 cMsLeft = self.cMsTimeout - cMsElapsed;
570 if cMsLeft <= cMsMin:
571 return cMsMin;
572 if cMsLeft > cMsMax > 0:
573 return cMsMax
574 return cMsLeft;
575
576 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
577 """
578 Wrapper for TransportBase.recvMsg that stashes the response away
579 so the client can inspect it later on.
580 """
581 if cMsTimeout is None:
582 cMsTimeout = self.getMsLeft(500);
583 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
584 self.lockTask();
585 self.t3oReply = (cbMsg, sOpcode, abPayload);
586 self.unlockTask();
587 return (cbMsg, sOpcode, abPayload);
588
589 def recvAck(self, fNoDataOk = False):
590 """
591 Receives an ACK or error response from the TXS.
592
593 Returns True on success.
594 Returns False on timeout or transport error.
595 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
596 and there are always details of some sort or another.
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
599 if cbMsg is None:
600 return False;
601 sOpcode = sOpcode.strip()
602 if sOpcode == "ACK":
603 return True;
604 return (sOpcode, getSZ(abPayload, 0, sOpcode));
605
606 def recvAckLogged(self, sCommand, fNoDataOk = False):
607 """
608 Wrapper for recvAck and logging.
609 Returns True on success (ACK).
610 Returns False on time, transport error and errors signalled by TXS.
611 """
612 rc = self.recvAck(fNoDataOk);
613 if rc is not True and not fNoDataOk:
614 if rc is False:
615 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
616 else:
617 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
618 rc = False;
619 return rc;
620
621 def recvTrueFalse(self, sCommand):
622 """
623 Receives a TRUE/FALSE response from the TXS.
624 Returns True on TRUE, False on FALSE and None on error/other (logged).
625 """
626 cbMsg, sOpcode, abPayload = self.recvReply();
627 if cbMsg is None:
628 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
629 return None;
630
631 sOpcode = sOpcode.strip()
632 if sOpcode == "TRUE":
633 return True;
634 if sOpcode == "FALSE":
635 return False;
636 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
637 return None;
638
639 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
640 """
641 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
642 """
643 if cMsTimeout is None:
644 cMsTimeout = self.getMsLeft(500);
645 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
646
647 def asyncToSync(self, fnAsync, *aArgs):
648 """
649 Wraps an asynchronous task into a synchronous operation.
650
651 Returns False on failure, task return status on success.
652 """
653 rc = fnAsync(*aArgs);
654 if rc is False:
655 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
656 return rc;
657
658 rc = self.waitForTask(self.cMsTimeout + 5000);
659 if rc is False:
660 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
661 self.cancelTask();
662 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
663 return False;
664
665 rc = self.getResult();
666 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
667 return rc;
668
669 #
670 # Connection tasks.
671 #
672
673 def taskConnect(self, cMsIdleFudge):
674 """Tries to connect to the TXS"""
675 while not self.isCancelled():
676 reporter.log2('taskConnect: connecting ...');
677 rc = self.oTransport.connect(self.getMsLeft(500));
678 if rc is True:
679 reporter.log('taskConnect: succeeded');
680 return self.taskGreet(cMsIdleFudge);
681 if rc is None:
682 reporter.log2('taskConnect: unable to connect');
683 return None;
684 if self.hasTimedOut():
685 reporter.log2('taskConnect: timed out');
686 if not self.fTryConnect:
687 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
688 return False;
689 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
690 if not self.fTryConnect:
691 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
692 return False;
693
694 def taskGreet(self, cMsIdleFudge):
695 """Greets the TXS"""
696 rc = self.sendMsg("HOWDY", ());
697 if rc is True:
698 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
699 if rc is True:
700 while cMsIdleFudge > 0:
701 cMsIdleFudge -= 1000;
702 time.sleep(1);
703 else:
704 self.oTransport.disconnect(self.fTryConnect);
705 return rc;
706
707 def taskBye(self):
708 """Says goodbye to the TXS"""
709 rc = self.sendMsg("BYE");
710 if rc is True:
711 rc = self.recvAckLogged("BYE");
712 self.oTransport.disconnect();
713 return rc;
714
715 def taskVer(self):
716 """Requests version information from TXS"""
717 rc = self.sendMsg("VER");
718 if rc is True:
719 rc = False;
720 cbMsg, sOpcode, abPayload = self.recvReply();
721 if cbMsg is not None:
722 sOpcode = sOpcode.strip();
723 if sOpcode == "ACK VER":
724 sVer = getSZ(abPayload, 0);
725 if sVer is not None:
726 rc = sVer;
727 else:
728 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
729 else:
730 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
731 return rc;
732
733 def taskUuid(self):
734 """Gets the TXS UUID"""
735 rc = self.sendMsg("UUID");
736 if rc is True:
737 rc = False;
738 cbMsg, sOpcode, abPayload = self.recvReply();
739 if cbMsg is not None:
740 sOpcode = sOpcode.strip()
741 if sOpcode == "ACK UUID":
742 sUuid = getSZ(abPayload, 0);
743 if sUuid is not None:
744 sUuid = '{%s}' % (sUuid,)
745 try:
746 _ = uuid.UUID(sUuid);
747 rc = sUuid;
748 except:
749 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
750 else:
751 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
752 else:
753 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
754 else:
755 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
756 return rc;
757
758 #
759 # Process task
760 # pylint: disable=missing-docstring
761 #
762
763 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
764 # Construct the payload.
765 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
766 for sArg in asArgs:
767 aoPayload.append('%s' % (sArg));
768 aoPayload.append(long(len(asAddEnv)));
769 for sPutEnv in asAddEnv:
770 aoPayload.append('%s' % (sPutEnv));
771 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
772 if utils.isString(o):
773 aoPayload.append(o);
774 elif o is not None:
775 aoPayload.append('|');
776 o.uTxsClientCrc32 = zlib.crc32(b'');
777 else:
778 aoPayload.append('');
779 aoPayload.append('%s' % (sAsUser));
780 aoPayload.append(long(self.cMsTimeout));
781
782 # Kick of the EXEC command.
783 rc = self.sendMsg('EXEC', aoPayload)
784 if rc is True:
785 rc = self.recvAckLogged('EXEC');
786 if rc is True:
787 # Loop till the process completes, feed input to the TXS and
788 # receive output from it.
789 sFailure = "";
790 msPendingInputReply = None;
791 cbMsg, sOpcode, abPayload = (None, None, None);
792 while True:
793 # Pending input?
794 if msPendingInputReply is None \
795 and oStdIn is not None \
796 and not utils.isString(oStdIn):
797 try:
798 sInput = oStdIn.read(65536);
799 except:
800 reporter.errorXcpt('read standard in');
801 sFailure = 'exception reading stdin';
802 rc = None;
803 break;
804 if sInput:
805 # Convert to a byte array before handing it of to sendMsg or the string
806 # will get some zero termination added breaking the CRC (and injecting
807 # unwanted bytes).
808 abInput = array.array('B', sInput.encode('utf-8'));
809 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
810 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
811 if rc is not True:
812 sFailure = 'sendMsg failure';
813 break;
814 msPendingInputReply = base.timestampMilli();
815 continue;
816
817 rc = self.sendMsg('STDINEOS');
818 oStdIn = None;
819 if rc is not True:
820 sFailure = 'sendMsg failure';
821 break;
822 msPendingInputReply = base.timestampMilli();
823
824 # Wait for input (500 ms timeout).
825 if cbMsg is None:
826 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
827 if cbMsg is None:
828 # Check for time out before restarting the loop.
829 # Note! Only doing timeout checking here does mean that
830 # the TXS may prevent us from timing out by
831 # flooding us with data. This is unlikely though.
832 if self.hasTimedOut() \
833 and ( msPendingInputReply is None \
834 or base.timestampMilli() - msPendingInputReply > 30000):
835 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
836 sFailure = 'timeout';
837 rc = None;
838 break;
839 # Check that the connection is OK.
840 if not self.oTransport.isConnectionOk():
841 self.oTransport.disconnect();
842 sFailure = 'disconnected';
843 rc = False;
844 break;
845 continue;
846
847 # Handle the response.
848 sOpcode = sOpcode.rstrip();
849 if sOpcode == 'STDOUT':
850 oOut = oStdOut;
851 elif sOpcode == 'STDERR':
852 oOut = oStdErr;
853 elif sOpcode == 'TESTPIPE':
854 oOut = oTestPipe;
855 else:
856 oOut = None;
857 if oOut is not None:
858 # Output from the process.
859 if len(abPayload) < 4:
860 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
861 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
862 rc = None;
863 break;
864 uStreamCrc32 = getU32(abPayload, 0);
865 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
866 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
867 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
868 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
869 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
870 rc = None;
871 break;
872 try:
873 oOut.write(abPayload[4:]);
874 except:
875 sFailure = 'exception writing %s' % (sOpcode);
876 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
877 rc = None;
878 break;
879 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
880 # Standard input is ignored. Ignore this condition for now.
881 msPendingInputReply = None;
882 reporter.log('taskExecEx: Standard input is ignored... why?');
883 del oStdIn.uTxsClientCrc32;
884 oStdIn = '/dev/null';
885 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
886 and msPendingInputReply is not None:
887 # TXS STDIN error, abort.
888 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
889 msPendingInputReply = None;
890 sFailure = 'TXS is out of memory for std input buffering';
891 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
892 rc = None;
893 break;
894 elif sOpcode == 'ACK' and msPendingInputReply is not None:
895 msPendingInputReply = None;
896 elif sOpcode.startswith('PROC '):
897 # Process status message, handle it outside the loop.
898 rc = True;
899 break;
900 else:
901 sFailure = 'Unexpected opcode %s' % (sOpcode);
902 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
903 rc = None;
904 break;
905 # Clear the message.
906 cbMsg, sOpcode, abPayload = (None, None, None);
907
908 # If we sent an STDIN packet and didn't get a reply yet, we'll give
909 # TXS some 5 seconds to reply to this. If we don't wait here we'll
910 # get screwed later on if we mix it up with the reply to some other
911 # command. Hackish.
912 if msPendingInputReply is not None:
913 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
914 if cbMsg2 is not None:
915 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
916 % (cbMsg2, sOpcode2, abPayload2));
917 msPendingInputReply = None;
918 else:
919 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
920 self.fScrewedUpMsgState = True;
921
922 # Parse the exit status (True), abort (None) or do nothing (False).
923 if rc is True:
924 if sOpcode == 'PROC OK':
925 pass;
926 else:
927 rc = False;
928 # Do proper parsing some other day if needed:
929 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
930 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
931 if sOpcode == 'PROC DOO':
932 reporter.log('taskExecEx: PROC DOO[FUS]: %s' % (abPayload,));
933 elif sOpcode.startswith('PROC NOK'):
934 reporter.log('taskExecEx: PROC NOK: rcExit=%s' % (abPayload,));
935 elif abPayload and sOpcode.startswith('PROC '):
936 reporter.log('taskExecEx: %s payload=%s' % (sOpcode, abPayload,));
937
938 else:
939 if rc is None:
940 # Abort it.
941 reporter.log('taskExecEx: sending ABORT...');
942 rc = self.sendMsg('ABORT');
943 while rc is True:
944 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
945 if cbMsg is None:
946 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
947 self.fScrewedUpMsgState = True;
948 break;
949 if sOpcode.startswith('PROC '):
950 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
951 break;
952 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
953 # Check that the connection is OK before looping.
954 if not self.oTransport.isConnectionOk():
955 self.oTransport.disconnect();
956 break;
957
958 # Fake response with the reason why we quit.
959 if sFailure is not None:
960 self.t3oReply = (0, 'EXECFAIL', sFailure);
961 rc = None;
962 else:
963 rc = None;
964
965 # Cleanup.
966 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
967 if o is not None and not utils.isString(o):
968 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
969 # Make sure all files are closed
970 o.close(); # pylint: disable=maybe-no-member
971 reporter.log('taskExecEx: returns %s' % (rc));
972 return rc;
973
974 #
975 # Admin tasks
976 #
977
978 def hlpRebootShutdownWaitForAck(self, sCmd):
979 """Wait for reboot/shutodwn ACK."""
980 rc = self.recvAckLogged(sCmd);
981 if rc is True:
982 # poll a little while for server to disconnect.
983 uMsStart = base.timestampMilli();
984 while self.oTransport.isConnectionOk() \
985 and base.timestampMilli() - uMsStart >= 5000:
986 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
987 break;
988 self.oTransport.disconnect();
989 return rc;
990
991 def taskReboot(self):
992 rc = self.sendMsg('REBOOT');
993 if rc is True:
994 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
995 return rc;
996
997 def taskShutdown(self):
998 rc = self.sendMsg('SHUTDOWN');
999 if rc is True:
1000 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
1001 return rc;
1002
1003 #
1004 # CD/DVD control tasks.
1005 #
1006
1007 ## TODO
1008
1009 #
1010 # File system tasks
1011 #
1012
1013 def taskMkDir(self, sRemoteDir, fMode):
1014 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
1015 if rc is True:
1016 rc = self.recvAckLogged('MKDIR');
1017 return rc;
1018
1019 def taskMkDirPath(self, sRemoteDir, fMode):
1020 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
1021 if rc is True:
1022 rc = self.recvAckLogged('MKDRPATH');
1023 return rc;
1024
1025 def taskMkSymlink(self, sLinkTarget, sLink):
1026 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1027 if rc is True:
1028 rc = self.recvAckLogged('MKSYMLNK');
1029 return rc;
1030
1031 def taskRmDir(self, sRemoteDir):
1032 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1033 if rc is True:
1034 rc = self.recvAckLogged('RMDIR');
1035 return rc;
1036
1037 def taskRmFile(self, sRemoteFile):
1038 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1039 if rc is True:
1040 rc = self.recvAckLogged('RMFILE');
1041 return rc;
1042
1043 def taskRmSymlink(self, sRemoteSymlink):
1044 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1045 if rc is True:
1046 rc = self.recvAckLogged('RMSYMLNK');
1047 return rc;
1048
1049 def taskRmTree(self, sRemoteTree):
1050 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1051 if rc is True:
1052 rc = self.recvAckLogged('RMTREE');
1053 return rc;
1054
1055 def taskChMod(self, sRemotePath, fMode):
1056 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1057 if rc is True:
1058 rc = self.recvAckLogged('CHMOD');
1059 return rc;
1060
1061 def taskChOwn(self, sRemotePath, idUser, idGroup):
1062 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1063 if rc is True:
1064 rc = self.recvAckLogged('CHOWN');
1065 return rc;
1066
1067 def taskIsDir(self, sRemoteDir):
1068 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1069 if rc is True:
1070 rc = self.recvTrueFalse('ISDIR');
1071 return rc;
1072
1073 def taskIsFile(self, sRemoteFile):
1074 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1075 if rc is True:
1076 rc = self.recvTrueFalse('ISFILE');
1077 return rc;
1078
1079 def taskIsSymlink(self, sRemoteSymlink):
1080 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1081 if rc is True:
1082 rc = self.recvTrueFalse('ISSYMLNK');
1083 return rc;
1084
1085 #def "STAT "
1086 #def "LSTAT "
1087 #def "LIST "
1088
1089 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1090 #
1091 # Open the local file (make sure it exist before bothering TXS) and
1092 # tell TXS that we want to upload a file.
1093 #
1094 try:
1095 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1096 except:
1097 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1098 return False;
1099
1100 # Common cause with taskUploadStr
1101 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1102
1103 # Cleanup.
1104 oLocalFile.close();
1105 return rc;
1106
1107 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1108 # Wrap sContent in a file like class.
1109 class InStringFile(object): # pylint: disable=too-few-public-methods
1110 def __init__(self, sContent):
1111 self.sContent = sContent;
1112 self.off = 0;
1113
1114 def read(self, cbMax):
1115 cbLeft = len(self.sContent) - self.off;
1116 if cbLeft == 0:
1117 return "";
1118 if cbLeft <= cbMax:
1119 sRet = self.sContent[self.off:(self.off + cbLeft)];
1120 else:
1121 sRet = self.sContent[self.off:(self.off + cbMax)];
1122 self.off = self.off + len(sRet);
1123 return sRet;
1124
1125 oLocalString = InStringFile(sContent);
1126 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1127
1128 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1129 """Common worker used by taskUploadFile and taskUploadString."""
1130 #
1131 # Command + ACK.
1132 #
1133 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1134 # Fall back on the old command if the new one is not known by the TXS.
1135 #
1136 if fMode == 0:
1137 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1138 if rc is True:
1139 rc = self.recvAckLogged('PUT FILE');
1140 else:
1141 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1142 if rc is True:
1143 rc = self.recvAck();
1144 if rc is False:
1145 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1146 elif rc is not True:
1147 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1148 # Fallback:
1149 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1150 if rc is True:
1151 rc = self.recvAckLogged('PUT FILE');
1152 else:
1153 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1154 rc = False;
1155 if rc is True:
1156 #
1157 # Push data packets until eof.
1158 #
1159 uMyCrc32 = zlib.crc32(b'');
1160 while True:
1161 # Read up to 64 KB of data.
1162 try:
1163 sRaw = oLocalFile.read(65536);
1164 except:
1165 rc = None;
1166 break;
1167
1168 # Convert to array - this is silly!
1169 abBuf = array.array('B');
1170 if utils.isString(sRaw):
1171 for i, _ in enumerate(sRaw):
1172 abBuf.append(ord(sRaw[i]));
1173 else:
1174 abBuf.extend(sRaw);
1175 sRaw = None;
1176
1177 # Update the file stream CRC and send it off.
1178 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1179 if not abBuf:
1180 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1181 else:
1182 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1183 if rc is False:
1184 break;
1185
1186 # Wait for the reply.
1187 rc = self.recvAck();
1188 if rc is not True:
1189 if rc is False:
1190 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1191 else:
1192 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1193 rc = False;
1194 break;
1195
1196 # EOF?
1197 if not abBuf:
1198 break;
1199
1200 # Send ABORT on ACK and I/O errors.
1201 if rc is None:
1202 rc = self.sendMsg('ABORT');
1203 if rc is True:
1204 self.recvAckLogged('ABORT');
1205 rc = False;
1206 return rc;
1207
1208 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1209 try:
1210 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1211 except:
1212 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1213 return False;
1214
1215 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1216
1217 oLocalFile.close();
1218 if rc is False:
1219 try:
1220 os.remove(sLocalFile);
1221 except:
1222 reporter.errorXcpt();
1223 return rc;
1224
1225 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1226 # Wrap sContent in a file like class.
1227 class OutStringFile(object): # pylint: disable=too-few-public-methods
1228 def __init__(self):
1229 self.asContent = [];
1230
1231 def write(self, sBuf):
1232 self.asContent.append(sBuf);
1233 return None;
1234
1235 oLocalString = OutStringFile();
1236 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1237 if rc is True:
1238 rc = '';
1239 for sBuf in oLocalString.asContent:
1240 if hasattr(sBuf, 'decode'):
1241 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1242 else:
1243 rc += sBuf;
1244 return rc;
1245
1246 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1247 """Common worker for taskDownloadFile and taskDownloadString."""
1248 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1249 if rc is True:
1250 #
1251 # Process data packets until eof.
1252 #
1253 uMyCrc32 = zlib.crc32(b'');
1254 while rc is True:
1255 cbMsg, sOpcode, abPayload = self.recvReply();
1256 if cbMsg is None:
1257 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1258 rc = None;
1259 break;
1260
1261 # Validate.
1262 sOpcode = sOpcode.rstrip();
1263 if sOpcode not in ('DATA', 'DATA EOF',):
1264 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1265 % (sOpcode, getSZ(abPayload, 0, "None")));
1266 rc = False;
1267 break;
1268 if sOpcode == 'DATA' and len(abPayload) < 4:
1269 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1270 rc = None;
1271 break;
1272 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1273 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1274 rc = None;
1275 break;
1276
1277 # Check the CRC (common for both packets).
1278 uCrc32 = getU32(abPayload, 0);
1279 if sOpcode == 'DATA':
1280 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1281 if uCrc32 != (uMyCrc32 & 0xffffffff):
1282 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1283 % (hex(uMyCrc32), hex(uCrc32)));
1284 rc = None;
1285 break;
1286 if sOpcode == 'DATA EOF':
1287 rc = self.sendMsg('ACK');
1288 break;
1289
1290 # Finally, push the data to the file.
1291 try:
1292 if sys.version_info < (3, 9, 0):
1293 # Removed since Python 3.9.
1294 abData = abPayload[4:].tostring();
1295 else:
1296 abData = abPayload[4:].tobytes();
1297 oLocalFile.write(abData);
1298 except:
1299 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1300 rc = None;
1301 break;
1302 rc = self.sendMsg('ACK');
1303
1304 # Send NACK on validation and I/O errors.
1305 if rc is None:
1306 rc = self.sendMsg('NACK');
1307 rc = False;
1308 return rc;
1309
1310 def taskPackFile(self, sRemoteFile, sRemoteSource):
1311 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1312 if rc is True:
1313 rc = self.recvAckLogged('PKFILE');
1314 return rc;
1315
1316 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1317 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1318 if rc is True:
1319 rc = self.recvAckLogged('UNPKFILE');
1320 return rc;
1321
1322 # pylint: enable=missing-docstring
1323
1324
1325 #
1326 # Public methods - generic task queries
1327 #
1328
1329 def isSuccess(self):
1330 """Returns True if the task completed successfully, otherwise False."""
1331 self.lockTask();
1332 sStatus = self.sStatus;
1333 oTaskRc = self.oTaskRc;
1334 self.unlockTask();
1335 if sStatus != "":
1336 return False;
1337 if oTaskRc is False or oTaskRc is None:
1338 return False;
1339 return True;
1340
1341 def getResult(self):
1342 """
1343 Returns the result of a completed task.
1344 Returns None if not completed yet or no previous task.
1345 """
1346 self.lockTask();
1347 sStatus = self.sStatus;
1348 oTaskRc = self.oTaskRc;
1349 self.unlockTask();
1350 if sStatus != "":
1351 return None;
1352 return oTaskRc;
1353
1354 def getLastReply(self):
1355 """
1356 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1357 Returns a None, None, None three-tuple if there was no last reply.
1358 """
1359 self.lockTask();
1360 t3oReply = self.t3oReply;
1361 self.unlockTask();
1362 return t3oReply;
1363
1364 #
1365 # Public methods - connection.
1366 #
1367
1368 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1369 """
1370 Initiates a disconnect task.
1371
1372 Returns True on success, False on failure (logged).
1373
1374 The task returns True on success and False on failure.
1375 """
1376 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1377
1378 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1379 """Synchronous version."""
1380 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1381
1382 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1383 """
1384 Initiates a task for getting the TXS version information.
1385
1386 Returns True on success, False on failure (logged).
1387
1388 The task returns the version string on success and False on failure.
1389 """
1390 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1391
1392 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1393 """Synchronous version."""
1394 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1395
1396 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1397 """
1398 Initiates a task for getting the TXS UUID.
1399
1400 Returns True on success, False on failure (logged).
1401
1402 The task returns UUID string (in {}) on success and False on failure.
1403 """
1404 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1405
1406 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1407 """Synchronous version."""
1408 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1409
1410 #
1411 # Public methods - execution.
1412 #
1413
1414 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1415 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1416 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1417 """
1418 Initiates a exec process task.
1419
1420 Returns True on success, False on failure (logged).
1421
1422 The task returns True if the process exited normally with status code 0.
1423 The task returns None if on failure prior to executing the process, and
1424 False if the process exited with a different status or in an abnormal
1425 manner. Both None and False are logged of course and further info can
1426 also be obtained by getLastReply().
1427
1428 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1429 these streams. If None, no special action is taken and the output goes
1430 to where ever the TXS sends its output, and ditto for input.
1431 - To send to / read from the bitbucket, pass '/dev/null'.
1432 - To redirect to/from a file, just specify the remote filename.
1433 - To append to a file use '>>' followed by the remote filename.
1434 - To pipe the stream to/from the TXS, specify a file like
1435 object. For StdIn a non-blocking read() method is required. For
1436 the other a write() method is required. Watch out for deadlock
1437 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1438 """
1439 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1440 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1441 oStdOut, oStdErr, oTestPipe, sAsUser));
1442
1443 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1444 oStdIn = '/dev/null', oStdOut = '/dev/null',
1445 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1446 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1447 """Synchronous version."""
1448 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1449 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1450
1451 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1452 cMsTimeout = 3600000, fIgnoreErrors = False):
1453 """
1454 Initiates a exec process test task.
1455
1456 Returns True on success, False on failure (logged).
1457
1458 The task returns True if the process exited normally with status code 0.
1459 The task returns None if on failure prior to executing the process, and
1460 False if the process exited with a different status or in an abnormal
1461 manner. Both None and False are logged of course and further info can
1462 also be obtained by getLastReply().
1463
1464 Standard in is taken from /dev/null. While both standard output and
1465 standard error goes directly to reporter.log(). The testpipe is piped
1466 to reporter.xxxx.
1467 """
1468
1469 sStdIn = '/dev/null';
1470 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1471 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1472 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1473 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1474
1475 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1476 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1477
1478 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1479 cMsTimeout = 3600000, fIgnoreErrors = False):
1480 """Synchronous version."""
1481 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1482 cMsTimeout, fIgnoreErrors);
1483
1484 #
1485 # Public methods - system
1486 #
1487
1488 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1489 """
1490 Initiates a reboot task.
1491
1492 Returns True on success, False on failure (logged).
1493
1494 The task returns True on success, False on failure (logged). The
1495 session will be disconnected on successful task completion.
1496 """
1497 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1498
1499 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1500 """Synchronous version."""
1501 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1502
1503 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1504 """
1505 Initiates a shutdown task.
1506
1507 Returns True on success, False on failure (logged).
1508
1509 The task returns True on success, False on failure (logged).
1510 """
1511 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1512
1513 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1514 """Synchronous version."""
1515 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1516
1517
1518 #
1519 # Public methods - file system
1520 #
1521
1522 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1523 """
1524 Initiates a mkdir task.
1525
1526 Returns True on success, False on failure (logged).
1527
1528 The task returns True on success, False on failure (logged).
1529 """
1530 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1531
1532 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1533 """Synchronous version."""
1534 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1535
1536 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1537 """
1538 Initiates a mkdir -p task.
1539
1540 Returns True on success, False on failure (logged).
1541
1542 The task returns True on success, False on failure (logged).
1543 """
1544 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1545
1546 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1547 """Synchronous version."""
1548 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1549
1550 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1551 """
1552 Initiates a symlink task.
1553
1554 Returns True on success, False on failure (logged).
1555
1556 The task returns True on success, False on failure (logged).
1557 """
1558 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1559
1560 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1561 """Synchronous version."""
1562 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1563
1564 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1565 """
1566 Initiates a rmdir task.
1567
1568 Returns True on success, False on failure (logged).
1569
1570 The task returns True on success, False on failure (logged).
1571 """
1572 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1573
1574 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1575 """Synchronous version."""
1576 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1577
1578 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1579 """
1580 Initiates a rmfile task.
1581
1582 Returns True on success, False on failure (logged).
1583
1584 The task returns True on success, False on failure (logged).
1585 """
1586 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1587
1588 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1589 """Synchronous version."""
1590 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1591
1592 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1593 """
1594 Initiates a rmsymlink task.
1595
1596 Returns True on success, False on failure (logged).
1597
1598 The task returns True on success, False on failure (logged).
1599 """
1600 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1601
1602 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1603 """Synchronous version."""
1604 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1605
1606 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1607 """
1608 Initiates a rmtree task.
1609
1610 Returns True on success, False on failure (logged).
1611
1612 The task returns True on success, False on failure (logged).
1613 """
1614 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1615
1616 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1617 """Synchronous version."""
1618 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1619
1620 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1621 """
1622 Initiates a chmod task.
1623
1624 Returns True on success, False on failure (logged).
1625
1626 The task returns True on success, False on failure (logged).
1627 """
1628 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1629
1630 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1631 """Synchronous version."""
1632 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1633
1634 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1635 """
1636 Initiates a chown task.
1637
1638 Returns True on success, False on failure (logged).
1639
1640 The task returns True on success, False on failure (logged).
1641 """
1642 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1643
1644 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1645 """Synchronous version."""
1646 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1647
1648 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1649 """
1650 Initiates a is-dir query task.
1651
1652 Returns True on success, False on failure (logged).
1653
1654 The task returns True if it's a directory, False if it isn't, and
1655 None on error (logged).
1656 """
1657 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1658
1659 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1660 """Synchronous version."""
1661 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1662
1663 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1664 """
1665 Initiates a is-file query task.
1666
1667 Returns True on success, False on failure (logged).
1668
1669 The task returns True if it's a file, False if it isn't, and None on
1670 error (logged).
1671 """
1672 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1673
1674 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1675 """Synchronous version."""
1676 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1677
1678 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1679 """
1680 Initiates a is-symbolic-link query task.
1681
1682 Returns True on success, False on failure (logged).
1683
1684 The task returns True if it's a symbolic linke, False if it isn't, and
1685 None on error (logged).
1686 """
1687 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1688
1689 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1690 """Synchronous version."""
1691 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1692
1693 #def "STAT "
1694 #def "LSTAT "
1695 #def "LIST "
1696
1697 @staticmethod
1698 def calcFileXferTimeout(cbFile):
1699 """
1700 Calculates a reasonable timeout for an upload/download given the file size.
1701
1702 Returns timeout in milliseconds.
1703 """
1704 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1705
1706 @staticmethod
1707 def calcUploadTimeout(sLocalFile):
1708 """
1709 Calculates a reasonable timeout for an upload given the file (will stat it).
1710
1711 Returns timeout in milliseconds.
1712 """
1713 try: cbFile = os.path.getsize(sLocalFile);
1714 except: cbFile = 1024*1024;
1715 return Session.calcFileXferTimeout(cbFile);
1716
1717 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1718 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1719 """
1720 Initiates a download query task.
1721
1722 Returns True on success, False on failure (logged).
1723
1724 The task returns True on success, False on failure (logged).
1725 """
1726 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1727 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1728
1729 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1730 """Synchronous version."""
1731 if cMsTimeout <= 0:
1732 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1733 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1734
1735 def asyncUploadString(self, sContent, sRemoteFile,
1736 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1737 """
1738 Initiates a upload string task.
1739
1740 Returns True on success, False on failure (logged).
1741
1742 The task returns True on success, False on failure (logged).
1743 """
1744 if cMsTimeout <= 0:
1745 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1746 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1747 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1748
1749 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1750 """Synchronous version."""
1751 if cMsTimeout <= 0:
1752 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1753 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1754
1755 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1756 """
1757 Initiates a download file task.
1758
1759 Returns True on success, False on failure (logged).
1760
1761 The task returns True on success, False on failure (logged).
1762 """
1763 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1764
1765 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1766 """Synchronous version."""
1767 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1768
1769 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1770 cMsTimeout = 30000, fIgnoreErrors = False):
1771 """
1772 Initiates a download string task.
1773
1774 Returns True on success, False on failure (logged).
1775
1776 The task returns a byte string on success, False on failure (logged).
1777 """
1778 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1779 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1780
1781 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1782 cMsTimeout = 30000, fIgnoreErrors = False):
1783 """Synchronous version."""
1784 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1785 cMsTimeout, fIgnoreErrors);
1786
1787 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1788 """
1789 Initiates a packing file/directory task.
1790
1791 Returns True on success, False on failure (logged).
1792
1793 The task returns True on success, False on failure (logged).
1794 """
1795 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1796 (sRemoteFile, sRemoteSource));
1797
1798 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1799 """Synchronous version."""
1800 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1801
1802 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1803 """
1804 Initiates a unpack file task.
1805
1806 Returns True on success, False on failure (logged).
1807
1808 The task returns True on success, False on failure (logged).
1809 """
1810 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1811 (sRemoteFile, sRemoteDir));
1812
1813 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1814 """Synchronous version."""
1815 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1816
1817
1818class TransportTcp(TransportBase):
1819 """
1820 TCP transport layer for the TXS client session class.
1821 """
1822
1823 def __init__(self, sHostname, uPort, fReversedSetup):
1824 """
1825 Save the parameters. The session will call us back to make the
1826 connection later on its worker thread.
1827 """
1828 TransportBase.__init__(self, utils.getCallerName());
1829 self.sHostname = sHostname;
1830 self.fReversedSetup = fReversedSetup;
1831 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1832 self.oSocket = None;
1833 self.oWakeupW = None;
1834 self.oWakeupR = None;
1835 self.fConnectCanceled = False;
1836 self.fIsConnecting = False;
1837 self.oCv = threading.Condition();
1838 self.abReadAhead = array.array('B');
1839
1840 def toString(self):
1841 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1842 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1843 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1844 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1845
1846 def __isInProgressXcpt(self, oXcpt):
1847 """ In progress exception? """
1848 try:
1849 if isinstance(oXcpt, socket.error):
1850 try:
1851 if oXcpt.errno == errno.EINPROGRESS:
1852 return True;
1853 except: pass;
1854 # Windows?
1855 try:
1856 if oXcpt.errno == errno.EWOULDBLOCK:
1857 return True;
1858 except: pass;
1859 except:
1860 pass;
1861 return False;
1862
1863 def __isWouldBlockXcpt(self, oXcpt):
1864 """ Would block exception? """
1865 try:
1866 if isinstance(oXcpt, socket.error):
1867 try:
1868 if oXcpt.errno == errno.EWOULDBLOCK:
1869 return True;
1870 except: pass;
1871 try:
1872 if oXcpt.errno == errno.EAGAIN:
1873 return True;
1874 except: pass;
1875 except:
1876 pass;
1877 return False;
1878
1879 def __isConnectionReset(self, oXcpt):
1880 """ Connection reset by Peer or others. """
1881 try:
1882 if isinstance(oXcpt, socket.error):
1883 try:
1884 if oXcpt.errno == errno.ECONNRESET:
1885 return True;
1886 except: pass;
1887 try:
1888 if oXcpt.errno == errno.ENETRESET:
1889 return True;
1890 except: pass;
1891 except:
1892 pass;
1893 return False;
1894
1895 def _closeWakeupSockets(self):
1896 """ Closes the wakup sockets. Caller should own the CV. """
1897 oWakeupR = self.oWakeupR;
1898 self.oWakeupR = None;
1899 if oWakeupR is not None:
1900 oWakeupR.close();
1901
1902 oWakeupW = self.oWakeupW;
1903 self.oWakeupW = None;
1904 if oWakeupW is not None:
1905 oWakeupW.close();
1906
1907 return None;
1908
1909 def cancelConnect(self):
1910 # This is bad stuff.
1911 self.oCv.acquire();
1912 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1913 self.fConnectCanceled = True;
1914 if self.fIsConnecting:
1915 oSocket = self.oSocket;
1916 self.oSocket = None;
1917 if oSocket is not None:
1918 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1919 oSocket.close();
1920
1921 oWakeupW = self.oWakeupW;
1922 self.oWakeupW = None;
1923 if oWakeupW is not None:
1924 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1925 try: oWakeupW.send(b'cancelled!\n');
1926 except: reporter.logXcpt();
1927 try: oWakeupW.shutdown(socket.SHUT_WR);
1928 except: reporter.logXcpt();
1929 oWakeupW.close();
1930 self.oCv.release();
1931
1932 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1933 """ Connects to the TXS server as server, i.e. the reversed setup. """
1934 assert(self.fReversedSetup);
1935
1936 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1937
1938 # Workaround for bind() failure...
1939 try:
1940 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1941 except:
1942 reporter.errorXcpt('socket.listen(1) failed');
1943 return None;
1944
1945 # Bind the socket and make it listen.
1946 try:
1947 oSocket.bind((self.sHostname, self.uPort));
1948 except:
1949 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1950 return None;
1951 try:
1952 oSocket.listen(1);
1953 except:
1954 reporter.errorXcpt('socket.listen(1) failed');
1955 return None;
1956
1957 # Accept connections.
1958 oClientSocket = None;
1959 tClientAddr = None;
1960 try:
1961 (oClientSocket, tClientAddr) = oSocket.accept();
1962 except socket.error as e:
1963 if not self.__isInProgressXcpt(e):
1964 raise;
1965
1966 # Do the actual waiting.
1967 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1968 try:
1969 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1970 except socket.error as oXctp:
1971 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
1972 raise;
1973 reporter.log('socket.select() on accept was canceled');
1974 return None;
1975 except:
1976 reporter.logXcpt('socket.select() on accept');
1977
1978 # Try accept again.
1979 try:
1980 (oClientSocket, tClientAddr) = oSocket.accept();
1981 except socket.error as oXcpt:
1982 if not self.__isInProgressXcpt(e):
1983 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
1984 raise;
1985 reporter.log('socket.accept() was canceled');
1986 return None;
1987 reporter.log('socket.accept() timed out');
1988 return False;
1989 except:
1990 reporter.errorXcpt('socket.accept() failed');
1991 return None;
1992 except:
1993 reporter.errorXcpt('socket.accept() failed');
1994 return None;
1995
1996 # Store the connected socket and throw away the server socket.
1997 self.oCv.acquire();
1998 if not self.fConnectCanceled:
1999 self.oSocket.close();
2000 self.oSocket = oClientSocket;
2001 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
2002 self.oCv.release();
2003 return True;
2004
2005 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
2006 """ Connects to the TXS server as client. """
2007 assert(not self.fReversedSetup);
2008
2009 # Connect w/ timeouts.
2010 rc = None;
2011 try:
2012 oSocket.connect((self.sHostname, self.uPort));
2013 rc = True;
2014 except socket.error as oXcpt:
2015 iRc = oXcpt.errno;
2016 if self.__isInProgressXcpt(oXcpt):
2017 # Do the actual waiting.
2018 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
2019 try:
2020 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2021 if len(ttRc[1]) + len(ttRc[2]) == 0:
2022 raise socket.error(errno.ETIMEDOUT, 'select timed out');
2023 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
2024 rc = iRc == 0;
2025 except socket.error as oXcpt2:
2026 iRc = oXcpt2.errno;
2027 except:
2028 iRc = -42;
2029 reporter.fatalXcpt('socket.select() on connect failed');
2030
2031 if rc is True:
2032 pass;
2033 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2034 rc = False; # try again.
2035 else:
2036 if iRc != errno.EBADF or not self.fConnectCanceled:
2037 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2038 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2039 except:
2040 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2041 return rc;
2042
2043
2044 def connect(self, cMsTimeout):
2045 # Create a non-blocking socket.
2046 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2047 try:
2048 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2049 except:
2050 reporter.fatalXcpt('socket.socket() failed');
2051 return None;
2052 try:
2053 oSocket.setblocking(0);
2054 except:
2055 oSocket.close();
2056 reporter.fatalXcpt('socket.socket() failed');
2057 return None;
2058
2059 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2060 oWakeupR = None;
2061 oWakeupW = None;
2062 if hasattr(socket, 'socketpair'):
2063 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2064 except: reporter.logXcpt('socket.socketpair() failed');
2065
2066 # Update the state.
2067 self.oCv.acquire();
2068 rc = None;
2069 if not self.fConnectCanceled:
2070 self.oSocket = oSocket;
2071 self.oWakeupW = oWakeupW;
2072 self.oWakeupR = oWakeupR;
2073 self.fIsConnecting = True;
2074 self.oCv.release();
2075
2076 # Try connect.
2077 if oWakeupR is None:
2078 oWakeupR = oSocket; # Avoid select failure.
2079 if self.fReversedSetup:
2080 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2081 else:
2082 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2083 oSocket = None;
2084
2085 # Update the state and cleanup on failure/cancel.
2086 self.oCv.acquire();
2087 if rc is True and self.fConnectCanceled:
2088 rc = False;
2089 self.fIsConnecting = False;
2090
2091 if rc is not True:
2092 if self.oSocket is not None:
2093 self.oSocket.close();
2094 self.oSocket = None;
2095 self._closeWakeupSockets();
2096 self.oCv.release();
2097
2098 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2099 return rc;
2100
2101 def disconnect(self, fQuiet = False):
2102 if self.oSocket is not None:
2103 self.abReadAhead = array.array('B');
2104
2105 # Try a shutting down the socket gracefully (draining it).
2106 try:
2107 self.oSocket.shutdown(socket.SHUT_WR);
2108 except:
2109 if not fQuiet:
2110 reporter.error('shutdown(SHUT_WR)');
2111 try:
2112 self.oSocket.setblocking(0); # just in case it's not set.
2113 sData = "1";
2114 while sData:
2115 sData = self.oSocket.recv(16384);
2116 except:
2117 pass;
2118
2119 # Close it.
2120 self.oCv.acquire();
2121 try: self.oSocket.setblocking(1);
2122 except: pass;
2123 self.oSocket.close();
2124 self.oSocket = None;
2125 else:
2126 self.oCv.acquire();
2127 self._closeWakeupSockets();
2128 self.oCv.release();
2129
2130 def sendBytes(self, abBuf, cMsTimeout):
2131 if self.oSocket is None:
2132 reporter.error('TransportTcp.sendBytes: No connection.');
2133 return False;
2134
2135 # Try send it all.
2136 try:
2137 cbSent = self.oSocket.send(abBuf);
2138 if cbSent == len(abBuf):
2139 return True;
2140 except Exception as oXcpt:
2141 if not self.__isWouldBlockXcpt(oXcpt):
2142 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2143 return False;
2144 cbSent = 0;
2145
2146 # Do a timed send.
2147 msStart = base.timestampMilli();
2148 while True:
2149 cMsElapsed = base.timestampMilli() - msStart;
2150 if cMsElapsed > cMsTimeout:
2151 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2152 break;
2153
2154 # wait.
2155 try:
2156 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2157 if ttRc[2] and not ttRc[1]:
2158 reporter.error('TranportTcp.sendBytes: select returned with exception');
2159 break;
2160 if not ttRc[1]:
2161 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2162 break;
2163 except:
2164 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2165 break;
2166
2167 # Try send more.
2168 try:
2169 cbSent += self.oSocket.send(abBuf[cbSent:]);
2170 if cbSent == len(abBuf):
2171 return True;
2172 except Exception as oXcpt:
2173 if not self.__isWouldBlockXcpt(oXcpt):
2174 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2175 break;
2176
2177 return False;
2178
2179 def __returnReadAheadBytes(self, cb):
2180 """ Internal worker for recvBytes. """
2181 assert(len(self.abReadAhead) >= cb);
2182 abRet = self.abReadAhead[:cb];
2183 self.abReadAhead = self.abReadAhead[cb:];
2184 return abRet;
2185
2186 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2187 if self.oSocket is None:
2188 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2189 return None;
2190
2191 # Try read in some more data without bothering with timeout handling first.
2192 if len(self.abReadAhead) < cb:
2193 try:
2194 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2195 if abBuf:
2196 self.abReadAhead.extend(array.array('B', abBuf));
2197 except Exception as oXcpt:
2198 if not self.__isWouldBlockXcpt(oXcpt):
2199 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2200 return None;
2201
2202 if len(self.abReadAhead) >= cb:
2203 return self.__returnReadAheadBytes(cb);
2204
2205 # Timeout loop.
2206 msStart = base.timestampMilli();
2207 while True:
2208 cMsElapsed = base.timestampMilli() - msStart;
2209 if cMsElapsed > cMsTimeout:
2210 if not fNoDataOk or self.abReadAhead:
2211 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2212 break;
2213
2214 # Wait.
2215 try:
2216 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2217 if ttRc[2] and not ttRc[0]:
2218 reporter.error('TranportTcp.recvBytes: select returned with exception');
2219 break;
2220 if not ttRc[0]:
2221 if not fNoDataOk or self.abReadAhead:
2222 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2223 % (len(self.abReadAhead), cb, fNoDataOk));
2224 break;
2225 except:
2226 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2227 break;
2228
2229 # Try read more.
2230 try:
2231 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2232 if not abBuf:
2233 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2234 % (len(self.abReadAhead), cb, fNoDataOk));
2235 self.disconnect();
2236 return None;
2237
2238 self.abReadAhead.extend(array.array('B', abBuf));
2239
2240 except Exception as oXcpt:
2241 reporter.log('recv => exception %s' % (oXcpt,));
2242 if not self.__isWouldBlockXcpt(oXcpt):
2243 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2244 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2245 break;
2246
2247 # Done?
2248 if len(self.abReadAhead) >= cb:
2249 return self.__returnReadAheadBytes(cb);
2250
2251 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2252 return None;
2253
2254 def isConnectionOk(self):
2255 if self.oSocket is None:
2256 return False;
2257 try:
2258 ttRc = select.select([], [], [self.oSocket], 0.0);
2259 if ttRc[2]:
2260 return False;
2261
2262 self.oSocket.send(array.array('B')); # send zero bytes.
2263 except:
2264 return False;
2265 return True;
2266
2267 def isRecvPending(self, cMsTimeout = 0):
2268 try:
2269 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2270 if not ttRc[0]:
2271 return False;
2272 except:
2273 pass;
2274 return True;
2275
2276
2277def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2278 """
2279 Opens a connection to a Test Execution Service via TCP, given its name.
2280
2281 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2282 or similar.
2283 """
2284 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2285 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2286 try:
2287 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2288 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2289 except:
2290 reporter.errorXcpt(None, 15);
2291 return None;
2292 return oSession;
2293
2294
2295def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2296 """
2297 Tries to open a connection to a Test Execution Service via TCP, given its name.
2298
2299 This differs from openTcpSession in that it won't log a connection failure
2300 as an error.
2301 """
2302 try:
2303 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2304 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2305 except:
2306 reporter.errorXcpt(None, 15);
2307 return None;
2308 return oSession;
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette