VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/useraccount.py@ 63694

Last change on this file since 63694 was 62484, checked in by vboxsync, 8 years ago

(C) 2016

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 10.3 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: useraccount.py 62484 2016-07-22 18:35:33Z vboxsync $
3
4"""
5Test Manager - User DB records management.
6"""
7
8__copyright__ = \
9"""
10Copyright (C) 2012-2016 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 62484 $"
30
31
32# Standard python imports.
33import unittest;
34
35# Validation Kit imports.
36from testmanager import config;
37from testmanager.core.base import ModelDataBase, ModelLogicBase, ModelDataBaseTestCase, TMTooManyRows, TMRowNotFound;
38
39
40class UserAccountData(ModelDataBase):
41 """
42 User account data
43 """
44
45 ksIdAttr = 'uid';
46
47 ksParam_uid = 'UserAccount_uid'
48 ksParam_tsExpire = 'UserAccount_tsExpire'
49 ksParam_tsEffective = 'UserAccount_tsEffective'
50 ksParam_uidAuthor = 'UserAccount_uidAuthor'
51 ksParam_sLoginName = 'UserAccount_sLoginName'
52 ksParam_sUsername = 'UserAccount_sUsername'
53 ksParam_sEmail = 'UserAccount_sEmail'
54 ksParam_sFullName = 'UserAccount_sFullName'
55
56 kasAllowNullAttributes = ['uid', 'tsEffective', 'tsExpire', 'uidAuthor'];
57
58
59 def __init__(self):
60 """Init parameters"""
61 ModelDataBase.__init__(self);
62 self.uid = None;
63 self.tsEffective = None;
64 self.tsExpire = None;
65 self.uidAuthor = None;
66 self.sUsername = None;
67 self.sEmail = None;
68 self.sFullName = None;
69 self.sLoginName = None;
70
71 def initFromDbRow(self, aoRow):
72 """
73 Init from database table row
74 Returns self. Raises exception of the row is None.
75 """
76 if aoRow is None:
77 raise TMRowNotFound('User not found.');
78
79 self.uid = aoRow[0];
80 self.tsEffective = aoRow[1];
81 self.tsExpire = aoRow[2];
82 self.uidAuthor = aoRow[3];
83 self.sUsername = aoRow[4];
84 self.sEmail = aoRow[5];
85 self.sFullName = aoRow[6];
86 self.sLoginName = aoRow[7];
87 return self;
88
89 def initFromDbWithId(self, oDb, uid, tsNow = None, sPeriodBack = None):
90 """
91 Initialize the object from the database.
92 """
93 oDb.execute(self.formatSimpleNowAndPeriodQuery(oDb,
94 'SELECT *\n'
95 'FROM Users\n'
96 'WHERE uid = %s\n'
97 , ( uid, ), tsNow, sPeriodBack));
98 aoRow = oDb.fetchOne()
99 if aoRow is None:
100 raise TMRowNotFound('uid=%s not found (tsNow=%s sPeriodBack=%s)' % (uid, tsNow, sPeriodBack,));
101 return self.initFromDbRow(aoRow);
102
103 def _validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb):
104 # Custom handling of the email field.
105 if sAttr == 'sEmail':
106 return ModelDataBase.validateEmail(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull);
107
108 # Automatically lowercase the login name if we're supposed to do case
109 # insensitive matching. (The feature assumes lower case in DB.)
110 if sAttr == 'sLoginName' and oValue is not None and config.g_kfLoginNameCaseInsensitive:
111 oValue = oValue.lower();
112
113 return ModelDataBase._validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb);
114
115
116class UserAccountLogic(ModelLogicBase):
117 """
118 User account logic (for the Users table).
119 """
120
121 def __init__(self, oDb):
122 ModelLogicBase.__init__(self, oDb)
123 self.dCache = None;
124
125 def fetchForListing(self, iStart, cMaxRows, tsNow):
126 """
127 Fetches user accounts.
128
129 Returns an array (list) of UserAccountData items, empty list if none.
130 Raises exception on error.
131 """
132 if tsNow is None:
133 self._oDb.execute('SELECT *\n'
134 'FROM Users\n'
135 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n'
136 'ORDER BY sUsername DESC\n'
137 'LIMIT %s OFFSET %s\n'
138 , (cMaxRows, iStart,));
139 else:
140 self._oDb.execute('SELECT *\n'
141 'FROM Users\n'
142 'WHERE tsExpire > %s\n'
143 ' AND tsEffective <= %s\n'
144 'ORDER BY sUsername DESC\n'
145 'LIMIT %s OFFSET %s\n'
146 , (tsNow, tsNow, cMaxRows, iStart,));
147
148 aoRows = [];
149 for _ in range(self._oDb.getRowCount()):
150 aoRows.append(UserAccountData().initFromDbRow(self._oDb.fetchOne()));
151 return aoRows;
152
153 def addEntry(self, oData, uidAuthor, fCommit = False):
154 """
155 Add user account entry to the DB.
156 """
157 self._oDb.callProc('UserAccountLogic_addEntry',
158 (uidAuthor, oData.sUsername, oData.sEmail, oData.sFullName, oData.sLoginName,));
159 self._oDb.maybeCommit(fCommit);
160 return True;
161
162 def editEntry(self, oData, uidAuthor, fCommit = False):
163 """
164 Modify user account.
165 """
166 self._oDb.callProc('UserAccountLogic_editEntry',
167 (uidAuthor, oData.uid, oData.sUsername, oData.sEmail, oData.sFullName, oData.sLoginName,));
168 self._oDb.maybeCommit(fCommit);
169 return True;
170
171 def removeEntry(self, uidAuthor, uid, fCascade = False, fCommit = False):
172 """
173 Delete user account
174 """
175 self._oDb.callProc('UserAccountLogic_delEntry', (uidAuthor, uid));
176 self._oDb.maybeCommit(fCommit);
177 _ = fCascade;
178 return True;
179
180 def _getByField(self, sField, sValue):
181 """
182 Get user account record by its field value
183 """
184 self._oDb.execute('SELECT *\n'
185 'FROM Users\n'
186 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n'
187 ' AND ' + sField + ' = %s'
188 , (sValue,))
189
190 aRows = self._oDb.fetchAll()
191 if len(aRows) not in (0, 1):
192 raise TMTooManyRows('Found more than one user account with the same credentials. Database structure is corrupted.')
193
194 try:
195 return aRows[0]
196 except IndexError:
197 return []
198
199 def getById(self, idUserId):
200 """
201 Get user account information by ID.
202 """
203 return self._getByField('uid', idUserId)
204
205 def tryFetchAccountByLoginName(self, sLoginName):
206 """
207 Try get user account information by login name.
208
209 Returns UserAccountData if found, None if not.
210 Raises exception on DB error.
211 """
212 if config.g_kfLoginNameCaseInsensitive:
213 sLoginName = sLoginName.lower();
214
215 self._oDb.execute('SELECT *\n'
216 'FROM Users\n'
217 'WHERE sLoginName = %s\n'
218 ' AND tsExpire = \'infinity\'::TIMESTAMP\n'
219 , (sLoginName, ));
220 if self._oDb.getRowCount() != 1:
221 if self._oDb.getRowCount() != 0:
222 raise self._oDb.integrityException('%u rows in Users with sLoginName="%s"'
223 % (self._oDb.getRowCount(), sLoginName));
224 return None;
225 return UserAccountData().initFromDbRow(self._oDb.fetchOne());
226
227 def cachedLookup(self, uid):
228 """
229 Looks up the current UserAccountData object for uid via an object cache.
230
231 Returns a shared UserAccountData object. None if not found.
232 Raises exception on DB error.
233 """
234 if self.dCache is None:
235 self.dCache = self._oDb.getCache('UserAccount');
236
237 oUser = self.dCache.get(uid, None);
238 if oUser is None:
239 self._oDb.execute('SELECT *\n'
240 'FROM Users\n'
241 'WHERE uid = %s\n'
242 ' AND tsExpire = \'infinity\'::TIMESTAMP\n'
243 , (uid, ));
244 if self._oDb.getRowCount() == 0:
245 # Maybe it was deleted, try get the last entry.
246 self._oDb.execute('SELECT *\n'
247 'FROM Users\n'
248 'WHERE uid = %s\n'
249 'ORDER BY tsExpire DESC\n'
250 'LIMIT 1\n'
251 , (uid, ));
252 elif self._oDb.getRowCount() > 1:
253 raise self._oDb.integrityException('%s infinity rows for %s' % (self._oDb.getRowCount(), uid));
254
255 if self._oDb.getRowCount() == 1:
256 oUser = UserAccountData().initFromDbRow(self._oDb.fetchOne());
257 self.dCache[uid] = oUser;
258 return oUser;
259
260 def resolveChangeLogAuthors(self, aoEntries):
261 """
262 Given an array of ChangeLogEntry instances, set sAuthor to whatever
263 uidAuthor resolves to.
264
265 Returns aoEntries.
266 Raises exception on DB error.
267 """
268 for oEntry in aoEntries:
269 oUser = self.cachedLookup(oEntry.uidAuthor)
270 if oUser is not None:
271 oEntry.sAuthor = oUser.sUsername;
272 return aoEntries;
273
274
275#
276# Unit testing.
277#
278
279# pylint: disable=C0111
280class UserAccountDataTestCase(ModelDataBaseTestCase):
281 def setUp(self):
282 self.aoSamples = [UserAccountData(),];
283
284if __name__ == '__main__':
285 unittest.main();
286 # not reached.
287
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette