00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008
00009
00010 from sdcoadd import sdcoadd
00011 import asap as sd
00012 from asap.scantable import is_scantable, is_ms
00013
00014
00015 class sdcoadd_unittest_base:
00016 """
00017 Base class for sdcoadd unit test
00018 """
00019
00020 datapath=os.environ.get('CASAPATH').split()[0] + \
00021 '/data/regression/unittest/sdcoadd/'
00022 taskname = "sdcoadd"
00023
00024 mergeids = {"FOCUS_ID": "FOCUS", "FREQ_ID": "FREQUENCIES", \
00025 "MOLECULE_ID": "MOLECULES"}
00026 addedtabs = []
00027
00028
00029 def _get_scantable_params( self, scanname ):
00030 """
00031 Returns a dictionary which contains row numbers and a set of
00032 data in scantable for later verifications
00033 """
00034 if not os.path.exists(scanname):
00035 raise Exception, "A scantable '%s' does not exists." % scanname
00036 if not is_scantable(scanname):
00037 raise Exception, "Input file '%s' is not a scantable." % scanname
00038
00039 res = {}
00040 for tab in self.addedtabs + self.mergeids.values():
00041 tb.open(scanname+"/"+tab)
00042 res["n"+tab] = tb.nrows()
00043 tb.close()
00044
00045
00046 tb.open(scanname)
00047 res["nMAIN"] = tb.nrows()
00048 for col in ["SCANNO"] + self.mergeids.keys():
00049 res[col+"S"] = list(set(tb.getcol(col)))
00050 tb.close()
00051 return res
00052
00053 def _test_merged_scantable( self, res, ref ):
00054 reltol = 1.0e-5
00055 self._compareDictVal(res,ref,reltol=reltol)
00056
00057 def _compareDictVal( self, testdict, refdict, reltol=1.0e-5, complist=None ):
00058 self.assertTrue(isinstance(testdict,dict) and \
00059 isinstance(refdict, dict),\
00060 "Need to specify two dictionaries to compare")
00061 if complist:
00062 keylist = complist
00063 else:
00064 keylist = refdict.keys()
00065
00066 for key in keylist:
00067 self.assertTrue(testdict.has_key(key),\
00068 msg="%s is not defined in the current results."\
00069 % key)
00070 self.assertTrue(refdict.has_key(key),\
00071 msg="%s is not defined in the reference data."\
00072 % key)
00073 testval = self._to_list(testdict[key])
00074 refval = self._to_list(refdict[key])
00075 self.assertTrue(len(testval)==len(refval),"Number of elemnets differs.")
00076 for i in range(len(testval)):
00077 if isinstance(refval[i],str):
00078 self.assertTrue(testval[i]==refval[i],\
00079 msg="%s[%d] differs: %s (expected: %s) " % \
00080 (key, i, str(testval[i]), str(refval[i])))
00081 else:
00082 self.assertTrue(self._isInAllowedRange(testval[i],refval[i],reltol),\
00083 msg="%s[%d] differs: %s (expected: %s) " % \
00084 (key, i, str(testval[i]), str(refval[i])))
00085 del testval, refval
00086
00087
00088 def _isInAllowedRange( self, testval, refval, reltol=1.0e-5 ):
00089 """
00090 Check if a test value is within permissive relative difference from refval.
00091 Returns a boolean.
00092 testval & refval : two numerical values to compare
00093 reltol : allowed relative difference to consider the two
00094 values to be equal. (default 0.01)
00095 """
00096 denom = refval
00097 if refval == 0:
00098 if testval == 0:
00099 return True
00100 else:
00101 denom = testval
00102 rdiff = (testval-refval)/denom
00103 del denom,testval,refval
00104 return (abs(rdiff) <= reltol)
00105
00106 def _to_list( self, input ):
00107 """
00108 Convert input to a list
00109 If input is None, this method simply returns None.
00110 """
00111 import numpy
00112 listtypes = (list, tuple, numpy.ndarray)
00113 if input == None:
00114 return None
00115 elif type(input) in listtypes:
00116 return list(input)
00117 else:
00118 return [input]
00119
00120
00121 class sdcoadd_basicTest( sdcoadd_unittest_base, unittest.TestCase ):
00122 """
00123 Basic unit tests for task sdcoadd. No averaging and interactive testing.
00124
00125 The list of tests:
00126 test00 --- default parameters (raises an error)
00127 test01 --- valid infiles (scantable) WITHOUT outfile
00128 test02 --- valid infiles (scantable) with outfile
00129 test03 --- outform='MS'
00130 test04 --- overwrite=False
00131 test05 --- overwrite=True
00132 test06 --- merge 3 scantables
00133 test07 --- specify only a scantables (raises an error)
00134 """
00135
00136 inlist = ['orions_calSave_21.asap','orions_calSave_25.asap','orions_calSave_23.asap']
00137 outname = "sdcoadd_out.asap"
00138
00139
00140
00141
00142 ref2125 = {"nMAIN": 16, "nFOCUS": 1, "nFREQUENCIES": 8, "nMOLECULES": 2,\
00143 "SCANNOS": range(2), "FOCUS_IDS": range(1),\
00144 "FREQ_IDS": range(8),"MOLECULE_IDS": range(2)}
00145
00146
00147
00148
00149
00150
00151 ref212523 = {"nMAIN": 24, "nFOCUS": 1, "nFREQUENCIES": 8, "nMOLECULES": 2,\
00152 "SCANNOS": range(3), "FOCUS_IDS": range(1),\
00153 "FREQ_IDS": range(8),"MOLECULE_IDS": range(2)}
00154
00155 def setUp( self ):
00156
00157 for infile in self.inlist:
00158 if os.path.exists(infile):
00159 shutil.rmtree(infile)
00160 shutil.copytree(self.datapath+infile, infile)
00161
00162 default(sdcoadd)
00163
00164 def tearDown( self ):
00165 for thefile in self.inlist + [self.outname]:
00166 if (os.path.exists(thefile)):
00167 shutil.rmtree(thefile)
00168
00169 def test00( self ):
00170 """Test 0: Default parameters (raises an error)"""
00171 try:
00172 result = sdcoadd()
00173 self.assertTrue(False,
00174 msg='The task must throw exception')
00175 except Exception, e:
00176 pos=str(e).find('Need at least two data file names')
00177 self.assertNotEqual(pos,-1,
00178 msg='Unexpected exception was thrown: %s'%(str(e)))
00179
00180 def test01( self ):
00181 """Test 1: valid infiles (scantables) WITHOUT outfile"""
00182 infiles = self.inlist[0:2]
00183 result = sdcoadd(infiles=infiles)
00184
00185 self.assertEqual(result,None)
00186
00187 outname = self.inlist[0]+"_coadd"
00188 print outname
00189 self.assertTrue(os.path.exists(outname),msg="No output written")
00190 merged = self._get_scantable_params(outname)
00191 self._test_merged_scantable(merged, self.ref2125)
00192
00193 def test02( self ):
00194 """Test 2: valid infiles (scantable) with outfile"""
00195 infiles = self.inlist[0:2]
00196 outfile = self.outname
00197 result = sdcoadd(infiles=infiles,outfile=outfile)
00198
00199 self.assertEqual(result,None)
00200 self.assertTrue(os.path.exists(outfile),msg="No output written")
00201
00202 merged = self._get_scantable_params(outfile)
00203 self._test_merged_scantable(merged, self.ref2125)
00204
00205 def test03( self ):
00206 """Test 3: outform='MS'"""
00207 infiles = self.inlist[0:2]
00208 outfile = self.outname.rstrip('.asap')+'.ms'
00209 outform = 'MS2'
00210 result = sdcoadd(infiles=infiles,outfile=outfile,outform=outform)
00211 self.assertEqual(result,None)
00212 self.assertTrue(os.path.exists(outfile),msg="No output written")
00213
00214 self.assertTrue(is_ms(outfile),msg="Output file is not an MS")
00215
00216 def test04( self ):
00217 """Test 4: overwrite=False (raises an error)"""
00218 infiles = self.inlist[1:3]
00219 outfile = self.outname
00220 result = sdcoadd(infiles=infiles,outfile=outfile)
00221 self.assertEqual(result,None)
00222 self.assertTrue(os.path.exists(outfile),msg="No output written")
00223
00224 infiles = self.inlist[0:2]
00225 outfile = self.outname
00226 try:
00227 result2 = sdcoadd(infiles=infiles,outfile=outfile)
00228 self.assertTrue(False,
00229 msg='The task must throw exception')
00230 except Exception, e:
00231 pos=str(e).find('Output file \'%s\' exists.'%(outfile))
00232 self.assertNotEqual(pos,-1,
00233 msg='Unexpected exception was thrown: %s'%(str(e)))
00234
00235 def test05( self ):
00236 """Test 5: overwrite=True"""
00237 infiles = self.inlist[1:3]
00238 outfile = self.outname
00239 result = sdcoadd(infiles=infiles,outfile=outfile)
00240 self.assertEqual(result,None)
00241 self.assertTrue(os.path.exists(outfile),msg="No output written")
00242
00243 infiles = self.inlist[0:2]
00244 overwrite = True
00245 result2 = sdcoadd(infiles=infiles,outfile=outfile,overwrite=overwrite)
00246
00247 self.assertEqual(result2,None)
00248
00249 merged = self._get_scantable_params(outfile)
00250 self._test_merged_scantable(merged, self.ref2125)
00251
00252 def test06( self ):
00253 """Test 6: Merge 3 scantables"""
00254 infiles = self.inlist
00255 outfile = self.outname
00256 result = sdcoadd(infiles=infiles,outfile=outfile)
00257
00258 self.assertEqual(result,None)
00259 self.assertTrue(os.path.exists(outfile),msg="No output written")
00260
00261 merged = self._get_scantable_params(outfile)
00262 self._test_merged_scantable(merged, self.ref212523)
00263
00264 def test07( self ):
00265 """Test 7: specify only a scantables (raises an error)"""
00266 infiles = self.inlist[0]
00267 outfile = self.outname
00268 try:
00269 result = sdcoadd(infiles=infiles,outfile=outfile)
00270 self.assertTrue(False,
00271 msg='The task must throw exception')
00272 except Exception, e:
00273 pos=str(e).find('Need at least two data file names')
00274 self.assertNotEqual(pos,-1,
00275 msg='Unexpected exception was thrown: %s'%(str(e)))
00276
00277
00278 class sdcoadd_mergeTest( sdcoadd_unittest_base, unittest.TestCase ):
00279 """
00280 Test capabilities of sd.merge(). No averaging and interactive testing.
00281
00282 The list of tests:
00283 testmerge01 --- test merge of HISTORY subtables
00284 testmerge02 --- test proper merge of FREQUENCIES subtables
00285 testmerge03 --- test proper merge of MOLECULE subtables
00286 """
00287
00288 datapath=os.environ.get('CASAPATH').split()[0] + '/data/regression/unittest/sdcoadd/'
00289
00290
00291
00292
00293 inlist = []
00294 outname = "sdcoadd_out.asap"
00295
00296 def setUp( self ):
00297 default(sdcoadd)
00298
00299 def tearDown( self ):
00300 for thefile in self.inlist + [self.outname]:
00301 if (os.path.exists(thefile)):
00302 shutil.rmtree(thefile)
00303
00304 def _copy_inputs( self ):
00305
00306 for infile in self.inlist:
00307 if os.path.exists(infile):
00308 shutil.rmtree(infile)
00309 shutil.copytree(self.datapath+infile, infile)
00310
00311 def testmerge01( self ):
00312 """Merge Test 1: merge of HISTORY subtables"""
00313 self.inlist = ['orions_calSave_21if03.asap','orions_calSave_23.asap']
00314 self._copy_inputs()
00315
00316 infiles = self.inlist
00317 innhist = 0
00318 htab="/HISTORY"
00319 for tab in infiles:
00320 tb.open(tab+htab)
00321 innhist += tb.nrows()
00322 tb.close()
00323 outfile = self.outname
00324 result = sdcoadd(infiles=infiles,outfile=outfile)
00325 self.assertEqual(result,None)
00326 self.assertTrue(os.path.exists(outfile),msg="No output written")
00327
00328 tb.open(outfile+htab)
00329 outnhist = tb.nrows()
00330 tb.close()
00331 self.assertTrue(outnhist >= innhist,\
00332 msg="nHIST = %d (expected: >= %d)" % (outnhist, innhist))
00333 def testmerge02( self ):
00334 """Merge Test 2: proper merge of FREQUENCIES subtables"""
00335 self.inlist = ['orions_calSave_21if03.asap','orions_calSave_23.asap']
00336 self._copy_inputs()
00337
00338 infiles = self.inlist
00339 outfile = self.outname
00340 result = sdcoadd(infiles=infiles,outfile=outfile)
00341 self.assertEqual(result,None)
00342 self.assertTrue(os.path.exists(outfile),msg="No output written")
00343
00344 merged = self._get_scantable_params(outfile)
00345
00346 self.assertEqual(merged["nFREQUENCIES"],4,msg="nFREQUENCIES is not correct")
00347 self.assertTrue(merged["FREQ_IDS"]==range(4),\
00348 msg="FREQ_IDS = %s (expected: %s)" % \
00349 (str(merged["FREQ_IDS"]), str(range(4))))
00350
00351 self.assertEqual(merged["nMAIN"],12,msg="nMAIN is not correct")
00352
00353
00354 def testmerge03( self ):
00355 """Merge Test 3: proper merge of MOLECULES subtables"""
00356 self.inlist = ['orions_calSave_21if03.asap','orions_calSave_2327.asap']
00357 self._copy_inputs()
00358
00359 infiles = self.inlist
00360 outfile = self.outname
00361 result = sdcoadd(infiles=infiles,outfile=outfile)
00362 self.assertEqual(result,None)
00363 self.assertTrue(os.path.exists(outfile),msg="No output written")
00364
00365 merged = self._get_scantable_params(outfile)
00366
00367 self.assertEqual(merged["nMOLECULES"],2,msg="nMOLECULES is wrong")
00368 self.assertTrue(merged["MOLECULE_IDS"]==range(2),\
00369 msg="MOLECULE_IDS = %s (expected: %s)" % \
00370 (str(merged["MOLECULE_IDS"]), str(range(2))))
00371
00372 self.assertEqual(merged["nMAIN"],20,msg="nMAIN is wrong")
00373
00374 refFID = range(8)
00375 self.assertEqual(merged["nFREQUENCIES"],len(refFID),\
00376 msg="nFREQUENCIES is wrong")
00377 self.assertTrue(merged["FREQ_IDS"]==refFID,\
00378 msg="FREQ_IDS = %s (expected: %s)" % \
00379 (str(merged["FREQ_IDS"]), str(refFID)))
00380
00381
00382 class sdcoadd_storageTest( sdcoadd_unittest_base, unittest.TestCase ):
00383 """
00384 Unit tests for task sdcoadd. Test scantable sotrage and insitu
00385 parameters
00386
00387 The list of tests:
00388 testMT --- storage = 'memory', insitu = True
00389 testMF --- storage = 'memory', insitu = False
00390 testDT --- storage = 'disk', insitu = True
00391 testDF --- storage = 'disk', insitu = False
00392
00393 Note on handlings of disk storage:
00394 Task script restores unit and frame information.
00395 scantable.convert_flux returns new table.
00396
00397 Tested items:
00398 1. Number of rows in (sub-)tables and list of IDs of output scantable.
00399 2. Units and coordinates of output scantable.
00400 3. units and coordinates of input scantables before/after run.
00401 """
00402
00403 inlist = ['orions_calSave_21.asap','orions_calSave_25.asap']
00404 outname = sdcoadd_unittest_base.taskname+".merged"
00405
00406
00407
00408
00409
00410 ref2125 = {"nMAIN": 16, "nFOCUS": 1, "nFREQUENCIES": 8, "nMOLECULES": 2,\
00411 "SCANNOS": range(2), "FOCUS_IDS": range(1),\
00412 "FREQ_IDS": range(8),"MOLECULE_IDS": range(2)}
00413 merge_uc = {'spunit': 'GHz', 'flunit': 'Jy',\
00414 'frame': 'LSRD', 'doppler': 'OPTICAL'}
00415
00416 def setUp( self ):
00417
00418 for infile in self.inlist:
00419 if os.path.exists(infile):
00420 shutil.rmtree(infile)
00421 shutil.copytree(self.datapath+infile, infile)
00422
00423 self.storage = sd.rcParams['scantable.storage']
00424 self.insitu = sd.rcParams['insitu']
00425
00426 default(sdcoadd)
00427
00428 def tearDown( self ):
00429
00430 sd.rcParams['scantable.storage'] = self.storage
00431 sd.rcParams['insitu'] = self.insitu
00432 for thefile in self.inlist:
00433 if (os.path.exists(thefile)):
00434 shutil.rmtree(thefile)
00435
00436
00437 def _get_unit_coord( self, scanname ):
00438
00439
00440
00441 self.assertTrue(os.path.exists(scanname),\
00442 "'%s' does not exists." % scanname)
00443 self.assertTrue(is_scantable(scanname),\
00444 "Input table is not a scantable: %s" % scanname)
00445 scan = sd.scantable(scanname, average=False,parallactify=False)
00446 retdict = {}
00447 retdict['spunit'] = scan.get_unit()
00448 retdict['flunit'] = scan.get_fluxunit()
00449 coord = scan._getcoordinfo()
00450 retdict['frame'] = coord[1]
00451 retdict['doppler'] = coord[2]
00452 return retdict
00453
00454 def _get_uclist( self, stlist ):
00455
00456
00457
00458 retlist = []
00459 for scanname in stlist:
00460 retlist.append(self._get_unit_coord(scanname))
00461 print retlist
00462 return retlist
00463
00464 def _comp_unit_coord( self, stlist, before):
00465
00466 if isinstance(stlist,str):
00467 stlist = [ stlist ]
00468
00469 if isinstance(before, dict):
00470 before = [ before ]
00471 if len(stlist) != len(before):
00472 raise Exception("Number of scantables in list is different from reference data.")
00473 self.assertTrue(isinstance(before[0],dict),\
00474 "Reference data should be (a list of) dictionary")
00475
00476 after = self._get_uclist(stlist)
00477 for i in range(len(stlist)):
00478 print "Comparing units and coordinates of '%s'" %\
00479 stlist[i]
00480 self._compareDictVal(after[i],before[i])
00481
00482
00483 def testMT( self ):
00484 """Storage Test MT: sdcoadd on storage='memory' and insitu=T"""
00485 tid = "MT"
00486 infiles = self.inlist
00487 outfile = self.outname+tid
00488 fluxunit = self.merge_uc['flunit']
00489 specunit = self.merge_uc['spunit']
00490 frame = self.merge_uc['frame']
00491 doppler = self.merge_uc['doppler']
00492
00493
00494 initval = self._get_uclist(infiles)
00495
00496 sd.rcParams['scantable.storage'] = 'memory'
00497 sd.rcParams['insitu'] = True
00498 print "Running test with storage='%s' and insitu=%s" % \
00499 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00500 result = sdcoadd(infiles=infiles,outfile=outfile,\
00501 fluxunit=fluxunit,specunit=specunit,\
00502 frame=frame,doppler=doppler)
00503
00504 self.assertEqual(result,None)
00505 self.assertTrue(os.path.exists(outfile),msg="No output written")
00506
00507 merged = self._get_scantable_params(outfile)
00508 self._test_merged_scantable(merged, self.ref2125)
00509 self._comp_unit_coord(outfile,self.merge_uc)
00510
00511
00512 self._comp_unit_coord(infiles,initval)
00513
00514 def testMF( self ):
00515 """Storage Test MF: sdcoadd on storage='memory' and insitu=F"""
00516 tid = "MF"
00517 infiles = self.inlist
00518 outfile = self.outname+tid
00519 fluxunit = self.merge_uc['flunit']
00520 specunit = self.merge_uc['spunit']
00521 frame = self.merge_uc['frame']
00522 doppler = self.merge_uc['doppler']
00523
00524
00525 initval = self._get_uclist(infiles)
00526
00527 sd.rcParams['scantable.storage'] = 'memory'
00528 sd.rcParams['insitu'] = False
00529 print "Running test with storage='%s' and insitu=%s" % \
00530 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00531 result = sdcoadd(infiles=infiles,outfile=outfile,\
00532 fluxunit=fluxunit,specunit=specunit,\
00533 frame=frame,doppler=doppler)
00534
00535 self.assertEqual(result,None)
00536 self.assertTrue(os.path.exists(outfile),msg="No output written")
00537
00538 merged = self._get_scantable_params(outfile)
00539 self._test_merged_scantable(merged, self.ref2125)
00540 self._comp_unit_coord(outfile,self.merge_uc)
00541
00542
00543 self._comp_unit_coord(infiles,initval)
00544
00545 def testDT( self ):
00546 """Storage Test DT: sdcoadd on storage='disk' and insitu=T"""
00547 tid = "DT"
00548 infiles = self.inlist
00549 outfile = self.outname+tid
00550 fluxunit = self.merge_uc['flunit']
00551 specunit = self.merge_uc['spunit']
00552 frame = self.merge_uc['frame']
00553 doppler = self.merge_uc['doppler']
00554
00555
00556 initval = self._get_uclist(infiles)
00557
00558 sd.rcParams['scantable.storage'] = 'disk'
00559 sd.rcParams['insitu'] = True
00560 print "Running test with storage='%s' and insitu=%s" % \
00561 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00562 result = sdcoadd(infiles=infiles,outfile=outfile,\
00563 fluxunit=fluxunit,specunit=specunit,\
00564 frame=frame,doppler=doppler)
00565
00566 self.assertEqual(result,None)
00567 self.assertTrue(os.path.exists(outfile),msg="No output written")
00568
00569 merged = self._get_scantable_params(outfile)
00570 self._test_merged_scantable(merged, self.ref2125)
00571 self._comp_unit_coord(outfile,self.merge_uc)
00572
00573
00574 self._comp_unit_coord(infiles,initval)
00575
00576 def testDF( self ):
00577 """Storage Test DF: sdcoadd on storage='disk' and insitu=F"""
00578 tid = "DF"
00579 infiles = self.inlist
00580 outfile = self.outname+tid
00581 fluxunit = self.merge_uc['flunit']
00582 specunit = self.merge_uc['spunit']
00583 frame = self.merge_uc['frame']
00584 doppler = self.merge_uc['doppler']
00585
00586
00587 initval = self._get_uclist(infiles)
00588
00589 sd.rcParams['scantable.storage'] = 'disk'
00590 sd.rcParams['insitu'] = False
00591 print "Running test with storage='%s' and insitu=%s" % \
00592 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00593 result = sdcoadd(infiles=infiles,outfile=outfile,\
00594 fluxunit=fluxunit,specunit=specunit,\
00595 frame=frame,doppler=doppler)
00596
00597 self.assertEqual(result,None)
00598 self.assertTrue(os.path.exists(outfile),msg="No output written")
00599
00600 merged = self._get_scantable_params(outfile)
00601 self._test_merged_scantable(merged, self.ref2125)
00602 self._comp_unit_coord(outfile,self.merge_uc)
00603
00604
00605 self._comp_unit_coord(infiles,initval)
00606
00607 def suite():
00608 return [sdcoadd_basicTest, sdcoadd_mergeTest, sdcoadd_storageTest]