00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008 import sha
00009 import time
00010 import numpy
00011 import re
00012 import string
00013
00014 from sdimprocess import sdimprocess
00015 import asap as sd
00016
00017
00018
00019
00020
00021
00022
00023
00024 class sdimprocess_unittest_base:
00025 """
00026 Base class for sdimprocess unit test
00027 """
00028 taskname='sdimprocess'
00029 datapath=os.environ.get('CASAPATH').split()[0] + '/data/regression/unittest/sdimprocess/'
00030
00031 def _checkfile( self, name ):
00032 isthere=os.path.exists(name)
00033 self.assertEqual(isthere,True,
00034 msg='output file %s was not created because of the task failure'%(name))
00035
00036 def _checkstats(self,name,ref):
00037 self._checkfile(name)
00038 ia.open(name)
00039 stats=ia.statistics()
00040 ia.close()
00041 for key in stats.keys():
00042
00043 message='statistics \'%s\' does not match'%(key)
00044 if type(stats[key])==str:
00045 self.assertEqual(stats[key],ref[key],
00046 msg=message)
00047 else:
00048
00049 ret=numpy.allclose(stats[key],ref[key])
00050 self.assertEqual(ret,True,
00051 msg=message)
00052
00053
00054
00055
00056 class sdimprocess_test0(unittest.TestCase,sdimprocess_unittest_base):
00057 """
00058 Test on bad parameter setting
00059 """
00060
00061 rawfiles=['scan_x.im','scan_y.im']
00062 prefix=sdimprocess_unittest_base.taskname+'Test0'
00063 outfile=prefix+'.im'
00064
00065 def setUp(self):
00066 self.res=None
00067 for name in self.rawfiles:
00068 if (not os.path.exists(name)):
00069 shutil.copytree(self.datapath+name, name)
00070
00071 default(sdimprocess)
00072
00073 def tearDown(self):
00074 for name in self.rawfiles:
00075 if (os.path.exists(name)):
00076 shutil.rmtree(name)
00077 os.system( 'rm -rf '+self.prefix+'*' )
00078
00079 def test000(self):
00080 """Test 000: Default parameters"""
00081 res=sdimprocess()
00082 self.assertEqual(res,False)
00083
00084 def test001(self):
00085 """Test 001: only 1 image is given for Basket-Weaving"""
00086 try:
00087 res=sdimprocess(infiles=[self.rawfiles[0]],mode='basket',direction=[0.])
00088 self.assertTrue(False,
00089 msg='The task must throw exception')
00090 except Exception, e:
00091 pos=str(e).find('infiles should be a list of input images for Basket-Weaving.')
00092 self.assertNotEqual(pos,-1,
00093 msg='Unexpected exception was thrown: %s'%(str(e)))
00094
00095 def test002(self):
00096 """Test 002: direction is not given for Basket-Weaving"""
00097 try:
00098 res=sdimprocess(infiles=self.rawfiles,mode='basket')
00099 self.assertTrue(False,
00100 msg='The task must throw exception')
00101 except Exception, e:
00102 pos=str(e).find('direction must have at least two different direction.')
00103 self.assertNotEqual(pos,-1,
00104 msg='Unexpected exception was thrown: %s'%(str(e)))
00105
00106 def test003(self):
00107 """Test 003: Multiple images are given for Press"""
00108 try:
00109 res=sdimprocess(infiles=self.rawfiles,mode='press')
00110 self.assertTrue(False,
00111 msg='The task must throw exception')
00112 except Exception, e:
00113 pos=str(e).find('infiles allows only one input file for pressed-out method.')
00114 self.assertNotEqual(pos,-1,
00115 msg='Unexpected exception was thrown: %s'%(str(e)))
00116
00117 def test004(self):
00118 """Test 004: direction is not given for Press"""
00119 try:
00120 res=sdimprocess(infiles=[self.rawfiles[0]],mode='press')
00121 self.assertTrue(False,
00122 msg='The task must throw exception')
00123 except Exception, e:
00124 pos=str(e).find('direction allows only one direction for pressed-out method.')
00125 self.assertNotEqual(pos,-1,
00126 msg='Unexpected exception was thrown: %s'%(str(e)))
00127
00128 def test005(self):
00129 """Test 005: Existing output image file"""
00130 shutil.copytree(self.datapath+self.rawfiles[0], self.outfile)
00131 try:
00132 res=sdimprocess(infiles=self.rawfiles,mode='basket',direction=[0.,90.0],outfile=self.outfile,overwrite=False)
00133 self.assertTrue(False,
00134 msg='The task must throw exception')
00135 except StandardError, e:
00136 pos=str(e).find('%s exists'%(self.outfile))
00137 self.assertNotEqual(pos,-1,
00138 msg='Unexpected exception was thrown: %s'%(str(e)))
00139 except Exception, e:
00140 self.assertTrue(False,
00141 msg='Unexpected exception was thrown: %s'%(str(e)))
00142
00143 def test006(self):
00144 """Test 006: Zero beamsize for Press"""
00145 try:
00146 res=sdimprocess(infiles=[self.rawfiles[0]],mode='press',beamsize=0.0,direction=[0.],outfile=self.outfile,overwrite=True)
00147 self.assertTrue(False,
00148 msg='The task must throw exception')
00149 except StandardError, e:
00150 pos=str(e).find('Gaussian2DParam')
00151 self.assertNotEqual(pos,-1,
00152 msg='Unexpected exception was thrown: %s'%(str(e)))
00153 except Exception, e:
00154 self.assertTrue(False,
00155 msg='Unexpected exception was thrown: %s'%(str(e)))
00156
00157
00158
00159
00160 class sdimprocess_test1(unittest.TestCase,sdimprocess_unittest_base):
00161 """
00162 Test on Pressed method.
00163
00164 Test data, scan_x.im, is artificial data, which is
00165
00166 - 128x128 in R.A. and Dec.
00167 - 1 polarization component (Stokes I)
00168 - 1 spectral channel
00169 - Gaussian source in the center
00170 - 1% random noise
00171 - scanning noise in horizontal direction
00172 - smoothed by Gaussian kernel of FWHM of 10 pixel
00173
00174 """
00175
00176 rawfile='scan_x.im'
00177 prefix=sdimprocess_unittest_base.taskname+'Test1'
00178 outfile=prefix+'.im'
00179 mode='press'
00180
00181 def setUp(self):
00182 self.res=None
00183 if (not os.path.exists(self.rawfile)):
00184 shutil.copytree(self.datapath+self.rawfile, self.rawfile)
00185
00186 default(sdimprocess)
00187
00188 def tearDown(self):
00189 if (os.path.exists(self.rawfile)):
00190 shutil.rmtree(self.rawfile)
00191 os.system( 'rm -rf '+self.prefix+'*' )
00192
00193 def test100(self):
00194 """Test 100: Pressed method using whole pixels"""
00195 res=sdimprocess(infiles=self.rawfile,mode=self.mode,numpoly=2,beamsize=300.0,smoothsize=2.0,direction=0.0,outfile=self.outfile,overwrite=True)
00196 self.assertEqual(res,None,
00197 msg='Any error occurred during imaging')
00198 refstats={'blc': numpy.array([0, 0, 0, 0], dtype=numpy.int32),
00199 'blcf': '00:00:00.000, +00.00.00.000, I, 1.415e+09Hz',
00200 'max': numpy.array([ 0.89482009]),
00201 'maxpos': numpy.array([63, 63, 0, 0], dtype=numpy.int32),
00202 'maxposf': '23:55:47.944, +01.03.00.212, I, 1.415e+09Hz',
00203 'mean': numpy.array([ 0.02900992]),
00204 'medabsdevmed': numpy.array([ 0.00768787]),
00205 'median': numpy.array([ 0.00427032]),
00206 'min': numpy.array([-0.02924964]),
00207 'minpos': numpy.array([ 79, 115, 0, 0], dtype=numpy.int32),
00208 'minposf': '23:54:43.795, +01.55.01.288, I, 1.415e+09Hz',
00209 'npts': numpy.array([ 16384.]),
00210 'quartile': numpy.array([ 0.01577091]),
00211 'rms': numpy.array([ 0.10900906]),
00212 'sigma': numpy.array([ 0.10508127]),
00213 'sum': numpy.array([ 475.29847136]),
00214 'sumsq': numpy.array([ 194.69064919]),
00215 'trc': numpy.array([127, 127, 0, 0], dtype=numpy.int32),
00216 'trcf': '23:51:31.537, +02.07.01.734, I, 1.415e+09Hz'}
00217 self._checkstats(self.outfile,refstats)
00218
00219 def test101(self):
00220 """Test 101: Pressed method with certain threshold"""
00221 res=sdimprocess(infiles=self.rawfile,mode=self.mode,numpoly=2,beamsize=300.0,smoothsize=2.0,direction=0.0,tmax=0.5,tmin=-0.1,outfile=self.outfile,overwrite=True)
00222 self.assertEqual(res,None,
00223 msg='Any error occurred during imaging')
00224 refstats={'blc': numpy.array([0, 0, 0, 0], dtype=numpy.int32),
00225 'blcf': '00:00:00.000, +00.00.00.000, I, 1.415e+09Hz',
00226 'max': numpy.array([ 0.97879869]),
00227 'maxpos': numpy.array([63, 63, 0, 0], dtype=numpy.int32),
00228 'maxposf': '23:55:47.944, +01.03.00.212, I, 1.415e+09Hz',
00229 'mean': numpy.array([ 0.02900992]),
00230 'medabsdevmed': numpy.array([ 0.00766596]),
00231 'median': numpy.array([ 0.00407957]),
00232 'min': numpy.array([-0.04284666]),
00233 'minpos': numpy.array([ 90, 53, 0, 0], dtype=numpy.int32),
00234 'minposf': '23:53:59.916, +00.53.00.126, I, 1.415e+09Hz',
00235 'npts': numpy.array([ 16384.]),
00236 'quartile': numpy.array([ 0.0157865]),
00237 'rms': numpy.array([ 0.11193633]),
00238 'sigma': numpy.array([ 0.10811512]),
00239 'sum': numpy.array([ 475.29845356]),
00240 'sumsq': numpy.array([ 205.28728974]),
00241 'trc': numpy.array([127, 127, 0, 0], dtype=numpy.int32),
00242 'trcf': '23:51:31.537, +02.07.01.734, I, 1.415e+09Hz'}
00243 self._checkstats(self.outfile,refstats)
00244
00245
00246
00247
00248 class sdimprocess_test2(unittest.TestCase,sdimprocess_unittest_base):
00249 """
00250 Test on FFT based Basket-Weaving
00251
00252 Test data, scan_x.im and scan_y.im, are artificial data, which is
00253
00254 - 128x128 in R.A. and Dec.
00255 - 1 polarization component (Stokes I)
00256 - 1 spectral channel
00257 - Gaussian source in the center
00258 - 1% random noise
00259 - scanning noise in horizontal direction (scan_x.im)
00260 or vertical direction (scan_y.im)
00261 - smoothed by Gaussian kernel of FWHM of 10 pixel
00262
00263 """
00264
00265 rawfiles=['scan_x.im','scan_y.im']
00266 prefix=sdimprocess_unittest_base.taskname+'Test2'
00267 outfile=prefix+'.im'
00268 mode='basket'
00269
00270 def setUp(self):
00271 self.res=None
00272 for name in self.rawfiles:
00273 if (not os.path.exists(name)):
00274 shutil.copytree(self.datapath+name, name)
00275
00276 default(sdimprocess)
00277
00278 def tearDown(self):
00279 for name in self.rawfiles:
00280 if (os.path.exists(name)):
00281 shutil.rmtree(name)
00282 os.system( 'rm -rf '+self.prefix+'*' )
00283
00284 def test2000(self):
00285 """Test 200: FFT based Basket-Weaving using whole pixels"""
00286 res=sdimprocess(infiles=self.rawfiles,mode=self.mode,direction=[0.0,90.0],masklist=20.0,outfile=self.outfile,overwrite=True)
00287 refstats={'blc': numpy.array([0, 0, 0, 0], dtype=numpy.int32),
00288 'blcf': '00:00:00.000, +00.00.00.000, I, 1.415e+09Hz',
00289 'max': numpy.array([ 0.92714936]),
00290 'maxpos': numpy.array([64, 64, 0, 0], dtype=numpy.int32),
00291 'maxposf': '23:55:43.941, +01.04.00.222, I, 1.415e+09Hz',
00292 'mean': numpy.array([ 0.02962625]),
00293 'medabsdevmed': numpy.array([ 0.00571492]),
00294 'median': numpy.array([ 0.00429045]),
00295 'min': numpy.array([-0.02618393]),
00296 'minpos': numpy.array([ 56, 107, 0, 0], dtype=numpy.int32),
00297 'minposf': '23:56:15.881, +01.47.01.037, I, 1.415e+09Hz',
00298 'npts': numpy.array([ 16384.]),
00299 'quartile': numpy.array([ 0.01154788]),
00300 'rms': numpy.array([ 0.11236797]),
00301 'sigma': numpy.array([ 0.1083954]),
00302 'sum': numpy.array([ 485.39648429]),
00303 'sumsq': numpy.array([ 206.87355986]),
00304 'trc': numpy.array([127, 127, 0, 0], dtype=numpy.int32),
00305 'trcf': '23:51:31.537, +02.07.01.734, I, 1.415e+09Hz'}
00306 self._checkstats(self.outfile,refstats)
00307
00308 def test2001(self):
00309 """Test 201: FFT based Basket-Weaving with certain threshold"""
00310 res=sdimprocess(infiles=self.rawfiles,mode=self.mode,direction=[0.0,90.0],masklist=20.0,tmax=0.5,tmin=-0.1,outfile=self.outfile,overwrite=True)
00311 refstats={'blc': numpy.array([0, 0, 0, 0], dtype=numpy.int32),
00312 'blcf': '00:00:00.000, +00.00.00.000, I, 1.415e+09Hz',
00313 'max': numpy.array([ 0.99387228]),
00314 'maxpos': numpy.array([63, 63, 0, 0], dtype=numpy.int32),
00315 'maxposf': '23:55:47.944, +01.03.00.212, I, 1.415e+09Hz',
00316 'mean': numpy.array([ 0.02962625]),
00317 'medabsdevmed': numpy.array([ 0.00570825]),
00318 'median': numpy.array([ 0.00428429]),
00319 'min': numpy.array([-0.0260052]),
00320 'minpos': numpy.array([ 56, 107, 0, 0], dtype=numpy.int32),
00321 'minposf': '23:56:15.881, +01.47.01.037, I, 1.415e+09Hz',
00322 'npts': numpy.array([ 16384.]),
00323 'quartile': numpy.array([ 0.01155156]),
00324 'rms': numpy.array([ 0.1128579]),
00325 'sigma': numpy.array([ 0.10890324]),
00326 'sum': numpy.array([ 485.39650849]),
00327 'sumsq': numpy.array([ 208.68146098]),
00328 'trc': numpy.array([127, 127, 0, 0], dtype=numpy.int32),
00329 'trcf': '23:51:31.537, +02.07.01.734, I, 1.415e+09Hz'}
00330 self._checkstats(self.outfile,refstats)
00331
00332
00333 def suite():
00334 return [sdimprocess_test0,sdimprocess_test1,sdimprocess_test2]