Coverage for /wheeldirectory/casa-CAS-14623-1-py3.12.el8/lib/py/lib/python3.12/site-packages/casatasks/private/imagerhelpers/imager_parallel_continuum.py: 15%

208 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-09 01:03 +0000

1import os 

2import math 

3import shutil 

4import string 

5import time 

6import re; 

7import copy 

8import pdb 

9 

10from casatools import synthesisimager, synthesisnormalizer 

11from casatasks import casalog 

12 

13from .imager_base import PySynthesisImager 

14from .parallel_imager_helper import PyParallelImagerHelper 

15synth_imager_name = 'synthesisimager' 

16synth_imager_import = 'from casatools import synthesisimager' 

17 

18 

19''' 

20An implementation of parallel continuum imaging, using synthesisxxxx tools 

21 

22Datasets are partitioned by row and major cycles are parallelized.  

23Gathers and normalization are done before passing the images to a 

24non-parallel minor cycle. The output model image is them scattered to 

25all the nodes for the next parallel major cycle. 

26 

27There are N synthesisimager objects. 

28There is 1 instance per image field, of the normalizer and deconvolver. 

29There is 1 iterbot.  

30  

31''' 

32 

33############################################# 

34############################################# 

35## Parallelize only major cycle. 

36############################################# 

37class PyParallelContSynthesisImager(PySynthesisImager): 

38 

39 def __init__(self,params=None): 

40 

41 PySynthesisImager.__init__(self,params) 

42 

43 self.PH = PyParallelImagerHelper() 

44 self.NN = self.PH.NN 

45 self.selpars = self.allselpars; 

46 self.allselpars = self.PH.partitionContDataSelection(self.allselpars) 

47 # self.allcflist = self.PH.partitionCFCacheList(self.cfcachepars['cflist']); 

48 # self.allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']); 

49 self.listOfNodes = self.PH.getNodeList(); 

50 self.coordsyspars = {}; 

51 self.toolsi=None 

52 

53############################################# 

54 def resetSaveModelParams(self, params=None): 

55 mainparams = params.getSelPars() 

56 for n in self.allselpars: # for all nodes 

57 for v in self.allselpars[n]: # for all MSes 

58 self.allselpars[n][v]['readonly']=mainparams[v]['readonly'] 

59 self.allselpars[n][v]['usescratch']=mainparams[v]['usescratch'] 

60 

61############################################# 

62# def initializeImagers(self): 

63# ### Drygridding, and Coordsys comes from a single imager on MAIN node. 

64# ### No startmodel confusion. It's created only once and then scattered. 

65 #self.initializeImagers() 

66# pdb.set_trace() 

67# super().initializeImagers() 

68# ### Note : Leftover from CAS-9977  

69# ### There is a coord system mismatch at scatter/gather, if the MAIN version already 

70# ### exists on disk. With startmodel, it's xxx.model. With aproject, it's xxx.residual. 

71# ### There is an exception in SIImageStore::openImage to handle this.  

72# ### Turn on casalog.filter('DEBUG1') to see the warning message. 

73 

74 

75############################################# 

76 def initializeImagersBase(self,thisSelPars,partialSelPars): 

77 

78 if partialSelPars==False: ## Init only on the zero'th node 

79 

80 # 

81 # Use the already-created imager on MAIN node 

82 # 

83 ##self.toolsi = synthesisimager() 

84 

85 # 

86 # Select data.  

87 # 

88 for mss in sorted( self.selpars.keys() ): 

89 self.toolsi.selectdata( thisSelPars[mss] ) 

90 

91 # Defineimage.  

92 # This makes the global csys. Get csys to distribute to other nodes 

93 # It also sets 'startmodel' if available (this is later scattered to nodes) 

94 for fld in range(0,self.NF): 

95 tmpimpars = copy.deepcopy(self.allimpars[str(fld)]) 

96 #if tmpimpars.has_key('startmodel'): 

97 # tmpimpars.pop('startmodel') 

98 self.toolsi.defineimage( impars=tmpimpars, gridpars = self.allgridpars[str(fld)] ) 

99 fullcoords = self.toolsi.getcsys() 

100 self.coordsyspars[str(fld)] = fullcoords 

101 

102 # Modify the coordsys inputs 

103 for fld in range(0, self.NF): 

104 self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy() 

105 

106 # Call the global defineimage again  

107 # (to get around later error of different coordsys latpoles! (CAs-9977) 

108 #for fld in range(0,self.NF): 

109 # self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] ) 

110 

111 

112 

113 else: ## partialSelPars==True , The actual initialization on all nodes. 

114 

115 # 

116 # Start the imagers on all nodes. 

117 # 

118 joblist=[] 

119 for node in self.listOfNodes: 

120 joblist.append( self.PH.runcmd('{0}; toolsi = {1}()'.format( 

121 synth_imager_import, synth_imager_name), 

122 node) ); 

123 self.PH.checkJobs(joblist); 

124 

125 # 

126 # Select data. If partialSelPars is True, use the thisSelPars 

127 # data structure as a list of partitioned selections. 

128 # 

129 joblist=[]; 

130 nodes=self.listOfNodes;#[1]; 

131 for node in nodes: 

132 for mss in sorted( self.selpars.keys() ): 

133 selStr=str(thisSelPars[str(node-1)][mss]); 

134 joblist.append( self.PH.runcmd("toolsi.selectdata( "+selStr+")", node) ) 

135 self.PH.checkJobs(joblist); 

136 

137 # 

138 # Call defineimage at each node. 

139 # 

140 joblist=[]; 

141 for node in nodes: 

142 ## For each image-field, define imaging parameters 

143 nimpars = copy.deepcopy(self.allimpars) 

144 # casalog.post("nimpars = "+str(nimpars)) 

145 ngridpars = copy.deepcopy(self.allgridpars) 

146 for fld in range(0,self.NF): 

147 if self.NN>1: 

148 nimpars[str(fld)]['imagename'] = self.PH.getpartimagename( nimpars[str(fld)]['imagename'], node ) 

149 

150 ## Pop out the startmodel, as it would already have been created on the main node,. 

151 tmpimpars = nimpars[str(fld)] 

152 if 'startmodel' in tmpimpars: 

153 tmpimpars.pop('startmodel') 

154 

155 joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) 

156 + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) ) 

157 self.PH.checkJobs(joblist); 

158 

159############################################# 

160 

161 def initializeImagers(self): 

162 #--------------------------------------- 

163 # Check if cfcache exists. 

164 # 

165 cfCacheName='' 

166 cfcExists=False 

167 if(self.allgridpars['0']['gridder'].startswith('awpr')): 

168 cfCacheName=self.allgridpars['0']['cfcache'] 

169 else: 

170 self.allgridpars['0']['cfcache']='' 

171 cfcExists=True 

172 if(self.allgridpars['0']['gridder'] == 'awproject' or self.allgridpars['0']['gridder'] == 'awprojectft'): 

173 if (cfCacheName == ''): 

174 cfCacheName = self.allimpars['0']['imagename'] + '.cf' 

175 cfCacheName=self.allgridpars['0']['cfcache'] = cfCacheName 

176 self.allgridpars['0']['cfcache']= cfCacheName 

177 cfcExists = (os.path.exists(cfCacheName) and os.path.isdir(cfCacheName)); 

178 if (cfcExists): 

179 nCFs = len(os.listdir(cfCacheName)); 

180 if (nCFs == 0): 

181 casalog.post(cfCacheName + " exists, but is empty. Attempt is being made to fill it now.","WARN") 

182 cfcExists = False; 

183 # casalog.post("##########################################") 

184 # casalog.post("CFCACHE = "+cfCacheName,cfcExists) 

185 # casalog.post("##########################################") 

186 

187 

188 # Start one imager on MAIN node 

189 self.toolsi = synthesisimager() 

190 

191 # Init one SI tool ( it records the csys per field in self.coordsyspars ) 

192 self.initializeImagersBase(self.selpars,False); 

193 

194 # Modify the coordsys inputs 

195# for fld in range(0, self.NF): 

196# self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy() 

197 

198 # Dry Gridding on the MAIN node ( i.e. on self.toolsi) 

199 #if (not cfcExists): 

200 # self.dryGridding(); 

201 

202 ##weighting with mosfield=True 

203 if( ( ( (self.weightpars['type'].count('briggs') or self.weightpars['type'].count('uniform')) > 0) and (self.weightpars['multifield']) ) ): 

204 self.toolsi.setweighting(**self.weightpars) 

205 ###master create the weight density for all fields 

206 self.toolsi.getweightdensity() 

207 

208 # Clean up the single imager (MAIN node) 

209 #self.toolsi.done() 

210 #self.toolsi = None 

211 

212 # Do the second round, initializing imagers on ALL nodes 

213 self.initializeImagersBase(self.allselpars,True); 

214 

215 # Fill CFCache - it uses all nodes. 

216 if (not cfcExists): 

217 self.SItool=self.toolsi 

218 #super().initializeImagers() 

219 ###Doing this serially as in parallel it randomly has race condition 

220 ###about table.dat not available 

221 super().makeCFCache(False) 

222 #self.fillCFCache() 

223 self.reloadCFCache(); 

224 self.SItool=None 

225 self.toolsi.done() 

226 self.toolsi = None 

227 

228 

229###################################################################################################################################### 

230 #--------------------------------------- 

231 # 4. call setdata() for images on all nodes 

232 # 

233 # joblist=[]; 

234 # for node in self.listOfNodes: 

235 # ## Send in Selection parameters for all MSs in the list 

236 # #### MPIInterface related changes (the -1 in the expression below) 

237 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ): 

238 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) ) 

239 # self.PH.checkJobs(joblist); 

240 

241 #--------------------------------------- 

242 # 5. Call defineImage() on all nodes. This sets up the FTMs. 

243 # 

244# joblist=[]; 

245# for node in self.listOfNodes: 

246# ## For each image-field, define imaging parameters 

247# nimpars = copy.deepcopy(self.allimpars) 

248# #casalog.post("nimpars = "+str(nimpars)) 

249# ngridpars = copy.deepcopy(self.allgridpars) 

250# for fld in range(0,self.NF): 

251# if self.NN>1: 

252# nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

253# ### nimpars[str(fld)]['imagename'] = self.allnormpars[str(fld)]['workdir'] + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

254# ### nimpars[str(fld)]['imagename'] = nimpars[str(fld)]['imagename']+'.n'+str(node) 

255 

256# # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']+'.n'+str(node) 

257# # # Give the same CFCache name to all nodes 

258# ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']; 

259 

260# joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) ) 

261# self.PH.checkJobs(joblist); 

262 

263 #--------------------------------------- 

264 # 6. If cfcache does not exist, call fillCFCache() 

265 # This will fill the "empty" CFCache in parallel 

266 # 7. Now call reloadCFCache() on all nodes. 

267 # This reloads the latest cfcahce. 

268 

269 

270 

271 # TRY: Start all over again! 

272 # self.deleteImagers(); 

273 

274 # joblist=[] 

275 

276 # for node in self.listOfNodes: 

277 # joblist.append( self.PH.runcmd("toolsi = casac.synthesisimager()", node) ); 

278 # self.PH.checkJobs(joblist); 

279 

280 # joblist=[]; 

281 # nodes=self.listOfNodes;#[1]; 

282 # for node in nodes: 

283 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ): 

284 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) ) 

285 # # for mss in sorted( self.selpars.keys() ): 

286 # # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.selpars[mss])+")", node) ) 

287 # self.PH.checkJobs(joblist); 

288 

289 # joblist=[]; 

290 # for node in self.listOfNodes: 

291 # nimpars = copy.deepcopy(self.allimpars) 

292 # ngridpars = copy.deepcopy(self.allgridpars) 

293 # for fld in range(0,self.NF): 

294 # if self.NN>1: 

295 # nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

296 # # # Give the same CFCache name to all nodes 

297 # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']; 

298 

299 # joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) ) 

