Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/imagerhelpers/imager_parallel_continuum.py: 14%

223 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-21 07:43 +0000

1import os 

2import math 

3import shutil 

4import string 

5import time 

6import re 

7import copy 

8import pdb 

9 

10from casatools import synthesisimager, synthesisnormalizer 

11from casatasks import casalog 

12 

13from .imager_base import PySynthesisImager 

14from .parallel_imager_helper import PyParallelImagerHelper 

15 

16synth_imager_name = "synthesisimager" 

17synth_imager_import = "from casatools import synthesisimager" 

18 

19 

20""" 

21An implementation of parallel continuum imaging, using synthesisxxxx tools 

22 

23Datasets are partitioned by row and major cycles are parallelized.  

24Gathers and normalization are done before passing the images to a 

25non-parallel minor cycle. The output model image is them scattered to 

26all the nodes for the next parallel major cycle. 

27 

28There are N synthesisimager objects. 

29There is 1 instance per image field, of the normalizer and deconvolver. 

30There is 1 iterbot.  

31  

32""" 

33 

34 

35############################################# 

36############################################# 

37## Parallelize only major cycle. 

38############################################# 

39class PyParallelContSynthesisImager(PySynthesisImager): 

40 

41 def __init__(self, params=None): 

42 

43 PySynthesisImager.__init__(self, params) 

44 

45 self.PH = PyParallelImagerHelper() 

46 self.NN = self.PH.NN 

47 self.selpars = self.allselpars 

48 self.allselpars = self.PH.partitionContDataSelection(self.allselpars) 

49 # self.allcflist = self.PH.partitionCFCacheList(self.cfcachepars['cflist']); 

50 # self.allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']); 

51 self.listOfNodes = self.PH.getNodeList() 

52 self.coordsyspars = {} 

53 self.toolsi = None 

54 

55 ############################################# 

56 def resetSaveModelParams(self, params=None): 

57 mainparams = params.getSelPars() 

58 for n in self.allselpars: # for all nodes 

59 for v in self.allselpars[n]: # for all MSes 

60 self.allselpars[n][v]["readonly"] = mainparams[v]["readonly"] 

61 self.allselpars[n][v]["usescratch"] = mainparams[v][ 

62 "usescratch" 

63 ] 

64 

65 ############################################# 

66 # def initializeImagers(self): 

67 # ### Drygridding, and Coordsys comes from a single imager on MAIN node. 

68 # ### No startmodel confusion. It's created only once and then scattered. 

69 # self.initializeImagers() 

70 # pdb.set_trace() 

71 # super().initializeImagers() 

72 # ### Note : Leftover from CAS-9977 

73 # ### There is a coord system mismatch at scatter/gather, if the MAIN version already 

74 # ### exists on disk. With startmodel, it's xxx.model. With aproject, it's xxx.residual. 

75 # ### There is an exception in SIImageStore::openImage to handle this. 

76 # ### Turn on casalog.filter('DEBUG1') to see the warning message. 

77 

78 ############################################# 

79 def initializeImagersBase(self, thisSelPars, partialSelPars): 

80 

81 if partialSelPars == False: ## Init only on the zero'th node 

82 

83 # 

84 # Use the already-created imager on MAIN node 

85 # 

86 ##self.toolsi = synthesisimager() 

87 

88 # 

89 # Select data. 

90 # 

91 for mss in sorted(self.selpars.keys()): 

92 self.toolsi.selectdata(thisSelPars[mss]) 

93 

94 # Defineimage. 

95 # This makes the global csys. Get csys to distribute to other nodes 

96 # It also sets 'startmodel' if available (this is later scattered to nodes) 

97 for fld in range(0, self.NF): 

98 tmpimpars = copy.deepcopy(self.allimpars[str(fld)]) 

99 # if tmpimpars.has_key('startmodel'): 

100 # tmpimpars.pop('startmodel') 

101 self.toolsi.defineimage( 

102 impars=tmpimpars, gridpars=self.allgridpars[str(fld)] 

103 ) 

