00001 import os
00002 import sys
00003 import shutil
00004 import string
00005 from __main__ import default
00006 import partitionhelper as ph
00007 from tasks import listpartition
00008 import unittest
00009
00010 '''
00011 Unit tests for task listpartition. It tests the following parameters:
00012 vis: wrong and correct values
00013 createdict true or false
00014 listfile: save to a file
00015
00016 These tests use auxiliary functions defined in partitionhelper.py
00017
00018 '''
00019
00020
00021
00022
00023 class test_base(unittest.TestCase):
00024
00025 def setUp_MSdata(self):
00026 self.vis = "Four_ants_3C286.ms"
00027
00028 if os.path.exists(self.vis):
00029 pass
00030 else:
00031 print "Linking to data..."
00032 os.system('ln -s ' + os.environ.get('CASAPATH').split()[0] +
00033 "/data/regression/unittest/partition/" + self.vis + ' ' + self.vis)
00034
00035 default(listpartition)
00036
00037 def setUp_MMSdata1(self):
00038 self.vis = 'pFourantsSpw.mms'
00039 self.visdata = self.vis+"/SUBMSS/"
00040
00041 if os.path.exists(self.vis):
00042 pass
00043 else:
00044 print "Linking to data..."
00045 os.system('ln -s ' + os.environ.get('CASAPATH').split()[0] +
00046 "/data/regression/unittest/partition/" + self.vis + ' ' + self.vis)
00047
00048 default(listpartition)
00049
00050 def setUp_MMSdata2(self):
00051 self.vis = 'pFourantsScan.mms'
00052 self.visdata = self.vis+"/SUBMSS/"
00053
00054 if os.path.exists(self.vis):
00055 pass
00056 else:
00057 print "Linking to data..."
00058 os.system('ln -s ' + os.environ.get('CASAPATH').split()[0] +
00059 "/data/regression/unittest/partition/" + self.vis + ' ' + self.vis)
00060
00061 default(listpartition)
00062
00063 def setUp_MMSdata3(self):
00064 self.vis = 'pFourantsMix.mms'
00065 self.visdata = self.vis+"/SUBMSS/"
00066
00067 if os.path.exists(self.vis):
00068 pass
00069 else:
00070 print "Linking to data..."
00071 os.system('ln -s ' + os.environ.get('CASAPATH').split()[0] +
00072 "/data/regression/unittest/partition/" + self.vis + ' ' + self.vis)
00073
00074 default(listpartition)
00075
00076
00077 class test_MS(test_base):
00078
00079 def setUp(self):
00080 self.setUp_MSdata()
00081
00082
00083 def testMS1(self):
00084 '''listpartition MS1: Input MS'''
00085 res = listpartition(vis=self.vis)
00086 self.assertEqual(res, {}, "It should return an empty dictionary")
00087
00088 def testMS2(self):
00089 '''listpartition MS2: Save to a file'''
00090 output = 'listpartitionms.txt'
00091 if os.path.exists(output):
00092 os.system('rm -rf '+output)
00093
00094 listpartition(vis=self.vis, listfile=output)
00095 self.assertTrue(os.path.exists(output), 'Output file does not exist')
00096
00097
00098 def testMS3(self):
00099 '''listpartition MS3: Create an output dictionary'''
00100
00101 resdict = listpartition(vis=self.vis, createdict=True)
00102
00103 self.assertEqual(resdict[0]['MS'], self.vis)
00104
00105
00106 class test_MMS_spw(test_base):
00107
00108 def setUp(self):
00109 self.setUp_MMSdata1()
00110
00111
00112 def testspw1(self):
00113 '''listpartition MMS spw1: Input MMS'''
00114 res = listpartition(vis=self.vis)
00115 self.assertEqual(res, {}, "It should return an empty dictionary")
00116
00117
00118 def testspw2(self):
00119 '''listpartition MMS spw2: Save to a file'''
00120 output = 'listpartitionspw.txt'
00121 if os.path.exists(output):
00122 os.system('rm -rf '+output)
00123
00124 listpartition(vis=self.vis, listfile=output)
00125 self.assertTrue(os.path.exists(output), 'Output file %s does not exist'%output)
00126
00127
00128 ff = open(output,'r')
00129 nlines = len(ff.readlines())
00130 ff.close()
00131 self.assertEqual(nlines, 33, 'Wrong number of lines in output')
00132
00133
00134 def testspw3(self):
00135 '''listpartition MMS spw3: Create an output dictionary'''
00136
00137 resdict = listpartition(vis=self.vis, createdict=True)
00138 nkeys = resdict.keys().__len__()
00139 self.assertEqual(nkeys, 16)
00140
00141
00142 for k in resdict.keys():
00143 subms = resdict[k]['MS']
00144 MS = self.visdata+'/'+subms
00145 scans = resdict[k]['scanId'].keys()
00146 for s in scans:
00147 nr = resdict[k]['scanId'][s]['nrows']
00148 refN = ph.getScanNrows(MS, s)
00149 self.assertEqual(nr, refN, '%s, scan=%s, nrows=%s do not match reference nrows=%s'\
00150 %(MS, s, nr, refN))
00151
00152 class test_MMS_scan(test_base):
00153
00154 def setUp(self):
00155 self.setUp_MMSdata2()
00156
00157
00158 def testscan1(self):
00159 '''listpartition MMS scan1: Input MMS'''
00160 res = listpartition(vis=self.vis)
00161 self.assertEqual(res, {}, "It should return an empty dictionary")
00162
00163
00164 def testscan2(self):
00165 '''listpartition MMS scan2: Save to a file'''
00166 output = 'listpartitionscan1.txt'
00167 if os.path.exists(output):
00168 os.system('rm -rf '+output)
00169
00170 listpartition(vis=self.vis, listfile=output)
00171 self.assertTrue(os.path.exists(output), 'Output file %s does not exist'%output)
00172
00173
00174 ff = open(output,'r')
00175 nlines = len(ff.readlines())
00176 ff.close()
00177 self.assertEqual(nlines, 3, 'Wrong number of lines in output')
00178
00179
00180 def testscan3(self):
00181 '''listpartition MMS scan3: Create an output dictionary'''
00182
00183 resdict = listpartition(vis=self.vis, createdict=True)
00184 nkeys = resdict.keys().__len__()
00185 self.assertEqual(nkeys, 2)
00186
00187
00188 for k in resdict.keys():
00189 subms = resdict[k]['MS']
00190 MS = self.visdata+'/'+subms
00191 scans = resdict[k]['scanId'].keys()
00192 for s in scans:
00193 nr = resdict[k]['scanId'][s]['nrows']
00194 refN = ph.getScanNrows(MS, s)
00195 self.assertEqual(nr, refN, '%s, scan=%s, nrows=%s do not match reference nrows=%s'\
00196 %(MS, s, nr, refN))
00197
00198 def testscan4(self):
00199 '''listpartition MMS scan4: check the sizes of the sub-MSs'''
00200
00201 output = 'listpartitionscan2.txt'
00202 if os.path.exists(output):
00203 os.system('rm -rf '+output)
00204
00205 listpartition(vis=self.vis, listfile=output)
00206 self.assertTrue(os.path.exists(output))
00207
00208
00209 ff = open(output,'r')
00210 mslist = ff.readlines()
00211 i = 0
00212 for l in mslist:
00213 if i == 0:
00214 i += 1
00215 continue
00216
00217 ll = l.rstrip()
00218 rear = ll.rpartition(' ')
00219 front = ll.partition(' ')
00220
00221
00222
00223 dusize = ph.getDiskUsage(self.visdata+front[0])
00224
00225
00226 self.assertEqual(dusize, rear[2], '%s is not equal to %s for %s'%(dusize,rear[2],front[0]))
00227
00228 ff.close()
00229
00230 class test_MMS_mix(test_base):
00231
00232 def setUp(self):
00233 self.setUp_MMSdata3()
00234
00235
00236 def testmix1(self):
00237 '''listpartition MMS mix1: Input MMS'''
00238 res = listpartition(vis=self.vis)
00239 self.assertEqual(res, {}, "It should return an empty dictionary")
00240
00241
00242 def testmix2(self):
00243 '''listpartition MMS mix2: Save to a file'''
00244 output = 'listpartitionmix1.txt'
00245 if os.path.exists(output):
00246 os.system('rm -rf '+output)
00247
00248 listpartition(vis=self.vis, listfile=output)
00249 self.assertTrue(os.path.exists(output), 'Output file %s does not exist'%output)
00250
00251
00252 ff = open(output,'r')
00253 nlines = len(ff.readlines())
00254 ff.close()
00255 self.assertEqual(nlines, 33, 'Wrong number of lines in output')
00256
00257
00258 def testmix3(self):
00259 '''listpartition MMS mix3: Create an output dictionary'''
00260
00261 resdict = listpartition(vis=self.vis, createdict=True)
00262 nkeys = resdict.keys().__len__()
00263 self.assertEqual(nkeys, 32)
00264
00265
00266 for k in resdict.keys():
00267 subms = resdict[k]['MS']
00268 MS = self.visdata+'/'+subms
00269 scans = resdict[k]['scanId'].keys()
00270 for s in scans:
00271 nr = resdict[k]['scanId'][s]['nrows']
00272 refN = ph.getScanNrows(MS, s)
00273 self.assertEqual(nr, refN, '%s, scan=%s, nrows=%s do not match reference nrows=%s'\
00274 %(MS, s, nr, refN))
00275
00276 def testmix4(self):
00277 '''listpartition MMS mix4: check the sizes of the sub-MSs'''
00278
00279 output = 'listpartitionmix2.txt'
00280 if os.path.exists(output):
00281 os.system('rm -rf '+output)
00282
00283 listpartition(vis=self.vis, listfile=output)
00284 self.assertTrue(os.path.exists(output))
00285
00286
00287 ff = open(output,'r')
00288 mslist = ff.readlines()
00289 i = 0
00290 for l in mslist:
00291 if i == 0:
00292 i += 1
00293 continue
00294
00295 ll = l.rstrip()
00296 rear = ll.rpartition(' ')
00297 front = ll.partition(' ')
00298
00299
00300
00301 dusize = ph.getDiskUsage(self.visdata+front[0])
00302
00303
00304 self.assertEqual(dusize, rear[2], '%s is not equal to %s for %s'%(dusize,rear[2],front[0]))
00305
00306 ff.close()
00307
00308 class listpartition_cleanup(unittest.TestCase):
00309
00310 def setUp(self):
00311 pass
00312
00313 def tearDown(self):
00314
00315
00316 os.system('rm -rf Four_ants_3C286.ms')
00317 os.system('rm -rf pFourants*.mms')
00318 os.system('rm -rf ' + 'listpartition*.txt')
00319
00320 def test_run(self):
00321 '''listpartition: Cleanup'''
00322 pass
00323
00324 def suite():
00325 return [test_MS,
00326 test_MMS_spw,
00327 test_MMS_scan,
00328 test_MMS_mix,
00329 listpartition_cleanup]
00330
00331
00332