00001 import os
00002 import sys
00003 import shutil
00004 from __main__ import default
00005 from tasks import *
00006 from taskinit import *
00007 import unittest
00008 import time
00009 from simple_cluster import *
00010 import glob
00011 import multiprocessing
00012 import testhelper
00013 from parallel.parallel_task_helper import ParallelTaskHelper
00014
00015 class test_simplecluster(unittest.TestCase):
00016
00017 projectname="test_simplecluster"
00018 clusterfile="test_simplecluster_config.txt"
00019 monitorFile="monitoring.log"
00020 cluster=None
00021
00022
00023 host=os.uname()[1]
00024 cwd=os.getcwd()
00025 ncpu=multiprocessing.cpu_count()
00026
00027
00028 if os.environ.has_key('BYPASS_SEQUENTIAL_PROCESSING'):
00029 ParallelTaskHelper.bypassParallelProcessing(1)
00030 bypassParallelProcessing = True
00031 else:
00032 bypassParallelProcessing = False
00033
00034 vis = ""
00035 ref = ""
00036 aux = ""
00037
00038 def stopCluster(self):
00039
00040 self.cluster.stop_cluster()
00041
00042 self.cleanUp()
00043
00044 def cleanUp(self):
00045 logfiles=glob.glob("engine-*.log")
00046 for i in logfiles:
00047 os.remove(i)
00048 if os.path.exists(self.clusterfile):
00049 os.remove(self.clusterfile)
00050 if os.path.exists(self.monitorFile):
00051 os.remove(self.monitorFile)
00052
00053 def initCluster(self,userMonitorFile="",max_engines=0.,max_memory=0.,memory_per_engine=512.):
00054
00055 self.cleanUp()
00056
00057 if (len(userMonitorFile) > 0):
00058 self.cluster = simple_cluster(userMonitorFile)
00059 self.monitorFile = userMonitorFile
00060 else:
00061 self.cluster = simple_cluster()
00062 self.monitorFile = "monitoring.log"
00063
00064 self.createClusterFile(max_engines,max_memory,memory_per_engine)
00065
00066 if (self.cluster.init_cluster(self.clusterfile, self.projectname)):
00067
00068 if (len(userMonitorFile) > 0):
00069 self.waitForFile(userMonitorFile, 20)
00070 else:
00071 self.waitForFile('monitoring.log', 20)
00072
00073 def createClusterFile(self,max_engines=0.,max_memory=0.,memory_per_engine=512.):
00074
00075 msg=self.host + ', ' + str(max_engines) + ', ' + self.cwd + ', ' + str(max_memory) + ', ' + str(memory_per_engine)
00076 f=open(self.clusterfile, 'w')
00077 f.write(msg)
00078 f.close()
00079 self.waitForFile(self.clusterfile, 10)
00080
00081 def waitForFile(self, file, seconds):
00082 for i in range(0,seconds):
00083 if (os.path.isfile(file)):
00084 return
00085 time.sleep(1)
00086
00087 def setUpFile(self,file,type_file):
00088
00089 if type(file) is list:
00090 for file_i in file:
00091 self.setUpFileCore(file_i,type_file)
00092 else:
00093 self.setUpFileCore(file,type_file)
00094
00095 if type_file=='vis':
00096 self.vis = file
00097 elif type_file =='ref':
00098 self.ref = file
00099 elif type_file=='aux':
00100 self.aux = file
00101
00102 def setUpFileCore(self,file,type_file):
00103
00104 if os.path.exists(file):
00105 print "%s file %s is already in the working area, deleting ..." % (type_file,file)
00106 os.system('rm -rf ' + file)
00107 print "Copy %s file %s into the working area..." % (type_file,file)
00108 os.system('cp -r ' + os.environ.get('CASAPATH').split()[0] +
00109 '/data/regression/unittest/simplecluster/' + file + ' ' + file)
00110
00111 def create_input(self,str_text, filename):
00112 """Save the string in a text file"""
00113
00114 inp = filename
00115 cmd = str_text
00116
00117
00118 if os.path.exists(inp):
00119 os.system('rm -f '+ inp)
00120
00121
00122 fid = open(inp, 'w')
00123 fid.write(cmd)
00124
00125
00126 fid.close()
00127
00128
00129 self.waitForFile(filename, 10)
00130
00131 return
00132
00133 def test1_defaultCluster(self):
00134 """Test 1: Create a default cluster"""
00135
00136
00137 self.initCluster()
00138
00139 cluster_list = self.cluster.get_hosts()
00140 self.assertTrue(cluster_list[0][0]==self.host)
00141 self.assertTrue(cluster_list[0][1]<=self.ncpu)
00142 self.assertTrue(cluster_list[0][2]==self.cwd)
00143
00144 self.stopCluster()
00145
00146 def test2_availableResourcesCluster(self):
00147 """Test 2: Create a custom cluster to use all the available resources"""
00148
00149
00150 self.initCluster(max_engines=1.,max_memory=1.,memory_per_engine=1024.)
00151
00152 cluster_list = self.cluster.get_hosts()
00153 self.assertTrue(cluster_list[0][0]==self.host)
00154 self.assertTrue(cluster_list[0][1]<=self.ncpu)
00155 self.assertTrue(cluster_list[0][2]==self.cwd)
00156
00157 self.stopCluster()
00158
00159 def test3_halfCPUCluster(self):
00160 """Test 3: Create a custom cluster to use half of available CPU capacity"""
00161
00162
00163 self.initCluster(max_engines=0.5,max_memory=1.,memory_per_engine=512.)
00164
00165 cluster_list = self.cluster.get_hosts()
00166 self.assertTrue(cluster_list[0][0]==self.host)
00167 self.assertTrue(cluster_list[0][1]<=int(0.5*self.ncpu))
00168 self.assertTrue(cluster_list[0][2]==self.cwd)
00169
00170 self.stopCluster()
00171
00172 def test3_halfMemoryCluster(self):
00173 """Test 3: Create a custom cluster to use half of available RAM memory"""
00174
00175
00176 self.initCluster(max_engines=1.,max_memory=0.5,memory_per_engine=512.)
00177
00178 cluster_list = self.cluster.get_hosts()
00179 self.assertTrue(cluster_list[0][0]==self.host)
00180 self.assertTrue(cluster_list[0][2]==self.cwd)
00181
00182 self.stopCluster()
00183
00184 def test4_monitoringDefault(self):
00185 """Test 4: Check default monitoring file exists"""
00186
00187
00188 self.initCluster()
00189
00190 fid = open('monitoring.log', 'r')
00191 line = fid.readline()
00192 self.assertTrue(line.find('Host')>=0)
00193 self.assertTrue(line.find('Engine')>=0)
00194 self.assertTrue(line.find('Status')>=0)
00195 self.assertTrue(line.find('CPU[%]')>=0)
00196 self.assertTrue(line.find('Memory[%]')>=0)
00197 self.assertTrue(line.find('Time[s]')>=0)
00198 self.assertTrue(line.find('Read[MB]')>=0)
00199 self.assertTrue(line.find('Write[MB]')>=0)
00200 self.assertTrue(line.find('Read[MB/s]')>=0)
00201 self.assertTrue(line.find('Write[MB/s]')>=0)
00202 self.assertTrue(line.find('Job')>=0)
00203 self.assertTrue(line.find('Sub-MS')>=0)
00204
00205 self.stopCluster()
00206
00207 def test5_monitoringUser(self):
00208 """Test 5: Check custom monitoring file exists"""
00209
00210
00211 self.initCluster('userMonitorFile.log')
00212
00213 fid = open('userMonitorFile.log', 'r')
00214 line = fid.readline()
00215 self.assertTrue(line.find('Host')>=0)
00216 self.assertTrue(line.find('Engine')>=0)
00217 self.assertTrue(line.find('Status')>=0)
00218 self.assertTrue(line.find('CPU[%]')>=0)
00219 self.assertTrue(line.find('Memory[%]')>=0)
00220 self.assertTrue(line.find('Time[s]')>=0)
00221 self.assertTrue(line.find('Read[MB]')>=0)
00222 self.assertTrue(line.find('Write[MB]')>=0)
00223 self.assertTrue(line.find('Read[MB/s]')>=0)
00224 self.assertTrue(line.find('Write[MB/s]')>=0)
00225 self.assertTrue(line.find('Job')>=0)
00226 self.assertTrue(line.find('Sub-MS')>=0)
00227
00228 self.stopCluster()
00229
00230 def test6_monitoringStandAlone(self):
00231 """Test 6: Check the dict structure of the stand-alone method """
00232
00233
00234 self.initCluster('userMonitorFile.log')
00235
00236 state = self.cluster.show_resource(True)
00237 cluster_list = self.cluster.get_hosts()
00238 for engine in range(cluster_list[0][1]):
00239 self.assertTrue(state[self.host][engine].has_key('Status'))
00240 self.assertTrue(state[self.host][engine].has_key('Sub-MS'))
00241 self.assertTrue(state[self.host][engine].has_key('Read'))
00242 self.assertTrue(state[self.host][engine].has_key('Write'))
00243 self.assertTrue(state[self.host][engine].has_key('Job'))
00244 self.assertTrue(state[self.host][engine].has_key('Memory'))
00245 self.assertTrue(state[self.host][engine].has_key('ReadRate'))
00246 self.assertTrue(state[self.host][engine].has_key('WriteRate'))
00247
00248 self.stopCluster()
00249
00250 def test7_bypassParallelProcessing(self):
00251 """Test 7: Bypass Parallel Processing mode """
00252
00253 simple_cluster.setDefaults(default_mem_per_engine=33554432)
00254
00255
00256 self.setUpFile("Four_ants_3C286.mms",'vis')
00257
00258
00259 text = "mode='unflag'\n"\
00260 "mode='clip' clipminmax=[0,0.1]"
00261 filename = 'list_flagdata.txt'
00262 self.create_input(text, filename)
00263
00264
00265 flagdata(vis=self.vis, mode='list', inpfile=filename)
00266
00267
00268 ret_dict = flagdata(vis=self.vis, mode='summary')
00269
00270
00271 self.assertTrue(ret_dict['name']=='Summary')
00272 self.assertTrue(ret_dict['spw']['15']['flagged'] == 96284.0)
00273 self.assertTrue(ret_dict['spw']['0']['flagged'] == 129711.0)
00274 self.assertTrue(ret_dict['spw']['1']['flagged'] == 128551.0)
00275 self.assertTrue(ret_dict['spw']['2']['flagged'] == 125686.0)
00276 self.assertTrue(ret_dict['spw']['3']['flagged'] == 122862.0)
00277 self.assertTrue(ret_dict['spw']['4']['flagged'] == 109317.0)
00278 self.assertTrue(ret_dict['spw']['5']['flagged'] == 24481.0)
00279 self.assertTrue(ret_dict['spw']['6']['flagged'] == 0)
00280 self.assertTrue(ret_dict['spw']['7']['flagged'] == 0)
00281 self.assertTrue(ret_dict['spw']['8']['flagged'] == 0)
00282 self.assertTrue(ret_dict['spw']['9']['flagged'] == 27422.0)
00283 self.assertTrue(ret_dict['spw']['10']['flagged'] == 124638.0)
00284 self.assertTrue(ret_dict['spw']['11']['flagged'] == 137813.0)
00285 self.assertTrue(ret_dict['spw']['12']['flagged'] == 131896.0)
00286 self.assertTrue(ret_dict['spw']['13']['flagged'] == 125074.0)
00287 self.assertTrue(ret_dict['spw']['14']['flagged'] == 118039.0)
00288
00289
00290 os.system('rm -rf ' + self.vis)
00291
00292
00293 simple_cluster.setDefaults(default_mem_per_engine=512)
00294 if not self.bypassParallelProcessing:
00295 ParallelTaskHelper.bypassParallelProcessing(0)
00296
00297 def test8_IgnoreNullSelectionError(self):
00298 """Test 8: Check that NullSelection errors happening for some sub-MSs are ignored """
00299 """Note: In this test we also check simple_cluster initialization via ParallelTaskHelper """
00300
00301
00302 self.setUpFile("Four_ants_3C286.mms",'vis')
00303
00304
00305 flagdata(vis=self.vis, mode='unflag')
00306
00307
00308 flagdata(vis=self.vis, mode='manual', scan='30')
00309
00310
00311 ret_dict = flagdata(vis=self.vis, mode='summary')
00312
00313
00314 self.assertTrue(ret_dict['scan']['30']['flagged'] == 2187264.0)
00315 self.assertTrue(ret_dict['scan']['31']['flagged'] == 0)
00316
00317
00318 self.cluster = simple_cluster.getCluster()
00319 if (self.cluster != None):
00320 self.stopCluster()
00321
00322
00323 os.system('rm -rf ' + self.vis)
00324
00325
00326
00327 class test_flagdata_mms(test_simplecluster):
00328
00329 def setUp(self):
00330
00331 self.setUpFile("Four_ants_3C286.mms",'vis')
00332
00333
00334 if not self.bypassParallelProcessing:
00335 self.initCluster()
00336
00337 def tearDown(self):
00338
00339 if not self.bypassParallelProcessing:
00340 self.stopCluster()
00341
00342 os.system('rm -rf ' + self.vis)
00343
00344 def test1_flagdata_list_return(self):
00345 """Test 1: Test support for MMS using flagdata in unflag+clip mode"""
00346
00347
00348 text = "mode='unflag'\n"\
00349 "mode='clip' clipminmax=[0,0.1]"
00350 filename = 'list_flagdata.txt'
00351 self.create_input(text, filename)
00352
00353
00354 flagdata(vis=self.vis, mode='list', inpfile=filename)
00355
00356
00357 ret_dict = flagdata(vis=self.vis, mode='summary')
00358
00359
00360 self.assertTrue(ret_dict['name']=='Summary')
00361 self.assertTrue(ret_dict['spw']['15']['flagged'] == 96284.0)
00362 self.assertTrue(ret_dict['spw']['0']['flagged'] == 129711.0)
00363 self.assertTrue(ret_dict['spw']['1']['flagged'] == 128551.0)
00364 self.assertTrue(ret_dict['spw']['2']['flagged'] == 125686.0)
00365 self.assertTrue(ret_dict['spw']['3']['flagged'] == 122862.0)
00366 self.assertTrue(ret_dict['spw']['4']['flagged'] == 109317.0)
00367 self.assertTrue(ret_dict['spw']['5']['flagged'] == 24481.0)
00368 self.assertTrue(ret_dict['spw']['6']['flagged'] == 0)
00369 self.assertTrue(ret_dict['spw']['7']['flagged'] == 0)
00370 self.assertTrue(ret_dict['spw']['8']['flagged'] == 0)
00371 self.assertTrue(ret_dict['spw']['9']['flagged'] == 27422.0)
00372 self.assertTrue(ret_dict['spw']['10']['flagged'] == 124638.0)
00373 self.assertTrue(ret_dict['spw']['11']['flagged'] == 137813.0)
00374 self.assertTrue(ret_dict['spw']['12']['flagged'] == 131896.0)
00375 self.assertTrue(ret_dict['spw']['13']['flagged'] == 125074.0)
00376 self.assertTrue(ret_dict['spw']['14']['flagged'] == 118039.0)
00377
00378
00379 class test_setjy_mms(test_simplecluster):
00380
00381 def setUp(self):
00382
00383 self.setUpFile("ngc5921.applycal.mms",'vis')
00384
00385
00386 if not self.bypassParallelProcessing:
00387 self.initCluster()
00388
00389 def tearDown(self):
00390
00391 if not self.bypassParallelProcessing:
00392 self.stopCluster()
00393
00394 os.system('rm -rf ' + self.vis)
00395
00396 def test1_setjy_scratchless_mode_single_model(self):
00397 """Test 1: Set vis model header in one single field """
00398
00399 retval = setjy(vis=self.vis, field='1331+305*',fluxdensity=[1331.,0.,0.,0.], scalebychan=False, usescratch=False)
00400 self.assertTrue(retval, "setjy run failed")
00401
00402 tblocal = tbtool()
00403 tblocal.open(self.vis)
00404 model_0 = tblocal.getkeyword('model_0')
00405 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00406
00407 mslocal = mstool()
00408 mslocal.open(self.vis)
00409 listSubMSs = mslocal.getreferencedtables()
00410 mslocal.close()
00411 for subMS in listSubMSs:
00412 tblocal = tbtool()
00413 tblocal.open(subMS)
00414 fieldId = tblocal.getcell('FIELD_ID',1)
00415 if (fieldId == 0):
00416 model_0 = tblocal.getkeyword('model_0')
00417 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00418 elif (fieldId == 1):
00419 model_0 = tblocal.getkeyword('model_0')
00420 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00421 elif (fieldId == 2):
00422 model_0 = tblocal.getkeyword('model_0')
00423 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00424 else:
00425 raise AssertionError, "Unrecognized field [%s] found in Sub-MS [%s]" %(str(fieldId),subMS)
00426 tblocal.close()
00427 tblocal.close()
00428
00429 def test2_setjy_scratchless_mode_multiple_model(self):
00430 """Test 2: Set vis model header in one multiple fields """
00431
00432 retval = setjy(vis=self.vis, field='1331+305*',fluxdensity=[1331.,0.,0.,0.], scalebychan=False, usescratch=False)
00433 self.assertTrue(retval, "setjy run failed")
00434 retval = setjy(vis=self.vis, field='1445+099*',fluxdensity=[1445.,0.,0.,0.], scalebychan=False, usescratch=False)
00435 self.assertTrue(retval, "setjy run failed")
00436
00437 tblocal = tbtool()
00438 tblocal.open(self.vis)
00439 model_0 = tblocal.getkeyword('model_0')
00440 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00441 model_1 = tblocal.getkeyword('model_1')
00442 self.assertEqual(model_1['cl_0']['container']['component0']['flux']['value'][0],1445.)
00443
00444 mslocal = mstool()
00445 mslocal.open(self.vis)
00446 listSubMSs = mslocal.getreferencedtables()
00447 mslocal.close()
00448 for subMS in listSubMSs:
00449 tblocal.open(subMS)
00450 fieldId = tblocal.getcell('FIELD_ID',1)
00451 if (fieldId == 0):
00452 model_0 = tblocal.getkeyword('model_0')
00453 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00454 model_1 = tblocal.getkeyword('model_1')
00455 self.assertEqual(model_1['cl_0']['container']['component0']['flux']['value'][0],1445.)
00456 elif (fieldId == 1):
00457 model_0 = tblocal.getkeyword('model_0')
00458 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00459 model_1 = tblocal.getkeyword('model_1')
00460 self.assertEqual(model_1['cl_0']['container']['component0']['flux']['value'][0],1445.)
00461 elif (fieldId == 2):
00462 model_0 = tblocal.getkeyword('model_0')
00463 self.assertEqual(model_0['cl_0']['container']['component0']['flux']['value'][0],1331.)
00464 model_1 = tblocal.getkeyword('model_1')
00465 self.assertEqual(model_1['cl_0']['container']['component0']['flux']['value'][0],1445.)
00466 else:
00467 raise AssertionError, "Unrecognized field [%s] found in Sub-MS [%s]" %(str(fieldId),subMS)
00468 tblocal.close()
00469 tblocal.close()
00470
00471 def test3_setjy_scratch_mode_single_model(self):
00472 """Test 3: Set MODEL_DATA in one single field"""
00473
00474 retval = setjy(vis=self.vis, field='1331+305*',fluxdensity=[1331.,0.,0.,0.], scalebychan=False,usescratch=True)
00475 self.assertTrue(retval, "setjy run failed")
00476
00477 mslocal = mstool()
00478 mslocal.open(self.vis)
00479 listSubMSs = mslocal.getreferencedtables()
00480 mslocal.close()
00481 for subMS in listSubMSs:
00482 tblocal = tbtool()
00483 tblocal.open(subMS)
00484 fieldId = tblocal.getcell('FIELD_ID',1)
00485 if (fieldId == 0):
00486 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1331.0)
00487 elif (fieldId == 1):
00488 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1.0)
00489 elif (fieldId == 2):
00490 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1.0)
00491 else:
00492 raise AssertionError, "Unrecognized field [%s] found in Sub-MS [%s]" %(str(fieldId),subMS)
00493 tblocal.close()
00494 tblocal.close()
00495
00496 def test4_setjy_scratch_mode_multiple_model(self):
00497 """Test 4: Set MODEL_DATA in multiple fields"""
00498
00499 retval = setjy(vis=self.vis, field='1331+305*',fluxdensity=[1331.,0.,0.,0.], scalebychan=False, usescratch=True)
00500 self.assertTrue(retval, "setjy run failed")
00501 retval = setjy(vis=self.vis, field='1445+099*',fluxdensity=[1445.,0.,0.,0.], scalebychan=False, usescratch=True)
00502 self.assertTrue(retval, "setjy run failed")
00503
00504 mslocal = mstool()
00505 mslocal.open(self.vis)
00506 listSubMSs = mslocal.getreferencedtables()
00507 mslocal.close()
00508 for subMS in listSubMSs:
00509 tblocal = tbtool()
00510 tblocal.open(subMS)
00511 fieldId = tblocal.getcell('FIELD_ID',1)
00512 if (fieldId == 0):
00513 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1331.0)
00514 elif (fieldId == 1):
00515 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1445.0)
00516 elif (fieldId == 2):
00517 self.assertEqual(tblocal.getcell('MODEL_DATA',1)[0][0].real,1.0)
00518 else:
00519 raise AssertionError, "Unrecognized field [%s] found in Sub-MS [%s]" %(str(fieldId),subMS)
00520 tblocal.close()
00521 tblocal.close()
00522
00523 class test_applycal_mms(test_simplecluster):
00524
00525 def setUp(self):
00526
00527 self.setUpFile("ngc5921.applycal.mms",'vis')
00528
00529 self.setUpFile("ngc5921.split.mms",'ref')
00530
00531 self.setUpFile(["ngc5921.fluxscale", "ngc5921.gcal", "ngc5921.bcal"],'aux')
00532
00533
00534 if not self.bypassParallelProcessing:
00535 self.initCluster()
00536
00537 def tearDown(self):
00538
00539 if not self.bypassParallelProcessing:
00540 self.stopCluster()
00541
00542 os.system('rm -rf ' + self.vis)
00543
00544 os.system('rm -rf ' + self.ref)
00545
00546 for file in self.aux:
00547 os.system('rm -rf ' + file)
00548
00549 def test1_applycal_fluxscale_gcal_bcal(self):
00550 """Test 1: Apply calibration using fluxscal gcal and bcal tables"""
00551
00552 applycal(vis=self.vis,field='',spw='',selectdata=False,gaintable=self.aux,
00553 gainfield=['nearest','nearest','0'],interp=['linear', 'linear','nearest'],spwmap=[],gaincurve=False,opacity=0.0)
00554
00555 compare = testhelper.compTables(self.ref,self.vis,['FLAG_CATEGORY'])
00556
00557 self.assertTrue(compare)
00558
00559
00560 class test_uvcont_mms(test_simplecluster):
00561
00562 def setUp(self):
00563
00564 self.setUpFile("ngc5921.uvcont.mms",'vis')
00565
00566 self.setUpFile(["ngc5921.mms.cont", "ngc5921.mms.contsub"],'ref')
00567
00568
00569 if not self.bypassParallelProcessing:
00570 self.initCluster()
00571
00572 def tearDown(self):
00573
00574 if not self.bypassParallelProcessing:
00575 self.stopCluster()
00576
00577 os.system('rm -rf ' + self.vis)
00578
00579 for file in self.ref:
00580 os.system('rm -rf ' + file)
00581
00582 def test1_uvcont_single_spw(self):
00583 """Test 1: Extract continuum from one single SPW using uvcontsub"""
00584
00585 uvcontsub(vis=self.vis,field = 'N5921*',fitspw='0:4~6;50~59',spw = '0',solint = 'int',fitorder = 0,want_cont = True)
00586
00587 compare_cont = testhelper.compTables(self.ref[0],self.vis+".cont",['FLAG_CATEGORY'])
00588 self.assertTrue(compare_cont)
00589 compare_contsub = testhelper.compTables(self.ref[1],self.vis+".contsub",['FLAG_CATEGORY'])
00590 self.assertTrue(compare_contsub)
00591
00592 class testJobData(unittest.TestCase):
00593 '''
00594 This class tests the JobData class in the simple_cluster.
00595 '''
00596
00597 def testSimpleJob(self):
00598 jd = JobData('myJob')
00599 self.assertEqual('returnVar0 = myJob()', jd.getCommandLine())
00600 self.assertEqual(['returnVar0'], jd.getReturnVariableList())
00601
00602 def testJobWithOneArg(self):
00603 jd = JobData('myJob',{'arg1':1})
00604 self.assertEqual('returnVar0 = myJob(arg1 = 1)', jd.getCommandLine())
00605 self.assertEqual(['returnVar0'], jd.getReturnVariableList())
00606
00607 def testJobWithMultipleArg(self):
00608 jd = JobData('myJob',{'arg1':1,'arg2':2,'arg3':'three'})
00609 self.assertEqual\
00610 ("returnVar0 = myJob(arg1 = 1, arg2 = 2, arg3 = 'three')",
00611 jd.getCommandLine())
00612 self.assertEqual(['returnVar0'], jd.getReturnVariableList())
00613
00614 def testJobWithMultipleCommands(self):
00615 jd = JobData('myJob1')
00616 jd.addCommand('myJob2',{'arg1':1})
00617 self.assertEqual('returnVar0 = myJob1(); '+\
00618 'returnVar1 = myJob2(arg1 = 1)',
00619 jd.getCommandLine())
00620 self.assertEqual(['returnVar0', 'returnVar1'],
00621 jd.getReturnVariableList())
00622
00623 def testGetCommandArguments(self):
00624
00625 jd = JobData('myCommand1',{'arg1':1})
00626 resp = jd.getCommandArguments()
00627 self.assertTrue(isinstance(resp,dict))
00628 self.assertEqual(len(resp),1)
00629 self.assertEqual(resp['arg1'],1)
00630
00631
00632 jd.addCommand('myCommand2',{'arg2':2})
00633 resp = jd.getCommandArguments()
00634 self.assertTrue(isinstance(resp,dict))
00635 self.assertEqual(len(resp),2)
00636 self.assertTrue(isinstance(resp['myCommand1'],dict))
00637 self.assertTrue(isinstance(resp['myCommand2'],dict))
00638 self.assertEqual(resp['myCommand1']['arg1'],1)
00639 self.assertEqual(resp['myCommand2']['arg2'],2)
00640
00641
00642 resp = jd.getCommandArguments('myCommand1')
00643 self.assertTrue(isinstance(resp,dict))
00644 self.assertEqual(len(resp),1)
00645 self.assertEqual(resp['arg1'],1)
00646
00647
00648 class testJobQueueManager(unittest.TestCase):
00649 '''
00650 This class tests the Job Queue Manager.
00651 '''
00652 def __init__(self, methodName = 'runTest'):
00653 self.setUpCluster()
00654 unittest.TestCase.__init__(self, methodName)
00655
00656
00657 def testJobExecutionSingleReturn(self):
00658
00659 queue = JobQueueManager()
00660 queue.addJob(JobData('echoFunction', {'input':'inputVar'}))
00661 queue.executeQueue()
00662
00663 jobList = queue.getOutputJobs('done')
00664 self.assertEqual(len(jobList),1)
00665 self.assertEqual(jobList[0].getReturnValues(),'inputVar')
00666
00667 queue.clearJobs()
00668 for idx in xrange(16):
00669 queue.addJob(JobData('echoFunction',
00670 {'input':'inputVar%d' % idx}))
00671 queue.executeQueue()
00672
00673 jobList = queue.getOutputJobs('done')
00674 self.assertEqual(len(jobList),16)
00675 for job in jobList:
00676 self.assertEqual(job.getReturnValues(),
00677 job._commandList[0].commandInfo['input'])
00678
00679 def testJobExectionWithoutReturn(self):
00680 queue = JobQueueManager()
00681 queue.addJob(JobData('noReturn'))
00682 queue.executeQueue()
00683
00684 jobList = queue.getOutputJobs('done')
00685 self.assertEqual(len(jobList),1)
00686 self.assertEqual(jobList[0].getReturnValues(),None)
00687
00688
00689 queue.clearJobs()
00690 for idx in xrange(16):
00691 if idx % 2:
00692 queue.addJob(JobData('echoFunction',
00693 {'input':'inputVar%d' % idx}))
00694 else:
00695 queue.addJob(JobData('noReturn'))
00696 queue.executeQueue()
00697
00698 jobList = queue.getOutputJobs('done')
00699 self.assertEqual(len(jobList),16)
00700 for job in jobList:
00701 if job._commandList[0].commandName == 'echoFunction':
00702 self.assertEqual(job.getReturnValues(),
00703 job._commandList[0].commandInfo['input'])
00704 else:
00705 self.assertEqual(job.getReturnValues(), None)
00706
00707 def testFailedJobs(self):
00708 queue = JobQueueManager()
00709 queue.addJob(JobData('setErrror',{'setError':True,
00710 'returnValue':1}))
00711 queue.executeQueue()
00712
00713 jobList = queue.getOutputJobs()
00714 self.assertEqual(len(jobList),1)
00715 self.assertEqual(jobList[0].status,'broken')
00716
00717
00718 queue.clearJobs()
00719 for idx in xrange(16):
00720 queue.addJob(JobData('setError', {'setError': idx %2,
00721 'returnValue': idx}))
00722 queue.executeQueue()
00723 jobList = queue.getOutputJobs('done')
00724 self.assertEqual(len(jobList),8)
00725
00726 jobList = queue.getOutputJobs()
00727 self.assertEqual(len(jobList),16)
00728 for job in jobList:
00729 if job._commandList[0].commandInfo['setError']:
00730 self.assertEqual(job.getReturnValues(), None)
00731 self.assertEqual(job.status, 'broken')
00732 else:
00733 self.assertEqual(job.getReturnValues(),
00734 job._commandList[0].commandInfo['returnValue'])
00735 self.assertEqual(job.status, 'done')
00736
00737
00738 def setUpCluster(self):
00739 '''
00740 This method defines three methods on the cluster:
00741 echoFunction - returns whatever is sent
00742 noReturn - has no return value
00743 errorMethod - which will raise an exception if the argument is true
00744 '''
00745 cluster = simple_cluster.getCluster()
00746
00747 command = '''
00748 def echoFunction(input = ''):
00749 return input
00750 '''
00751 for engine in cluster.use_engines():
00752 cluster.do_and_record(command, engine)
00753
00754
00755 command = '''
00756 def noReturn():
00757 pass
00758 '''
00759
00760 for engine in cluster.use_engines():
00761 cluster.do_and_record(command, engine)
00762
00763 command = '''
00764 def setError(setError, returnValue):
00765 if setError:
00766 raise Exception, "Error Condition"
00767 return returnValue
00768 '''
00769
00770 for engine in cluster.use_engines():
00771 cluster.do_and_record(command, engine)
00772
00773
00774 counter = 30
00775 while counter > 0 and (len(cluster.get_status(True)) <
00776 len(cluster.use_engines())):
00777 time.sleep(1)
00778 counter -= 1
00779 cluster.remove_record()
00780
00781 def suite():
00782 return [test_simplecluster,test_flagdata_mms,test_setjy_mms,test_applycal_mms,test_uvcont_mms]
00783
00784 if __name__ == '__main__':
00785 testSuite = []
00786 for testClass in suite():
00787 testSuite.append(unittest.makeSuite(testClass,'test'))
00788 allTests = unittest.TestSuite(testSuite)
00789 unittest.TextTestRunner(verbosity=2).run(allTests)