1 | # test tools for the pyxpcom bindings
|
---|
2 | from xpcom import _xpcom
|
---|
3 | import unittest
|
---|
4 |
|
---|
5 | # export a "getmemusage()" function that returns a useful "bytes used" count
|
---|
6 | # for the current process. Growth in this when doing the same thing over and
|
---|
7 | # over implies a leak.
|
---|
8 |
|
---|
9 | try:
|
---|
10 | import win32api
|
---|
11 | import win32pdh
|
---|
12 | import win32pdhutil
|
---|
13 | have_pdh = 1
|
---|
14 | except ImportError:
|
---|
15 | have_pdh = 0
|
---|
16 |
|
---|
17 | # XXX - win32pdh is slow, particularly finding our current process.
|
---|
18 | # A better way would be good.
|
---|
19 |
|
---|
20 | # Our win32pdh specific functions - they can be at the top-level on all
|
---|
21 | # platforms, but will only actually be called if the modules are available.
|
---|
22 | def FindMyCounter():
|
---|
23 | pid_me = win32api.GetCurrentProcessId()
|
---|
24 |
|
---|
25 | object = "Process"
|
---|
26 | items, instances = win32pdh.EnumObjectItems(None,None,object, -1)
|
---|
27 | for instance in instances:
|
---|
28 | # We use 2 counters - "ID Process" and "Working Set"
|
---|
29 | counter = "ID Process"
|
---|
30 | format = win32pdh.PDH_FMT_LONG
|
---|
31 |
|
---|
32 | hq = win32pdh.OpenQuery()
|
---|
33 | path = win32pdh.MakeCounterPath( (None,object,instance, None, -1,"ID Process") )
|
---|
34 | hc1 = win32pdh.AddCounter(hq, path)
|
---|
35 | path = win32pdh.MakeCounterPath( (None,object,instance, None, -1,"Working Set") )
|
---|
36 | hc2 = win32pdh.AddCounter(hq, path)
|
---|
37 | win32pdh.CollectQueryData(hq)
|
---|
38 | type, pid = win32pdh.GetFormattedCounterValue(hc1, format)
|
---|
39 | if pid==pid_me:
|
---|
40 | win32pdh.RemoveCounter(hc1) # not needed any more
|
---|
41 | return hq, hc2
|
---|
42 | # Not mine - close the query and try again
|
---|
43 | win32pdh.RemoveCounter(hc1)
|
---|
44 | win32pdh.RemoveCounter(hc2)
|
---|
45 | win32pdh.CloseQuery(hq)
|
---|
46 | else:
|
---|
47 | raise RuntimeError, "Can't find myself!?"
|
---|
48 |
|
---|
49 | def CloseCounter(hq, hc):
|
---|
50 | win32pdh.RemoveCounter(hc)
|
---|
51 | win32pdh.CloseQuery(hq)
|
---|
52 |
|
---|
53 | def GetCounterValue(hq, hc):
|
---|
54 | win32pdh.CollectQueryData(hq)
|
---|
55 | format = win32pdh.PDH_FMT_LONG
|
---|
56 | type, val = win32pdh.GetFormattedCounterValue(hc, format)
|
---|
57 | return val
|
---|
58 |
|
---|
59 | g_pdh_data = None
|
---|
60 | # The pdh function that does the work
|
---|
61 | def pdh_getmemusage():
|
---|
62 | global g_pdh_data
|
---|
63 | if g_pdh_data is None:
|
---|
64 | hq, hc = FindMyCounter()
|
---|
65 | g_pdh_data = hq, hc
|
---|
66 | hq, hc = g_pdh_data
|
---|
67 | return GetCounterValue(hq, hc)
|
---|
68 |
|
---|
69 | # The public bit
|
---|
70 | if have_pdh:
|
---|
71 | getmemusage = pdh_getmemusage
|
---|
72 | else:
|
---|
73 | def getmemusage():
|
---|
74 | return 0
|
---|
75 |
|
---|
76 | # Test runner utilities, including some support for builtin leak tests.
|
---|
77 | class TestLoader(unittest.TestLoader):
|
---|
78 | def loadTestsFromTestCase(self, testCaseClass):
|
---|
79 | """Return a suite of all tests cases contained in testCaseClass"""
|
---|
80 | leak_tests = []
|
---|
81 | for name in self.getTestCaseNames(testCaseClass):
|
---|
82 | real_test = testCaseClass(name)
|
---|
83 | leak_test = self._getTestWrapper(real_test)
|
---|
84 | leak_tests.append(leak_test)
|
---|
85 | return self.suiteClass(leak_tests)
|
---|
86 | def _getTestWrapper(self, test):
|
---|
87 | # later! see pywin32's win32/test/util.py
|
---|
88 | return test
|
---|
89 | def loadTestsFromModule(self, mod):
|
---|
90 | if hasattr(mod, "suite"):
|
---|
91 | ret = mod.suite()
|
---|
92 | else:
|
---|
93 | ret = unittest.TestLoader.loadTestsFromModule(self, mod)
|
---|
94 | assert ret.countTestCases() > 0, "No tests in %r" % (mod,)
|
---|
95 | return ret
|
---|
96 | def loadTestsFromName(self, name, module=None):
|
---|
97 | test = unittest.TestLoader.loadTestsFromName(self, name, module)
|
---|
98 | if isinstance(test, unittest.TestSuite):
|
---|
99 | pass # hmmm? print "Don't wrap suites yet!", test._tests
|
---|
100 | elif isinstance(test, unittest.TestCase):
|
---|
101 | test = self._getTestWrapper(test)
|
---|
102 | else:
|
---|
103 | print "XXX - what is", test
|
---|
104 | return test
|
---|
105 |
|
---|
106 | # A base class our tests should derive from (well, one day it will be)
|
---|
107 | TestCase = unittest.TestCase
|
---|
108 |
|
---|
109 | def suite_from_functions(*funcs):
|
---|
110 | suite = unittest.TestSuite()
|
---|
111 | for func in funcs:
|
---|
112 | suite.addTest(unittest.FunctionTestCase(func))
|
---|
113 | return suite
|
---|
114 |
|
---|
115 | def testmain(*args, **kw):
|
---|
116 | new_kw = kw.copy()
|
---|
117 | if not new_kw.has_key('testLoader'):
|
---|
118 | new_kw['testLoader'] = TestLoader()
|
---|
119 | try:
|
---|
120 | unittest.main(*args, **new_kw)
|
---|
121 | finally:
|
---|
122 | _xpcom.NS_ShutdownXPCOM()
|
---|
123 | ni = _xpcom._GetInterfaceCount()
|
---|
124 | ng = _xpcom._GetGatewayCount()
|
---|
125 | if ni or ng:
|
---|
126 | print "********* WARNING - Leaving with %d/%d objects alive" % (ni,ng)
|
---|