00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008
00009 '''
00010 Unit tests for task hanningsmooth. It tests the following parameters:
00011 vis: wrong and correct values
00012 vis: check that output column is created
00013
00014 Other tests: check that theoretical and calculated values of column CORRECTED_DATA
00015 are the same.
00016 '''
00017 class hanningsmooth_test(unittest.TestCase):
00018
00019
00020 msfile = 'ngc5921_ut.ms'
00021 res = None
00022 out = 'hanningsmooth_test'
00023
00024 def setUp(self):
00025 self.res = None
00026 default(hanningsmooth)
00027 datapath = os.environ.get('CASAPATH').split()[0] + '/data/regression/unittest/hanningsmooth/'
00028 shutil.copytree(datapath+self.msfile, self.msfile)
00029
00030 def tearDown(self):
00031 if (os.path.exists(self.msfile)):
00032 os.system('rm -rf ' + self.msfile)
00033 if (os.path.exists('cvelngc.ms')):
00034 os.system('rm -rf cvelngc.ms')
00035 if (os.path.exists(self.out)):
00036 os.system('rm -rf ' + self.out)
00037
00038 shutil.rmtree('mynewms.ms',ignore_errors=True)
00039
00040
00041 def checkcol(self,table,colname):
00042 '''Check if requested column exists'''
00043 tb.open(table)
00044 col = tb.getvarcol(colname)
00045 if ((not col) or (len(col) == 0)):
00046 tb.close()
00047 return False
00048 else:
00049 tb.close()
00050 return True
00051
00052 def calc(self,dataB,data,dataA):
00053 '''Calculate the hanning smooth of each element'''
00054 const0 = 0.25
00055 const1 = 0.5
00056 const2 = 0.25
00057 S = const0*dataB + const1*data + const2*dataA
00058 return S
00059
00060 def getvarcol(self,table,colname):
00061 '''Return the requested column'''
00062 tb.open(table)
00063 col = tb.getvarcol(colname)
00064 tb.close()
00065 return col
00066
00067 def test0(self):
00068 '''Test 0: Default values'''
00069 self.res = hanningsmooth()
00070 self.assertFalse(self.res)
00071
00072 def test1(self):
00073 """Test 1: Wrong input MS should return False"""
00074 msfile = 'badmsfile'
00075 self.res = hanningsmooth(vis=msfile)
00076 self.assertFalse(self.res)
00077
00078 def test2(self):
00079 '''Test 2: Check that output column is created'''
00080 self.res = hanningsmooth(vis=self.msfile, datacolumn='corrected')
00081 self.assertEqual(self.res,None)
00082 self.assertTrue(self.checkcol(self.msfile, 'CORRECTED_DATA'))
00083
00084 def test3(self):
00085 '''Test 3: Theoretical and calculated values should be the same with datacolumn==CORRECTED'''
00086
00087
00088 flag_col = self.getvarcol(self.msfile, 'FLAG')
00089 self.assertTrue(flag_col['r1'][0][0] == [False])
00090 self.assertTrue(flag_col['r1'][0][1] == [False])
00091 self.assertTrue(flag_col['r1'][0][61] == [False])
00092 self.assertTrue(flag_col['r1'][0][62] == [False])
00093
00094 self.res = hanningsmooth(vis=self.msfile, datacolumn='corrected')
00095
00096
00097 flag_col = self.getvarcol(self.msfile, 'FLAG')
00098 self.assertTrue(flag_col['r1'][0][0] == [True])
00099 self.assertTrue(flag_col['r1'][0][1] == [False])
00100 self.assertTrue(flag_col['r1'][0][61] == [False])
00101 self.assertTrue(flag_col['r1'][0][62] == [True])
00102
00103 self.assertTrue(self.checkcol(self.msfile, 'CORRECTED_DATA'))
00104 data_col = self.getvarcol(self.msfile, 'DATA')
00105 corr_col = self.getvarcol(self.msfile, 'CORRECTED_DATA')
00106 nrows = len(corr_col)
00107
00108
00109 max = 1e-05
00110 for i in range(1,nrows,2) :
00111 row = 'r%s'%i
00112
00113 for pol in range(0,2) :
00114
00115 for chan in range(1,62) :
00116
00117 data = data_col[row][pol][chan]
00118 dataB = data_col[row][pol][chan-1]
00119 dataA = data_col[row][pol][chan+1]
00120
00121 Smoothed = self.calc(dataB,data,dataA)
00122 CorData = corr_col[row][pol][chan]
00123
00124
00125 self.assertTrue(abs(CorData-Smoothed) < max )
00126
00127 def test4(self):
00128 '''Test 4: Theoretical and calculated values should be the same with datacolumn==DATA'''
00129
00130
00131 flag_col = self.getvarcol(self.msfile, 'FLAG')
00132 self.assertTrue(flag_col['r1'][0][0] == [False])
00133 self.assertTrue(flag_col['r1'][0][1] == [False])
00134 self.assertTrue(flag_col['r1'][0][61] == [False])
00135 self.assertTrue(flag_col['r1'][0][62] == [False])
00136
00137 data_col = self.getvarcol(self.msfile, 'DATA')
00138 self.res = hanningsmooth(vis=self.msfile,datacolumn='data')
00139 corr_col = self.getvarcol(self.msfile, 'DATA')
00140 nrows = len(corr_col)
00141
00142
00143 flag_col = self.getvarcol(self.msfile, 'FLAG')
00144 self.assertTrue(flag_col['r1'][0][0] == [True])
00145 self.assertTrue(flag_col['r1'][0][1] == [False])
00146 self.assertTrue(flag_col['r1'][0][61] == [False])
00147 self.assertTrue(flag_col['r1'][0][62] == [True])
00148
00149
00150 max = 1e-05
00151 for i in range(1,nrows,2) :
00152 row = 'r%s'%i
00153
00154 for pol in range(0,2) :
00155
00156 for chan in range(1,62) :
00157
00158 data = data_col[row][pol][chan]
00159 dataB = data_col[row][pol][chan-1]
00160 dataA = data_col[row][pol][chan+1]
00161
00162 Smoothed = self.calc(dataB,data,dataA)
00163 CorData = corr_col[row][pol][chan]
00164
00165
00166 self.assertTrue(abs(CorData-Smoothed) < max )
00167
00168 def test5(self):
00169 '''Test 5: Check that output MS is created '''
00170 shutil.rmtree('mynewms.ms',ignore_errors=True)
00171 self.res = hanningsmooth(vis=self.msfile, outputvis='mynewms.ms')
00172 self.assertTrue(self.checkcol('mynewms.ms', 'DATA'))
00173
00174 def test6(self):
00175 '''Test 6: Flagging should be correct with datacolumn==ALL'''
00176 clearcal(vis=self.msfile)
00177
00178
00179 flag_col = self.getvarcol(self.msfile, 'FLAG')
00180 self.assertTrue(flag_col['r1'][0][0] == [False])
00181 self.assertTrue(flag_col['r1'][0][1] == [False])
00182 self.assertTrue(flag_col['r1'][0][61] == [False])
00183 self.assertTrue(flag_col['r1'][0][62] == [False])
00184
00185 self.res = hanningsmooth(vis=self.msfile,datacolumn='all')
00186
00187
00188 flag_col = self.getvarcol(self.msfile, 'FLAG')
00189 self.assertTrue(flag_col['r1'][0][0] == [True])
00190 self.assertTrue(flag_col['r1'][0][1] == [False])
00191 self.assertTrue(flag_col['r1'][0][61] == [False])
00192 self.assertTrue(flag_col['r1'][0][62] == [True])
00193
00194 def test7(self):
00195 '''Test 7: Flagging should be correct when hanning smoothing within cvel (no transform)'''
00196 clearcal(vis=self.msfile)
00197
00198
00199 flag_col = self.getvarcol(self.msfile, 'FLAG')
00200 self.assertTrue(flag_col['r1'][0][0] == [False])
00201 self.assertTrue(flag_col['r1'][0][1] == [False])
00202 self.assertTrue(flag_col['r1'][0][61] == [False])
00203 self.assertTrue(flag_col['r1'][0][62] == [False])
00204
00205 self.res = cvel(vis=self.msfile, outputvis='cvelngc.ms', hanning=True)
00206
00207
00208 flag_col = self.getvarcol('cvelngc.ms', 'FLAG')
00209 self.assertTrue(flag_col['r1'][0][0] == [True])
00210 self.assertTrue(flag_col['r1'][0][1] == [False])
00211 self.assertTrue(flag_col['r1'][0][61] == [False])
00212 self.assertTrue(flag_col['r1'][0][62] == [True])
00213
00214 def test8(self):
00215 '''Test 8: Flagging should be correct when hanning smoothing within cvel (with transform)'''
00216 clearcal(vis=self.msfile)
00217
00218
00219 flag_col = self.getvarcol(self.msfile, 'FLAG')
00220 self.assertTrue(flag_col['r1'][0][0] == [False])
00221 self.assertTrue(flag_col['r1'][0][1] == [False])
00222 self.assertTrue(flag_col['r1'][0][61] == [False])
00223 self.assertTrue(flag_col['r1'][0][62] == [False])
00224
00225 self.res = cvel(vis=self.msfile, outputvis='cvelngc.ms', hanning=True, outframe='cmb')
00226
00227
00228 flag_col = self.getvarcol('cvelngc.ms', 'FLAG')
00229 self.assertTrue(flag_col['r1'][0][0] == [True])
00230 self.assertTrue(flag_col['r1'][0][1] == [False])
00231 self.assertTrue(flag_col['r1'][0][2] == [False])
00232 self.assertTrue(flag_col['r1'][0][60] == [False])
00233 self.assertTrue(flag_col['r1'][0][61] == [True])
00234 self.assertTrue(flag_col['r1'][0][62] == [True])
00235
00236 def suite():
00237 return [hanningsmooth_test]
00238
00239