104 fullcoords = self.toolsi.getcsys() 

105 self.coordsyspars[str(fld)] = fullcoords 

106 

107 # Modify the coordsys inputs 

108 for fld in range(0, self.NF): 

109 self.allimpars[str(fld)]["csys"] = self.coordsyspars[str(fld)][ 

110 "coordsys" 

111 ].copy() 

112 

113 # Call the global defineimage again 

114 # (to get around later error of different coordsys latpoles! (CAs-9977) 

115 # for fld in range(0,self.NF): 

116 # self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] ) 

117 

118 else: ## partialSelPars==True , The actual initialization on all nodes. 

119 

120 # 

121 # Start the imagers on all nodes. 

122 # 

123 joblist = [] 

124 for node in self.listOfNodes: 

125 joblist.append( 

126 self.PH.runcmd( 

127 "{0}; toolsi = {1}()".format( 

128 synth_imager_import, synth_imager_name 

129 ), 

130 node, 

131 ) 

132 ) 

133 self.PH.checkJobs(joblist) 

134 

135 # 

136 # Select data. If partialSelPars is True, use the thisSelPars 

137 # data structure as a list of partitioned selections. 

138 # 

139 joblist = [] 

140 nodes = self.listOfNodes 

141 # [1]; 

142 for node in nodes: 

143 for mss in sorted(self.selpars.keys()): 

144 selStr = str(thisSelPars[str(node - 1)][mss]) 

145 joblist.append( 

146 self.PH.runcmd( 

147 "toolsi.selectdata( " + selStr + ")", node 

148 ) 

149 ) 

150 self.PH.checkJobs(joblist) 

151 

152 # 

153 # Call defineimage at each node. 

154 # 

155 joblist = [] 

156 for node in nodes: 

157 ## For each image-field, define imaging parameters 

158 nimpars = copy.deepcopy(self.allimpars) 

159 # casalog.post("nimpars = "+str(nimpars)) 

160 ngridpars = copy.deepcopy(self.allgridpars) 

161 for fld in range(0, self.NF): 

162 if self.NN > 1: 

163 nimpars[str(fld)]["imagename"] = ( 

164 self.PH.getpartimagename( 

165 nimpars[str(fld)]["imagename"], node 

166 ) 

167 ) 

168 

169 ## Pop out the startmodel, as it would already have been created on the main node,. 

170 tmpimpars = nimpars[str(fld)] 

171 if "startmodel" in tmpimpars: 

172 tmpimpars.pop("startmodel") 

173 

174 joblist.append( 

175 self.PH.runcmd( 

176 "toolsi.defineimage( impars=" 

177 + str(nimpars[str(fld)]) 

178 + ", gridpars=" 

179 + str(ngridpars[str(fld)]) 

180 + ")", 

181 node, 

182 ) 

183 ) 

184 self.PH.checkJobs(joblist) 

185 

186 ############################################# 

187 

188 def initializeImagers(self): 

189 # --------------------------------------- 

190 # Check if cfcache exists. 

191 # 

192 cfCacheName = "" 

193 cfcExists = False 

194 if self.allgridpars["0"]["gridder"].startswith("awpr"): 

195 cfCacheName = self.allgridpars["0"]["cfcache"] 

196 else: 

197 self.allgridpars["0"]["cfcache"] = "" 

198 cfcExists = True 

199 if ( 

200 self.allgridpars["0"]["gridder"] == "awproject" 

201 or self.allgridpars["0"]["gridder"] == "awprojectft" 

202 ): 

203 if cfCacheName == "": 

204 cfCacheName = self.allimpars["0"]["imagename"] + ".cf" 

205 cfCacheName = self.allgridpars["0"]["cfcache"] = cfCacheName 

206 self.allgridpars["0"]["cfcache"] = cfCacheName 

207 cfcExists = os.path.exists(cfCacheName) and os.path.isdir( 

208 cfCacheName 

209 ) 

