VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testboxscript/testboxupgrade.py@ 95512

Last change on this file since 95512 was 94125, checked in by vboxsync, 3 years ago

ValKit/testboxscript: pylint 2.9.6 adjustments (mostly about using 'with' statements).

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 11.7 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: testboxupgrade.py 94125 2022-03-08 14:15:09Z vboxsync $
3
4"""
5TestBox Script - Upgrade from local file ZIP.
6"""
7
8__copyright__ = \
9"""
10Copyright (C) 2012-2022 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 94125 $"
30
31# Standard python imports.
32import os
33import shutil
34import sys
35import subprocess
36import threading
37import time
38import uuid;
39import zipfile
40
41# Validation Kit imports.
42from common import utils;
43import testboxcommons
44from testboxscript import TBS_EXITCODE_SYNTAX;
45
46# Figure where we are.
47try: __file__
48except: __file__ = sys.argv[0];
49g_ksTestScriptDir = os.path.dirname(os.path.abspath(__file__));
50g_ksValidationKitDir = os.path.dirname(g_ksTestScriptDir);
51
52
53def _doUpgradeThreadProc(oStdOut, asBuf):
54 """Thread procedure for the upgrade test drive."""
55 asBuf.append(oStdOut.read());
56 return True;
57
58
59def _doUpgradeCheckZip(oZip):
60 """
61 Check that the essential files are there.
62 Returns list of members on success, None on failure.
63 """
64 asMembers = oZip.namelist();
65 if ('testboxscript/testboxscript/testboxscript.py' not in asMembers) \
66 or ('testboxscript/testboxscript/testboxscript_real.py' not in asMembers):
67 testboxcommons.log('Missing one or both testboxscripts (members: %s)' % (asMembers,));
68 return None;
69
70 for sMember in asMembers:
71 if not sMember.startswith('testboxscript/'):
72 testboxcommons.log('zip file contains member outside testboxscript/: "%s"' % (sMember,));
73 return None;
74 if sMember.find('/../') > 0 or sMember.endswith('/..'):
75 testboxcommons.log('zip file contains member with escape sequence: "%s"' % (sMember,));
76 return None;
77
78 return asMembers;
79
80def _doUpgradeUnzipAndCheck(oZip, sUpgradeDir, asMembers):
81 """
82 Unzips the files into sUpdateDir, does chmod(755) on all files and
83 checks that there are no symlinks or special files.
84 Returns True/False.
85 """
86 #
87 # Extract the files.
88 #
89 if os.path.exists(sUpgradeDir):
90 shutil.rmtree(sUpgradeDir);
91 for sMember in asMembers:
92 if sMember.endswith('/'):
93 os.makedirs(os.path.join(sUpgradeDir, sMember.replace('/', os.path.sep)), 0o775);
94 else:
95 oZip.extract(sMember, sUpgradeDir);
96
97 #
98 # Make all files executable and make sure only owner can write to them.
99 # While at it, also check that there are only files and directory, no
100 # symbolic links or special stuff.
101 #
102 for sMember in asMembers:
103 sFull = os.path.join(sUpgradeDir, sMember);
104 if sMember.endswith('/'):
105 if not os.path.isdir(sFull):
106 testboxcommons.log('Not directory: "%s"' % sFull);
107 return False;
108 else:
109 if not os.path.isfile(sFull):
110 testboxcommons.log('Not regular file: "%s"' % sFull);
111 return False;
112 try:
113 os.chmod(sFull, 0o755);
114 except Exception as oXcpt:
115 testboxcommons.log('warning chmod error on %s: %s' % (sFull, oXcpt));
116 return True;
117
118def _doUpgradeTestRun(sUpgradeDir):
119 """
120 Do a testrun of the new script, to make sure it doesn't fail with
121 to run in any way because of old python, missing import or generally
122 busted upgrade.
123 Returns True/False.
124 """
125 asArgs = [os.path.join(sUpgradeDir, 'testboxscript', 'testboxscript', 'testboxscript.py'), '--version' ];
126 testboxcommons.log('Testing the new testbox script (%s)...' % (asArgs[0],));
127 if sys.executable:
128 asArgs.insert(0, sys.executable);
129 oChild = subprocess.Popen(asArgs, shell = False, # pylint: disable=consider-using-with
130 stdout=subprocess.PIPE, stderr=subprocess.STDOUT);
131
132 asBuf = []
133 oThread = threading.Thread(target=_doUpgradeThreadProc, args=(oChild.stdout, asBuf));
134 oThread.daemon = True;
135 oThread.start();
136 oThread.join(30);
137
138 # Give child up to 5 seconds to terminate after producing output.
139 if sys.version_info[0] >= 3 and sys.version_info[1] >= 3:
140 oChild.wait(5); # pylint: disable=too-many-function-args
141 else:
142 for _ in range(50):
143 iStatus = oChild.poll();
144 if iStatus is None:
145 break;
146 time.sleep(0.1);
147 iStatus = oChild.poll();
148 if iStatus is None:
149 testboxcommons.log('Checking the new testboxscript timed out.');
150 oChild.terminate();
151 oThread.join(5);
152 return False;
153 if iStatus is not TBS_EXITCODE_SYNTAX:
154 testboxcommons.log('The new testboxscript returned %d instead of %d during check.' \
155 % (iStatus, TBS_EXITCODE_SYNTAX));
156 return False;
157
158 sOutput = b''.join(asBuf).decode('utf-8');
159 sOutput = sOutput.strip();
160 try:
161 iNewVersion = int(sOutput);
162 except:
163 testboxcommons.log('The new testboxscript returned an unparseable version string: "%s"!' % (sOutput,));
164 return False;
165 testboxcommons.log('New script version: %s' % (iNewVersion,));
166 return True;
167
168def _doUpgradeApply(sUpgradeDir, asMembers):
169 """
170 # Apply the directories and files from the upgrade.
171 returns True/False/Exception.
172 """
173
174 #
175 # Create directories first since that's least intrusive.
176 #
177 for sMember in asMembers:
178 if sMember[-1] == '/':
179 sMember = sMember[len('testboxscript/'):];
180 if sMember != '':
181 sFull = os.path.join(g_ksValidationKitDir, sMember);
182 if not os.path.isdir(sFull):
183 os.makedirs(sFull, 0o755);
184
185 #
186 # Move the files into place.
187 #
188 fRc = True;
189 asOldFiles = [];
190 for sMember in asMembers:
191 if sMember[-1] != '/':
192 sSrc = os.path.join(sUpgradeDir, sMember);
193 sDst = os.path.join(g_ksValidationKitDir, sMember[len('testboxscript/'):]);
194
195 # Move the old file out of the way first.
196 sDstRm = None;
197 if os.path.exists(sDst):
198 testboxcommons.log2('Info: Installing "%s"' % (sDst,));
199 sDstRm = '%s-delete-me-%s' % (sDst, uuid.uuid4(),);
200 try:
201 os.rename(sDst, sDstRm);
202 except Exception as oXcpt:
203 testboxcommons.log('Error: failed to rename (old) "%s" to "%s": %s' % (sDst, sDstRm, oXcpt));
204 try:
205 shutil.copy(sDst, sDstRm);
206 except Exception as oXcpt:
207 testboxcommons.log('Error: failed to copy (old) "%s" to "%s": %s' % (sDst, sDstRm, oXcpt));
208 break;
209 try:
210 os.unlink(sDst);
211 except Exception as oXcpt:
212 testboxcommons.log('Error: failed to unlink (old) "%s": %s' % (sDst, oXcpt));
213 break;
214
215 # Move/copy the new one into place.
216 testboxcommons.log2('Info: Installing "%s"' % (sDst,));
217 try:
218 os.rename(sSrc, sDst);
219 except Exception as oXcpt:
220 testboxcommons.log('Warning: failed to rename (new) "%s" to "%s": %s' % (sSrc, sDst, oXcpt));
221 try:
222 shutil.copy(sSrc, sDst);
223 except:
224 testboxcommons.log('Error: failed to copy (new) "%s" to "%s": %s' % (sSrc, sDst, oXcpt));
225 fRc = False;
226 break;
227
228 #
229 # Roll back on failure.
230 #
231 if fRc is not True:
232 testboxcommons.log('Attempting to roll back old files...');
233 for sDstRm in asOldFiles:
234 sDst = sDstRm[:sDstRm.rfind('-delete-me')];
235 testboxcommons.log2('Info: Rolling back "%s" (%s)' % (sDst, os.path.basename(sDstRm)));
236 try:
237 shutil.move(sDstRm, sDst);
238 except:
239 testboxcommons.log('Error: failed to rollback "%s" onto "%s": %s' % (sDstRm, sDst, oXcpt));
240 return False;
241 return True;
242
243def _doUpgradeRemoveOldStuff(sUpgradeDir, asMembers):
244 """
245 Clean up all obsolete files and directories.
246 Returns True (shouldn't fail or raise any exceptions).
247 """
248
249 try:
250 shutil.rmtree(sUpgradeDir, ignore_errors = True);
251 except:
252 pass;
253
254 asKnownFiles = [];
255 asKnownDirs = [];
256 for sMember in asMembers:
257 sMember = sMember[len('testboxscript/'):];
258 if sMember == '':
259 continue;
260 if sMember[-1] == '/':
261 asKnownDirs.append(os.path.normpath(os.path.join(g_ksValidationKitDir, sMember[:-1])));
262 else:
263 asKnownFiles.append(os.path.normpath(os.path.join(g_ksValidationKitDir, sMember)));
264
265 for sDirPath, asDirs, asFiles in os.walk(g_ksValidationKitDir, topdown=False):
266 for sDir in asDirs:
267 sFull = os.path.normpath(os.path.join(sDirPath, sDir));
268 if sFull not in asKnownDirs:
269 testboxcommons.log2('Info: Removing obsolete directory "%s"' % (sFull,));
270 try:
271 os.rmdir(sFull);
272 except Exception as oXcpt:
273 testboxcommons.log('Warning: failed to rmdir obsolete dir "%s": %s' % (sFull, oXcpt));
274
275 for sFile in asFiles:
276 sFull = os.path.normpath(os.path.join(sDirPath, sFile));
277 if sFull not in asKnownFiles:
278 testboxcommons.log2('Info: Removing obsolete file "%s"' % (sFull,));
279 try:
280 os.unlink(sFull);
281 except Exception as oXcpt:
282 testboxcommons.log('Warning: failed to unlink obsolete file "%s": %s' % (sFull, oXcpt));
283 return True;
284
285def upgradeFromZip(sZipFile):
286 """
287 Upgrade the testboxscript install using the specified zip file.
288 Returns True/False.
289 """
290
291 # A little precaution.
292 if utils.isRunningFromCheckout():
293 testboxcommons.log('Use "svn up" to "upgrade" your source tree!');
294 return False;
295
296 #
297 # Prepare.
298 #
299 # Note! Don't bother cleaning up files and dirs in the error paths,
300 # they'll be restricted to the one zip and the one upgrade dir.
301 # We'll remove them next time we upgrade.
302 #
303 oZip = zipfile.ZipFile(sZipFile, 'r'); # No 'with' support in 2.6 class: pylint: disable=consider-using-with
304 asMembers = _doUpgradeCheckZip(oZip);
305 if asMembers is None:
306 return False;
307
308 sUpgradeDir = os.path.join(g_ksTestScriptDir, 'upgrade');
309 testboxcommons.log('Unzipping "%s" to "%s"...' % (sZipFile, sUpgradeDir));
310 if _doUpgradeUnzipAndCheck(oZip, sUpgradeDir, asMembers) is not True:
311 return False;
312 oZip.close();
313
314 if _doUpgradeTestRun(sUpgradeDir) is not True:
315 return False;
316
317 #
318 # Execute.
319 #
320 if _doUpgradeApply(sUpgradeDir, asMembers) is not True:
321 return False;
322 _doUpgradeRemoveOldStuff(sUpgradeDir, asMembers);
323 return True;
324
325
326# For testing purposes.
327if __name__ == '__main__':
328 sys.exit(upgradeFromZip(sys.argv[1]));
329
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette