casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
test_sdsmooth.py
Go to the documentation of this file.
00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008 #
00009 #import listing
00010 import numpy
00011 
00012 import asap as sd
00013 from sdsmooth import sdsmooth
00014 #from sdstat import sdstat
00015 
00016 #
00017 # Unit test of sdsmooth task.
00018 #
00019 
00020 class sdsmooth_unittest_base:
00021     """
00022     Base class for sdsmooth unit test
00023     """
00024     # Data path of input/output
00025     datapath = os.environ.get('CASAPATH').split()[0] + \
00026                '/data/regression/unittest/sdsmooth/'
00027     taskname = "sdsmooth"
00028 
00029     def _checkfile( self, name ):
00030         isthere=os.path.exists(name)
00031         self.assertTrue(isthere,
00032                          msg='output file %s was not created because of the task failure'%(name))
00033 
00034     ### helper function to calculate necessary stat
00035     def _getStats(self,filename, linechan=None, basechan=None):
00036         if not linechan: linechan = self.linechan
00037         if not basechan: basechan = self.basechan
00038         self._checkfile(filename)
00039         scan = sd.scantable(filename,average=False)
00040         # only the first row is calculated
00041         linmsk = scan.create_mask(linechan)
00042         blmsk = scan.create_mask(basechan)
00043         linmax = scan.stats('max',mask=linmsk)[0]
00044         linmaxpos = scan.stats('max_abc',mask=linmsk)[0]
00045         lineqw = scan.stats('sum',mask=linmsk)[0]/linmax
00046         blrms = scan.stats('rms',mask=blmsk)[0]
00047         del scan, linmsk, blmsk
00048         retdic = {'linemax': linmax, 'linemaxpos': linmaxpos,
00049                   'lineeqw': lineqw, 'baserms': blrms}
00050         del linmax, linmaxpos, lineqw, blrms
00051         return retdic
00052 
00053     def _compareDictVal(self,testdict, refdict,places=4):
00054         for stat, refval in refdict.iteritems():
00055             self.assertTrue(testdict.has_key(stat),
00056                             msg = "'%s' is not defined in the current run" % stat)
00057             print "Comparing '%s': %f (current run), %f (reference)" % \
00058                   (stat,testdict[stat],refval)
00059             self.assertAlmostEqual(testdict[stat],refval,places=places)
00060 
00061 
00062 class sdsmooth_badinputs(sdsmooth_unittest_base,unittest.TestCase):
00063     """
00064     Tests on bad input parameter setting
00065     """
00066     inname = 'OrionS_rawACSmod_if2_calTPave_bl.asap'
00067     outname = sdsmooth_unittest_base.taskname+'_bad.asap'
00068 
00069     badname = "bad"
00070     badquant = "5bad"
00071     
00072     def setUp(self):
00073         if os.path.exists(self.inname):
00074             shutil.rmtree(self.inname)
00075         if os.path.exists(self.outname):
00076             shutil.rmtree(self.outname)
00077         shutil.copytree(self.datapath+self.inname, self.inname)
00078 
00079         default(sdsmooth)
00080 
00081     def tearDown(self):
00082         if (os.path.exists(self.inname)):
00083             shutil.rmtree(self.inname)
00084 
00085     # Tests on invalid parameter sets
00086     def test_default(self):
00087         """Test Default parameters (raises an errror)"""
00088         # argument verification error
00089         res = sdsmooth()
00090         self.assertFalse(res)
00091 
00092     def testBad_kernel(self):
00093         """Test bad kernel name"""
00094         kernel = self.badname
00095 
00096         # argument verification error
00097         res = sdsmooth(infile=self.inname,outfile=self.outname,
00098                        kernel=kernel)
00099         self.assertFalse(res)
00100 
00101     def testBad_chanwidth(self):
00102         """Test bad chanwidth"""
00103         kernel = 'regrid'
00104         chanwidth = self.badquant
00105 
00106         try:
00107             res = sdsmooth(infile=self.inname,outfile=self.outname,
00108                            kernel=kernel,chanwidth=chanwidth)
00109             self.assertTrue(False,
00110                             msg='The task must throw exception')
00111         except Exception, e:
00112             pos=str(e).find('Invalid quantity chanwidth 5bad')
00113             self.assertNotEqual(pos,-1,
00114                                 msg='Unexpected exception was thrown: %s'%(str(e)))
00115 
00116 
00117 class sdsmooth_basicTest(sdsmooth_unittest_base,unittest.TestCase):
00118     """
00119     Basic unit tests for task sdsmooth. No interactive testing.
00120 
00121     The list of tests:
00122     test00    --- default parameters (raises an errror)
00123     test01    --- default + valid input filename (hanning smooth)
00124     test02-03 --- kernel = 'boxcar' w/ and w/o kwidth param
00125     test04-05 --- kernel = 'gaussian' w/ and w/o kwidth param
00126     test06    --- overwrite = True
00127     test07,10 --- kernel = 'regrid' with chwidth in channel unit
00128     test08,11 --- kernel = 'regrid' with chwidth in frequency unit
00129     test09,12 --- kernel = 'regrid' with chwidth in velocity unit
00130 
00131     Note: input data is generated from a single dish regression data,
00132     'OrionS_rawACSmod', as follows:
00133       default(sdcal)
00134       sdcal(infile='OrionS_rawACSmod',scanlist=[20,21,22,23],
00135                 calmode='ps',tau=0.09,outfile='temp.asap')
00136       default(sdcal)
00137       sdcal(infile='temp.asap',timeaverage=True,tweight='tintsys',
00138                 polaverage=True,pweight='tsys',outfile='temp2.asap')
00139       default(sdbaseline)
00140       sdbaseline(infile='temp2.asap',iflist=[2],masklist=blchan2,
00141                  maskmode='list',outfile=self.infile)
00142 
00143       The reference stat values can be obtained by
00144       sdstat(infile=outfile,masklist=linechan)
00145       sdstat(infile=outfile,masklist=basechan)
00146     """
00147     ### TODO:
00148     ### - need checking for flag application
00149     ### - comparison with simple and known spectral shape
00150 
00151     # Input and output names
00152     infile = 'OrionS_rawACSmod_if2_calTPave_bl.asap'
00153     outroot = sdsmooth_unittest_base.taskname+'_test'
00154     linechan = [[2951,3088]]
00155     basechan = [[500,2950],[3089,7692]]
00156     regridw = 4
00157     reglinech = [[737,772]]
00158     regbasech = [[125,736],[773,1923]]
00159 
00160     def setUp(self):
00161         if os.path.exists(self.infile):
00162             shutil.rmtree(self.infile)
00163         shutil.copytree(self.datapath+self.infile, self.infile)
00164 
00165         default(sdsmooth)
00166 
00167     def tearDown(self):
00168         if (os.path.exists(self.infile)):
00169             shutil.rmtree(self.infile)
00170 
00171     def test01(self):
00172         """Test 1: Default parameters + valid input filename (hanning smooth)"""
00173         tid="01"
00174         outfile = self.outroot+tid+'.asap'
00175 
00176         result = sdsmooth(infile=self.infile,outfile=outfile)
00177         self.assertEqual(result,None,
00178                          msg="The task returned '"+str(result)+"' instead of None")
00179         ### reference values ( ASAP r2084 + CASA r14498)
00180         refdic = {'linemax': 1.9679156541824341,
00181                   'linemaxpos': 3049.0,
00182                   'lineeqw': 61.521829228644677,
00183                   'baserms': 0.17469978332519531}
00184         testval = self._getStats(outfile)
00185         self._compareDictVal(testval, refdic)
00186 
00187     def test02(self):
00188         """Test 2: kernel = 'boxcar'"""
00189         tid="02"
00190         outfile = self.outroot+tid+'.asap'
00191         kernel = 'boxcar'
00192 
00193         result =sdsmooth(infile=self.infile,outfile=outfile,kernel=kernel)
00194         self.assertEqual(result,None,
00195                          msg="The task returned '"+str(result)+"' instead of None")
00196         ### reference values ( ASAP r2084 + CASA r14498)
00197         refdic = {'linemax': 1.9409093856811523,
00198                   'linemaxpos': 3048.0,
00199                   'lineeqw': 62.508279566880944,
00200                   'baserms': 0.15886218845844269}
00201         testval = self._getStats(outfile)
00202         self._compareDictVal(testval, refdic)
00203 
00204     def test03(self):
00205         """Test 3: kernel = 'boxcar' + kwidth = 16"""
00206         tid="03"
00207         outfile = self.outroot+tid+'.asap'
00208         kernel = 'boxcar'
00209         kwidth = 16
00210 
00211         result =sdsmooth(infile=self.infile,outfile=outfile,
00212                          kernel=kernel,kwidth=kwidth)
00213         self.assertEqual(result,None,
00214                          msg="The task returned '"+str(result)+"' instead of None")
00215         ### reference values ( ASAP r2084 + CASA r14498)
00216         refdic = {'linemax': 1.8286358118057251,
00217                   'linemaxpos': 3047.0,
00218                   'lineeqw': 65.946254391136108,
00219                   'baserms': 0.096009217202663422}
00220         testval = self._getStats(outfile)
00221         self._compareDictVal(testval, refdic)
00222 
00223 
00224     def test04(self):
00225         """Test 4: kernel = 'gaussian'"""
00226         tid="04"
00227         outfile = self.outroot+tid+'.asap'
00228         kernel = 'gaussian'
00229 
00230         result =sdsmooth(infile=self.infile,outfile=outfile,kernel=kernel)
00231         self.assertEqual(result,None,
00232                          msg="The task returned '"+str(result)+"' instead of None")
00233         ### reference values ( ASAP r2084 + CASA r14498)
00234         refdic = {'linemax': 1.9107955694198608,
00235                   'linemaxpos': 3049.0,
00236                   'lineeqw': 63.315141667417912,
00237                   'baserms': 0.14576216042041779}
00238         testval = self._getStats(outfile)
00239         self._compareDictVal(testval, refdic)
00240 
00241     def test05(self):
00242         """Test 5: kernel = 'gaussian' + kwidth = 16"""
00243         tid="05"
00244         outfile = self.outroot+tid+'.asap'
00245         kernel = 'gaussian'
00246         kwidth = 16
00247 
00248         result =sdsmooth(infile=self.infile,outfile=outfile,
00249                          kernel=kernel,kwidth=kwidth)
00250         self.assertEqual(result,None,
00251                          msg="The task returned '"+str(result)+"' instead of None")
00252         ### reference values ( ASAP r2084 + CASA r14498)
00253         refdic = {'linemax': 1.762944221496582,
00254                   'linemaxpos': 3046.0,
00255                   'lineeqw': 68.064315277502047,
00256                   'baserms': 0.074664078652858734}
00257         testval = self._getStats(outfile)
00258         self._compareDictVal(testval, refdic)
00259 
00260     def test06(self):
00261         """Test 6: Test overwrite=True"""
00262         tid="06"
00263         outfile = self.outroot+tid+'.asap'
00264 
00265         # the fist run with hanning smooth
00266         print "The 1st run of sdsmooth (hanning smooth)"
00267         result1 =sdsmooth(infile=self.infile,outfile=outfile)
00268         self.assertEqual(result1,None,
00269                          msg="The task returned '"+str(result1)+"' instead of None for the 1st run")
00270         self.assertTrue(os.path.exists(outfile),
00271                         msg="Output file doesn't exist after the 1st run.")
00272 
00273         # overwrite 'outfile'
00274         # the second run with kernel = 'gaussian'
00275         kernel = 'gaussian'
00276         print "The 2nd run of sdsmooth (OVERWRITE with Gaussian smooth)"
00277         result2 =sdsmooth(infile=self.infile,outfile=outfile,
00278                           kernel=kernel,overwrite=True)
00279         self.assertEqual(result2,None,
00280                          msg="The task returned '"+str(result2)+"' instead of None for the 2nd run")
00281         self.assertTrue(os.path.exists(outfile),
00282                         msg="Output file doesn't exist after the 2nd run.")
00283         # Should be the same value as test04
00284         ### reference values ( ASAP r2084 + CASA r14498)
00285         refdic = {'linemax': 1.9107955694198608,
00286                   'linemaxpos': 3049.0,
00287                   'lineeqw': 63.315141667417912,
00288                   'baserms': 0.14576216042041779}
00289         testval = self._getStats(outfile)
00290         self._compareDictVal(testval, refdic)
00291 
00292 
00293     def test07(self):
00294         """Test 7: kernel = 'regrid' + chanwidth in channel unit"""
00295         tid="07"
00296         unit='channel'
00297         scan = sd.scantable(self.infile, average=False)
00298         oldunit = scan.get_unit()
00299         scan.set_unit(unit)
00300         nch_old = scan.nchan(scan.getif(0))
00301         scan.set_unit(oldunit)
00302         del scan
00303         
00304         outfile = self.outroot+tid+'.asap'
00305         kernel = 'regrid'
00306         chanwidth = str(self.regridw)
00307 
00308         result =sdsmooth(infile=self.infile,outfile=outfile,
00309                          kernel=kernel,chanwidth=chanwidth)
00310         self.assertEqual(result,None,
00311                          msg="The task returned '"+str(result)+"' instead of None")
00312         ### reference values ( ASAP r2435 + CASA r18782)
00313         refdic = {'linemax': 1.9440968036651611,
00314                   'linemaxpos': 762.0,
00315                   'lineeqw': 15.466022935853511,
00316                   'baserms': 0.16496795415878296}
00317         refsc = {"ch0": 44049935561.916985,
00318                  "incr": 24416.931915037214}
00319         testval = self._getStats(outfile,self.reglinech,self.regbasech)
00320         self._compareDictVal(testval, refdic)
00321         # spectral coordinate check
00322         scan = sd.scantable(outfile, average=False)
00323         scan.set_unit("Hz")
00324         sc_new = scan._getabcissa(0)
00325         del scan
00326         nch_new = len(sc_new)
00327         ch0_new = sc_new[0]
00328         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00329         
00330         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00331         self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00332         self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00333 
00334 
00335     def test08(self):
00336         """Test 8: kernel = 'regrid' + chanwidth in frequency unit"""
00337         tid="08"
00338         unit="Hz"
00339         # get channel number and width in input data
00340         scan = sd.scantable(self.infile, average=False)
00341         oldunit = scan.get_unit()
00342         scan.set_unit(unit)
00343         nch_old = scan.nchan(scan.getif(0))
00344         fx = scan._getabcissa(0)
00345         chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00346         scan.set_unit(oldunit)
00347         del scan
00348         chw_new = chw_old*self.regridw
00349         
00350         outfile = self.outroot+tid+'.asap'
00351         kernel = 'regrid'
00352         chanwidth = qa.tos(qa.quantity(chw_new,unit))
00353 
00354         result =sdsmooth(infile=self.infile,outfile=outfile,
00355                          kernel=kernel,chanwidth=chanwidth)
00356         self.assertEqual(result,None,
00357                          msg="The task returned '"+str(result)+"' instead of None")
00358         ### reference values ( ASAP r2435 + CASA r18782)
00359         refdic = {'linemax': 1.9440968036651611,
00360                   'linemaxpos': 762.0,
00361                   'lineeqw': 15.466022935853511,
00362                   'baserms': 0.16496795415878296}
00363         refsc = {"ch0": 44049935561.916985,
00364                  "incr": 24416.931914787496}
00365         testval = self._getStats(outfile,self.reglinech,self.regbasech)
00366         self._compareDictVal(testval, refdic)
00367         # spectral coordinate check
00368         scan = sd.scantable(outfile, average=False)
00369         scan.set_unit("Hz")
00370         sc_new = scan._getabcissa(0)
00371         del scan
00372         nch_new = len(sc_new)
00373         ch0_new = sc_new[0]
00374         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00375         
00376         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00377         self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00378         self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00379 
00380 
00381     def test09(self):
00382         """Test 9: kernel = 'regrid' + chanwidth in velocity unit"""
00383         tid="09"
00384         unit="km/s"
00385         # get channel number and width in input data
00386         scan = sd.scantable(self.infile, average=False)
00387         oldunit = scan.get_unit()
00388         scan.set_unit(unit)
00389         nch_old = scan.nchan(scan.getif(0))
00390         fx = scan._getabcissa(0)
00391         chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00392         scan.set_unit(oldunit)
00393         del scan
00394         chw_new = chw_old*self.regridw
00395 
00396         outfile = self.outroot+tid+'.asap'
00397         kernel = 'regrid'
00398         chanwidth = qa.tos(qa.quantity(chw_new,unit))
00399 
00400         result =sdsmooth(infile=self.infile,outfile=outfile,
00401                          kernel=kernel,chanwidth=chanwidth)
00402         self.assertEqual(result,None,
00403                          msg="The task returned '"+str(result)+"' instead of None")
00404         ### reference values ( ASAP r2435 + CASA r18782)
00405         refdic = {'linemax': 1.9440964460372925,
00406                   'linemaxpos': 762.0,
00407                   'lineeqw': 15.466027743114028,
00408                   'baserms': 0.16496789455413818}
00409         refsc = {"ch0": 44049935561.917,
00410                  "incr": 24416.931942994266}
00411         testval = self._getStats(outfile,self.reglinech,self.regbasech)
00412         self._compareDictVal(testval, refdic)
00413         # spectral coordinate check
00414         scan = sd.scantable(outfile, average=False)
00415         scan.set_unit("Hz")
00416         sc_new = scan._getabcissa(0)
00417         del scan
00418         nch_new = len(sc_new)
00419         ch0_new = sc_new[0]
00420         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00421         
00422         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00423         self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00424         self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00425 
00426     def test10(self):
00427         """Test 10: kernel = 'regrid' + negative chanwidth in channel unit"""
00428         tid="10"
00429         unit='channel'
00430         scan = sd.scantable(self.infile, average=False)
00431         oldunit = scan.get_unit()
00432         scan.set_unit(unit)
00433         nch_old = scan.nchan(scan.getif(0))
00434         scan.set_unit(oldunit)
00435         del scan
00436         
00437         outfile = self.outroot+tid+'.asap'
00438         kernel = 'regrid'
00439         chanwidth = str(-self.regridw)
00440 
00441         result =sdsmooth(infile=self.infile,outfile=outfile,
00442                          kernel=kernel,chanwidth=chanwidth)
00443         self.assertEqual(result,None,
00444                          msg="The task returned '"+str(result)+"' instead of None")
00445         ### reference values ( ASAP r2435 + CASA r18782)
00446         refdic = {'linemax': 1.9440968036651611,
00447                   'linemaxpos': 1285.0,
00448                   'lineeqw': 15.466022935853511,
00449                   'baserms': 0.16496795415878296}
00450         refsc = {"ch0": 44099917021.547066,
00451                  "incr": -24416.931915037214}
00452         nnew = int(numpy.ceil(nch_old/float(self.regridw)))
00453         reglinech = []
00454         regbasech = []
00455         for chans in self.reglinech:
00456             reglinech.append([nnew-1-chans[0],nnew-1-chans[1]])
00457         for chans in self.regbasech:
00458             regbasech.append([nnew-1-chans[0],nnew-1-chans[1]])
00459         testval = self._getStats(outfile,reglinech,regbasech)
00460         self._compareDictVal(testval, refdic)
00461         # spectral coordinate check
00462         scan = sd.scantable(outfile, average=False)
00463         scan.set_unit("Hz")
00464         sc_new = scan._getabcissa(0)
00465         del scan
00466         nch_new = len(sc_new)
00467         ch0_new = sc_new[0]
00468         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00469         
00470         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00471         self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00472         self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00473 
00474     def test11(self):
00475         """Test11: kernel = 'regrid' + negative chanwidth in frequency unit"""
00476         tid="11"
00477         unit="Hz"
00478         # get channel number and width in input data
00479         scan = sd.scantable(self.infile, average=False)
00480         oldunit = scan.get_unit()
00481         scan.set_unit(unit)
00482         nch_old = scan.nchan(scan.getif(0))
00483         fx = scan._getabcissa(0)
00484         chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00485         scan.set_unit(oldunit)
00486         del scan
00487         chw_new = chw_old*(-self.regridw)
00488         
00489         outfile = self.outroot+tid+'.asap'
00490         kernel = 'regrid'
00491         chanwidth = qa.tos(qa.quantity(chw_new,unit))
00492 
00493         result =sdsmooth(infile=self.infile,outfile=outfile,
00494                          kernel=kernel,chanwidth=chanwidth)
00495         self.assertEqual(result,None,
00496                          msg="The task returned '"+str(result)+"' instead of None")
00497         ### reference values ( ASAP r2435 + CASA r18782)
00498         refdic = {'linemax': 1.9440968036651611,
00499                   'linemaxpos': 1285.0,
00500                   'lineeqw': 15.466022935853511,
00501                   'baserms': 0.16496795415878296}
00502         refsc = {"ch0": 44099917021.547066,
00503                  "incr": -24416.931914787496}
00504         nnew = int(numpy.ceil(nch_old/float(self.regridw)))
00505         reglinech = []
00506         regbasech = []
00507         for chans in self.reglinech:
00508             reglinech.append([nnew-1-chans[0],nnew-1-chans[1]])
00509         for chans in self.regbasech:
00510             regbasech.append([nnew-1-chans[0],nnew-1-chans[1]])
00511         testval = self._getStats(outfile,reglinech,regbasech)
00512         self._compareDictVal(testval, refdic)
00513         # spectral coordinate check
00514         scan = sd.scantable(outfile, average=False)
00515         scan.set_unit("Hz")
00516         sc_new = scan._getabcissa(0)
00517         del scan
00518         nch_new = len(sc_new)
00519         ch0_new = sc_new[0]
00520         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00521         
00522         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00523         self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00524         self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00525 
00526 
00527     def test12(self):
00528         """Test 12: kernel = 'regrid' + positive chanwidth in velocity unit"""
00529         tid="12"
00530         unit="km/s"
00531         # get channel number and width in input data
00532         scan = sd.scantable(self.infile, average=False)
00533         oldunit = scan.get_unit()
00534         scan.set_unit(unit)
00535         nch_old = scan.nchan(scan.getif(0))
00536         fx = scan._getabcissa(0)
00537         chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00538         scan.set_unit(oldunit)
00539         del scan
00540         chw_new = chw_old*(-self.regridw)
00541 
00542         outfile = self.outroot+tid+'.asap'
00543         kernel = 'regrid'
00544         chanwidth = qa.tos(qa.quantity(chw_new,unit))
00545 
00546         result =sdsmooth(infile=self.infile,outfile=outfile,
00547                          kernel=kernel,chanwidth=chanwidth)
00548         self.assertEqual(result,None,
00549                          msg="The task returned '"+str(result)+"' instead of None")
00550         ### reference values ( ASAP r2435 + CASA r18782)
00551         refdic = {'linemax': 1.9440964460372925,
00552                   'linemaxpos': 1285.0,
00553                   'lineeqw': 15.466027743114028,
00554                   'baserms': 0.16496789455413818}
00555         refsc = {"ch0": 44099917021.547066,
00556                  "incr": -24416.931942994266}
00557         nnew = int(numpy.ceil(nch_old/float(self.regridw)))
00558         reglinech = []
00559         regbasech = []
00560         for chans in self.reglinech:
00561             reglinech.append([nnew-1-chans[0],nnew-1-chans[1]])
00562         for chans in self.regbasech:
00563             regbasech.append([nnew-1-chans[0],nnew-1-chans[1]])
00564         testval = self._getStats(outfile,reglinech,regbasech)
00565         self._compareDictVal(testval, refdic)
00566         # spectral coordinate check
00567         scan = sd.scantable(outfile, average=False)
00568         scan.set_unit("Hz")
00569         sc_new = scan._getabcissa(0)
00570         del scan
00571         nch_new = len(sc_new)
00572         ch0_new = sc_new[0]
00573         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00574         
00575         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00576         self.assertAlmostEqual((ch0_new-refsc['ch0'])/refsc['ch0'],0.,places=5)
00577         self.assertAlmostEqual((incr_new-refsc['incr'])/refsc['incr'],0.,places=5)
00578 
00579 
00580 class sdsmooth_storageTest( sdsmooth_unittest_base, unittest.TestCase ):
00581     """
00582     Unit tests for task sdsmooth. Test scantable sotrage and insitu
00583     parameters
00584 
00585     The list of tests:
00586     testMTsm  --- storage = 'memory', insitu = True (smooth)
00587     testMFsm  --- storage = 'memory', insitu = False (smooth)
00588     testDTsm  --- storage = 'disk', insitu = True (smooth)
00589     testDFsm  --- storage = 'disk', insitu = False (smooth)
00590     testMTrg  --- storage = 'memory', insitu = True (regrid)
00591     testMFrg  --- storage = 'memory', insitu = False (regrid)
00592     testDTrg  --- storage = 'disk', insitu = True (regrid)
00593     testDFrg  --- storage = 'disk', insitu = False (regrid)
00594 
00595     Note on handlings of disk storage:
00596        Returns new table after smooth and regrid.
00597        Returns new table after scantable.convert_flux.
00598        Task script restores unit and frame information.
00599     """
00600     # Input and output names
00601     infile = 'OrionS_rawACSmod_if2_calTPave_bl.asap'
00602     outroot = sdsmooth_unittest_base.taskname+'_test'
00603     linechan = [[2951,3088]]
00604     basechan = [[500,2950],[3089,7692]]
00605     regridw = 4
00606     reglinech = [[737,772]]
00607     regbasech = [[125,736],[773,1923]]
00608 
00609     ### reference values ( ASAP r2435 + CASA r18782)
00610     smrefdic = {'linemax': 1.8286358118057251,
00611                 'linemaxpos': 3047.0,
00612                 'lineeqw': 65.946254391136108,
00613                 'baserms': 0.096009217202663422}
00614     rgrefdic = {'linemax': 1.9440968036651611,
00615                 'linemaxpos': 762.0,
00616                 'lineeqw': 15.466022935853511,
00617                 'baserms': 0.16496795415878296}
00618     rgrefsc = {"ch0": 44049935561.916985,
00619                "incr": 24416.931914787496}
00620 
00621 
00622     def setUp( self ):
00623         if os.path.exists(self.infile):
00624             shutil.rmtree(self.infile)
00625         shutil.copytree(self.datapath+self.infile, self.infile)
00626 
00627         default(sdsmooth)
00628 
00629     def tearDown( self ):
00630         if (os.path.exists(self.infile)):
00631             shutil.rmtree(self.infile)
00632 
00633     def testMTsm( self ):
00634         """Storage Test MTsm: kernel = 'boxcar' + kwidth = 16 on storage='memory' and insitu=T"""
00635         tid="MTsm"
00636         outfile = self.outroot+tid+'.asap'
00637         kernel = 'boxcar'
00638         kwidth = 16
00639 
00640         sd.rcParams['scantable.storage'] = 'memory'
00641         sd.rcParams['insitu'] = True
00642         print "Running test with storage='%s' and insitu=%s" % \
00643               (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00644 
00645         initval = self._getStats(self.infile)
00646         result =sdsmooth(infile=self.infile,outfile=outfile,
00647                          kernel=kernel,kwidth=kwidth)
00648         self.assertEqual(result,None,
00649                          msg="The task returned '"+str(result)+"' instead of None")
00650         # Test input data
00651         print "Comparing INPUT statistics before/after smoothing"
00652         newinval = self._getStats(self.infile)
00653         self._compareDictVal(newinval, initval)
00654         # Test output data
00655         print "Testing OUTPUT statistics"
00656         testval = self._getStats(outfile)
00657         self._compareDictVal(testval, self.smrefdic)
00658 
00659     def testMFsm( self ):
00660         """Storage Test MFsm: kernel = 'boxcar' + kwidth = 16 on storage='memory' and insitu=F"""
00661         tid="MFsm"
00662         outfile = self.outroot+tid+'.asap'
00663         kernel = 'boxcar'
00664         kwidth = 16
00665 
00666         sd.rcParams['scantable.storage'] = 'memory'
00667         sd.rcParams['insitu'] = False
00668         print "Running test with storage='%s' and insitu=%s" % \
00669               (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00670 
00671         initval = self._getStats(self.infile)
00672         result =sdsmooth(infile=self.infile,outfile=outfile,
00673                          kernel=kernel,kwidth=kwidth)
00674         self.assertEqual(result,None,
00675                          msg="The task returned '"+str(result)+"' instead of None")
00676         # Test input data
00677         print "Comparing INPUT statistics before/after smoothing"
00678         newinval = self._getStats(self.infile)
00679         self._compareDictVal(newinval, initval)
00680         # Test output data
00681         print "Testing OUTPUT statistics"
00682         testval = self._getStats(outfile)
00683         self._compareDictVal(testval, self.smrefdic)
00684 
00685     def testDTsm( self ):
00686         """Storage Test DTsm: kernel = 'boxcar' + kwidth = 16 on storage='disk' and insitu=T"""
00687         tid="DTsm"
00688         outfile = self.outroot+tid+'.asap'
00689         kernel = 'boxcar'
00690         kwidth = 16
00691 
00692         sd.rcParams['scantable.storage'] = 'disk'
00693         sd.rcParams['insitu'] = True
00694         print "Running test with storage='%s' and insitu=%s" % \
00695               (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00696 
00697         initval = self._getStats(self.infile)
00698         result =sdsmooth(infile=self.infile,outfile=outfile,
00699                          kernel=kernel,kwidth=kwidth)
00700         self.assertEqual(result,None,
00701                          msg="The task returned '"+str(result)+"' instead of None")
00702         # Test input data
00703         print "Comparing INPUT statistics before/after smoothing"
00704         newinval = self._getStats(self.infile)
00705         self._compareDictVal(newinval, initval)
00706         # Test output data
00707         print "Testing OUTPUT statistics"
00708         testval = self._getStats(outfile)
00709         self._compareDictVal(testval, self.smrefdic)
00710 
00711     def testDFsm( self ):
00712         """Storage Test DFsm: kernel = 'boxcar' + kwidth = 16 on storage='disk' and insitu=F"""
00713         tid="DFsm"
00714         outfile = self.outroot+tid+'.asap'
00715         kernel = 'boxcar'
00716         kwidth = 16
00717 
00718         sd.rcParams['scantable.storage'] = 'disk'
00719         sd.rcParams['insitu'] = False
00720         print "Running test with storage='%s' and insitu=%s" % \
00721               (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00722 
00723         initval = self._getStats(self.infile)
00724         result =sdsmooth(infile=self.infile,outfile=outfile,
00725                          kernel=kernel,kwidth=kwidth)
00726         self.assertEqual(result,None,
00727                          msg="The task returned '"+str(result)+"' instead of None")
00728         # Test input data
00729         print "Comparing INPUT statistics before/after smoothing"
00730         newinval = self._getStats(self.infile)
00731         self._compareDictVal(newinval, initval)
00732         # Test output data
00733         print "Testing OUTPUT statistics"
00734         testval = self._getStats(outfile)
00735         self._compareDictVal(testval, self.smrefdic)
00736 
00737     def testMTrg( self ):
00738         """Storage Test MTrg: kernel = 'regrid' on storage='memory' and insitu=T"""
00739         tid="MTrg"
00740         unit="Hz"
00741         # get channel number and width in input data
00742         sd.rcParams['scantable.storage'] = 'memory'
00743         scan = sd.scantable(self.infile, average=False)
00744         oldunit = scan.get_unit()
00745         scan.set_unit(unit)
00746         nch_old = scan.nchan(scan.getif(0))
00747         fx = scan._getabcissa(0)
00748         chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00749         scan.set_unit(oldunit)
00750         del scan
00751         chw_new = chw_old*self.regridw
00752         
00753         outfile = self.outroot+tid+'.asap'
00754         kernel = 'regrid'
00755         chanwidth = qa.tos(qa.quantity(chw_new,unit))
00756 
00757         sd.rcParams['scantable.storage'] = 'memory'
00758         sd.rcParams['insitu'] = True
00759         print "Running test with storage='%s' and insitu=%s" % \
00760               (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00761 
00762         initval = self._getStats(self.infile,self.linechan,self.basechan)
00763         result =sdsmooth(infile=self.infile,outfile=outfile,
00764                          kernel=kernel,chanwidth=chanwidth)
00765         self.assertEqual(result,None,
00766                          msg="The task returned '"+str(result)+"' instead of None")
00767         # Test input data
00768         print "Comparing INPUT statistics before/after smoothing"
00769         newinval = self._getStats(self.infile,self.linechan,self.basechan)
00770         self._compareDictVal(newinval, initval)
00771         # Test output data
00772         print "Testing OUTPUT statistics"
00773         testval = self._getStats(outfile,self.reglinech,self.regbasech)
00774         self._compareDictVal(testval, self.rgrefdic)
00775         # spectral coordinate check
00776         sd.rcParams['scantable.storage'] = 'memory'
00777         scan = sd.scantable(outfile, average=False)
00778         scan.set_unit("Hz")
00779         sc_new = scan._getabcissa(0)
00780         del scan
00781         nch_new = len(sc_new)
00782         ch0_new = sc_new[0]
00783         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00784         
00785         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00786         self.assertAlmostEqual((ch0_new-self.rgrefsc['ch0'])/self.rgrefsc['ch0'],0.,places=5)
00787         self.assertAlmostEqual((incr_new-self.rgrefsc['incr'])/self.rgrefsc['incr'],0.,places=5)
00788 
00789     def testMFrg( self ):
00790         """Storage Test MFrg: kernel = 'regrid' on storage='memory' and insitu=F"""
00791         tid="MFrg"
00792         unit="Hz"
00793         # get channel number and width in input data
00794         sd.rcParams['scantable.storage'] = 'memory'
00795         scan = sd.scantable(self.infile, average=False)
00796         oldunit = scan.get_unit()
00797         scan.set_unit(unit)
00798         nch_old = scan.nchan(scan.getif(0))
00799         fx = scan._getabcissa(0)
00800         chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00801         scan.set_unit(oldunit)
00802         del scan
00803         chw_new = chw_old*self.regridw
00804         
00805         outfile = self.outroot+tid+'.asap'
00806         kernel = 'regrid'
00807         chanwidth = qa.tos(qa.quantity(chw_new,unit))
00808 
00809         sd.rcParams['scantable.storage'] = 'memory'
00810         sd.rcParams['insitu'] = False
00811         print "Running test with storage='%s' and insitu=%s" % \
00812               (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00813 
00814         initval = self._getStats(self.infile,self.linechan,self.basechan)
00815         result =sdsmooth(infile=self.infile,outfile=outfile,
00816                          kernel=kernel,chanwidth=chanwidth)
00817         self.assertEqual(result,None,
00818                          msg="The task returned '"+str(result)+"' instead of None")
00819         # Test input data
00820         print "Comparing INPUT statistics before/after smoothing"
00821         newinval = self._getStats(self.infile,self.linechan,self.basechan)
00822         self._compareDictVal(newinval, initval)
00823         # Test output data
00824         print "Testing OUTPUT statistics"
00825         testval = self._getStats(outfile,self.reglinech,self.regbasech)
00826         self._compareDictVal(testval, self.rgrefdic)
00827         # spectral coordinate check
00828         sd.rcParams['scantable.storage'] = 'memory'
00829         scan = sd.scantable(outfile, average=False)
00830         scan.set_unit("Hz")
00831         sc_new = scan._getabcissa(0)
00832         del scan
00833         nch_new = len(sc_new)
00834         ch0_new = sc_new[0]
00835         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00836         
00837         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00838         self.assertAlmostEqual((ch0_new-self.rgrefsc['ch0'])/self.rgrefsc['ch0'],0.,places=5)
00839         self.assertAlmostEqual((incr_new-self.rgrefsc['incr'])/self.rgrefsc['incr'],0.,places=5)
00840 
00841     def testDTrg( self ):
00842         """Storage Test DTrg: kernel = 'regrid' on storage='disk' and insitu=T"""
00843         tid="DTrg"
00844         unit="Hz"
00845         # get channel number and width in input data
00846         sd.rcParams['scantable.storage'] = 'memory'
00847         scan = sd.scantable(self.infile, average=False)
00848         oldunit = scan.get_unit()
00849         scan.set_unit(unit)
00850         nch_old = scan.nchan(scan.getif(0))
00851         fx = scan._getabcissa(0)
00852         chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00853         scan.set_unit(oldunit)
00854         del scan
00855         chw_new = chw_old*self.regridw
00856         
00857         outfile = self.outroot+tid+'.asap'
00858         kernel = 'regrid'
00859         chanwidth = qa.tos(qa.quantity(chw_new,unit))
00860 
00861         sd.rcParams['scantable.storage'] = 'disk'
00862         sd.rcParams['insitu'] = True
00863         print "Running test with storage='%s' and insitu=%s" % \
00864               (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00865 
00866         initval = self._getStats(self.infile,self.linechan,self.basechan)
00867         result =sdsmooth(infile=self.infile,outfile=outfile,
00868                          kernel=kernel,chanwidth=chanwidth)
00869         self.assertEqual(result,None,
00870                          msg="The task returned '"+str(result)+"' instead of None")
00871         # Test input data
00872         print "Comparing INPUT statistics before/after smoothing"
00873         newinval = self._getStats(self.infile,self.linechan,self.basechan)
00874         self._compareDictVal(newinval, initval)
00875         # Test output data
00876         print "Testing OUTPUT statistics"
00877         testval = self._getStats(outfile,self.reglinech,self.regbasech)
00878         self._compareDictVal(testval, self.rgrefdic)
00879         # spectral coordinate check
00880         sd.rcParams['scantable.storage'] = 'memory'
00881         scan = sd.scantable(outfile, average=False)
00882         scan.set_unit("Hz")
00883         sc_new = scan._getabcissa(0)
00884         del scan
00885         nch_new = len(sc_new)
00886         ch0_new = sc_new[0]
00887         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00888         
00889         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00890         self.assertAlmostEqual((ch0_new-self.rgrefsc['ch0'])/self.rgrefsc['ch0'],0.,places=5)
00891         self.assertAlmostEqual((incr_new-self.rgrefsc['incr'])/self.rgrefsc['incr'],0.,places=5)
00892 
00893     def testDFrg( self ):
00894         """Storage Test DFrg: kernel = 'regrid' on storage='disk' and insitu=F"""
00895         tid="DFrg"
00896         unit="Hz"
00897         # get channel number and width in input data
00898         sd.rcParams['scantable.storage'] = 'memory'
00899         scan = sd.scantable(self.infile, average=False)
00900         oldunit = scan.get_unit()
00901         scan.set_unit(unit)
00902         nch_old = scan.nchan(scan.getif(0))
00903         fx = scan._getabcissa(0)
00904         chw_old = (fx[nch_old-1]-fx[0])/float(nch_old-1)
00905         scan.set_unit(oldunit)
00906         del scan
00907         chw_new = chw_old*self.regridw
00908         
00909         outfile = self.outroot+tid+'.asap'
00910         kernel = 'regrid'
00911         chanwidth = qa.tos(qa.quantity(chw_new,unit))
00912 
00913         sd.rcParams['scantable.storage'] = 'disk'
00914         sd.rcParams['insitu'] = False
00915         print "Running test with storage='%s' and insitu=%s" % \
00916               (sd.rcParams['scantable.storage'], str(sd.rcParams['insitu']))
00917 
00918         initval = self._getStats(self.infile,self.linechan,self.basechan)
00919         result =sdsmooth(infile=self.infile,outfile=outfile,
00920                          kernel=kernel,chanwidth=chanwidth)
00921         self.assertEqual(result,None,
00922                          msg="The task returned '"+str(result)+"' instead of None")
00923         # Test input data
00924         print "Comparing INPUT statistics before/after smoothing"
00925         newinval = self._getStats(self.infile,self.linechan,self.basechan)
00926         self._compareDictVal(newinval, initval)
00927         # Test output data
00928         print "Testing OUTPUT statistics"
00929         testval = self._getStats(outfile,self.reglinech,self.regbasech)
00930         self._compareDictVal(testval, self.rgrefdic)
00931         # spectral coordinate check
00932         sd.rcParams['scantable.storage'] = 'memory'
00933         scan = sd.scantable(outfile, average=False)
00934         scan.set_unit("Hz")
00935         sc_new = scan._getabcissa(0)
00936         del scan
00937         nch_new = len(sc_new)
00938         ch0_new = sc_new[0]
00939         incr_new = (sc_new[nch_new-1]-sc_new[0])/float(nch_new-1)
00940         
00941         self.assertEqual(nch_new,numpy.ceil(nch_old/self.regridw))
00942         self.assertAlmostEqual((ch0_new-self.rgrefsc['ch0'])/self.rgrefsc['ch0'],0.,places=5)
00943         self.assertAlmostEqual((incr_new-self.rgrefsc['incr'])/self.rgrefsc['incr'],0.,places=5)
00944 
00945 
00946 def suite():
00947     return [sdsmooth_badinputs, sdsmooth_basicTest, sdsmooth_storageTest]