210 if cfcExists: 

211 nCFs = len(os.listdir(cfCacheName)) 

212 if nCFs == 0: 

213 casalog.post( 

214 cfCacheName 

215 + " exists, but is empty. Attempt is being made to fill it now.", 

216 "WARN", 

217 ) 

218 cfcExists = False 

219 # casalog.post("##########################################") 

220 # casalog.post("CFCACHE = "+cfCacheName,cfcExists) 

221 # casalog.post("##########################################") 

222 

223 # Start one imager on MAIN node 

224 self.toolsi = synthesisimager() 

225 

226 # Init one SI tool ( it records the csys per field in self.coordsyspars ) 

227 self.initializeImagersBase(self.selpars, False) 

228 

229 # Modify the coordsys inputs 

230 # for fld in range(0, self.NF): 

231 # self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy() 

232 

233 # Dry Gridding on the MAIN node ( i.e. on self.toolsi) 

234 # if (not cfcExists): 

235 # self.dryGridding(); 

236 

237 ##weighting with mosfield=True 

238 if ( 

239 ( 

240 self.weightpars["type"].count("briggs") 

241 or self.weightpars["type"].count("uniform") 

242 ) 

243 > 0 

244 ) and (self.weightpars["multifield"]): 

245 self.toolsi.setweighting(**self.weightpars) 

246 ###master create the weight density for all fields 

247 self.toolsi.getweightdensity() 

248 

249 # Clean up the single imager (MAIN node) 

250 # self.toolsi.done() 

251 # self.toolsi = None 

252 

253 # Do the second round, initializing imagers on ALL nodes 

254 self.initializeImagersBase(self.allselpars, True) 

255 

256 # Fill CFCache - it uses all nodes. 

257 if not cfcExists: 

258 self.SItool = self.toolsi 

259 # super().initializeImagers() 

260 ###Doing this serially as in parallel it randomly has race condition 

261 ###about table.dat not available 

262 super().makeCFCache(False) 

263 # self.fillCFCache() 

264 self.reloadCFCache() 

265 self.SItool = None 

266 self.toolsi.done() 

267 self.toolsi = None 

268 

269 ###################################################################################################################################### 

270 # --------------------------------------- 

271 # 4. call setdata() for images on all nodes 

272 # 

273 # joblist=[]; 

274 # for node in self.listOfNodes: 

275 # ## Send in Selection parameters for all MSs in the list 

276 # #### MPIInterface related changes (the -1 in the expression below) 

277 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ): 

278 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) ) 

279 # self.PH.checkJobs(joblist); 

280 

281 # --------------------------------------- 

282 # 5. Call defineImage() on all nodes. This sets up the FTMs. 

283 # 

284 # joblist=[]; 

285 # for node in self.listOfNodes: 

286 # ## For each image-field, define imaging parameters 

287 # nimpars = copy.deepcopy(self.allimpars) 

288 # #casalog.post("nimpars = "+str(nimpars)) 

289 # ngridpars = copy.deepcopy(self.allgridpars) 

290 # for fld in range(0,self.NF): 

291 # if self.NN>1: 

292 # nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

293 # ### nimpars[str(fld)]['imagename'] = self.allnormpars[str(fld)]['workdir'] + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

294 # ### nimpars[str(fld)]['imagename'] = nimpars[str(fld)]['imagename']+'.n'+str(node) 

295 

296 # # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']+'.n'+str(node) 

297 # # # Give the same CFCache name to all nodes 

298 # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']; 

299 

300 # joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) ) 

301 # self.PH.checkJobs(joblist); 

302 

303 # --------------------------------------- 

304 # 6. If cfcache does not exist, call fillCFCache() 

305 # This will fill the "empty" CFCache in parallel 

306 # 7. Now call reloadCFCache() on all nodes. 

307 # This reloads the latest cfcahce. 

308 

309 # TRY: Start all over again! 

310 # self.deleteImagers(); 

311 

312 # joblist=[] 

313 

314 # for node in self.listOfNodes: 

315 # joblist.append( self.PH.runcmd("toolsi = casac.synthesisimager()", node) ); 

316 # self.PH.checkJobs(joblist); 

317 

318 # joblist=[]; 

319 # nodes=self.listOfNodes;#[1]; 

320 # for node in nodes: 

321 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ): 

322 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) ) 

323 # # for mss in sorted( self.selpars.keys() ): 

324 # # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.selpars[mss])+")", node) ) 

325 # self.PH.checkJobs(joblist); 

326 

327 # joblist=[]; 

328 # for node in self.listOfNodes: 

329 # nimpars = copy.deepcopy(self.allimpars) 

330 # ngridpars = copy.deepcopy(self.allgridpars) 

331 # for fld in range(0,self.NF): 

332 # if self.NN>1: 

333 # nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

334 # # # Give the same CFCache name to all nodes 

335 # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']; 

336 

337 # joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) ) 

338 # self.PH.checkJobs(joblist); 

339 

340 ############################################# 

341 

342 ############################################# 

343 

344 def initializeNormalizers(self): 

345 for immod in range(0, self.NF): 

346 self.PStools.append(synthesisnormalizer()) 

347 self.localnormpars = copy.deepcopy(self.allnormpars[str(immod)]) 

348 partnames = [] 

349 if self.NN > 1: 

350 #### MPIInterface related changes 

351 # for node in range(0,self.NN): 

352 for node in self.listOfNodes: 

353 partnames.append( 

354 self.PH.getpartimagename( 

355 self.allimpars[str(immod)]["imagename"], node 

356 ) 

357 ) 

358 # onename = self.allimpars[str(immod)]['imagename']+'.n'+str(node) 

359 # partnames.append( self.PH.getpath(node) + '/' + onename ) 

360 # self.PH.deletepartimages( self.PH.getpath(node), onename ) # To ensure restarts work properly. 

361 self.PH.deletepartimages( 

362 self.allimpars[str(immod)]["imagename"], node 

363 ) # To ensure restarts work properly. 

364 self.localnormpars["partimagenames"] = partnames 

365 

366 self.PStools[immod].setupnormalizer(normpars=self.localnormpars) 

367 

368 ############################################# 

369 def setWeighting(self): 

370 ## Set weight parameters and accumulate weight density (natural) 

371 joblist = [] 

372 if ( 

373 ( 

374 self.weightpars["type"].count("briggs") 

375 or self.weightpars["type"].count("uniform") 

376 ) 

377 > 0 

378 ) and (self.weightpars["multifield"]): 

379 ###master created the weight density for all fields 

380 ##Should have been in initializeImagersBase_New but it is not being called ! 

381 self.toolsi = synthesisimager() 

382 for mss in sorted(self.selpars.keys()): 

383 self.toolsi.selectdata(self.selpars[mss]) 

384 for fld in range(0, self.NF): 

385 self.toolsi.defineimage( 

386 impars=self.allimpars[str(fld)], 

387 gridpars=self.allgridpars[str(fld)], 

388 ) 

389 self.toolsi.setweighting(**self.weightpars) 

390 ###master create the weight density for all fields 

391 weightimage = self.toolsi.getweightdensity() 

392 self.toolsi.done() 

393 self.toolsi = None 

394 destWgtim = weightimage + "_moswt" 

395 if os.path.exists(destWgtim): 

396 shutil.rmtree(destWgtim) 

397 shutil.move(weightimage, destWgtim) 

398 joblist = [] 

399 for node in self.listOfNodes: 

400 joblist.append( 

401 self.PH.runcmd( 

402 "toolsi.setweightdensity('" + str(destWgtim) + "')", 

403 node, 

404 ) 

405 ) 

406 self.PH.checkJobs(joblist) 

407 # for node in self.listOfNodes: 

408 # ## Set weighting pars 

409 # joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) ) 

410 # self.PH.checkJobs( joblist ) 

411 # joblist=[]; 

412 # for node in self.listOfNodes: 

413 # joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) ) 

414 # self.PH.checkJobs( joblist ) 

415 

416 # for immod in range(0,self.NF): 

417 # #self.PStools[immod].gatherweightdensity() 

418 # self.PStools[immod].scatterweightdensity() 

419 ## Set weight density for each nodel 

420 # joblist=[]; 

421 # for node in self.listOfNodes: 

422 # joblist.append( self.PH.runcmd("toolsi.setweightdensity()", node ) ) 

423 # self.PH.checkJobs( joblist ) 

424 #### end of multifield or mosweight 

425 else: 

426 joblist = [] 

427 for node in self.listOfNodes: 

428 ## Set weighting pars 

429 joblist.append( 

430 self.PH.runcmd( 

431 "toolsi.setweighting( **" + str(self.weightpars) + ")", 

432 node, 

433 ) 

434 ) 

435 self.PH.checkJobs(joblist) 

436 

437 ## If only one field, do the get/gather/set of the weight density. 

438 if ( 

439 self.NF == 1 

440 ): # and self.allimpars['0']['stokes']=="I": ## Remove after gridded wts appear for all fields correctly (i.e. new FTM). 

441 

442 if not ( 

443 (self.weightpars["type"] == "natural") 

444 or (self.weightpars["type"] == "radial") 

445 ): ## For natural and radial, this array isn't created at all. 

446 ## Remove when we switch to new FTM 

447 

448 casalog.post( 

449 "Gathering/Merging/Scattering Weight Density for PSF generation", 

450 "INFO", 

451 ) 

452 

453 # joblist = [] 

454 # for node in self.listOfNodes: 

455 # joblist.append( 

456 # self.PH.runcmd("toolsi.getweightdensity()", node) 

457 # ) 

458 # self.PH.checkJobs(joblist) 

459 

460 ## gather weightdensity and sum and scatter 

461 casalog.post( 

462 "******************************************************" 

463 ) 

464 casalog.post(" gather and scatter now ") 

465 casalog.post( 

466 "******************************************************" 

467 ) 

468 resname = self.localnormpars["imagename"] + ".residual" 

469 sumgridname = resname.replace(".residual", ".gridwt") 

470 ################ 

471 

472 if os.path.exists(sumgridname + "_temp") and ( 

473 os.path.exists(resname) 

474 or os.path.exists(resname + ".tt0") 

475 ): # a restart 

476 shutil.rmtree(sumgridname, True) 

477 shutil.move(sumgridname + "_temp", sumgridname) 

478 ############################# 

479 

480 else: 

481 try: 

482 joblist = [] 

483 if ( 

484 self.localnormpars["calcres"] 

485 and self.localnormpars["calcpsf"] 

486 ): 

487 ##SIImageStore ..keeps comparing shape 

488 ##when we try to make a gridwt 

489 psfname = resname.replace(".residual", ".psf") 

490 pbname = resname.replace(".residual", ".pb") 

491 shutil.rmtree(resname, True) 

492 shutil.rmtree(resname + ".tt0", True) 

493 shutil.rmtree(psfname, True) 

494 shutil.rmtree(psfname + ".tt0", True) 

495 shutil.rmtree(pbname, True) 

496 shutil.rmtree(pbname + ".tt0", True) 

497 

498 for node in self.listOfNodes: 

499 joblist.append( 

500 self.PH.runcmd( 

501 "toolsi.getweightdensity()", node 

502 ) 

503 ) 

504 self.PH.checkJobs(joblist) 

505 locpstool = synthesisnormalizer() 

506 locpstool.setupnormalizer( 

507 normpars=self.localnormpars 

508 ) 

509 

510 locpstool.gatherweightdensity() 

511 sumgridname = locpstool.scatterweightdensity() 

512 # print("%%%%%%%%", sumgridname) 

513 except Exception as inst: 

514 if os.path.exists(resname) or os.path.exists( 

515 resname + ".tt0" 

516 ): 

517 raise Exception( 

518 f"restarting Briggs Style weighting for parallel mfs imaging without the {sumgridname}_temp file on disk try restarting with calcres and calcpsf=True" 

519 ) from inst 

520 else: 

521 raise Exception(inst) 

522 

523 ## Set weight density for each nodel 

524 joblist = [] 

525 for node in self.listOfNodes: 

526 joblist.append( 

527 self.PH.runcmd( 

528 "toolsi.setweightdensity('" 

529 + str(sumgridname) 

530 + "')", 

531 node, 

532 ) 

533 ) 

534 self.PH.checkJobs(joblist) 

535 ###For some reason we cannot stop psf being made along with gridwt image and 

536 ### and may have the wrong shape at this stage 

537 # shutil.rmtree(sumgridname) 

538 shutil.rmtree(sumgridname + "_temp", True) 

539 shutil.move(sumgridname, sumgridname + "_temp") 

540 

541 tmppsfname = sumgridname.replace(".gridwt", ".psf") 

542 resname = sumgridname.replace(".gridwt", ".residual") 

543 if not os.path.exists( 

544 resname 

545 ): # not a restart so psf shape may be different if full pol...delete it 

546 shutil.rmtree(tmppsfname, True) 

547 if not os.path.exists(resname + ".tt0"): 

548 shutil.rmtree(tmppsfname + ".tt0", True) 

549 

550 else: 

551 if not ( 

552 (self.weightpars["type"] == "natural") 

553 or (self.weightpars["type"] == "radial") 

554 ): 

555 casalog.post( 

556 "Parallel-Continuum-multifield with briggs weighting will give different weighting schemes with number of processes used", 

557 "WARN", 

558 ) 

559 

560 def deleteImagers(self): 

561 self.PH.runcmd("toolsi.done()") 

562 

563 def deleteWorkDir(self): 

564 ## Delete the contents of the .workdirectory 

565 for immod in range(0, self.NF): 

566 normpars = copy.deepcopy(self.allnormpars[str(immod)]) 

567 if self.NN > 1: 

568 for node in self.listOfNodes: 

569 self.PH.deletepartimages( 

570 self.allimpars[str(immod)]["imagename"], 

571 node, 

572 deldir=True, 

573 ) 

574 

575 # ## Delete the workdirectory 

576 # casalog.post("Deleting workdirectory : "+self.PH.getworkdir(imagename, node)) 

577 # shutil.rmtree( self.PH.getworkdir(imagename, node) ) 

578 

579 def deleteCluster(self): 

580 self.PH.takedownCluster() 

581 

582 # ############################################# 

583 def dryGridding(self): 

584 dummy = [""] 

585 self.toolsi.drygridding(dummy) 

586 

587 # def dryGridding_Old(self): 

588 # nodes=[1]; 

589 # joblist=[]; 

590 # for node in nodes: 

591 # dummy=['']; 

592 # cmd = "toolsi.drygridding("+str(dummy)+")"; 

593 # joblist.append(self.PH.runcmd(cmd,node)); 

594 # self.PH.checkJobs(joblist); 

595 

596 ############################################# 

597 def reloadCFCache(self): 

598 joblist = [] 

599 for node in self.listOfNodes: 

600 cmd = "toolsi.reloadcfcache()" 

601 casalog.post("reloadCFCache, CMD = {} {}".format(node, cmd)) 

602 joblist.append(self.PH.runcmd(cmd, node)) 

603 self.PH.checkJobs(joblist) 

604 

605 ############################################# 

606 # def fillCFCache(self): 

607 # #casalog.post("-----------------------fillCFCache------------------------------------") 

608 # # cflist=[f for f in os.listdir(self.allgridpars['cfcache']) if re.match(r'CFS*', f)]; 

609 # # partCFList = 

610 # if(not str(self.allgridpars['0']['gridder']).startswith("awp")): 

