00001 from tasks import *
00002 from taskinit import *
00003 from __main__ import inp
00004 from __main__ import default
00005 import os
00006 import shutil
00007 import unittest
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 epsilon = 0.0001
00021
00022 fits='ngc5921.fits'
00023 msfile = 'ngc5921.ms'
00024
00025 class visstat_test(unittest.TestCase):
00026 def setUp(self):
00027 if(not os.path.exists(msfile)):
00028 datapath=os.environ.get('CASAPATH').split()[0]+'/data/regression/unittest/visstat/'
00029 shutil.copytree(datapath+msfile, msfile)
00030
00031 flagdata(vis=msfile, antenna='17')
00032
00033 default('visstat')
00034
00035 def tearDown(self):
00036 shutil.rmtree(msfile)
00037 pass
00038
00039 def test1(self):
00040 '''Visstat 1: Default values'''
00041 retValue = {'success': True, 'msgs': "", 'error_msgs': '' }
00042 expected = {'ngc5921.ms':
00043 {'DATA': {'max': 73.75,
00044 'mean': 4.8371031336376049,
00045 'medabsdevmed': 0.045013353228569031,
00046 'median': 0.053559452295303345,
00047 'min': 2.2130521756480448e-05,
00048 'npts': 2660994.0,
00049 'quartile': 0.30496686697006226,
00050 'rms': 17.081207275390625,
00051 'stddev': 16.382008275102407,
00052 'sum': 12871502.415990865,
00053 'sumsq': 776391995.30866611,
00054 'var': 268.37019512552371}}}
00055
00056
00057
00058
00059
00060 s = visstat(vis=msfile, axis='amp', datacolumn='data')
00061
00062 if s.keys() != expected[msfile].keys():
00063 retValue['success']=False
00064 retValue['error_msgs']=retValue['error_msgs']\
00065 +"\nError: Wrong dictionary keys. Expected %s, got %s" % \
00066 (expected[msfile], s)
00067
00068
00069
00070
00071 print "Expected =", expected[msfile]
00072 print "Got =", s
00073 if not s.has_key('DATA'):
00074 retValue['success']=False
00075 retValue['error_msgs']=retValue['error_msgs']\
00076 +"\nError: Dictionary returned from visstat does not have key DATA"
00077
00078 for e in expected[msfile]['DATA'].keys():
00079 print "Checking %s: %s vs %s" % \
00080 (e, expected[msfile]['DATA'][e], s['DATA'][e])
00081 failed = False
00082 if expected[msfile]['DATA'][e] == 0:
00083 if s['DATA'][e] != 0:
00084 failed = True
00085 else:
00086 if abs((expected[msfile]['DATA'][e] - s['DATA'][e])/expected[msfile]['DATA'][e]) > 0.0001:
00087 failed = True
00088 if failed:
00089 retValue['success']=False
00090 retValue['error_msgs']=retValue['error_msgs']\
00091 +"\nError: Numbers differ, expected %s, got %s" % \
00092 (str(expected[msfile]['DATA'][e]), str(s['DATA'][e]))
00093
00094
00095
00096 self.assertTrue(retValue['success'],retValue['error_msgs'])
00097
00098 def test2(self):
00099 '''Visstat 2: Check channel selections'''
00100 retValue = {'success': True, 'msgs': "", 'error_msgs': '' }
00101
00102 for ch in [1, 2, 4, 7, 13, 62]:
00103 for corr in ['ll', 'rr', 'll,rr']:
00104 print "Call with spw='0:1~"+str(ch)+"', correlation="+corr
00105 s = visstat(vis=msfile, axis='amp', datacolumn='data', spw='0:1~'+str(ch), correlation=corr)
00106 print s
00107 n_expected = 2660994/63 * ch
00108 if corr in ['ll', 'rr']:
00109 n_expected /= 2
00110
00111 n = int(s['DATA']['npts'])
00112 print "Checking npts: %s vs %s" % (n, n_expected)
00113 if n != n_expected:
00114 retValue['success']=False
00115 retValue['error_msgs']=retValue['error_msgs']\
00116 +"\nError:"+str(n_expected) + " points expected, but npts = " + str(n)
00117
00118
00119 self.assertTrue(retValue['success'],retValue['error_msgs'])
00120
00121 def test3(self):
00122 '''Visstat 3: Test on different columns and axis'''
00123 retValue = {'success': True, 'msgs': "", 'error_msgs': '' }
00124
00125 print "Create scratch columns."
00126 cb.open(msfile)
00127 cb.close()
00128
00129 tb.open(msfile)
00130 cols = tb.colnames()
00131 tb.close()
00132
00133 cplx = ['amp', 'amplitude', 'phase', 'imag', 'imaginary', 'real']
00134 for x in cplx:
00135 cols.append(x)
00136 print cols
00137 cols.remove('DATA')
00138 have_corr = False
00139 if('CORRECTED_DATA' in cols):
00140 cols.remove('CORRECTED_DATA')
00141 have_corr = True
00142 have_model = False
00143 if('MODEL_DATA' in cols):
00144 cols.remove('MODEL_DATA')
00145 have_model = True
00146 cols.append('UVRANGE')
00147
00148 cols = [x.lower() for x in cols]
00149
00150 print "Trying these column names", cols
00151
00152 for col in cols:
00153 data_cols = ['']
00154 print 'col ',col
00155 print 'cplx ',cplx
00156 if col in cplx:
00157 data_cols = ['data']
00158 if have_corr:
00159 data_cols.append('corrected')
00160 if have_model:
00161 data_cols.append('model')
00162
00163 print data_cols
00164 for dc in data_cols:
00165 print "Call with axis =", col, "; datacolumn =", dc
00166 if dc != '':
00167 s = visstat(vis=msfile, axis=col, datacolumn=dc)
00168 else:
00169 s = visstat(vis=msfile, axis=col)
00170 print "Result was", s
00171 if col.upper() in ["FLAG_CATEGORY", "EXPOSURE", "OBSERVATION_ID", "PROCESSOR_ID", "STATE_ID", "TIME_CENTROID"]:
00172
00173
00174 if s != None:
00175 retValue['success']=False
00176 retValue['error_msgs']=retValue['error_msgs']\
00177 +"\nError: "+str(s)
00178
00179 elif not type(s) is dict:
00180 retValue['success']=False
00181 retValue['error_msgs']=retValue['error_msgs']\
00182 +"\nError: Return value " + str(s) + " is not a dictionary"
00183
00184 else:
00185 if dc == '' and \
00186 not col.upper() in s.keys() and \
00187 not col.upper()+'_0' in s.keys():
00188 retValue['success']=False
00189 retValue['error_msgs']=retValue['error_msgs']\
00190 +"\nError: Missing key " + col.upper() + " in result"
00191
00192
00193 if dc != '' and not dc.upper() in s.keys():
00194 retValue['success']=False
00195 retValue['error_msgs']=retValue['error_msgs']\
00196 +"\nError: Missing key " + dc.upper() + " in result"
00197
00198
00199 print 'retValue[\'success\'] ', retValue['success']
00200 print 'retValue[\'error_msgs\'] ', retValue['error_msgs']
00201 self.assertTrue(retValue['success'],retValue['error_msgs'])
00202
00203 def test4(self):
00204 '''Visstat 4: Test of special cases'''
00205 retValue = {'success': True, 'msgs': "", 'error_msgs': '' }
00206
00207 for a in range(1, 5):
00208 s = visstat(vis=msfile, axis='ANTENNA1', antenna=str(a)+'&26')
00209 print "antenna =", a, "; mean = ", s['ANTENNA1']['mean']
00210
00211
00212
00213 if msfile == 'ngc5921.ms':
00214 offset = 1
00215 else:
00216 offset = 0
00217 if abs((s['ANTENNA1']['mean']+offset) - a) > epsilon:
00218 retValue['success']=False
00219 retValue['error_msgs']=retValue['error_msgs']\
00220 +"\nError: Failed when antenna= "+str(a)+'&26'
00221
00222
00223 for scan in range(1, 8):
00224 s = visstat(vis=msfile, axis='SCAN_NUMBER', scan=str(scan))
00225
00226 print "scan =", scan, "; mean = ", s['SCAN_NUMBER']['mean']
00227 if abs(s['SCAN_NUMBER']['mean'] - scan) > epsilon:
00228 retValue['success']=False
00229 retValue['error_msgs']=retValue['error_msgs']\
00230 +"\nError: Failed for scan = "+str(scan)
00231
00232
00233 self.assertTrue(retValue['success'],retValue['error_msgs'])
00234
00235 def test5(self):
00236 '''Visstat 5: Test that flagging impact statistics'''
00237 retValue = {'success': True, 'msgs': "", 'error_msgs': '' }
00238
00239
00240 s = visstat(vis=msfile, axis='scan_number', scan='>=1')
00241 print "min = ", s['SCAN_NUMBER']['min']
00242 if abs(s['SCAN_NUMBER']['min'] - 1) > epsilon:
00243 retValue['success']=False
00244 retValue['error_msgs']=retValue['error_msgs']\
00245 +"\nError: Failed with visstat(vis=msfile, axis='scan_number', scan='>=1') "
00246
00247
00248 flagdata(vis=msfile, scan="1")
00249 s = visstat(vis=msfile, axis='scan_number', scan='>=1')
00250 print "min = ", s['SCAN_NUMBER']['min']
00251 if abs(s['SCAN_NUMBER']['min'] - 2) > epsilon:
00252 retValue['success']=False
00253 retValue['error_msgs']=retValue['error_msgs']\
00254 +"\nError: Failed with visstat(vis=msfile, axis='scan_number', scan='>=1') "\
00255 +"when data is flagged as flagdata(vis=msfile, scan='1')"
00256
00257
00258 flagdata(vis=msfile, scan="2")
00259 s = visstat(vis=msfile, axis='scan_number', scan='>=1')
00260 print "min = ", s['SCAN_NUMBER']['min']
00261 if abs(s['SCAN_NUMBER']['min'] - 3) > epsilon:
00262 retValue['success']=False
00263 retValue['error_msgs']=retValue['error_msgs']\
00264 +"\nError: Failed with visstat(vis=msfile, axis='scan_number', scan='>=1') "\
00265 +"when data is flagged as flagdata(vis=msfile, scan='2')"
00266
00267
00268 s = visstat(vis=msfile, axis='scan_number', useflags=False, scan='>=1')
00269 print "min = ", s['SCAN_NUMBER']['min']
00270 if abs(s['SCAN_NUMBER']['min'] - 1) > epsilon:
00271 retValue['success']=False
00272 retValue['error_msgs']=retValue['error_msgs']\
00273 +"\nError: Failed with visstat(vis=msfile, axis='scan_number', useflags=False, scan='>=1')"
00274
00275
00276 self.assertTrue(retValue['success'],retValue['error_msgs'])
00277
00278 def test6(self):
00279 '''Visstat 6: Test when all selected rows are flagged'''
00280 flagdata(vis=msfile,mode='manualflag',antenna='1;1&&1')
00281 res = visstat(vis=msfile,antenna='1',useflags=True)
00282 self.assertFalse(res, 'All data are flagged. An exception should have been raised')
00283
00284 class visstat_cleanup(unittest.TestCase):
00285
00286 def setUp(self):
00287 pass
00288
00289 def tearDown(self):
00290
00291 shutil.rmtree(msfile,ignore_errors=True)
00292
00293 def test1a(self):
00294 '''Visstat: Cleanup'''
00295 pass
00296
00297 def suite():
00298 return [visstat_test]
00299
00300