00001 import math
00002 import os
00003 import shutil
00004 from taskinit import *
00005 from tasks import *
00006 from locatescript import copydata
00007
00008 def description():
00009
00010 return "Tests converting the equinoxes (B1950, J2000, etc.) of uvw data with fixvis."
00011
00012 def data():
00013 """As I understand it, this must return the filenames of needed input data."""
00014 return ['0420+417.ms']
00015
00016
00017 def run( fetch=False ):
00018 """Run the tasks and compare the results."""
00019
00020
00021 if fetch:
00022 for f in data( ):
00023 copydata( f, os.getcwd( ) )
00024
00025 nsigma = 3.0
00026 advice = None
00027 lazy = True
00028 equinoxes = ['B1950_VLA', 'B1950', 'J2000']
00029
00030 convert_phasecenter = False
00031 origvisses = data()
00032
00033 passes = True
00034 for origvis in origvisses:
00035 peaks = run_tasks(origvis, advice, lazy, equinoxes, convert_phasecenter)
00036 passes = passes and compare(peaks, nsigma)
00037
00038 if passes:
00039 print ''
00040 print 'Regression PASSED'
00041 print ''
00042
00043 return []
00044
00045 def conv_dirstr(orig_dirstr, newframe):
00046 """
00047 Returns a direction string (suitable for use in clean) made by converting
00048 orig_dirstr to newframe.
00049 """
00050 orig_dirmeas = me.direction(*(orig_dirstr.split()))
00051 converted_dirmeas = me.measure(orig_dirmeas, newframe)
00052 lon = qa.formxxx(converted_dirmeas['m0'], format='hms')
00053 lat = qa.formxxx(converted_dirmeas['m1'], format='dms')
00054 return "%s %s %s" % (converted_dirmeas['refer'], lon, lat)
00055
00056
00057 def run_tasks(origvis='0420+417.ms', advice=None, lazy=True,
00058 equinoxes=['B1950_VLA', 'B1950', 'J2000', 'BMEAN', 'BTRUE',
00059 'APP', 'JMEAN', 'JTRUE'],
00060 convert_phasecenter=False):
00061 """
00062 Fill the peaks dictionary by running fixvis, clean, and imfit for
00063 equinoxes, and the original one.
00064 """
00065 if not advice:
00066 im.open(origvis)
00067 advice = im.advise()
00068 im.close()
00069 origphasecenter = advice[4]
00070
00071 peaks = {}
00072 peaks['original'] = getpeak(origvis, advice)
00073
00074
00075
00076 for equinox in equinoxes:
00077
00078
00079
00080
00081 visparts = origvis.split('.')
00082 visparts.insert(-1, equinox)
00083 outputvis = '.'.join(visparts)
00084
00085 try:
00086 if os.path.isdir(outputvis):
00087 if not lazy:
00088 shutil.rmtree(outputvis)
00089 print "Running fixvis(%s, %s, refcode=%s)" % (origvis,
00090 outputvis,
00091 equinox)
00092 fixvis(origvis, outputvis, refcode=equinox)
00093 else:
00094 print "Running fixvis(%s, %s, refcode=%s)" % (origvis,
00095 outputvis,
00096 equinox)
00097 fixvis(origvis, outputvis, refcode=equinox)
00098
00099 if convert_phasecenter:
00100 advice[4] = conv_dirstr(origphasecenter, equinox)
00101
00102 peaks[equinox] = getpeak(outputvis, advice)
00103
00104 if convert_phasecenter:
00105 advice[4] = origphasecenter
00106
00107 except Exception, e:
00108 print "Error", e, "trying to test equinox", equinox
00109 return peaks
00110
00111 def getpeak(vis, advice):
00112 """Clean vis and find the peak of the resulting image."""
00113 print "Getting peak of", vis
00114 pixsize = str(advice[2]['value']) + advice[2]['unit']
00115 npix = advice[1]
00116
00117 imroot = vis.replace('.ms', '')
00118 for ext in ['image', 'psf', 'residual', 'model', 'flux']:
00119 if os.path.isdir(imroot + '.' + ext):
00120 shutil.rmtree(imroot + '.' + ext)
00121 clean(vis=vis, imagename=imroot,
00122 imsize=[npix, npix],
00123
00124 cell=[pixsize, pixsize],
00125 phasecenter=advice[4])
00126 for ext in ['residual', 'model', 'flux']:
00127 if os.path.isdir(imroot + '.' + ext):
00128 shutil.rmtree(imroot + '.' + ext)
00129
00130
00131
00132 npixm1 = str(npix - 1)
00133 return imfit(imroot + '.image',
00134 box='0,0,' + npixm1 + ',' + npixm1)['results']['component0']
00135
00136 def direction_var(direction):
00137 """
00138 Given a direction dictionary from imfit with estimated latitude and
00139 longitude errors, returns the position variance collapsed down to a
00140 single number, in arcsec**2.
00141 """
00142 dirvars = [qa.convert(direction['error'][way],
00143 'arcsec')['value']**2 for way in ['latitude', 'longitude']]
00144 return sum(dirvars)
00145
00146
00147 def compare(peaks, nsigma=3.0):
00148 origdir = peaks['original']['shape']['direction']
00149 origdirvar = direction_var(origdir)
00150
00151
00152 origflux = qa.convert(peaks['original']['flux'], 'Jy')['value'][0]
00153 origfluxvar = qa.convert({'unit': peaks['original']['flux']['unit'],
00154 'value': peaks['original']['flux']['error'][0]},
00155 'Jy')['value']**2
00156
00157 equinoxes = peaks.keys()
00158 equinoxes.remove('original')
00159 equinoxes.sort()
00160 result = True
00161 for equinox in equinoxes:
00162 try:
00163 dist_from_original = me.separation(origdir,
00164 peaks[equinox]['shape']['direction'])
00165 dist_from_original = qa.convert(dist_from_original, 'arcsec')
00166 peaks[equinox]['shape']['direction']['dist_from_original'] = dist_from_original
00167 dirvar = direction_var(peaks[equinox]['shape']['direction'])
00168 septol = nsigma * math.sqrt(dirvar + origdirvar)
00169 if dist_from_original['value'] > septol:
00170 errmsg = "The %s peak is %.1g\" away from the original peak.\n" % (equinox,
00171 dist_from_original['value'])
00172 errmsg += " (%.1f x the tolerance)" % (dist_from_original['value']
00173 / septol)
00174 raise Exception, errmsg
00175
00176
00177 newflux = qa.convert(peaks[equinox]['flux'], 'Jy')['value'][0]
00178 newfluxvar = qa.convert({'unit': peaks[equinox]['flux']['unit'],
00179 'value': peaks[equinox]['flux']['error'][0]},
00180 'Jy')['value']
00181 fluxtol = nsigma * math.sqrt(newfluxvar + origfluxvar)
00182 if abs(newflux - origflux) > fluxtol:
00183 print "The flux density of %s, %g Jy, is %.1f x the tolerance away from" % (equinox, newflux, abs(newflux - origflux) / fluxtol)
00184 print "the original flux density, %g Jy." % (origflux)
00185
00186
00187 for q in ('majoraxis', 'minoraxis', 'positionangle'):
00188 cmp_shape_param(peaks, equinox, q, nsigma)
00189 except Exception, e:
00190 result = False
00191 print "Error", e, "comparing to test equinox", equinox
00192
00193 return result
00194
00195
00196 def cmp_shape_param(peaks, equinox, q, nsigma):
00197 """
00198 Compare shape parameter q ('majoraxis', 'minoraxis', or 'positionangle') in
00199 peaks[equinox] to the originals.
00200 """
00201 qerr = q + 'error'
00202 unit = peaks['original']['shape'][qerr]['unit']
00203
00204 origq = qa.convert(peaks['original']['shape'][q], unit)['value']
00205 newq = qa.convert(peaks[equinox]['shape'][q], unit)['value']
00206
00207 origvar = peaks['original']['shape'][qerr]['value']**2
00208 newvar = qa.convert(peaks[equinox]['shape'][qerr], unit)['value']**2
00209
00210 tol = nsigma * math.sqrt(origvar + newvar)
00211 err = abs(newq - origq)
00212 if err > tol:
00213 print "The %s of %s, %g %s, is %.1f x the tolerance away from" % (q,
00214 equinox,
00215 newq,
00216 unit,
00217 err / tol)
00218 print "the original %s, %g %s." % (q, origq, unit)