300 # self.PH.checkJobs(joblist); 

301 

302 

303############################################# 

304 

305 

306############################################# 

307 

308 def initializeNormalizers(self): 

309 for immod in range(0,self.NF): 

310 self.PStools.append(synthesisnormalizer()) 

311 self.localnormpars = copy.deepcopy( self.allnormpars[str(immod)] ) 

312 partnames = [] 

313 if(self.NN>1): 

314 #### MPIInterface related changes 

315 #for node in range(0,self.NN): 

316 for node in self.listOfNodes: 

317 partnames.append( self.PH.getpartimagename( self.allimpars[str(immod)]['imagename'], node ) ) 

318 #onename = self.allimpars[str(immod)]['imagename']+'.n'+str(node) 

319 #partnames.append( self.PH.getpath(node) + '/' + onename ) 

320 #self.PH.deletepartimages( self.PH.getpath(node), onename ) # To ensure restarts work properly. 

321 self.PH.deletepartimages( self.allimpars[str(immod)]['imagename'] , node ) # To ensure restarts work properly. 

322 self.localnormpars['partimagenames'] = partnames 

323 

324 self.PStools[immod].setupnormalizer(normpars=self.localnormpars) 

325 

326 

327############################################# 

328 def setWeighting(self): 

329 ## Set weight parameters and accumulate weight density (natural) 

330 joblist=[]; 

331 if( ( ((self.weightpars['type'].count('briggs') or self.weightpars['type'].count('uniform')) >0) and (self.weightpars['multifield']) ) ): 

332 ###master created the weight density for all fields 

333 ##Should have been in initializeImagersBase_New but it is not being called ! 

334 self.toolsi = synthesisimager() 

335 for mss in sorted( self.selpars.keys() ): 

336 self.toolsi.selectdata( self.selpars[mss] ) 

337 for fld in range(0,self.NF): 

338 self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] ) 

339 self.toolsi.setweighting(**self.weightpars) 

340 ###master create the weight density for all fields 

341 weightimage=self.toolsi.getweightdensity() 

342 self.toolsi.done() 

343 self.toolsi=None 

344 destWgtim=weightimage+'_moswt' 

345 if( os.path.exists(destWgtim)): 

346 shutil.rmtree(destWgtim) 

347 shutil.move(weightimage, destWgtim) 

348 joblist=[]; 

349 for node in self.listOfNodes: 

350 joblist.append( self.PH.runcmd("toolsi.setweightdensity('"+str(destWgtim)+"')", node ) ) 

351 self.PH.checkJobs( joblist ) 

352 #for node in self.listOfNodes: 

353 # ## Set weighting pars 

354 # joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) ) 

355 #self.PH.checkJobs( joblist ) 

356 #joblist=[]; 

357 #for node in self.listOfNodes: 

358 # joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) ) 

359 #self.PH.checkJobs( joblist ) 

360 

361 #for immod in range(0,self.NF): 

362 # #self.PStools[immod].gatherweightdensity() 

363 # self.PStools[immod].scatterweightdensity() 

364 ## Set weight density for each nodel 

365 #joblist=[]; 

366 #for node in self.listOfNodes: 

367 # joblist.append( self.PH.runcmd("toolsi.setweightdensity()", node ) ) 

368 #self.PH.checkJobs( joblist ) 

369 #### end of multifield or mosweight 

370 else: 

371 joblist=[]; 

372 for node in self.listOfNodes: 

373 ## Set weighting pars 

374 joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) ) 

375 self.PH.checkJobs( joblist ) 

376 

377 ## If only one field, do the get/gather/set of the weight density. 

378 if self.NF == 1: # and self.allimpars['0']['stokes']=="I": ## Remove after gridded wts appear for all fields correctly (i.e. new FTM). 

379 

380 if not ( (self.weightpars['type'] == 'natural') or (self.weightpars['type'] == 'radial')) : ## For natural and radial, this array isn't created at all. 

381 ## Remove when we switch to new FTM 

382 

383 casalog.post("Gathering/Merging/Scattering Weight Density for PSF generation","INFO") 

