00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008
00009 '''
00010 Unit tests for task fixvis.
00011
00012 Features tested:
00013 1. Do converted directions in the FIELD table have the right shape?
00014 2. Does the phase center shifting result in the expected shifts?
00015
00016 Note: The equinox_vis regression is a more general test of fixvis.
00017 '''
00018 datapath = os.environ.get('CASAPATH').split()[0] + '/data/regression/0420+417/'
00019 inpms = '0420+417.ms'
00020 outms = 'output.ms'
00021 datapath2 = os.environ.get('CASAPATH').split()[0] + '/data/regression/unittest/fixvis/'
00022 inpms2 = 'twocenteredpointsources.ms'
00023 outms2 = 'testx.ms'
00024
00025 class fixvis_test1(unittest.TestCase):
00026 def setUp(self):
00027 res = None
00028 if not os.path.exists(inpms):
00029 shutil.copytree(datapath + inpms, inpms)
00030 if not os.path.exists(inpms2):
00031 shutil.copytree(datapath2 + inpms2, inpms2)
00032 default(fixvis)
00033 shutil.rmtree(outms, ignore_errors=True)
00034 shutil.rmtree(outms2, ignore_errors=True)
00035
00036 def tearDown(self):
00037 shutil.rmtree(inpms)
00038 shutil.rmtree(inpms2)
00039 shutil.rmtree(outms, ignore_errors=True)
00040 shutil.rmtree(outms2, ignore_errors=True)
00041 os.system('rm -rf test[yz]*')
00042
00043 def test1(self):
00044 '''Test1: Do converted directions in the FIELD table have the right shape?'''
00045 refcode = 'J2000'
00046 self.res = fixvis(inpms, outms, refcode=refcode)
00047 tb.open(outms + '/FIELD')
00048 retValue = {'success': True, 'msgs': "", 'error_msgs': '' }
00049
00050 def record_error(errmsg, retValue):
00051 """Helper function to print and update retValue on an error."""
00052 print "test_fixvis.test1: Error:", errmsg
00053 retValue['success'] = False
00054 retValue['error_msgs'] += errmsg + "\n"
00055
00056 mscomponents = set(["table.dat",
00057 "table.f0",
00058 "table.f1",
00059 "table.f2",
00060 "table.f3",
00061 "table.f4",
00062 "table.f5",
00063 "table.f6",
00064 "table.f7",
00065 "table.f8",
00066 "table.f9",
00067 "table.f10",
00068 "table.f11",
00069 "FIELD/table.dat",
00070 "FIELD/table.f0"])
00071 for name in mscomponents:
00072 if not os.access(outms + "/" + name, os.F_OK):
00073 record_error(outms + "/" + name + " does not exist.", retValue)
00074
00075 if retValue['success']:
00076 exp_npoly = 0
00077 try:
00078 tb.open(outms + '/FIELD')
00079 npoly = tb.getcell('NUM_POLY', 0)
00080 if npoly != exp_npoly:
00081 record_error('FIELD/NUM_POLY[0], ' + str(npoly) + ' != expected '
00082 + str(exp_npoly), retValue)
00083 exp_shape = '[2, ' + str(npoly + 1) + ']'
00084 for dircol in ('PHASE_DIR', 'DELAY_DIR', 'REFERENCE_DIR'):
00085 ref = tb.getcolkeywords(dircol)['MEASINFO']['Ref']
00086 if ref != refcode:
00087 record_error(dircol + "'s stated frame, " + ref
00088 + ', != expected ' + refcode, retValue)
00089 dirshape = tb.getcolshapestring(dircol)
00090 if dirshape[0] != exp_shape:
00091 record_error(dircol + "'s shape, " + dirshape
00092 + ', != expected ' + exp_shape)
00093 except:
00094 record_error('Error: Cannot get FIELD directions.', retValue)
00095 else:
00096 tb.close()
00097
00098 self.assertTrue(retValue['success'])
00099
00100 def _fixvis_and_get_stats(self, phasecent):
00101 refcode = 'J2000'
00102 shutil.rmtree(outms2, ignore_errors=True)
00103 mystats = ''
00104 try:
00105 self.res = fixvis(inpms2, outms2, field='0', refcode=refcode,
00106 phasecenter=phasecent)
00107 self.assertTrue(self.res)
00108 mystats = self._get_stats(0, 'testy')
00109 except:
00110 print "*** Unexpected error ***"
00111
00112 return mystats
00113
00114 def _get_stats(self, fld, imgname):
00115 os.system('rm -rf ' + imgname + '*')
00116 clean(vis=outms2, imagename=imgname, field=str(fld), niter=10, threshold='0.1mJy',
00117 psfmode='clark', imagermode='csclean', ftmachine='ft', mask=True,
00118 imsize=[128, 128], cell=['0.100000080arcsec', '0.100000080arcsec'],
00119 weighting='natural', uvtaper=True,
00120 outertaper=[''], innertaper=[])
00121 return imstat(imgname + '.image')
00122
00123 def test2(self):
00124 '''Test2: Apply trivial phase center shift, i.e. none.'''
00125 refcode = 'J2000'
00126 shutil.rmtree(outms2, ignore_errors=True)
00127
00128 mystats = self._fixvis_and_get_stats('J2000 18h00m02.3092s -29d59m29.9987s')
00129 self.assertTrue(mystats['maxposf'] == '18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00130 (mystats['maxpos'] == [64, 64, 0, 0]).all())
00131
00132 def test3(self):
00133 '''Test3: Apply positive phase center shift along DEC.'''
00134 refcode = 'J2000'
00135 shutil.rmtree(outms2, ignore_errors=True)
00136 mystats = self._fixvis_and_get_stats('J2000 18h00m02.3092s -29d59m26.9987s')
00137 self.assertTrue(mystats['maxposf'] == '18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00138 (mystats['maxpos'] == [64,34,0,0]).all())
00139
00140 def test4(self):
00141 '''Test4: Apply negative phase center shift along DEC using offset syntax.'''
00142 refcode = 'J2000'
00143 shutil.rmtree(outms2, ignore_errors=True)
00144 mystats = self._fixvis_and_get_stats('0h -0d0m3s')
00145 self.assertTrue(mystats['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00146 (mystats['maxpos']==[64,94,0,0]).all())
00147
00148 def test5(self):
00149 '''Test5: Apply positive phase center shift along RA.'''
00150 mystats = self._fixvis_and_get_stats('J2000 18h00m02.5401s -29d59m29.9987s')
00151 self.assertTrue(mystats['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00152 (mystats['maxpos']==[94,64,0,0]).all())
00153
00154 def test6(self):
00155 '''Test6: Apply negative phase shift along RA using offset syntax (offset is an angle).'''
00156 mystats = self._fixvis_and_get_stats('-0d0m3s 0deg')
00157 self.assertTrue(mystats['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00158 (mystats['maxpos']==[34,64,0,0]).all())
00159
00160 def test7(self):
00161 '''Test7: Apply negative phase shift along RA in field 1 (using offset syntax, offset is a time), no shift in field 0.'''
00162 refcode = 'J2000'
00163 shutil.rmtree(outms2, ignore_errors=True)
00164 os.system('cp -R ' + inpms2 + ' ' + outms2)
00165
00166 mystats0 = ''
00167 mystats1 = ''
00168 try:
00169 x = qa.div(qa.quantity(-3./3600., 'deg'),
00170 qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600.
00171 phc = str(x) + 's 0deg'
00172 self.res = fixvis(vis=outms2, outputvis=outms2, field='1', refcode=refcode,
00173 phasecenter=phc, datacolumn='all')
00174 self.assertTrue(self.res)
00175 mystats0 = self._get_stats(0, 'testy')
00176 mystats1 = self._get_stats(1, 'testz')
00177 except:
00178 print "*** Unexpected error ***"
00179 self.assertFalse(True)
00180
00181 self.assertTrue(mystats0['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00182 (mystats0['maxpos']==[64,64,0,0]).all() and
00183 mystats1['maxposf']=='18:00:02.333, -30.59.29.999, I, 2.26e+11Hz' and
00184 (mystats1['maxpos']==[34,64,0,0]).all())
00185
00186 def test8(self):
00187 '''Test8: Apply negative phase shift along RA in field 0 (using offset syntax, offset is a time), execise datacolumn par = corrected'''
00188 refcode = 'J2000'
00189 shutil.rmtree(outms2, ignore_errors=True)
00190 os.system('cp -R ' + inpms2 + ' ' + outms2)
00191
00192 mystats0 = ''
00193 mystats1 = ''
00194 try:
00195 x = qa.div(qa.quantity(-3./3600., 'deg'),
00196 qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600.
00197 phc = str(x) + 's 0deg'
00198 self.res = fixvis(vis=outms2, outputvis=outms2, field='0', refcode=refcode,
00199 phasecenter=phc, datacolumn='DATA,CORRECTED')
00200 self.assertTrue(self.res)
00201 mystats0 = self._get_stats(0, 'testy')
00202
00203 except:
00204 print "*** Unexpected error ***"
00205 self.assertFalse(True)
00206
00207 self.assertTrue(mystats0['maxposf']=='18:00:02.307, -29.59.29.999, I, 2.26e+11Hz' and
00208 (mystats0['maxpos']==[34,64,0,0]).all())
00209
00210 def test9(self):
00211 '''Test9: Apply negative phase shift along RA in field 0 (using offset syntax, offset is a time), exercise datacolumn parameter.'''
00212 refcode = 'J2000'
00213 shutil.rmtree(outms2, ignore_errors=True)
00214 os.system('cp -R ' + inpms2 + ' ' + outms2)
00215
00216 mystats0 = ''
00217 mystats1 = ''
00218 try:
00219 x = qa.div(qa.quantity(-3./3600., 'deg'),
00220 qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600.
00221 phc = str(x) + 's 0deg'
00222 self.res = fixvis(vis=outms2, outputvis=outms2, field='0', refcode=refcode,
00223 phasecenter=phc, datacolumn='DATA')
00224 self.assertTrue(self.res)
00225 mystats0 = self._get_stats(0, 'testy')
00226
00227
00228 except:
00229 print "*** Unexpected error ***"
00230 self.assertFalse(True)
00231
00232 self.assertTrue(mystats0['maxposf']=='18:00:02.076, -29.59.29.999, I, 2.26e+11Hz' and
00233 (mystats0['maxpos']==[64,64,0,0]).all())
00234
00235 def test10(self):
00236 '''Test10: exercise datacolumn parameter - non-existent data column with valid name'''
00237 refcode = 'J2000'
00238 shutil.rmtree(outms2, ignore_errors=True)
00239 os.system('cp -R ' + inpms2 + ' ' + outms2)
00240
00241 try:
00242 x = qa.div(qa.quantity(-3./3600., 'deg'),
00243 qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600.
00244 phc = str(x) + 's 0deg'
00245
00246 print "\nTesting non-existing datacolumn - expected error"
00247 shutil.rmtree('test9tmp.ms', ignore_errors=True)
00248 split(vis=outms2, outputvis='test9tmp.ms', datacolumn='corrected')
00249 shutil.rmtree(outms2)
00250 self.res = fixvis(vis='test9tmp.ms', outputvis=outms2, field='0', refcode=refcode,
00251 phasecenter=phc, datacolumn='CORRECTED')
00252 self.assertFalse(self.res)
00253
00254 except:
00255 print "*** Expected error ***"
00256
00257
00258 def test11(self):
00259 '''Test11: Apply negative phase shift along RA in field 1 (using offset syntax, offset is a time) with only one data column selected'''
00260 refcode = 'J2000'
00261 shutil.rmtree(outms2, ignore_errors=True)
00262 os.system('cp -R ' + inpms2 + ' ' + outms2)
00263
00264 mystats0 = ''
00265 mystats1 = ''
00266 try:
00267 x = qa.div(qa.quantity(-3./3600., 'deg'),
00268 qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600.
00269 phc = str(x) + 's 0deg'
00270 shutil.rmtree('test9tmp.ms', ignore_errors=True)
00271 split(vis=outms2, outputvis='test9tmp.ms', datacolumn='corrected')
00272 shutil.rmtree(outms2)
00273 self.res = fixvis(vis='test9tmp.ms', outputvis=outms2, field='1', refcode=refcode,
00274 phasecenter=phc, datacolumn='DATA')
00275 self.assertTrue(self.res)
00276
00277 mystats0 = self._get_stats(0, 'testy')
00278 mystats1 = self._get_stats(1, 'testz')
00279
00280 except:
00281 print "*** Unexpected error ***"
00282 self.assertFalse(True)
00283
00284 self.assertTrue(mystats0['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00285 (mystats0['maxpos']==[64,64,0,0]).all() and
00286 mystats1['maxposf']=='18:00:02.333, -30.59.29.999, I, 2.26e+11Hz' and
00287 (mystats1['maxpos']==[34,64,0,0]).all())
00288
00289 def test12(self):
00290 '''Test12: Apply negative phase shift along RA in field 1 (using offset syntax, offset is a time) w/o scratch columns'''
00291 refcode = 'J2000'
00292 shutil.rmtree(outms2, ignore_errors=True)
00293 os.system('cp -R ' + inpms2 + ' ' + outms2)
00294
00295 mystats0 = ''
00296 mystats1 = ''
00297 try:
00298 x = qa.div(qa.quantity(-3./3600., 'deg'),
00299 qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600.
00300 phc = str(x) + 's 0deg'
00301 shutil.rmtree('test9tmp.ms', ignore_errors=True)
00302 split(vis=outms2, outputvis='test9tmp.ms', datacolumn='corrected')
00303 shutil.rmtree(outms2)
00304 self.res = fixvis(vis='test9tmp.ms', outputvis=outms2, field='1', refcode=refcode,
00305 phasecenter=phc)
00306 self.assertTrue(self.res)
00307
00308 mystats0 = self._get_stats(0, 'testy')
00309 mystats1 = self._get_stats(1, 'testz')
00310
00311 except:
00312 print "*** Unexpected error ***"
00313 self.assertFalse(True)
00314
00315 self.assertTrue(mystats0['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00316 (mystats0['maxpos']==[64,64,0,0]).all() and
00317 mystats1['maxposf']=='18:00:02.333, -30.59.29.999, I, 2.26e+11Hz' and
00318 (mystats1['maxpos']==[34,64,0,0]).all())
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331 def suite():
00332 return [fixvis_test1]