00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008
00009
00010 import numpy
00011
00012 import asap as sd
00013 from sdsmooth import sdsmooth
00014
00015
00016
00017
00018
00019
00020 class sdsmooth_unittest_base:
00021 """
00022 Base class for sdsmooth unit test
00023 """
00024
00025 datapath = os.environ.get('CASAPATH').split()[0] + \
00026 '/data/regression/unittest/sdsmooth/'
00027 taskname = "sdsmooth"
00028
00029 def _checkfile( self, name ):
00030 isthere=os.path.exists(name)
00031 self.assertTrue(isthere,
00032 msg='output file %s was not created because of the task failure'%(name))
00033
00034
00035 def _getStats(self,filename, linechan=None, basechan=None):
00036 if not linechan: linechan = self.linechan
00037 if not basechan: basechan = self.basechan
00038 self._checkfile(filename)
00039 scan = sd.scantable(filename,average=False)
00040
00041 linmsk = scan.create_mask(linechan)
00042 blmsk = scan.create_mask(basechan)
00043 linmax = scan.stats('max',mask=linmsk)[0]
00044 linmaxpos = scan.stats('max_abc',mask=linmsk)[0]
00045 lineqw = scan.stats('sum',mask=linmsk)[0]/linmax
00046 blrms = scan.stats('rms',mask=blmsk)[0]
00047 del scan, linmsk, blmsk
00048 retdic = {'linemax': linmax, 'linemaxpos': linmaxpos,
00049 'lineeqw': lineqw, 'baserms': blrms}
00050 del linmax, linmaxpos, lineqw, blrms
00051 return retdic
00052
00053 def _compareDictVal(self,testdict, refdict,places=4):
00054 for stat, refval in refdict.iteritems():
00055 self.assertTrue(testdict.has_key(stat),
00056 msg = "'%s' is not defined in the current run" % stat)
00057 print "Comparing '%s': %f (current run), %f (reference)" % \
00058 (stat,testdict[stat],refval)
00059 self.assertAlmostEqual(testdict[stat],refval,places=places)
00060
00061
00062 class sdsmooth_badinputs(sdsmooth_unittest_base,unittest.TestCase):
00063 """
00064 Tests on bad input parameter setting
00065 """
00066 inname = 'OrionS_rawACSmod_if2_calTPave_bl.asap'
00067 outname = sdsmooth_unittest_base.taskname+'_bad.asap'
00068
00069 badname = "bad"
00070 badquant = "5bad"
00071
00072 def setUp(self):
00073 if os.path.exists(self.inname):
00074 shutil.rmtree(self.inname)
00075 if os.path.exists(self.outname):
00076 shutil.rmtree(self.outname)
00077 shutil.copytree(self.datapath+self.inname, self.inname)
00078
00079 default(sdsmooth)
00080
00081 def tearDown(self):
00082 if (os.path.exists(self.inname)):
00083 shutil.rmtree(self.inname)
00084
00085
00086 def test_default(self):
00087 """Test Default parameters (raises an errror)"""
00088
00089 res = sdsmooth()
00090 self.assertFalse(res)
00091
00092 def testBad_kernel(self):
00093 """Test bad kernel name"""
00094 kernel = self.badname
00095
00096
00097 res = sdsmooth(infile=self.inname,outfile=self.outname,
00098 kernel=kernel)
00099 self.assertFalse(res)
00100
00101 def testBad_chanwidth(self):
00102 """Test bad chanwidth"""
00103 kernel = 'regrid'
00104 chanwidth = self.badquant
00105
00106 try:
00107 res = sdsmooth(infile=self.inname,outfile=self.outname,
00108 kernel=kernel,chanwidth=chanwidth)
00109 self.assertTrue(False,
00110 msg='The task must throw exception')
00111 except Exception, e:
00112 pos=str(e).find('Invalid quantity chanwidth 5bad')
00113 self.assertNotEqual(pos,-1,
00114 msg='Unexpected exception was thrown: %s'%(str(e)))
00115
00116
00117 class sdsmooth_basicTest(sdsmooth_unittest_base,unittest.TestCase):
00118 """
00119 Basic unit tests for task sdsmooth. No interactive testing.
00120
00121 The list of tests:
00122 test00 --- default parameters (raises an errror)
00123 test01 --- default + valid input filename (hanning smooth)
00124 test02-03 --- kernel = 'boxcar' w/ and w/o kwidth param
00125 test04-05 --- kernel = 'gaussian' w/ and w/o kwidth param
00126 test06 --- overwrite = True
00127 test07,10 --- kernel = 'regrid' with chwidth in channel unit
00128 test08,11 --- kernel = 'regrid' with chwidth in frequency unit
00129 test09,12 --- kernel = 'regrid' with chwidth in velocity unit
00130
00131 Note: input data is generated from a single dish regression data,
00132 'OrionS_rawACSmod', as follows:
00133 default(sdcal)
00134 sdcal(infile='OrionS_rawACSmod',scanlist=[20,21,22,23],
00135 calmode='ps',tau=0.09,outfile='temp.asap')
00136 default(sdcal)
00137 sdcal(infile='temp.asap',timeaverage=True,tweight='tintsys',
00138 polaverage=True,pweight='tsys',outfile='temp2.asap')
00139 default(sdbaseline)
00140 sdbaseline(infile='temp2.asap',iflist=[2],masklist=blchan2,
00141 maskmode='list',outfile=self.infile)
00142
00143 The reference stat values can be obtained by
00144 sdstat(infile=outfile,masklist=linechan)
00145 sdstat(infile=outfile,masklist=basechan)
00146 """
00147
00148
00149
00150
00151
00152 infile = 'OrionS_rawACSmod_if2_calTPave_bl.asap'
00153 outroot = sdsmooth_unittest_base.taskname+'_test'
00154 linechan = [[2951,3088]]
00155 basechan = [[500,2950],[3089,7692]]
00156 regridw = 4
00157 reglinech = [[737,772]]
00158 regbasech = [[125,736],[773,1923]]
00159
00160 def setUp(self):
00161 if os.path.exists(self.infile):
00162 shutil.rmtree(self.infile)
00163 shutil.copytree(self.datapath+self.infile, self.infile)
00164
00165 default(sdsmooth)
00166
00167 def tearDown(self):
00168 if (os.path.exists(self.infile)):
00169 shutil.rmtree(self.infile)
00170
00171 def test01(self):
00172 """Test 1: Default parameters + valid input filename (hanning smooth)"""
00173 tid="01"
00174 outfile = self.outroot+tid+'.asap'
00175
00176 result = sdsmooth(infile=self.infile,outfile=outfile)
00177 self.assertEqual(result,None,
00178 msg="The task returned '"+str(result)+"' instead of None")
00179
00180 refdic = {'linemax': 1.9679156541824341,
00181 'linemaxpos': 3049.0,
00182 'lineeqw': 61.521829228644677,
00183 'baserms': 0.17469978332519531}
00184 testval = self._getStats(outfile)
00185 self._compareDictVal(testval, refdic)
00186
00187 def test02(self):
00188 """Test 2: kernel = 'boxcar'"""
00189 tid="02"
00190 outfile = self.outroot+tid+'.asap'
00191 kernel = 'boxcar'
00192
00193 result =sdsmooth(infile=self.infile,outfile=outfile,kernel=kernel)
00194 self.assertEqual(result,None,
00195 msg="The task returned '"+str(result)+"' instead of None")
00196
00197 refdic = {'linemax': 1.9409093856811523,
00198 'linemaxpos': 3048.0,
00199 'lineeqw': 62.508279566880944,
00200 'baserms': 0.15886218845844269}
00201 testval = self._getStats(outfile)
00202 self._compareDictVal(testval, refdic)
00203
00204 def test03(self):
00205 """Test 3: kernel = 'boxcar' + kwidth = 16"""
00206 tid="03"
00207 outfile = self.outroot+tid+'.asap'
00208 kernel = 'boxcar'
00209 kwidth = 16
00210
00211 result =sdsmooth(infile=self.infile,outfile=outfile,
00212 kernel=kernel,kwidth=kwidth)
00213 self.assertEqual(result,None,
00214 msg="The task returned '"+str(result)+"' instead of None")
00215
00216 refdic = {'linemax': 1.8286358118057251,
00217 'linemaxpos': 3047.0,
00218 'lineeqw': 65.946254391136108,
00219 'baserms': 0.096009217202663422}
00220 testval = self._getStats(outfile)
00221 self._compareDictVal(testval, refdic)
00222
00223
00224 def test04(self):
00225 """Test 4: kernel = 'gaussian'"""
00226 tid="04"
00227 outfile = self.outroot+tid+'.asap'
00228 kernel = 'gaussian'
00229
00230 result =sdsmooth(infile=self.infile,outfile=outfile,kernel=kernel)
00231 self.assertEqual(result,None,
00232 msg="The task returned '"+str(result)+"' instead of None")
00233
00234 refdic = {'linemax': 1.9107955694198608,
00235 'linemaxpos': 3049.0,
00236 'lineeqw': 63.315141667417912,
00237 'baserms': 0.14576216042041779}
00238 testval = self._getStats(outfile)
00239 self._compareDictVal(testval, refdic)
00240
00241 def test05(self):
00242 """Test 5: kernel = 'gaussian' + kwidth = 16"""
00243 tid="05"
00244 outfile = self.outroot+tid+'.asap'
00245 kernel = 'gaussian'
00246 kwidth = 16
00247
00248 result =sdsmooth(infile=self.infile,outfile=outfile,
00249 kernel=kernel,kwidth=kwidth)
00250 self.assertEqual(result,None,
00251 msg="The task returned '"+str(result)+"' instead of None")
00252
00253 refdic = {'linemax': 1.762944221496582,
00254 'linemaxpos': 3046.0,
00255 'lineeqw': 68.064315277502047,
00256 'baserms': 0.074664078652858734}
00257 testval = self._getStats(outfile)
00258 self._compareDictVal(testval, refdic)
00259
00260 def test06(self):
00261 """Test 6: Test overwrite=True"""
00262 tid="06"
00263 outfile = self.outroot+tid+'.asap'
00264
00265
00266 print "The 1st run of sdsmooth (hanning smooth)"
00267 result1 =sdsmooth(infile=self.infile,outfile=outfile)
00268 self.assertEqual(result1,None,
00269 msg="The task returned '"+str(result1)+"' instead of None for the 1st run")
00270 self.assertTrue(os.path.exists(outfile),
00271 msg="Output file doesn't exist after the 1st run.")
00272
00273
00274
00275 kernel = 'gaussian'
00276 print "The 2nd run of sdsmooth (OVERWRITE with Gaussian smooth)"
00277 result2 =sdsmooth(infile=self.infile,outfile=outfile,
00278 kernel=kernel,overwrite=True)
00279 self.assertEqual(result2,None,
00280 msg="The task returned '"+str(result2)+"' instead of None for the 2nd run")
00281 self.assertTrue(os.path.exists(outfile),
00282 msg="Output file doesn't exist after the 2nd run.")
00283
00284
00285 refdic = {'linemax': 1.9107955694198608,
00286 'linemaxpos': 3049.0,
00287 'lineeqw': 63.315141667417912,
00288 'baserms': 0.14576216042041779}
00289 testval = self._getStats(outfile)
00290 self._compareDictVal(testval, refdic)
00291
00292
00293 def test07(self):
00294 """Test 7: kernel = 'regrid' + chanwidth in channel unit"""
00295 tid="07"
00296 unit='channel'
00297 scan = sd.scantable(self.infile, average=False)
00298 oldunit = scan.get_unit()
00299 scan.set_unit(unit)
00300 nch_old = scan.nchan(scan.getif(0))
00301 scan.set_unit(oldunit)
00302 del scan
00303
00304 outfile = self.outroot+tid+'.asap'
00305 kernel = 'regrid'
00306 chanwidth = str(self.regridw)
00307
00308 result =sdsmooth(infile=self.infile,outfile=outfile,
00309 kernel=kernel,chanwidth=chanwidth)
00310 self.assertEqual(result,None,
00311 msg="The task returned '"+str(result)+"' instead of None")
00312
00313 refdic = {'linemax': 1.9440968036651611,
00314 'linemaxpos': 762.0,
00315 'lineeqw': 15.466022935853511,
00316 'baserms': 0.16496795415878296}
00317 refsc = {"ch0": 44049935561.916985,
00318 "incr": 24416.931915037214}
00319 testval = self._getStats(outfile,self.reglinech,self.regbasech)
00320 self._compareDictVal(testval, refdic)
00321
00322 scan = sd.scantable(outfile, average=False)
00323 scan.set_unit("Hz")
00324 sc_new = scan._getabcissa(0)
00325 del scan
00326 nch_new = len(sc_new)
00327 ch0_new = sc_new[0]
00328 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00329
00330 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00331 self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00332 self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00333
00334
00335 def test08(self):
00336 """Test 8: kernel = 'regrid' + chanwidth in frequency unit"""
00337 tid="08"
00338 unit="Hz"
00339
00340 scan = sd.scantable(self.infile, average=False)
00341 oldunit = scan.get_unit()
00342 scan.set_unit(unit)
00343 nch_old = scan.nchan(scan.getif(0))
00344 fx = scan._getabcissa(0)
00345 chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00346 scan.set_unit(oldunit)
00347 del scan
00348 chw_new = chw_old*self.regridw
00349
00350 outfile = self.outroot+tid+'.asap'
00351 kernel = 'regrid'
00352 chanwidth = qa.tos(qa.quantity(chw_new,unit))
00353
00354 result =sdsmooth(infile=self.infile,outfile=outfile,
00355 kernel=kernel,chanwidth=chanwidth)
00356 self.assertEqual(result,None,
00357 msg="The task returned '"+str(result)+"' instead of None")
00358
00359 refdic = {'linemax': 1.9440968036651611,
00360 'linemaxpos': 762.0,
00361 'lineeqw': 15.466022935853511,
00362 'baserms': 0.16496795415878296}
00363 refsc = {"ch0": 44049935561.916985,
00364 "incr": 24416.931914787496}
00365 testval = self._getStats(outfile,self.reglinech,self.regbasech)
00366 self._compareDictVal(testval, refdic)
00367
00368 scan = sd.scantable(outfile, average=False)
00369 scan.set_unit("Hz")
00370 sc_new = scan._getabcissa(0)
00371 del scan
00372 nch_new = len(sc_new)
00373 ch0_new = sc_new[0]
00374 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00375
00376 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00377 self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00378 self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00379
00380
00381 def test09(self):
00382 """Test 9: kernel = 'regrid' + chanwidth in velocity unit"""
00383 tid="09"
00384 unit="km/s"
00385
00386 scan = sd.scantable(self.infile, average=False)
00387 oldunit = scan.get_unit()
00388 scan.set_unit(unit)
00389 nch_old = scan.nchan(scan.getif(0))
00390 fx = scan._getabcissa(0)
00391 chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00392 scan.set_unit(oldunit)
00393 del scan
00394 chw_new = chw_old*self.regridw
00395
00396 outfile = self.outroot+tid+'.asap'
00397 kernel = 'regrid'
00398 chanwidth = qa.tos(qa.quantity(chw_new,unit))
00399
00400 result =sdsmooth(infile=self.infile,outfile=outfile,
00401 kernel=kernel,chanwidth=chanwidth)
00402 self.assertEqual(result,None,
00403 msg="The task returned '"+str(result)+"' instead of None")
00404
00405 refdic = {'linemax': 1.9440964460372925,
00406 'linemaxpos': 762.0,
00407 'lineeqw': 15.466027743114028,
00408 'baserms': 0.16496789455413818}
00409 refsc = {"ch0": 44049935561.917,
00410 "incr": 24416.931942994266}
00411 testval = self._getStats(outfile,self.reglinech,self.regbasech)
00412 self._compareDictVal(testval, refdic)
00413
00414 scan = sd.scantable(outfile, average=False)
00415 scan.set_unit("Hz")
00416 sc_new = scan._getabcissa(0)
00417 del scan
00418 nch_new = len(sc_new)
00419 ch0_new = sc_new[0]
00420 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00421
00422 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00423 self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00424 self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00425
00426 def test10(self):
00427 """Test 10: kernel = 'regrid' + negative chanwidth in channel unit"""
00428 tid="10"
00429 unit='channel'
00430 scan = sd.scantable(self.infile, average=False)
00431 oldunit = scan.get_unit()
00432 scan.set_unit(unit)
00433 nch_old = scan.nchan(scan.getif(0))
00434 scan.set_unit(oldunit)
00435 del scan
00436
00437 outfile = self.outroot+tid+'.asap'
00438 kernel = 'regrid'
00439 chanwidth = str(-self.regridw)
00440
00441 result =sdsmooth(infile=self.infile,outfile=outfile,
00442 kernel=kernel,chanwidth=chanwidth)
00443 self.assertEqual(result,None,
00444 msg="The task returned '"+str(result)+"' instead of None")
00445
00446 refdic = {'linemax': 1.9440968036651611,
00447 'linemaxpos': 1285.0,
00448 'lineeqw': 15.466022935853511,
00449 'baserms': 0.16496795415878296}
00450 refsc = {"ch0": 44099917021.547066,
00451 "incr": -24416.931915037214}
00452 nnew = int(numpy.ceil(nch_old/float(self.regridw)))
00453 reglinech = []
00454 regbasech = []
00455 for chans in self.reglinech:
00456 reglinech.append([nnew-1-chans[0],nnew-1-chans[1]])
00457 for chans in self.regbasech:
00458 regbasech.append([nnew-1-chans[0],nnew-1-chans[1]])
00459 testval = self._getStats(outfile,reglinech,regbasech)
00460 self._compareDictVal(testval, refdic)
00461
00462 scan = sd.scantable(outfile, average=False)
00463 scan.set_unit("Hz")
00464 sc_new = scan._getabcissa(0)
00465 del scan
00466 nch_new = len(sc_new)
00467 ch0_new = sc_new[0]
00468 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00469
00470 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00471 self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00472 self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00473
00474 def test11(self):
00475 """Test11: kernel = 'regrid' + negative chanwidth in frequency unit"""
00476 tid="11"
00477 unit="Hz"
00478
00479 scan = sd.scantable(self.infile, average=False)
00480 oldunit = scan.get_unit()
00481 scan.set_unit(unit)
00482 nch_old = scan.nchan(scan.getif(0))
00483 fx = scan._getabcissa(0)
00484 chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00485 scan.set_unit(oldunit)
00486 del scan
00487 chw_new = chw_old*(-self.regridw)
00488
00489 outfile = self.outroot+tid+'.asap'
00490 kernel = 'regrid'
00491 chanwidth = qa.tos(qa.quantity(chw_new,unit))
00492
00493 result =sdsmooth(infile=self.infile,outfile=outfile,
00494 kernel=kernel,chanwidth=chanwidth)
00495 self.assertEqual(result,None,
00496 msg="The task returned '"+str(result)+"' instead of None")
00497
00498 refdic = {'linemax': 1.9440968036651611,
00499 'linemaxpos': 1285.0,
00500 'lineeqw': 15.466022935853511,
00501 'baserms': 0.16496795415878296}
00502 refsc = {"ch0": 44099917021.547066,
00503 "incr": -24416.931914787496}
00504 nnew = int(numpy.ceil(nch_old/float(self.regridw)))
00505 reglinech = []
00506 regbasech = []
00507 for chans in self.reglinech:
00508 reglinech.append([nnew-1-chans[0],nnew-1-chans[1]])
00509 for chans in self.regbasech:
00510 regbasech.append([nnew-1-chans[0],nnew-1-chans[1]])
00511 testval = self._getStats(outfile,reglinech,regbasech)
00512 self._compareDictVal(testval, refdic)
00513
00514 scan = sd.scantable(outfile, average=False)
00515 scan.set_unit("Hz")
00516 sc_new = scan._getabcissa(0)
00517 del scan
00518 nch_new = len(sc_new)
00519 ch0_new = sc_new[0]
00520 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00521
00522 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00523 self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00524 self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00525
00526
00527 def test12(self):
00528 """Test 12: kernel = 'regrid' + positive chanwidth in velocity unit"""
00529 tid="12"
00530 unit="km/s"
00531
00532 scan = sd.scantable(self.infile, average=False)
00533 oldunit = scan.get_unit()
00534 scan.set_unit(unit)
00535 nch_old = scan.nchan(scan.getif(0))
00536 fx = scan._getabcissa(0)
00537 chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00538 scan.set_unit(oldunit)
00539 del scan
00540 chw_new = chw_old*(-self.regridw)
00541
00542 outfile = self.outroot+tid+'.asap'
00543 kernel = 'regrid'
00544 chanwidth = qa.tos(qa.quantity(chw_new,unit))
00545
00546 result =sdsmooth(infile=self.infile,outfile=outfile,
00547 kernel=kernel,chanwidth=chanwidth)
00548 self.assertEqual(result,None,
00549 msg="The task returned '"+str(result)+"' instead of None")
00550
00551 refdic = {'linemax': 1.9440964460372925,
00552 'linemaxpos': 1285.0,
00553 'lineeqw': 15.466027743114028,
00554 'baserms': 0.16496789455413818}
00555 refsc = {"ch0": 44099917021.547066,
00556 "incr": -24416.931942994266}
00557 nnew = int(numpy.ceil(nch_old/float(self.regridw)))
00558 reglinech = []
00559 regbasech = []
00560 for chans in self.reglinech:
00561 reglinech.append([nnew-1-chans[0],nnew-1-chans[1]])
00562 for chans in self.regbasech:
00563 regbasech.append([nnew-1-chans[0],nnew-1-chans[1]])
00564 testval = self._getStats(outfile,reglinech,regbasech)
00565 self._compareDictVal(testval, refdic)
00566
00567 scan = sd.scantable(outfile, average=False)
00568 scan.set_unit("Hz")
00569 sc_new = scan._getabcissa(0)
00570 del scan
00571 nch_new = len(sc_new)
00572 ch0_new = sc_new[0]
00573 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00574
00575 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00576 self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00577 self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00578
00579
00580 class sdsmooth_storageTest( sdsmooth_unittest_base, unittest.TestCase ):
00581 """
00582 Unit tests for task sdsmooth. Test scantable sotrage and insitu
00583 parameters
00584
00585 The list of tests:
00586 testMTsm --- storage = 'memory', insitu = True (smooth)
00587 testMFsm --- storage = 'memory', insitu = False (smooth)
00588 testDTsm --- storage = 'disk', insitu = True (smooth)
00589 testDFsm --- storage = 'disk', insitu = False (smooth)
00590 testMTrg --- storage = 'memory', insitu = True (regrid)
00591 testMFrg --- storage = 'memory', insitu = False (regrid)
00592 testDTrg --- storage = 'disk', insitu = True (regrid)
00593 testDFrg --- storage = 'disk', insitu = False (regrid)
00594
00595 Note on handlings of disk storage:
00596 Returns new table after smooth and regrid.
00597 Returns new table after scantable.convert_flux.
00598 Task script restores unit and frame information.
00599 """
00600
00601 infile = 'OrionS_rawACSmod_if2_calTPave_bl.asap'
00602 outroot = sdsmooth_unittest_base.taskname+'_test'
00603 linechan = [[2951,3088]]
00604 basechan = [[500,2950],[3089,7692]]
00605 regridw = 4
00606 reglinech = [[737,772]]
00607 regbasech = [[125,736],[773,1923]]
00608
00609
00610 smrefdic = {'linemax': 1.8286358118057251,
00611 'linemaxpos': 3047.0,
00612 'lineeqw': 65.946254391136108,
00613 'baserms': 0.096009217202663422}
00614 rgrefdic = {'linemax': 1.9440968036651611,
00615 'linemaxpos': 762.0,
00616 'lineeqw': 15.466022935853511,
00617 'baserms': 0.16496795415878296}
00618 rgrefsc = {"ch0": 44049935561.916985,
00619 "incr": 24416.931914787496}
00620
00621
00622 def setUp( self ):
00623 if os.path.exists(self.infile):
00624 shutil.rmtree(self.infile)
00625 shutil.copytree(self.datapath+self.infile, self.infile)
00626
00627 default(sdsmooth)
00628
00629 def tearDown( self ):
00630 if (os.path.exists(self.infile)):
00631 shutil.rmtree(self.infile)
00632
00633 def testMTsm( self ):
00634 """Storage Test MTsm: kernel = 'boxcar' + kwidth = 16 on storage='memory' and insitu=T"""
00635 tid="MTsm"
00636 outfile = self.outroot+tid+'.asap'
00637 kernel = 'boxcar'
00638 kwidth = 16
00639
00640 sd.rcParams['scantable.storage'] = 'memory'
00641 sd.rcParams['insitu'] = True
00642 print "Running test with storage='%s' and insitu=%s" % \
00643 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00644
00645 initval = self._getStats(self.infile)
00646 result =sdsmooth(infile=self.infile,outfile=outfile,
00647 kernel=kernel,kwidth=kwidth)
00648 self.assertEqual(result,None,
00649 msg="The task returned '"+str(result)+"' instead of None")
00650
00651 print "Comparing INPUT statistics before/after smoothing"
00652 newinval = self._getStats(self.infile)
00653 self._compareDictVal(newinval, initval)
00654
00655 print "Testing OUTPUT statistics"
00656 testval = self._getStats(outfile)
00657 self._compareDictVal(testval, self.smrefdic)
00658
00659 def testMFsm( self ):
00660 """Storage Test MFsm: kernel = 'boxcar' + kwidth = 16 on storage='memory' and insitu=F"""
00661 tid="MFsm"
00662 outfile = self.outroot+tid+'.asap'
00663 kernel = 'boxcar'
00664 kwidth = 16
00665
00666 sd.rcParams['scantable.storage'] = 'memory'
00667 sd.rcParams['insitu'] = False
00668 print "Running test with storage='%s' and insitu=%s" % \
00669 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00670
00671 initval = self._getStats(self.infile)
00672 result =sdsmooth(infile=self.infile,outfile=outfile,
00673 kernel=kernel,kwidth=kwidth)
00674 self.assertEqual(result,None,
00675 msg="The task returned '"+str(result)+"' instead of None")
00676
00677 print "Comparing INPUT statistics before/after smoothing"
00678 newinval = self._getStats(self.infile)
00679 self._compareDictVal(newinval, initval)
00680
00681 print "Testing OUTPUT statistics"
00682 testval = self._getStats(outfile)
00683 self._compareDictVal(testval, self.smrefdic)
00684
00685 def testDTsm( self ):
00686 """Storage Test DTsm: kernel = 'boxcar' + kwidth = 16 on storage='disk' and insitu=T"""
00687 tid="DTsm"
00688 outfile = self.outroot+tid+'.asap'
00689 kernel = 'boxcar'
00690 kwidth = 16
00691
00692 sd.rcParams['scantable.storage'] = 'disk'
00693 sd.rcParams['insitu'] = True
00694 print "Running test with storage='%s' and insitu=%s" % \
00695 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00696
00697 initval = self._getStats(self.infile)
00698 result =sdsmooth(infile=self.infile,outfile=outfile,
00699 kernel=kernel,kwidth=kwidth)
00700 self.assertEqual(result,None,
00701 msg="The task returned '"+str(result)+"' instead of None")
00702
00703 print "Comparing INPUT statistics before/after smoothing"
00704 newinval = self._getStats(self.infile)
00705 self._compareDictVal(newinval, initval)
00706
00707 print "Testing OUTPUT statistics"
00708 testval = self._getStats(outfile)
00709 self._compareDictVal(testval, self.smrefdic)
00710
00711 def testDFsm( self ):
00712 """Storage Test DFsm: kernel = 'boxcar' + kwidth = 16 on storage='disk' and insitu=F"""
00713 tid="DFsm"
00714 outfile = self.outroot+tid+'.asap'
00715 kernel = 'boxcar'
00716 kwidth = 16
00717
00718 sd.rcParams['scantable.storage'] = 'disk'
00719 sd.rcParams['insitu'] = False
00720 print "Running test with storage='%s' and insitu=%s" % \
00721 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00722
00723 initval = self._getStats(self.infile)
00724 result =sdsmooth(infile=self.infile,outfile=outfile,
00725 kernel=kernel,kwidth=kwidth)
00726 self.assertEqual(result,None,
00727 msg="The task returned '"+str(result)+"' instead of None")
00728
00729 print "Comparing INPUT statistics before/after smoothing"
00730 newinval = self._getStats(self.infile)
00731 self._compareDictVal(newinval, initval)
00732
00733 print "Testing OUTPUT statistics"
00734 testval = self._getStats(outfile)
00735 self._compareDictVal(testval, self.smrefdic)
00736
00737 def testMTrg( self ):
00738 """Storage Test MTrg: kernel = 'regrid' on storage='memory' and insitu=T"""
00739 tid="MTrg"
00740 unit="Hz"
00741
00742 sd.rcParams['scantable.storage'] = 'memory'
00743 scan = sd.scantable(self.infile, average=False)
00744 oldunit = scan.get_unit()
00745 scan.set_unit(unit)
00746 nch_old = scan.nchan(scan.getif(0))
00747 fx = scan._getabcissa(0)
00748 chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00749 scan.set_unit(oldunit)
00750 del scan
00751 chw_new = chw_old*self.regridw
00752
00753 outfile = self.outroot+tid+'.asap'
00754 kernel = 'regrid'
00755 chanwidth = qa.tos(qa.quantity(chw_new,unit))
00756
00757 sd.rcParams['scantable.storage'] = 'memory'
00758 sd.rcParams['insitu'] = True
00759 print "Running test with storage='%s' and insitu=%s" % \
00760 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00761
00762 initval = self._getStats(self.infile,self.linechan,self.basechan)
00763 result =sdsmooth(infile=self.infile,outfile=outfile,
00764 kernel=kernel,chanwidth=chanwidth)
00765 self.assertEqual(result,None,
00766 msg="The task returned '"+str(result)+"' instead of None")
00767
00768 print "Comparing INPUT statistics before/after smoothing"
00769 newinval = self._getStats(self.infile,self.linechan,self.basechan)
00770 self._compareDictVal(newinval, initval)
00771
00772 print "Testing OUTPUT statistics"
00773 testval = self._getStats(outfile,self.reglinech,self.regbasech)
00774 self._compareDictVal(testval, self.rgrefdic)
00775
00776 sd.rcParams['scantable.storage'] = 'memory'
00777 scan = sd.scantable(outfile, average=False)
00778 scan.set_unit("Hz")
00779 sc_new = scan._getabcissa(0)
00780 del scan
00781 nch_new = len(sc_new)
00782 ch0_new = sc_new[0]
00783 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00784
00785 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00786 self.assertAlmostEqual((ch0_new-self.rgrefsc['ch0'])/self.rgrefsc['ch0'],0.,places=5)
00787 self.assertAlmostEqual((incr_new-self.rgrefsc['incr'])/self.rgrefsc['incr'],0.,places=5)
00788
00789 def testMFrg( self ):
00790 """Storage Test MFrg: kernel = 'regrid' on storage='memory' and insitu=F"""
00791 tid="MFrg"
00792 unit="Hz"
00793
00794 sd.rcParams['scantable.storage'] = 'memory'
00795 scan = sd.scantable(self.infile, average=False)
00796 oldunit = scan.get_unit()
00797 scan.set_unit(unit)
00798 nch_old = scan.nchan(scan.getif(0))
00799 fx = scan._getabcissa(0)
00800 chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00801 scan.set_unit(oldunit)
00802 del scan
00803 chw_new = chw_old*self.regridw
00804
00805 outfile = self.outroot+tid+'.asap'
00806 kernel = 'regrid'
00807 chanwidth = qa.tos(qa.quantity(chw_new,unit))
00808
00809 sd.rcParams['scantable.storage'] = 'memory'
00810 sd.rcParams['insitu'] = False
00811 print "Running test with storage='%s' and insitu=%s" % \
00812 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00813
00814 initval = self._getStats(self.infile,self.linechan,self.basechan)
00815 result =sdsmooth(infile=self.infile,outfile=outfile,
00816 kernel=kernel,chanwidth=chanwidth)
00817 self.assertEqual(result,None,
00818 msg="The task returned '"+str(result)+"' instead of None")
00819
00820 print "Comparing INPUT statistics before/after smoothing"
00821 newinval = self._getStats(self.infile,self.linechan,self.basechan)
00822 self._compareDictVal(newinval, initval)
00823
00824 print "Testing OUTPUT statistics"
00825 testval = self._getStats(outfile,self.reglinech,self.regbasech)
00826 self._compareDictVal(testval, self.rgrefdic)
00827
00828 sd.rcParams['scantable.storage'] = 'memory'
00829 scan = sd.scantable(outfile, average=False)
00830 scan.set_unit("Hz")
00831 sc_new = scan._getabcissa(0)
00832 del scan
00833 nch_new = len(sc_new)
00834 ch0_new = sc_new[0]
00835 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00836
00837 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00838 self.assertAlmostEqual((ch0_new-self.rgrefsc['ch0'])/self.rgrefsc['ch0'],0.,places=5)
00839 self.assertAlmostEqual((incr_new-self.rgrefsc['incr'])/self.rgrefsc['incr'],0.,places=5)
00840
00841 def testDTrg( self ):
00842 """Storage Test DTrg: kernel = 'regrid' on storage='disk' and insitu=T"""
00843 tid="DTrg"
00844 unit="Hz"
00845
00846 sd.rcParams['scantable.storage'] = 'memory'
00847 scan = sd.scantable(self.infile, average=False)
00848 oldunit = scan.get_unit()
00849 scan.set_unit(unit)
00850 nch_old = scan.nchan(scan.getif(0))
00851 fx = scan._getabcissa(0)
00852 chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00853 scan.set_unit(oldunit)
00854 del scan
00855 chw_new = chw_old*self.regridw
00856
00857 outfile = self.outroot+tid+'.asap'
00858 kernel = 'regrid'
00859 chanwidth = qa.tos(qa.quantity(chw_new,unit))
00860
00861 sd.rcParams['scantable.storage'] = 'disk'
00862 sd.rcParams['insitu'] = True
00863 print "Running test with storage='%s' and insitu=%s" % \
00864 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00865
00866 initval = self._getStats(self.infile,self.linechan,self.basechan)
00867 result =sdsmooth(infile=self.infile,outfile=outfile,
00868 kernel=kernel,chanwidth=chanwidth)
00869 self.assertEqual(result,None,
00870 msg="The task returned '"+str(result)+"' instead of None")
00871
00872 print "Comparing INPUT statistics before/after smoothing"
00873 newinval = self._getStats(self.infile,self.linechan,self.basechan)
00874 self._compareDictVal(newinval, initval)
00875
00876 print "Testing OUTPUT statistics"
00877 testval = self._getStats(outfile,self.reglinech,self.regbasech)
00878 self._compareDictVal(testval, self.rgrefdic)
00879
00880 sd.rcParams['scantable.storage'] = 'memory'
00881 scan = sd.scantable(outfile, average=False)
00882 scan.set_unit("Hz")
00883 sc_new = scan._getabcissa(0)
00884 del scan
00885 nch_new = len(sc_new)
00886 ch0_new = sc_new[0]
00887 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00888
00889 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00890 self.assertAlmostEqual((ch0_new-self.rgrefsc['ch0'])/self.rgrefsc['ch0'],0.,places=5)
00891 self.assertAlmostEqual((incr_new-self.rgrefsc['incr'])/self.rgrefsc['incr'],0.,places=5)
00892
00893 def testDFrg( self ):
00894 """Storage Test DFrg: kernel = 'regrid' on storage='disk' and insitu=F"""
00895 tid="DFrg"
00896 unit="Hz"
00897
00898 sd.rcParams['scantable.storage'] = 'memory'
00899 scan = sd.scantable(self.infile, average=False)
00900 oldunit = scan.get_unit()
00901 scan.set_unit(unit)
00902 nch_old = scan.nchan(scan.getif(0))
00903 fx = scan._getabcissa(0)
00904 chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00905 scan.set_unit(oldunit)
00906 del scan
00907 chw_new = chw_old*self.regridw
00908
00909 outfile = self.outroot+tid+'.asap'
00910 kernel = 'regrid'
00911 chanwidth = qa.tos(qa.quantity(chw_new,unit))
00912
00913 sd.rcParams['scantable.storage'] = 'disk'
00914 sd.rcParams['insitu'] = False
00915 print "Running test with storage='%s' and insitu=%s" % \
00916 (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00917
00918 initval = self._getStats(self.infile,self.linechan,self.basechan)
00919 result =sdsmooth(infile=self.infile,outfile=outfile,
00920 kernel=kernel,chanwidth=chanwidth)
00921 self.assertEqual(result,None,
00922 msg="The task returned '"+str(result)+"' instead of None")
00923
00924 print "Comparing INPUT statistics before/after smoothing"
00925 newinval = self._getStats(self.infile,self.linechan,self.basechan)
00926 self._compareDictVal(newinval, initval)
00927
00928 print "Testing OUTPUT statistics"
00929 testval = self._getStats(outfile,self.reglinech,self.regbasech)
00930 self._compareDictVal(testval, self.rgrefdic)
00931
00932 sd.rcParams['scantable.storage'] = 'memory'
00933 scan = sd.scantable(outfile, average=False)
00934 scan.set_unit("Hz")
00935 sc_new = scan._getabcissa(0)
00936 del scan
00937 nch_new = len(sc_new)
00938 ch0_new = sc_new[0]
00939 incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00940
00941 self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00942 self.assertAlmostEqual((ch0_new-self.rgrefsc['ch0'])/self.rgrefsc['ch0'],0.,places=5)
00943 self.assertAlmostEqual((incr_new-self.rgrefsc['incr'])/self.rgrefsc['incr'],0.,places=5)
00944
00945
00946 def suite():
00947 return [sdsmooth_badinputs, sdsmooth_basicTest, sdsmooth_storageTest]