384 

385 joblist=[]; 

386 for node in self.listOfNodes: 

387 joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) ) 

388 self.PH.checkJobs( joblist ) 

389 

390 

391 

392 

393 

394 ## gather weightdensity and sum and scatter 

395 casalog.post("******************************************************") 

396 casalog.post(" gather and scatter now ") 

397 casalog.post("******************************************************") 

398 locpstool=synthesisnormalizer() 

399 locpstool.setupnormalizer(normpars=self.localnormpars) 

400 

401 locpstool.gatherweightdensity() 

402 sumgridname=locpstool.scatterweightdensity() 

403 resname=sumgridname.replace(".gridwt", ".residual") 

404 #print("%%%%%%%%", sumgridname) 

405 if(os.path.exists(sumgridname+"_temp") and (os.path.exists(resname) or os.path.exists(resname+".tt0")) ): # a restart 

406 shutil.rmtree(sumgridname, True) 

407 shutil.move(sumgridname+"_temp", sumgridname) 

408 

409 ## Set weight density for each nodel 

410 joblist=[]; 

411 for node in self.listOfNodes: 

412 joblist.append( self.PH.runcmd("toolsi.setweightdensity('"+str(sumgridname)+"')", node ) ) 

413 self.PH.checkJobs( joblist ) 

414 ###For some reason we cannot stop psf being made along with gridwt image and  

415 ### and may have the wrong shape at this stage 

416 #shutil.rmtree(sumgridname) 

417 shutil.rmtree(sumgridname+"_temp", True) 

418 shutil.move(sumgridname, sumgridname+"_temp") 

419 

420 tmppsfname=sumgridname.replace(".gridwt", ".psf") 

421 resname=sumgridname.replace(".gridwt", ".residual") 

422 if(not os.path.exists(resname)) : # not a restart so psf shape may be different if full pol...delete it 

423 shutil.rmtree(tmppsfname, True) 

424 if(not os.path.exists(resname+".tt0")) : 

425 shutil.rmtree(tmppsfname+".tt0", True) 

426 

427 

428 else: 

429 if not ( (self.weightpars['type'] == 'natural') or (self.weightpars['type'] == 'radial')) : 

430 casalog.post("Parallel-Continuum-multifield with briggs weighting will give different weighting schemes with number of processes used", "WARN") 

431 

432 

433 

434 def deleteImagers(self): 

435 self.PH.runcmd("toolsi.done()") 

436 

437 def deleteWorkDir(self): 

438 ## Delete the contents of the .workdirectory  

439 for immod in range(0,self.NF): 

440 normpars = copy.deepcopy( self.allnormpars[str(immod)] ) 

441 if(self.NN>1): 

442 for node in self.listOfNodes: 

443 self.PH.deletepartimages( self.allimpars[str(immod)]['imagename'], node ,deldir=True ) 

444 

445# ## Delete the workdirectory 

446# casalog.post("Deleting workdirectory : "+self.PH.getworkdir(imagename, node)) 

447# shutil.rmtree( self.PH.getworkdir(imagename, node) ) 

448 

449 def deleteCluster(self): 

450 self.PH.takedownCluster() 

451 

452# ############################################# 

453 def dryGridding(self): 

454 dummy=[''] 

455 self.toolsi.drygridding(dummy) 

456 

457# def dryGridding_Old(self): 

458# nodes=[1]; 

459# joblist=[]; 

460# for node in nodes: 

461# dummy=['']; 

462# cmd = "toolsi.drygridding("+str(dummy)+")"; 

463# joblist.append(self.PH.runcmd(cmd,node)); 

464# self.PH.checkJobs(joblist); 

465 

466############################################# 

467 def reloadCFCache(self): 

468 joblist=[]; 

469 for node in self.listOfNodes: 

470 cmd = "toolsi.reloadcfcache()"; 

471 casalog.post("reloadCFCache, CMD = {} {}".format(node, cmd)) 

472 joblist.append(self.PH.runcmd(cmd,node)); 

473 self.PH.checkJobs(joblist); 

474############################################# 

