casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
test_simplecluster.py
Go to the documentation of this file.
00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008 import time
00009 from simple_cluster import *
00010 import glob
00011 import multiprocessing
00012 import testhelper
00013 from parallel.parallel_task_helper import ParallelTaskHelper
00014 
00015 class test_simplecluster(unittest.TestCase):
00016 
00017     projectname="test_simplecluster"
00018     clusterfile="test_simplecluster_config.txt"
00019     monitorFile="monitoring.log"
00020     cluster=None
00021 
00022     # Get local host configuration parameters
00023     host=os.uname()[1]
00024     cwd=os.getcwd()
00025     ncpu=multiprocessing.cpu_count()
00026     
00027     # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested 
00028     if os.environ.has_key('BYPASS_SEQUENTIAL_PROCESSING'):
00029         ParallelTaskHelper.bypassParallelProcessing(1)
00030         bypassParallelProcessing = True
00031     else:
00032         bypassParallelProcessing = False
00033         
00034     vis = ""
00035     ref = ""
00036     aux = ""
00037 
00038     def stopCluster(self):
00039         # Stop thread services and cluster
00040         self.cluster.stop_cluster()
00041         # Remove log files, cluster files, and monitoring files
00042         self.cleanUp()
00043 
00044     def cleanUp(self):
00045         logfiles=glob.glob("engine-*.log")
00046         for i in logfiles:
00047             os.remove(i)
00048         if os.path.exists(self.clusterfile):
00049             os.remove(self.clusterfile)
00050         if os.path.exists(self.monitorFile):
00051             os.remove(self.monitorFile)
00052 
00053     def initCluster(self,userMonitorFile="",max_engines=0.,max_memory=0.,memory_per_engine=512.):
00054         # First of all clean up files from previous sessions
00055         self.cleanUp()
00056         # Create cluster object
00057         if (len(userMonitorFile) > 0):
00058             self.cluster = simple_cluster(userMonitorFile)
00059             self.monitorFile = userMonitorFile
00060         else:
00061             self.cluster = simple_cluster()
00062             self.monitorFile = "monitoring.log"
00063         # Create cluster configuration file
00064         self.createClusterFile(max_engines,max_memory,memory_per_engine)
00065         # Initialize cluster object
00066         if (self.cluster.init_cluster(self.clusterfile, self.projectname)):
00067             # Wait unit cluster is producing monitoring info
00068             if (len(userMonitorFile) > 0):
00069                 self.waitForFile(userMonitorFile, 20)
00070             else:
00071                 self.waitForFile('monitoring.log', 20)
00072 
00073     def createClusterFile(self,max_engines=0.,max_memory=0.,memory_per_engine=512.):
00074             
00075         msg=self.host + ', ' + str(max_engines) + ', ' + self.cwd + ', ' + str(max_memory) + ', ' + str(memory_per_engine) 
00076         f=open(self.clusterfile, 'w')
00077         f.write(msg)
00078         f.close()
00079         self.waitForFile(self.clusterfile, 10)
00080 
00081     def waitForFile(self, file, seconds):
00082         for i in range(0,seconds):
00083             if (os.path.isfile(file)):
00084                 return
00085             time.sleep(1)
00086             
00087     def setUpFile(self,file,type_file):
00088         
00089         if type(file) is list:
00090             for file_i in file:
00091                 self.setUpFileCore(file_i,type_file)
00092         else:
00093             self.setUpFileCore(file,type_file)
00094                 
00095         if type_file=='vis':
00096             self.vis = file
00097         elif type_file =='ref':
00098             self.ref = file
00099         elif type_file=='aux':
00100             self.aux = file
00101                 
00102     def setUpFileCore(self,file,type_file):
00103         
00104         if os.path.exists(file):
00105              print "%s file %s is already in the working area, deleting ..." % (type_file,file)
00106              os.system('rm -rf ' + file)
00107         print "Copy %s file %s into the working area..." % (type_file,file)
00108         os.system('cp -r ' + os.environ.get('CASAPATH').split()[0] +
00109                   '/data/regression/unittest/simplecluster/' + file + ' ' + file)
00110     
00111     def create_input(self,str_text, filename):
00112         """Save the string in a text file"""
00113     
00114         inp = filename
00115         cmd = str_text
00116     
00117         # remove file first
00118         if os.path.exists(inp):
00119             os.system('rm -f '+ inp)
00120         
00121         # save to a file    
00122         fid = open(inp, 'w')
00123         fid.write(cmd)
00124         
00125         # close file
00126         fid.close()
00127 
00128         # wait until file is visible for the filesystem
00129         self.waitForFile(filename, 10)
00130     
00131         return
00132 
00133     def test1_defaultCluster(self):
00134         """Test 1: Create a default cluster"""
00135 
00136         # Create cluster file
00137         self.initCluster()
00138 
00139         cluster_list = self.cluster.get_hosts()
00140         self.assertTrue(cluster_list[0][0]==self.host)
00141         self.assertTrue(cluster_list[0][1]<=self.ncpu)
00142         self.assertTrue(cluster_list[0][2]==self.cwd)
00143 
00144         self.stopCluster()
00145         
00146     def test2_availableResourcesCluster(self):
00147         """Test 2: Create a custom cluster to use all the available resources"""
00148 
00149         # Create cluster file
00150         self.initCluster(max_engines=1.,max_memory=1.,memory_per_engine=1024.)
00151 
00152         cluster_list = self.cluster.get_hosts()
00153         self.assertTrue(cluster_list[0][0]==self.host)
00154         self.assertTrue(cluster_list[0][1]<=self.ncpu)
00155         self.assertTrue(cluster_list[0][2]==self.cwd)
00156 
00157         self.stopCluster()        
00158         
00159     def test3_halfCPUCluster(self):
00160         """Test 3: Create a custom cluster to use half of available CPU capacity"""
00161 
00162         # Create cluster file
00163         self.initCluster(max_engines=0.5,max_memory=1.,memory_per_engine=512.)
00164 
00165         cluster_list = self.cluster.get_hosts()
00166         self.assertTrue(cluster_list[0][0]==self.host)
00167         self.assertTrue(cluster_list[0][1]<=int(0.5*self.ncpu))
00168         self.assertTrue(cluster_list[0][2]==self.cwd)
00169 
00170         self.stopCluster()    
00171         
00172     def test3_halfMemoryCluster(self):
00173         """Test 3: Create a custom cluster to use half of available RAM memory"""
00174 
00175         # Create cluster file
00176         self.initCluster(max_engines=1.,max_memory=0.5,memory_per_engine=512.)
00177 
00178         cluster_list = self.cluster.get_hosts()
00179         self.assertTrue(cluster_list[0][0]==self.host)
00180         self.assertTrue(cluster_list[0][2]==self.cwd)
00181 
00182         self.stopCluster()                
00183 
00184     def test4_monitoringDefault(self):
00185         """Test 4: Check default monitoring file exists"""
00186         
00187         # Create cluster file
00188         self.initCluster()
00189             
00190         fid = open('monitoring.log', 'r')
00191         line = fid.readline()
00192         self.assertTrue(line.find('Host')>=0)
00193         self.assertTrue(line.find('Engine')>=0)
00194         self.assertTrue(line.find('Status')>=0)
00195         self.assertTrue(line.find('CPU[%]')>=0)
00196         self.assertTrue(line.find('Memory[%]')>=0)
00197         self.assertTrue(line.find('Time[s]')>=0)
00198         self.assertTrue(line.find('Read[MB]')>=0)
00199         self.assertTrue(line.find('Write[MB]')>=0)
00200         self.assertTrue(line.find('Read[MB/s]')>=0)
00201         self.assertTrue(line.find('Write[MB/s]')>=0)
00202         self.assertTrue(line.find('Job')>=0)
00203         self.assertTrue(line.find('Sub-MS')>=0)
00204 
00205         self.stopCluster()
00206 
00207     def test5_monitoringUser(self):
00208         """Test 5: Check custom monitoring file exists"""
00209         
00210         # Create cluster file
00211         self.initCluster('userMonitorFile.log')
00212 
00213         fid = open('userMonitorFile.log', 'r')
00214         line = fid.readline()
00215         self.assertTrue(line.find('Host')>=0)
00216         self.assertTrue(line.find('Engine')>=0)
00217         self.assertTrue(line.find('Status')>=0)
00218         self.assertTrue(line.find('CPU[%]')>=0)
00219         self.assertTrue(line.find('Memory[%]')>=0)
00220         self.assertTrue(line.find('Time[s]')>=0)
00221         self.assertTrue(line.find('Read[MB]')>=0)
00222         self.assertTrue(line.find('Write[MB]')>=0)
00223         self.assertTrue(line.find('Read[MB/s]')>=0)
00224         self.assertTrue(line.find('Write[MB/s]')>=0)
00225         self.assertTrue(line.find('Job')>=0)
00226         self.assertTrue(line.find('Sub-MS')>=0)
00227 
00228         self.stopCluster()
00229 
00230     def test6_monitoringStandAlone(self):
00231         """Test 6: Check the dict structure of the stand-alone method """
00232         
00233         # Create cluster file
00234         self.initCluster('userMonitorFile.log')
00235                 
00236         state = self.cluster.show_resource(True)
00237         cluster_list = self.cluster.get_hosts()
00238         for engine in range(cluster_list[0][1]):
00239             self.assertTrue(state[self.host][engine].has_key('Status'))
00240             self.assertTrue(state[self.host][engine].has_key('Sub-MS'))
00241             self.assertTrue(state[self.host][engine].has_key('Read'))
00242             self.assertTrue(state[self.host][engine].has_key('Write'))
00243             self.assertTrue(state[self.host][engine].has_key('Job'))
00244             self.assertTrue(state[self.host][engine].has_key('Memory'))
00245             self.assertTrue(state[self.host][engine].has_key('ReadRate'))
00246             self.assertTrue(state[self.host][engine].has_key('WriteRate'))
00247 
00248         self.stopCluster()
00249         
00250     def test7_bypassParallelProcessing(self):
00251         """Test 7: Bypass Parallel Processing mode """        
00252                
00253         simple_cluster.setDefaults(default_mem_per_engine=33554432)
00254         
00255         # Prepare MMS
00256         self.setUpFile("Four_ants_3C286.mms",'vis')
00257         
00258         # Create list file
00259         text = "mode='unflag'\n"\
00260                "mode='clip' clipminmax=[0,0.1]"
00261         filename = 'list_flagdata.txt'
00262         self.create_input(text, filename)
00263 
00264         # step 1: Do unflag+clip
00265         flagdata(vis=self.vis, mode='list', inpfile=filename)
00266 
00267         # step 2: Now do summary
00268         ret_dict = flagdata(vis=self.vis, mode='summary')
00269 
00270         # Check summary
00271         self.assertTrue(ret_dict['name']=='Summary')
00272         self.assertTrue(ret_dict['spw']['15']['flagged'] == 96284.0)
00273         self.assertTrue(ret_dict['spw']['0']['flagged'] == 129711.0)
00274         self.assertTrue(ret_dict['spw']['1']['flagged'] == 128551.0)
00275         self.assertTrue(ret_dict['spw']['2']['flagged'] == 125686.0)
00276         self.assertTrue(ret_dict['spw']['3']['flagged'] == 122862.0)
00277         self.assertTrue(ret_dict['spw']['4']['flagged'] == 109317.0)
00278         self.assertTrue(ret_dict['spw']['5']['flagged'] == 24481.0)
00279         self.assertTrue(ret_dict['spw']['6']['flagged'] == 0)
00280         self.assertTrue(ret_dict['spw']['7']['flagged'] == 0)
00281         self.assertTrue(ret_dict['spw']['8']['flagged'] == 0)
00282         self.assertTrue(ret_dict['spw']['9']['flagged'] == 27422.0)
00283         self.assertTrue(ret_dict['spw']['10']['flagged'] == 124638.0)
00284         self.assertTrue(ret_dict['spw']['11']['flagged'] == 137813.0)
00285         self.assertTrue(ret_dict['spw']['12']['flagged'] == 131896.0)
00286         self.assertTrue(ret_dict['spw']['13']['flagged'] == 125074.0)
00287         self.assertTrue(ret_dict['spw']['14']['flagged'] == 118039.0)    
00288         
00289         # Remove MMS
00290         os.system('rm -rf ' + self.vis)        
00291         
00292         # Restore default values
00293         simple_cluster.setDefaults(default_mem_per_engine=512)
00294         if not self.bypassParallelProcessing:
00295             ParallelTaskHelper.bypassParallelProcessing(0)
00296             
00297     def test8_IgnoreNullSelectionError(self):
00298         """Test 8: Check that NullSelection errors happening for some sub-MSs are ignored  """
00299         """Note: In this test we also check simple_cluster initialization via ParallelTaskHelper  """
00300         
00301         # Prepare MMS
00302         self.setUpFile("Four_ants_3C286.mms",'vis')          
00303         
00304         # Unflag entire MMS
00305         flagdata(vis=self.vis, mode='unflag')
00306         
00307         # Manually flag scan 30
00308         flagdata(vis=self.vis, mode='manual', scan='30')
00309 
00310         # step 2: Now do summary
00311         ret_dict = flagdata(vis=self.vis, mode='summary')
00312 
00313         # Check summary
00314         self.assertTrue(ret_dict['scan']['30']['flagged'] == 2187264.0)
00315         self.assertTrue(ret_dict['scan']['31']['flagged'] == 0)  
00316         
00317         # Stop cluster if it was started
00318         self.cluster = simple_cluster.getCluster()
00319         if (self.cluster != None):
00320             self.stopCluster()
00321             
00322         # Remove MMS
00323         os.system('rm -rf ' + self.vis)
00324         
00325 
00326 
00327 class test_flagdata_mms(test_simplecluster):
00328 
00329     def setUp(self):
00330         # Prepare MMS
00331         self.setUpFile("Four_ants_3C286.mms",'vis')
00332         
00333         # Startup cluster
00334         if not self.bypassParallelProcessing:            
00335             self.initCluster()
00336 
00337     def tearDown(self):
00338         # Stop cluster
00339         if not self.bypassParallelProcessing:
00340             self.stopCluster()
00341         # Remove MMS
00342         os.system('rm -rf ' + self.vis)
00343     
00344     def test1_flagdata_list_return(self):
00345         """Test 1: Test support for MMS using flagdata in unflag+clip mode"""
00346 
00347         # Create list file
00348         text = "mode='unflag'\n"\
00349                "mode='clip' clipminmax=[0,0.1]"
00350         filename = 'list_flagdata.txt'
00351         self.create_input(text, filename)
00352 
00353         # step 1: Do unflag+clip
00354         flagdata(vis=self.vis, mode='list', inpfile=filename)
00355 
00356         # step 2: Now do summary
00357         ret_dict = flagdata(vis=self.vis, mode='summary')
00358 
00359         # Check summary
00360         self.assertTrue(ret_dict['name']=='Summary')
00361         self.assertTrue(ret_dict['spw']['15']['flagged'] == 96284.0)
00362         self.assertTrue(ret_dict['spw']['0']['flagged'] == 129711.0)
00363         self.assertTrue(ret_dict['spw']['1']['flagged'] == 128551.0)
00364         self.assertTrue(ret_dict['spw']['2']['flagged'] == 125686.0)
00365         self.assertTrue(ret_dict['spw']['3']['flagged'] == 122862.0)
00366         self.assertTrue(ret_dict['spw']['4']['flagged'] == 109317.0)
00367         self.assertTrue(ret_dict['spw']['5']['flagged'] == 24481.0)
00368         self.assertTrue(ret_dict['spw']['6']['flagged'] == 0)
00369         self.assertTrue(ret_dict['spw']['7']['flagged'] == 0)
00370         self.assertTrue(ret_dict['spw']['8']['flagged'] == 0)
00371         self.assertTrue(ret_dict['spw']['9']['flagged'] == 27422.0)
00372         self.assertTrue(ret_dict['spw']['10']['flagged'] == 124638.0)
00373         self.assertTrue(ret_dict['spw']['11']['flagged'] == 137813.0)
00374         self.assertTrue(ret_dict['spw']['12']['flagged'] == 131896.0)
00375         self.assertTrue(ret_dict['spw']['13']['flagged'] == 125074.0)
00376         self.assertTrue(ret_dict['spw']['14']['flagged'] == 118039.0)
00377 
00378 
00379 class test_setjy_mms(test_simplecluster):
00380 
00381     def setUp(self):
00382         # Prepare MMS
00383         self.setUpFile("ngc5921.applycal.mms",'vis')
00384         
00385         # Startup cluster
00386         if not self.bypassParallelProcessing:            
00387             self.initCluster()
00388 
00389     def tearDown(self):
00390         # Stop cluster
00391         if not self.bypassParallelProcessing:
00392             self.stopCluster()
00393         # Remove MMS
00394         os.system('rm -rf ' + self.vis)
00395 
00396     def test1_setjy_scratchless_mode_single_model(self):
00397         """Test 1: Set vis model header in one single field """
00398 
00399         retval = setjy(vis=self.vis, field='1331+305*',fluxdensity=[1331.,0.,0.,0.], scalebychan=False, usescratch=False)
00400         self.assertTrue(retval, "setjy run failed")
00401         
00402         tblocal = tbtool()
00403         tblocal.open(self.vis)
00404         model_0 = tblocal.getkeyword('model_0')
00405         self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)      
00406         
00407         mslocal = mstool()
00408         mslocal.open(self.vis)
00409         listSubMSs = mslocal.getreferencedtables()
00410         mslocal.close()
00411         for subMS in listSubMSs:
00412             tblocal = tbtool()
00413             tblocal.open(subMS)
00414             fieldId = tblocal.getcell('FIELD_ID',1)
00415             if (fieldId == 0):
00416                 model_0 = tblocal.getkeyword('model_0')
00417                 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00418             elif (fieldId == 1):
00419                 model_0 = tblocal.getkeyword('model_0')
00420                 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00421             elif (fieldId == 2):
00422                 model_0 = tblocal.getkeyword('model_0')
00423                 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00424             else:
00425                 raise AssertionError, "Unrecognized field [%s] found in Sub-MS [%s]" %(str(fieldId),subMS)
00426                 tblocal.close()
00427             tblocal.close()
00428             
00429     def test2_setjy_scratchless_mode_multiple_model(self):
00430         """Test 2: Set vis model header in one multiple fields """
00431 
00432         retval = setjy(vis=self.vis, field='1331+305*',fluxdensity=[1331.,0.,0.,0.], scalebychan=False, usescratch=False)
00433         self.assertTrue(retval, "setjy run failed")
00434         retval = setjy(vis=self.vis, field='1445+099*',fluxdensity=[1445.,0.,0.,0.], scalebychan=False, usescratch=False)
00435         self.assertTrue(retval, "setjy run failed")
00436         
00437         tblocal = tbtool()
00438         tblocal.open(self.vis)
00439         model_0 = tblocal.getkeyword('model_0')
00440         self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00441         model_1 = tblocal.getkeyword('model_1')
00442         self.assertEqual(model_1['cl_0']['container']['component0']['flux']['value'][0],1445.)
00443         
00444         mslocal = mstool()
00445         mslocal.open(self.vis)
00446         listSubMSs = mslocal.getreferencedtables()
00447         mslocal.close()
00448         for subMS in listSubMSs:
00449             tblocal.open(subMS)
00450             fieldId = tblocal.getcell('FIELD_ID',1)
00451             if (fieldId == 0):
00452                 model_0 = tblocal.getkeyword('model_0')
00453                 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00454                 model_1 = tblocal.getkeyword('model_1')
00455                 self.assertEqual(model_1['cl_0']['container']['component0']['flux']['value'][0],1445.)
00456             elif (fieldId == 1):
00457                 model_0 = tblocal.getkeyword('model_0')
00458                 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00459                 model_1 = tblocal.getkeyword('model_1')
00460                 self.assertEqual(model_1['cl_0']['container']['component0']['flux']['value'][0],1445.)
00461             elif (fieldId == 2):
00462                 model_0 = tblocal.getkeyword('model_0')
00463                 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00464                 model_1 = tblocal.getkeyword('model_1')
00465                 self.assertEqual(model_1['cl_0']['container']['component0']['flux']['value'][0],1445.)
00466             else:
00467                 raise AssertionError, "Unrecognized field [%s] found in Sub-MS [%s]" %(str(fieldId),subMS)
00468                 tblocal.close()
00469             tblocal.close()
00470             
00471     def test3_setjy_scratch_mode_single_model(self):
00472         """Test 3: Set MODEL_DATA in one single field"""
00473 
00474         retval = setjy(vis=self.vis, field='1331+305*',fluxdensity=[1331.,0.,0.,0.], scalebychan=False,usescratch=True)
00475         self.assertTrue(retval, "setjy run failed")
00476         
00477         mslocal = mstool()
00478         mslocal.open(self.vis)
00479         listSubMSs = mslocal.getreferencedtables()
00480         mslocal.close()
00481         for subMS in listSubMSs:
00482             tblocal = tbtool()
00483             tblocal.open(subMS)
00484             fieldId = tblocal.getcell('FIELD_ID',1)
00485             if (fieldId == 0):
00486                 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1331.0)
00487             elif (fieldId == 1):
00488                 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1.0)
00489             elif (fieldId == 2):
00490                 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1.0)
00491             else:
00492                 raise AssertionError, "Unrecognized field [%s] found in Sub-MS [%s]" %(str(fieldId),subMS)
00493                 tblocal.close()
00494             tblocal.close()
00495 
00496     def test4_setjy_scratch_mode_multiple_model(self):
00497         """Test 4: Set MODEL_DATA in multiple fields"""
00498 
00499         retval = setjy(vis=self.vis, field='1331+305*',fluxdensity=[1331.,0.,0.,0.], scalebychan=False, usescratch=True)
00500         self.assertTrue(retval, "setjy run failed")
00501         retval = setjy(vis=self.vis, field='1445+099*',fluxdensity=[1445.,0.,0.,0.], scalebychan=False, usescratch=True)
00502         self.assertTrue(retval, "setjy run failed")
00503         
00504         mslocal = mstool()
00505         mslocal.open(self.vis)
00506         listSubMSs = mslocal.getreferencedtables()
00507         mslocal.close()
00508         for subMS in listSubMSs:
00509             tblocal = tbtool()
00510             tblocal.open(subMS)
00511             fieldId = tblocal.getcell('FIELD_ID',1)
00512             if (fieldId == 0):
00513                 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1331.0)
00514             elif (fieldId == 1):
00515                 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1445.0)
00516             elif (fieldId == 2):
00517                 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1.0)
00518             else:
00519                 raise AssertionError, "Unrecognized field [%s] found in Sub-MS [%s]" %(str(fieldId),subMS)
00520                 tblocal.close()
00521             tblocal.close()
00522             
00523 class test_applycal_mms(test_simplecluster):
00524 
00525     def setUp(self):
00526         # Set-up MMS
00527         self.setUpFile("ngc5921.applycal.mms",'vis')
00528         # Set-up reference MMS
00529         self.setUpFile("ngc5921.split.mms",'ref')
00530         # Set-up auxiliary files
00531         self.setUpFile(["ngc5921.fluxscale", "ngc5921.gcal", "ngc5921.bcal"],'aux')
00532         
00533         # Startup cluster
00534         if not self.bypassParallelProcessing:            
00535             self.initCluster()
00536 
00537     def tearDown(self):
00538         # Stop cluster
00539         if not self.bypassParallelProcessing:
00540             self.stopCluster()
00541         # Remove MMS
00542         os.system('rm -rf ' + self.vis) 
00543         # Remove ref MMS
00544         os.system('rm -rf ' + self.ref) 
00545         # Remove aux files
00546         for file in self.aux:
00547             os.system('rm -rf ' + file)         
00548         
00549     def test1_applycal_fluxscale_gcal_bcal(self):
00550         """Test 1: Apply calibration using fluxscal gcal and bcal tables"""
00551 
00552         applycal(vis=self.vis,field='',spw='',selectdata=False,gaintable=self.aux,
00553                  gainfield=['nearest','nearest','0'],interp=['linear', 'linear','nearest'],spwmap=[],gaincurve=False,opacity=0.0)
00554         
00555         compare = testhelper.compTables(self.ref,self.vis,['FLAG_CATEGORY'])
00556         
00557         self.assertTrue(compare)        
00558         
00559         
00560 class test_uvcont_mms(test_simplecluster):
00561 
00562     def setUp(self):           
00563         # Set-up MMS
00564         self.setUpFile("ngc5921.uvcont.mms",'vis')
00565         # Set-up reference MMS
00566         self.setUpFile(["ngc5921.mms.cont", "ngc5921.mms.contsub"],'ref')      
00567         
00568         # Startup cluster
00569         if not self.bypassParallelProcessing:            
00570             self.initCluster()
00571 
00572     def tearDown(self):
00573         # Stop cluster
00574         if not self.bypassParallelProcessing:
00575             self.stopCluster()
00576         # Remove MMS
00577         os.system('rm -rf ' + self.vis)
00578         # Remove ref MMS
00579         for file in self.ref:
00580             os.system('rm -rf ' + file) 
00581         
00582     def test1_uvcont_single_spw(self):
00583         """Test 1: Extract continuum from one single SPW using uvcontsub"""
00584 
00585         uvcontsub(vis=self.vis,field = 'N5921*',fitspw='0:4~6;50~59',spw = '0',solint = 'int',fitorder = 0,want_cont = True) 
00586         
00587         compare_cont = testhelper.compTables(self.ref[0],self.vis+".cont",['FLAG_CATEGORY'])
00588         self.assertTrue(compare_cont)
00589         compare_contsub = testhelper.compTables(self.ref[1],self.vis+".contsub",['FLAG_CATEGORY'])
00590         self.assertTrue(compare_contsub)             
00591 
00592 class testJobData(unittest.TestCase):
00593     '''
00594     This class tests the JobData class in the simple_cluster.
00595     '''
00596 
00597     def testSimpleJob(self):
00598         jd = JobData('myJob')
00599         self.assertEqual('returnVar0 = myJob()', jd.getCommandLine())
00600         self.assertEqual(['returnVar0'], jd.getReturnVariableList())
00601 
00602     def testJobWithOneArg(self):
00603         jd = JobData('myJob',{'arg1':1})
00604         self.assertEqual('returnVar0 = myJob(arg1 = 1)', jd.getCommandLine())
00605         self.assertEqual(['returnVar0'], jd.getReturnVariableList())
00606 
00607     def testJobWithMultipleArg(self):
00608         jd = JobData('myJob',{'arg1':1,'arg2':2,'arg3':'three'})
00609         self.assertEqual\
00610              ("returnVar0 = myJob(arg1 = 1, arg2 = 2, arg3 = 'three')",
00611               jd.getCommandLine())
00612         self.assertEqual(['returnVar0'], jd.getReturnVariableList())
00613 
00614     def testJobWithMultipleCommands(self):
00615         jd = JobData('myJob1')
00616         jd.addCommand('myJob2',{'arg1':1})
00617         self.assertEqual('returnVar0 = myJob1(); '+\
00618                          'returnVar1 = myJob2(arg1 = 1)',
00619                          jd.getCommandLine())
00620         self.assertEqual(['returnVar0', 'returnVar1'],
00621                          jd.getReturnVariableList())
00622 
00623     def testGetCommandArguments(self):
00624         # Test with a single command
00625         jd = JobData('myCommand1',{'arg1':1})
00626         resp = jd.getCommandArguments()
00627         self.assertTrue(isinstance(resp,dict))
00628         self.assertEqual(len(resp),1)
00629         self.assertEqual(resp['arg1'],1)
00630 
00631         # Now handle the cases of multiple commands
00632         jd.addCommand('myCommand2',{'arg2':2})
00633         resp = jd.getCommandArguments()
00634         self.assertTrue(isinstance(resp,dict))
00635         self.assertEqual(len(resp),2)
00636         self.assertTrue(isinstance(resp['myCommand1'],dict))
00637         self.assertTrue(isinstance(resp['myCommand2'],dict))
00638         self.assertEqual(resp['myCommand1']['arg1'],1)
00639         self.assertEqual(resp['myCommand2']['arg2'],2)
00640 
00641         # Now get just a single commands response
00642         resp = jd.getCommandArguments('myCommand1')
00643         self.assertTrue(isinstance(resp,dict))
00644         self.assertEqual(len(resp),1)
00645         self.assertEqual(resp['arg1'],1)
00646 
00647 
00648 class testJobQueueManager(unittest.TestCase):
00649     '''
00650     This class tests the Job Queue Manager.
00651     '''
00652     def __init__(self, methodName = 'runTest'):
00653         self.setUpCluster()
00654         unittest.TestCase.__init__(self, methodName)
00655 
00656     
00657     def testJobExecutionSingleReturn(self):
00658         # Test a single job with a single return
00659         queue = JobQueueManager()
00660         queue.addJob(JobData('echoFunction', {'input':'inputVar'}))
00661         queue.executeQueue()
00662 
00663         jobList = queue.getOutputJobs('done')
00664         self.assertEqual(len(jobList),1)
00665         self.assertEqual(jobList[0].getReturnValues(),'inputVar')
00666         
00667         queue.clearJobs()
00668         for idx in xrange(16):
00669             queue.addJob(JobData('echoFunction',
00670                                  {'input':'inputVar%d' % idx}))
00671         queue.executeQueue()
00672 
00673         jobList = queue.getOutputJobs('done')
00674         self.assertEqual(len(jobList),16)
00675         for job in jobList:
00676             self.assertEqual(job.getReturnValues(),
00677                              job._commandList[0].commandInfo['input'])
00678 
00679     def testJobExectionWithoutReturn(self):
00680         queue = JobQueueManager()
00681         queue.addJob(JobData('noReturn'))
00682         queue.executeQueue()
00683 
00684         jobList = queue.getOutputJobs('done')
00685         self.assertEqual(len(jobList),1)
00686         self.assertEqual(jobList[0].getReturnValues(),None)
00687 
00688         # Test a mixed return case
00689         queue.clearJobs()
00690         for idx in xrange(16):
00691             if idx % 2:
00692                 queue.addJob(JobData('echoFunction',
00693                                      {'input':'inputVar%d' % idx}))
00694             else:
00695                 queue.addJob(JobData('noReturn'))
00696         queue.executeQueue()
00697 
00698         jobList = queue.getOutputJobs('done')
00699         self.assertEqual(len(jobList),16)
00700         for job in jobList:
00701             if job._commandList[0].commandName == 'echoFunction':
00702                 self.assertEqual(job.getReturnValues(),
00703                                  job._commandList[0].commandInfo['input'])
00704             else:
00705                 self.assertEqual(job.getReturnValues(), None)
00706   
00707     def testFailedJobs(self):
00708         queue = JobQueueManager()
00709         queue.addJob(JobData('setErrror',{'setError':True,
00710                                           'returnValue':1}))
00711         queue.executeQueue()
00712 
00713         jobList = queue.getOutputJobs()
00714         self.assertEqual(len(jobList),1)
00715         self.assertEqual(jobList[0].status,'broken')
00716 
00717         # Test a mixed return case
00718         queue.clearJobs()
00719         for idx in xrange(16):
00720             queue.addJob(JobData('setError', {'setError': idx %2,
00721                                               'returnValue': idx}))
00722         queue.executeQueue()
00723         jobList = queue.getOutputJobs('done')
00724         self.assertEqual(len(jobList),8)
00725 
00726         jobList = queue.getOutputJobs()
00727         self.assertEqual(len(jobList),16)
00728         for job in jobList:
00729             if job._commandList[0].commandInfo['setError']:
00730                 self.assertEqual(job.getReturnValues(), None)
00731                 self.assertEqual(job.status, 'broken')
00732             else:
00733                 self.assertEqual(job.getReturnValues(),
00734                          job._commandList[0].commandInfo['returnValue'])
00735                 self.assertEqual(job.status, 'done')
00736 
00737           
00738     def setUpCluster(self):
00739         '''
00740         This method defines three methods on the cluster:
00741         echoFunction - returns whatever is sent
00742         noReturn - has no return value
00743         errorMethod - which will raise an exception if the argument is true
00744         '''
00745         cluster = simple_cluster.getCluster()
00746             
00747         command = '''
00748         def echoFunction(input = ''):
00749             return input
00750         '''
00751         for engine in cluster.use_engines():
00752             cluster.do_and_record(command, engine)
00753 
00754 
00755         command = '''
00756         def noReturn():
00757             pass
00758         '''
00759         
00760         for engine in cluster.use_engines():
00761             cluster.do_and_record(command, engine)
00762 
00763         command = '''
00764         def setError(setError, returnValue):
00765             if setError:
00766                 raise Exception, "Error Condition"
00767             return returnValue
00768         '''
00769 
00770         for engine in cluster.use_engines():
00771             cluster.do_and_record(command, engine)
00772 
00773         # Wait for all of the engines to return
00774         counter = 30
00775         while counter > 0 and (len(cluster.get_status(True)) <
00776                                len(cluster.use_engines())):
00777             time.sleep(1)
00778             counter -= 1
00779         cluster.remove_record()
00780 
00781 def suite():
00782     return [test_simplecluster,test_flagdata_mms,test_setjy_mms,test_applycal_mms,test_uvcont_mms]
00783      
00784 if __name__ == '__main__':
00785     testSuite = []
00786     for testClass in suite():
00787         testSuite.append(unittest.makeSuite(testClass,'test'))
00788     allTests = unittest.TestSuite(testSuite)
00789     unittest.TextTestRunner(verbosity=2).run(allTests)