611 # return 

612 # allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']); 

613 # cfcPath = "\""+str(self.allgridpars['0']['cfcache'])+"\""; 

614 # ftmname = "\""+str(self.allgridpars['0']['gridder'])+"\""; 

615 # psTermOn = str(self.allgridpars['0']['psterm']); 

616 # aTermOn = str(self.allgridpars['0']['aterm']); 

617 # conjBeams = str(self.allgridpars['0']['conjbeams']); 

618 # #aTermOn = str(True); 

619 # # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") 

620 # # casalog.post("AllCFList = ",allcflist) 

621 # m = len(allcflist); 

622 # # casalog.post("No. of nodes used: " + m,cfcPath,ftmname) 

623 # # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") 

624 

625 # joblist=[]; 

626 # for node in self.listOfNodes[:m]: 

627 # # casalog.post("#!$#!%#!$#@$#@$ " + allcflist) 

628 # cmd = "toolsi.fillcfcache("+str(allcflist[node])+","+str(ftmname)+","+str(cfcPath)+","+psTermOn+","+aTermOn+","+conjBeams+")"; 

629 # # casalog.post("CMD = " + str(node) +" " + cmd) 

630 # joblist.append(self.PH.runcmd(cmd,node)); 

631 # self.PH.checkJobs(joblist); 

632 

633 # # Linear code 

634 # cfcName = self.allgridpars['0']['cfcache']; 

635 # cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)]; 

636 # self.cfcachepars['cflist']=cflist; 

637 # self.toolsi.fillcfcache(cflist, self.allgridpars['0']['gridder'], 

638 # cfcName, 

639 # self.allgridpars['0']['psterm'], 

640 # self.allgridpars['0']['aterm'], 

641 # self.allgridpars['0']['conjbeams']); 

642 # # self.SItool.fillcfcache(**(self.cfcachepars)) ; 

643 ############################################# 

644 def makePSFCore(self): 

645 ### Make PSFs 

646 joblist = [] 

647 #### MPIInterface related changes 

648 # for node in range(0,self.PH.NN): 

649 for node in self.listOfNodes: 

650 joblist.append(self.PH.runcmd("toolsi.makepsf()", node)) 

651 self.PH.checkJobs(joblist) # this call blocks until all are done. 

652 

653 ############################################# 

654 def makePBCore(self): 

655 joblist = [] 

656 # Only one node needs to make the PB. It reads the freq from the image coordsys 

657 joblist.append(self.PH.runcmd("toolsi.makepb()", self.listOfNodes[0])) 

658 self.PH.checkJobs(joblist) 

659 

660 ############################################# 

661 

662 def runMajorCycleCore(self, lastcycle): 

663 casalog.post( 

664 "----------------------------- Running Parallel Major Cycle ----------------------------", 

665 "INFO", 

666 ) 

667 ### Run major cycle 

668 joblist = [] 

669 #### MPIInterface related changes 

670 # for node in range(0,self.PH.NN): 

671 for node in self.listOfNodes: 

672 joblist.append( 

673 self.PH.runcmd( 

674 "toolsi.executemajorcycle(controls={'lastcycle':" 

675 + str(lastcycle) 

676 + "})", 

677 node, 

678 ) 

679 ) 

680 self.PH.checkJobs(joblist) # this call blocks until all are done. 

681 

682 ############################################# 

683 def predictModelCore(self): 

684 joblist = [] 

685 #### MPIInterface related changes 

686 # for node in range(0,self.PH.NN): 

687 for node in self.listOfNodes: 

688 joblist.append(self.PH.runcmd("toolsi.predictmodel()", node)) 

689 self.PH.checkJobs(joblist) # this call blocks until all are done. 

690 

691 def estimatememory(self): 

692 joblist = [] 

693 for node in self.listOfNodes: 

694 joblist.append(self.PH.runcmd("toolsi.estimatememory()", node)) 

695 self.PH.checkJobs(joblist)