475# def fillCFCache(self): 

476# #casalog.post("-----------------------fillCFCache------------------------------------") 

477# # cflist=[f for f in os.listdir(self.allgridpars['cfcache']) if re.match(r'CFS*', f)]; 

478# # partCFList =  

479# if(not str(self.allgridpars['0']['gridder']).startswith("awp")): 

480# return 

481# allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']); 

482# cfcPath = "\""+str(self.allgridpars['0']['cfcache'])+"\""; 

483# ftmname = "\""+str(self.allgridpars['0']['gridder'])+"\""; 

484# psTermOn = str(self.allgridpars['0']['psterm']); 

485# aTermOn = str(self.allgridpars['0']['aterm']); 

486# conjBeams = str(self.allgridpars['0']['conjbeams']); 

487# #aTermOn = str(True); 

488# # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") 

489# # casalog.post("AllCFList = ",allcflist) 

490# m = len(allcflist); 

491# # casalog.post("No. of nodes used: " + m,cfcPath,ftmname) 

492# # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") 

493 

494# joblist=[]; 

495# for node in self.listOfNodes[:m]: 

496# # casalog.post("#!$#!%#!$#@$#@$ " + allcflist) 

497# cmd = "toolsi.fillcfcache("+str(allcflist[node])+","+str(ftmname)+","+str(cfcPath)+","+psTermOn+","+aTermOn+","+conjBeams+")"; 

498# # casalog.post("CMD = " + str(node) +" " + cmd) 

499# joblist.append(self.PH.runcmd(cmd,node)); 

500# self.PH.checkJobs(joblist); 

501 

502# # Linear code 

503# cfcName = self.allgridpars['0']['cfcache']; 

504# cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)]; 

505# self.cfcachepars['cflist']=cflist; 

506# self.toolsi.fillcfcache(cflist, self.allgridpars['0']['gridder'], 

507# cfcName, 

508# self.allgridpars['0']['psterm'], 

509# self.allgridpars['0']['aterm'], 

510# self.allgridpars['0']['conjbeams']); 

511# # self.SItool.fillcfcache(**(self.cfcachepars)) ; 

512############################################# 

513 def makePSFCore(self): 

514 ### Make PSFs 

515 joblist=[] 

516 #### MPIInterface related changes 

517 #for node in range(0,self.PH.NN): 

518 for node in self.listOfNodes: 

519 joblist.append( self.PH.runcmd("toolsi.makepsf()",node) ) 

520 self.PH.checkJobs( joblist ) # this call blocks until all are done. 

521 

522############################################# 

523 def makePBCore(self): 

524 joblist=[] 

525 # Only one node needs to make the PB. It reads the freq from the image coordsys 

526 joblist.append( self.PH.runcmd("toolsi.makepb()",self.listOfNodes[0]) ) 

527 self.PH.checkJobs( joblist ) 

528 

529############################################# 

530 

531 def runMajorCycleCore(self, lastcycle): 

532 casalog.post("----------------------------- Running Parallel Major Cycle ----------------------------","INFO") 

533 ### Run major cycle 

534 joblist=[] 

535 #### MPIInterface related changes 

536 #for node in range(0,self.PH.NN): 

537 for node in self.listOfNodes: 

538 joblist.append( self.PH.runcmd("toolsi.executemajorcycle(controls={'lastcycle':"+str(lastcycle)+"})",node) ) 

539 self.PH.checkJobs( joblist ) # this call blocks until all are done. 

540 

541############################################# 

542 def predictModelCore(self): 

543 joblist=[] 

544 #### MPIInterface related changes 

545 #for node in range(0,self.PH.NN): 

546 for node in self.listOfNodes: 

547 joblist.append( self.PH.runcmd("toolsi.predictmodel()",node) ) 

548 self.PH.checkJobs( joblist ) # this call blocks until all are done. 

549 

550 def estimatememory(self): 

551 joblist=[] 

552 for node in self.listOfNodes: 

553 joblist.append( self.PH.runcmd("toolsi.estimatememory()", node) ) 

554 self.PH.checkJobs( joblist )