casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
test_fixvis.py
Go to the documentation of this file.
00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008 
00009 '''
00010 Unit tests for task fixvis.
00011 
00012 Features tested:                                                       
00013   1. Do converted directions in the FIELD table have the right shape? 
00014   2. Does the phase center shifting result in the expected shifts?
00015 
00016 Note: The equinox_vis regression is a more general test of fixvis.
00017 '''
00018 datapath = os.environ.get('CASAPATH').split()[0] + '/data/regression/0420+417/'
00019 inpms = '0420+417.ms'
00020 outms = 'output.ms'
00021 datapath2 = os.environ.get('CASAPATH').split()[0] + '/data/regression/unittest/fixvis/'
00022 inpms2 = 'twocenteredpointsources.ms'
00023 outms2 = 'testx.ms'
00024 
00025 class fixvis_test1(unittest.TestCase):
00026     def setUp(self):
00027         res = None
00028         if not os.path.exists(inpms):
00029             shutil.copytree(datapath + inpms, inpms)
00030         if not os.path.exists(inpms2):
00031             shutil.copytree(datapath2 + inpms2, inpms2)
00032         default(fixvis)
00033         shutil.rmtree(outms, ignore_errors=True)
00034         shutil.rmtree(outms2, ignore_errors=True)
00035         
00036     def tearDown(self):
00037         shutil.rmtree(inpms)
00038         shutil.rmtree(inpms2)
00039         shutil.rmtree(outms, ignore_errors=True)
00040         shutil.rmtree(outms2, ignore_errors=True)
00041         os.system('rm -rf test[yz]*')
00042 
00043     def test1(self):
00044         '''Test1: Do converted directions in the FIELD table have the right shape?'''
00045         refcode = 'J2000'
00046         self.res = fixvis(inpms, outms, refcode=refcode)
00047         tb.open(outms + '/FIELD')
00048         retValue = {'success': True, 'msgs': "", 'error_msgs': '' }
00049 
00050         def record_error(errmsg, retValue):
00051             """Helper function to print and update retValue on an error."""
00052             print "test_fixvis.test1: Error:", errmsg
00053             retValue['success'] = False
00054             retValue['error_msgs'] += errmsg + "\n"
00055 
00056         mscomponents = set(["table.dat",
00057                             "table.f0",
00058                             "table.f1",
00059                             "table.f2",
00060                             "table.f3",
00061                             "table.f4",
00062                             "table.f5",
00063                             "table.f6",
00064                             "table.f7",
00065                             "table.f8",
00066                             "table.f9",
00067                             "table.f10",
00068                             "table.f11",
00069                             "FIELD/table.dat",
00070                             "FIELD/table.f0"])
00071         for name in mscomponents:
00072             if not os.access(outms + "/" + name, os.F_OK):
00073                 record_error(outms + "/" + name + " does not exist.", retValue)
00074 
00075         if retValue['success']:
00076             exp_npoly = 0
00077             try:
00078                 tb.open(outms + '/FIELD')
00079                 npoly = tb.getcell('NUM_POLY', 0)
00080                 if npoly != exp_npoly:
00081                     record_error('FIELD/NUM_POLY[0], ' + str(npoly) + ' != expected '
00082                                  + str(exp_npoly), retValue)
00083                 exp_shape = '[2, ' + str(npoly + 1) + ']'
00084                 for dircol in ('PHASE_DIR', 'DELAY_DIR', 'REFERENCE_DIR'):
00085                     ref = tb.getcolkeywords(dircol)['MEASINFO']['Ref']
00086                     if ref != refcode:
00087                         record_error(dircol + "'s stated frame, " + ref
00088                                      + ', != expected ' + refcode, retValue)
00089                     dirshape = tb.getcolshapestring(dircol)
00090                     if dirshape[0] != exp_shape:
00091                         record_error(dircol + "'s shape, " + dirshape
00092                                      + ', != expected ' + exp_shape)
00093             except:
00094                 record_error('Error: Cannot get FIELD directions.', retValue)
00095             else:
00096                 tb.close()
00097                 
00098         self.assertTrue(retValue['success'])
00099         
00100     def _fixvis_and_get_stats(self, phasecent):
00101         refcode = 'J2000'
00102         shutil.rmtree(outms2, ignore_errors=True)
00103         mystats = ''
00104         try:
00105             self.res = fixvis(inpms2, outms2, field='0', refcode=refcode,
00106                               phasecenter=phasecent)
00107             self.assertTrue(self.res)
00108             mystats = self._get_stats(0, 'testy')
00109         except:
00110             print "*** Unexpected error ***"
00111 
00112         return mystats
00113 
00114     def _get_stats(self, fld, imgname):
00115         os.system('rm -rf ' + imgname + '*')
00116         clean(vis=outms2, imagename=imgname, field=str(fld), niter=10, threshold='0.1mJy',
00117               psfmode='clark', imagermode='csclean', ftmachine='ft', mask=True,
00118               imsize=[128, 128], cell=['0.100000080arcsec', '0.100000080arcsec'],
00119               weighting='natural', uvtaper=True,
00120               outertaper=[''], innertaper=[])
00121         return imstat(imgname + '.image')
00122 
00123     def test2(self):
00124         '''Test2: Apply trivial phase center shift, i.e. none.'''
00125         refcode = 'J2000'
00126         shutil.rmtree(outms2, ignore_errors=True)
00127 
00128         mystats = self._fixvis_and_get_stats('J2000 18h00m02.3092s -29d59m29.9987s')
00129         self.assertTrue(mystats['maxposf'] == '18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00130                         (mystats['maxpos'] == [64, 64, 0, 0]).all())
00131 
00132     def test3(self):
00133         '''Test3: Apply positive phase center shift along DEC.'''
00134         refcode = 'J2000'
00135         shutil.rmtree(outms2, ignore_errors=True)
00136         mystats = self._fixvis_and_get_stats('J2000 18h00m02.3092s -29d59m26.9987s')
00137         self.assertTrue(mystats['maxposf'] == '18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00138                         (mystats['maxpos'] == [64,34,0,0]).all())
00139 
00140     def test4(self):
00141         '''Test4: Apply negative phase center shift along DEC using offset syntax.'''
00142         refcode = 'J2000'
00143         shutil.rmtree(outms2, ignore_errors=True)
00144         mystats = self._fixvis_and_get_stats('0h -0d0m3s')
00145         self.assertTrue(mystats['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00146                         (mystats['maxpos']==[64,94,0,0]).all())
00147 
00148     def test5(self):
00149         '''Test5: Apply positive phase center shift along RA.'''
00150         mystats = self._fixvis_and_get_stats('J2000 18h00m02.5401s -29d59m29.9987s')
00151         self.assertTrue(mystats['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00152                         (mystats['maxpos']==[94,64,0,0]).all())
00153 
00154     def test6(self):
00155         '''Test6: Apply negative phase shift along RA using offset syntax (offset is an angle).'''
00156         mystats = self._fixvis_and_get_stats('-0d0m3s 0deg')
00157         self.assertTrue(mystats['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00158                         (mystats['maxpos']==[34,64,0,0]).all())
00159 
00160     def test7(self):
00161         '''Test7: Apply negative phase shift along RA in field 1 (using offset syntax, offset is a time), no shift in field 0.'''
00162         refcode = 'J2000'
00163         shutil.rmtree(outms2, ignore_errors=True)
00164         os.system('cp -R ' + inpms2 + ' ' + outms2)
00165 
00166         mystats0 = ''
00167         mystats1 = ''
00168         try:
00169             x = qa.div(qa.quantity(-3./3600., 'deg'),
00170                        qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600. # (seconds)
00171             phc = str(x) + 's 0deg'
00172             self.res = fixvis(vis=outms2, outputvis=outms2, field='1', refcode=refcode,
00173                               phasecenter=phc, datacolumn='all')
00174             self.assertTrue(self.res)
00175             mystats0 = self._get_stats(0, 'testy')
00176             mystats1 = self._get_stats(1, 'testz')
00177         except:
00178             print "*** Unexpected error ***"
00179             self.assertFalse(True)
00180                                              
00181         self.assertTrue(mystats0['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00182                         (mystats0['maxpos']==[64,64,0,0]).all() and
00183                         mystats1['maxposf']=='18:00:02.333, -30.59.29.999, I, 2.26e+11Hz' and
00184                         (mystats1['maxpos']==[34,64,0,0]).all())
00185 
00186     def test8(self):
00187         '''Test8: Apply negative phase shift along RA in field 0 (using offset syntax, offset is a time), execise datacolumn par = corrected'''
00188         refcode = 'J2000'
00189         shutil.rmtree(outms2, ignore_errors=True)
00190         os.system('cp -R ' + inpms2 + ' ' + outms2)
00191 
00192         mystats0 = ''
00193         mystats1 = ''
00194         try:
00195             x = qa.div(qa.quantity(-3./3600., 'deg'),
00196                        qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600. # (seconds)
00197             phc = str(x) + 's 0deg'
00198             self.res = fixvis(vis=outms2, outputvis=outms2, field='0', refcode=refcode,
00199                               phasecenter=phc, datacolumn='DATA,CORRECTED')
00200             self.assertTrue(self.res)
00201             mystats0 = self._get_stats(0, 'testy') 
00202 
00203         except:
00204             print "*** Unexpected error ***"
00205             self.assertFalse(True)
00206 
00207         self.assertTrue(mystats0['maxposf']=='18:00:02.307, -29.59.29.999, I, 2.26e+11Hz' and
00208                         (mystats0['maxpos']==[34,64,0,0]).all())
00209 
00210     def test9(self):
00211         '''Test9: Apply negative phase shift along RA in field 0 (using offset syntax, offset is a time), exercise datacolumn parameter.'''
00212         refcode = 'J2000'
00213         shutil.rmtree(outms2, ignore_errors=True)
00214         os.system('cp -R ' + inpms2 + ' ' + outms2)
00215 
00216         mystats0 = ''
00217         mystats1 = ''
00218         try:
00219             x = qa.div(qa.quantity(-3./3600., 'deg'),
00220                        qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600. # (seconds)
00221             phc = str(x) + 's 0deg'
00222             self.res = fixvis(vis=outms2, outputvis=outms2, field='0', refcode=refcode,
00223                               phasecenter=phc, datacolumn='DATA')
00224             self.assertTrue(self.res)
00225             mystats0 = self._get_stats(0, 'testy') # source should still be centered because CORRECTED was not changed
00226                                                    # but the phase center change affects the world coordinates anyway
00227 
00228         except:
00229             print "*** Unexpected error ***"
00230             self.assertFalse(True)
00231 
00232         self.assertTrue(mystats0['maxposf']=='18:00:02.076, -29.59.29.999, I, 2.26e+11Hz' and
00233                         (mystats0['maxpos']==[64,64,0,0]).all())
00234 
00235     def test10(self):
00236         '''Test10: exercise datacolumn parameter - non-existent data column with valid name'''
00237         refcode = 'J2000'
00238         shutil.rmtree(outms2, ignore_errors=True)
00239         os.system('cp -R ' + inpms2 + ' ' + outms2)
00240 
00241         try:
00242             x = qa.div(qa.quantity(-3./3600., 'deg'),
00243                        qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600. # (seconds)
00244             phc = str(x) + 's 0deg'
00245 
00246             print "\nTesting non-existing datacolumn - expected error"
00247             shutil.rmtree('test9tmp.ms', ignore_errors=True)
00248             split(vis=outms2, outputvis='test9tmp.ms', datacolumn='corrected') # split out the unchanged column
00249             shutil.rmtree(outms2)
00250             self.res = fixvis(vis='test9tmp.ms', outputvis=outms2, field='0', refcode=refcode,
00251                               phasecenter=phc, datacolumn='CORRECTED')
00252             self.assertFalse(self.res)
00253 
00254         except:
00255             print "*** Expected error ***"
00256 
00257 
00258     def test11(self):
00259         '''Test11: Apply negative phase shift along RA in field 1 (using offset syntax, offset is a time) with only one data column selected'''
00260         refcode = 'J2000'
00261         shutil.rmtree(outms2, ignore_errors=True)
00262         os.system('cp -R ' + inpms2 + ' ' + outms2)
00263 
00264         mystats0 = ''
00265         mystats1 = ''
00266         try:
00267             x = qa.div(qa.quantity(-3./3600., 'deg'),
00268                        qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600. # (seconds)
00269             phc = str(x) + 's 0deg'
00270             shutil.rmtree('test9tmp.ms', ignore_errors=True)
00271             split(vis=outms2, outputvis='test9tmp.ms', datacolumn='corrected')
00272             shutil.rmtree(outms2)
00273             self.res = fixvis(vis='test9tmp.ms', outputvis=outms2, field='1', refcode=refcode,
00274                               phasecenter=phc, datacolumn='DATA')
00275             self.assertTrue(self.res)
00276             
00277             mystats0 = self._get_stats(0, 'testy')
00278             mystats1 = self._get_stats(1, 'testz')
00279             
00280         except:
00281             print "*** Unexpected error ***"
00282             self.assertFalse(True)
00283 
00284         self.assertTrue(mystats0['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00285                         (mystats0['maxpos']==[64,64,0,0]).all() and
00286                         mystats1['maxposf']=='18:00:02.333, -30.59.29.999, I, 2.26e+11Hz' and
00287                         (mystats1['maxpos']==[34,64,0,0]).all())
00288 
00289     def test12(self):
00290         '''Test12: Apply negative phase shift along RA in field 1 (using offset syntax, offset is a time) w/o scratch columns'''
00291         refcode = 'J2000'
00292         shutil.rmtree(outms2, ignore_errors=True)
00293         os.system('cp -R ' + inpms2 + ' ' + outms2)
00294 
00295         mystats0 = ''
00296         mystats1 = ''
00297         try:
00298             x = qa.div(qa.quantity(-3./3600., 'deg'),
00299                        qa.cos(qa.quantity(31.,'deg')))['value']*24./360.*3600. # (seconds)
00300             phc = str(x) + 's 0deg'
00301             shutil.rmtree('test9tmp.ms', ignore_errors=True)
00302             split(vis=outms2, outputvis='test9tmp.ms', datacolumn='corrected')
00303             shutil.rmtree(outms2)
00304             self.res = fixvis(vis='test9tmp.ms', outputvis=outms2, field='1', refcode=refcode,
00305                               phasecenter=phc)
00306             self.assertTrue(self.res)
00307             
00308             mystats0 = self._get_stats(0, 'testy')
00309             mystats1 = self._get_stats(1, 'testz')
00310             
00311         except:
00312             print "*** Unexpected error ***"
00313             self.assertFalse(True)
00314 
00315         self.assertTrue(mystats0['maxposf']=='18:00:02.309, -29.59.29.999, I, 2.26e+11Hz' and
00316                         (mystats0['maxpos']==[64,64,0,0]).all() and
00317                         mystats1['maxposf']=='18:00:02.333, -30.59.29.999, I, 2.26e+11Hz' and
00318                         (mystats1['maxpos']==[34,64,0,0]).all())
00319 
00320 
00321 
00322 
00323     ## def test_obs_pc(self):
00324     ##     """Prevent phasecenter modification for a subset of obsIDs"""
00325     ##     refcode = 'J2000'
00326     ##     shutil.rmtree(outms2, ignore_errors=True)
00327     ##     self.res = fixvis(inpms2, outms2, field='0', refcode=refcode,
00328     ##                       phasecenter='-0d0m3s 0deg', observation='0')
00329     ##     self.assertFalse(self.res)
00330     
00331 def suite():
00332     return [fixvis_test1]