Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/imagerhelpers/imager_parallel_continuum.py: 17%

193 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-31 19:10 +0000

1import os 

2import math 

3import shutil 

4import string 

5import time 

6import re; 

7import copy 

8import pdb 

9 

10from casatools import synthesisimager, synthesisnormalizer 

11from casatasks import casalog 

12 

13from .imager_base import PySynthesisImager 

14from .parallel_imager_helper import PyParallelImagerHelper 

15synth_imager_name = 'synthesisimager' 

16synth_imager_import = 'from casatools import synthesisimager' 

17 

18 

19''' 

20An implementation of parallel continuum imaging, using synthesisxxxx tools 

21 

22Datasets are partitioned by row and major cycles are parallelized.  

23Gathers and normalization are done before passing the images to a 

24non-parallel minor cycle. The output model image is them scattered to 

25all the nodes for the next parallel major cycle. 

26 

27There are N synthesisimager objects. 

28There is 1 instance per image field, of the normalizer and deconvolver. 

29There is 1 iterbot.  

30  

31''' 

32 

33############################################# 

34############################################# 

35## Parallelize only major cycle. 

36############################################# 

37class PyParallelContSynthesisImager(PySynthesisImager): 

38 

39 def __init__(self,params=None): 

40 

41 PySynthesisImager.__init__(self,params) 

42 

43 self.PH = PyParallelImagerHelper() 

44 self.NN = self.PH.NN 

45 self.selpars = self.allselpars; 

46 self.allselpars = self.PH.partitionContDataSelection(self.allselpars) 

47 # self.allcflist = self.PH.partitionCFCacheList(self.cfcachepars['cflist']); 

48 # self.allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']); 

49 self.listOfNodes = self.PH.getNodeList(); 

50 self.coordsyspars = {}; 

51 self.toolsi=None 

52 

53############################################# 

54 def resetSaveModelParams(self, params=None): 

55 mainparams = params.getSelPars() 

56 for n in self.allselpars: # for all nodes 

57 for v in self.allselpars[n]: # for all MSes 

58 self.allselpars[n][v]['readonly']=mainparams[v]['readonly'] 

59 self.allselpars[n][v]['usescratch']=mainparams[v]['usescratch'] 

60 

61############################################# 

62# def initializeImagers(self): 

63# ### Drygridding, and Coordsys comes from a single imager on MAIN node. 

64# ### No startmodel confusion. It's created only once and then scattered. 

65 #self.initializeImagers() 

66# pdb.set_trace() 

67# super().initializeImagers() 

68# ### Note : Leftover from CAS-9977  

69# ### There is a coord system mismatch at scatter/gather, if the MAIN version already 

70# ### exists on disk. With startmodel, it's xxx.model. With aproject, it's xxx.residual. 

71# ### There is an exception in SIImageStore::openImage to handle this.  

72# ### Turn on casalog.filter('DEBUG1') to see the warning message. 

73 

74 

75############################################# 

76 def initializeImagersBase(self,thisSelPars,partialSelPars): 

77 

78 if partialSelPars==False: ## Init only on the zero'th node 

79 

80 # 

81 # Use the already-created imager on MAIN node 

82 # 

83 ##self.toolsi = synthesisimager() 

84 

85 # 

86 # Select data.  

87 # 

88 for mss in sorted( self.selpars.keys() ): 

89 self.toolsi.selectdata( thisSelPars[mss] ) 

90 

91 # Defineimage.  

92 # This makes the global csys. Get csys to distribute to other nodes 

93 # It also sets 'startmodel' if available (this is later scattered to nodes) 

94 for fld in range(0,self.NF): 

95 tmpimpars = copy.deepcopy(self.allimpars[str(fld)]) 

96 #if tmpimpars.has_key('startmodel'): 

97 # tmpimpars.pop('startmodel') 

98 self.toolsi.defineimage( impars=tmpimpars, gridpars = self.allgridpars[str(fld)] ) 

99 fullcoords = self.toolsi.getcsys() 

100 self.coordsyspars[str(fld)] = fullcoords 

101 

102 # Modify the coordsys inputs 

103 for fld in range(0, self.NF): 

104 self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy() 

105 

106 # Call the global defineimage again  

107 # (to get around later error of different coordsys latpoles! (CAs-9977) 

108 #for fld in range(0,self.NF): 

109 # self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] ) 

110 

111 

112 

113 else: ## partialSelPars==True , The actual initialization on all nodes. 

114 

115 # 

116 # Start the imagers on all nodes. 

117 # 

118 joblist=[] 

119 for node in self.listOfNodes: 

120 joblist.append( self.PH.runcmd('{0}; toolsi = {1}()'.format( 

121 synth_imager_import, synth_imager_name), 

122 node) ); 

123 self.PH.checkJobs(joblist); 

124 

125 # 

126 # Select data. If partialSelPars is True, use the thisSelPars 

127 # data structure as a list of partitioned selections. 

128 # 

129 joblist=[]; 

130 nodes=self.listOfNodes;#[1]; 

131 for node in nodes: 

132 for mss in sorted( self.selpars.keys() ): 

133 selStr=str(thisSelPars[str(node-1)][mss]); 

134 joblist.append( self.PH.runcmd("toolsi.selectdata( "+selStr+")", node) ) 

135 self.PH.checkJobs(joblist); 

136 

137 # 

138 # Call defineimage at each node. 

139 # 

140 joblist=[]; 

141 for node in nodes: 

142 ## For each image-field, define imaging parameters 

143 nimpars = copy.deepcopy(self.allimpars) 

144 # casalog.post("nimpars = "+str(nimpars)) 

145 ngridpars = copy.deepcopy(self.allgridpars) 

146 for fld in range(0,self.NF): 

147 if self.NN>1: 

148 nimpars[str(fld)]['imagename'] = self.PH.getpartimagename( nimpars[str(fld)]['imagename'], node ) 

149 

150 ## Pop out the startmodel, as it would already have been created on the main node,. 

151 tmpimpars = nimpars[str(fld)] 

152 if 'startmodel' in tmpimpars: 

153 tmpimpars.pop('startmodel') 

154 

155 joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) 

156 + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) ) 

157 self.PH.checkJobs(joblist); 

158 

159############################################# 

160 

161 def initializeImagers(self): 

162 #--------------------------------------- 

163 # Check if cfcache exists. 

164 # 

165 cfCacheName='' 

166 cfcExists=False 

167 if(self.allgridpars['0']['gridder'].startswith('awp')): 

168 cfCacheName=self.allgridpars['0']['cfcache'] 

169 else: 

170 self.allgridpars['0']['cfcache']='' 

171 cfcExists=True 

172 if(self.allgridpars['0']['gridder'] == 'awproject' or self.allgridpars['0']['gridder'] == 'awprojectft'): 

173 if (cfCacheName == ''): 

174 cfCacheName = self.allimpars['0']['imagename'] + '.cf' 

175 cfCacheName=self.allgridpars['0']['cfcache'] = cfCacheName 

176 self.allgridpars['0']['cfcache']= cfCacheName 

177 cfcExists = (os.path.exists(cfCacheName) and os.path.isdir(cfCacheName)); 

178 if (cfcExists): 

179 nCFs = len(os.listdir(cfCacheName)); 

180 if (nCFs == 0): 

181 casalog.post(cfCacheName + " exists, but is empty. Attempt is being made to fill it now.","WARN") 

182 cfcExists = False; 

183 # casalog.post("##########################################") 

184 # casalog.post("CFCACHE = "+cfCacheName,cfcExists) 

185 # casalog.post("##########################################") 

186 

187 

188 # Start one imager on MAIN node 

189 self.toolsi = synthesisimager() 

190 

191 # Init one SI tool ( it records the csys per field in self.coordsyspars ) 

192 self.initializeImagersBase(self.selpars,False); 

193 

194 # Modify the coordsys inputs 

195# for fld in range(0, self.NF): 

196# self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy() 

197 

198 # Dry Gridding on the MAIN node ( i.e. on self.toolsi) 

199 #if (not cfcExists): 

200 # self.dryGridding(); 

201 

202 ##weighting with mosfield=True 

203 if( (self.weightpars['type']=='briggs') and (self.weightpars['multifield'])): 

204 self.toolsi.setweighting(**self.weightpars) 

205 ###master create the weight density for all fields 

206 self.toolsi.getweightdensity() 

207 

208 # Clean up the single imager (MAIN node) 

209 #self.toolsi.done() 

210 #self.toolsi = None 

211 

212 # Do the second round, initializing imagers on ALL nodes 

213 self.initializeImagersBase(self.allselpars,True); 

214 

215 # Fill CFCache - it uses all nodes. 

216 if (not cfcExists): 

217 self.SItool=self.toolsi 

218 #super().initializeImagers() 

219 ###Doing this serially as in parallel it randomly has race condition 

220 ###about table.dat not available 

221 super().makeCFCache(False) 

222 #self.fillCFCache() 

223 self.reloadCFCache(); 

224 self.SItool=None 

225 self.toolsi.done() 

226 self.toolsi = None 

227 

228 

229###################################################################################################################################### 

230 #--------------------------------------- 

231 # 4. call setdata() for images on all nodes 

232 # 

233 # joblist=[]; 

234 # for node in self.listOfNodes: 

235 # ## Send in Selection parameters for all MSs in the list 

236 # #### MPIInterface related changes (the -1 in the expression below) 

237 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ): 

238 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) ) 

239 # self.PH.checkJobs(joblist); 

240 

241 #--------------------------------------- 

242 # 5. Call defineImage() on all nodes. This sets up the FTMs. 

243 # 

244# joblist=[]; 

245# for node in self.listOfNodes: 

246# ## For each image-field, define imaging parameters 

247# nimpars = copy.deepcopy(self.allimpars) 

248# #casalog.post("nimpars = "+str(nimpars)) 

249# ngridpars = copy.deepcopy(self.allgridpars) 

250# for fld in range(0,self.NF): 

251# if self.NN>1: 

252# nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

253# ### nimpars[str(fld)]['imagename'] = self.allnormpars[str(fld)]['workdir'] + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

254# ### nimpars[str(fld)]['imagename'] = nimpars[str(fld)]['imagename']+'.n'+str(node) 

255 

256# # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']+'.n'+str(node) 

257# # # Give the same CFCache name to all nodes 

258# ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']; 

259 

260# joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) ) 

261# self.PH.checkJobs(joblist); 

262 

263 #--------------------------------------- 

264 # 6. If cfcache does not exist, call fillCFCache() 

265 # This will fill the "empty" CFCache in parallel 

266 # 7. Now call reloadCFCache() on all nodes. 

267 # This reloads the latest cfcahce. 

268 

269 

270 

271 # TRY: Start all over again! 

272 # self.deleteImagers(); 

273 

274 # joblist=[] 

275 

276 # for node in self.listOfNodes: 

277 # joblist.append( self.PH.runcmd("toolsi = casac.synthesisimager()", node) ); 

278 # self.PH.checkJobs(joblist); 

279 

280 # joblist=[]; 

281 # nodes=self.listOfNodes;#[1]; 

282 # for node in nodes: 

283 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ): 

284 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) ) 

285 # # for mss in sorted( self.selpars.keys() ): 

286 # # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.selpars[mss])+")", node) ) 

287 # self.PH.checkJobs(joblist); 

288 

289 # joblist=[]; 

290 # for node in self.listOfNodes: 

291 # nimpars = copy.deepcopy(self.allimpars) 

292 # ngridpars = copy.deepcopy(self.allgridpars) 

293 # for fld in range(0,self.NF): 

294 # if self.NN>1: 

295 # nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node) 

296 # # # Give the same CFCache name to all nodes 

297 # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']; 

298 

299 # joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) ) 

300 # self.PH.checkJobs(joblist); 

301 

302 

303############################################# 

304 

305 

306############################################# 

307 

308 def initializeNormalizers(self): 

309 for immod in range(0,self.NF): 

310 self.PStools.append(synthesisnormalizer()) 

311 normpars = copy.deepcopy( self.allnormpars[str(immod)] ) 

312 partnames = [] 

313 if(self.NN>1): 

314 #### MPIInterface related changes 

315 #for node in range(0,self.NN): 

316 for node in self.listOfNodes: 

317 partnames.append( self.PH.getpartimagename( self.allimpars[str(immod)]['imagename'], node ) ) 

318 #onename = self.allimpars[str(immod)]['imagename']+'.n'+str(node) 

319 #partnames.append( self.PH.getpath(node) + '/' + onename ) 

320 #self.PH.deletepartimages( self.PH.getpath(node), onename ) # To ensure restarts work properly. 

321 self.PH.deletepartimages( self.allimpars[str(immod)]['imagename'] , node ) # To ensure restarts work properly. 

322 normpars['partimagenames'] = partnames 

323 self.PStools[immod].setupnormalizer(normpars=normpars) 

324 

325 

326############################################# 

327 def setWeighting(self): 

328 

329 ## Set weight parameters and accumulate weight density (natural) 

330 joblist=[]; 

331 if( (self.weightpars['type']=='briggs') and (self.weightpars['multifield'])): 

332 ###master created the weight density for all fields 

333 ##Should have been in initializeImagersBase_New but it is not being called ! 

334 self.toolsi = synthesisimager() 

335 for mss in sorted( self.selpars.keys() ): 

336 self.toolsi.selectdata( self.selpars[mss] ) 

337 for fld in range(0,self.NF): 

338 self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] ) 

339 self.toolsi.setweighting(**self.weightpars) 

340 ###master create the weight density for all fields 

341 weightimage=self.toolsi.getweightdensity() 

342 self.toolsi.done() 

343 self.toolsi=None 

344 destWgtim=weightimage+'_moswt' 

345 if( os.path.exists(destWgtim)): 

346 shutil.rmtree(destWgtim) 

347 shutil.move(weightimage, destWgtim) 

348 joblist=[]; 

349 for node in self.listOfNodes: 

350 joblist.append( self.PH.runcmd("toolsi.setweightdensity('"+str(destWgtim)+"')", node ) ) 

351 self.PH.checkJobs( joblist ) 

352 #for node in self.listOfNodes: 

353 # ## Set weighting pars 

354 # joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) ) 

355 #self.PH.checkJobs( joblist ) 

356 #joblist=[]; 

357 #for node in self.listOfNodes: 

358 # joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) ) 

359 #self.PH.checkJobs( joblist ) 

360 

361 #for immod in range(0,self.NF): 

362 # #self.PStools[immod].gatherweightdensity() 

363 # self.PStools[immod].scatterweightdensity() 

364 ## Set weight density for each nodel 

365 #joblist=[]; 

366 #for node in self.listOfNodes: 

367 # joblist.append( self.PH.runcmd("toolsi.setweightdensity()", node ) ) 

368 #self.PH.checkJobs( joblist ) 

369 #### end of multifield or mosweight 

370 else: 

371 joblist=[]; 

372 for node in self.listOfNodes: 

373 ## Set weighting pars 

374 joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) ) 

375 self.PH.checkJobs( joblist ) 

376 

377 ## If only one field, do the get/gather/set of the weight density. 

378 if self.NF == 1 and self.allimpars['0']['stokes']=="I": ## Remove after gridded wts appear for all fields correctly (i.e. new FTM). 

379 

380 if not ( (self.weightpars['type'] == 'natural') or (self.weightpars['type'] == 'radial')) : ## For natural and radial, this array isn't created at all. 

381 ## Remove when we switch to new FTM 

382 

383 casalog.post("Gathering/Merging/Scattering Weight Density for PSF generation","INFO") 

384 

385 joblist=[]; 

386 for node in self.listOfNodes: 

387 joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) ) 

388 self.PH.checkJobs( joblist ) 

389 

390 ## gather weightdensity and sum and scatter 

391 casalog.post("******************************************************") 

392 casalog.post(" gather and scatter now ") 

393 casalog.post("******************************************************") 

394 for immod in range(0,self.NF): 

395 self.PStools[immod].gatherweightdensity() 

396 self.PStools[immod].scatterweightdensity() 

397 

398 ## Set weight density for each nodel 

399 joblist=[]; 

400 for node in self.listOfNodes: 

401 joblist.append( self.PH.runcmd("toolsi.setweightdensity()", node ) ) 

402 self.PH.checkJobs( joblist ) 

403 

404 

405 

406 def deleteImagers(self): 

407 self.PH.runcmd("toolsi.done()") 

408 

409 def deleteWorkDir(self): 

410 ## Delete the contents of the .workdirectory  

411 for immod in range(0,self.NF): 

412 normpars = copy.deepcopy( self.allnormpars[str(immod)] ) 

413 if(self.NN>1): 

414 for node in self.listOfNodes: 

415 self.PH.deletepartimages( self.allimpars[str(immod)]['imagename'], node ,deldir=True ) 

416 

417# ## Delete the workdirectory 

418# casalog.post("Deleting workdirectory : "+self.PH.getworkdir(imagename, node)) 

419# shutil.rmtree( self.PH.getworkdir(imagename, node) ) 

420 

421 def deleteCluster(self): 

422 self.PH.takedownCluster() 

423 

424# ############################################# 

425 def dryGridding(self): 

426 dummy=[''] 

427 self.toolsi.drygridding(dummy) 

428 

429# def dryGridding_Old(self): 

430# nodes=[1]; 

431# joblist=[]; 

432# for node in nodes: 

433# dummy=['']; 

434# cmd = "toolsi.drygridding("+str(dummy)+")"; 

435# joblist.append(self.PH.runcmd(cmd,node)); 

436# self.PH.checkJobs(joblist); 

437 

438############################################# 

439 def reloadCFCache(self): 

440 joblist=[]; 

441 for node in self.listOfNodes: 

442 cmd = "toolsi.reloadcfcache()"; 

443 casalog.post("reloadCFCache, CMD = {} {}".format(node, cmd)) 

444 joblist.append(self.PH.runcmd(cmd,node)); 

445 self.PH.checkJobs(joblist); 

446############################################# 

447# def fillCFCache(self): 

448# #casalog.post("-----------------------fillCFCache------------------------------------") 

449# # cflist=[f for f in os.listdir(self.allgridpars['cfcache']) if re.match(r'CFS*', f)]; 

450# # partCFList =  

451# if(not str(self.allgridpars['0']['gridder']).startswith("awp")): 

452# return 

453# allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']); 

454# cfcPath = "\""+str(self.allgridpars['0']['cfcache'])+"\""; 

455# ftmname = "\""+str(self.allgridpars['0']['gridder'])+"\""; 

456# psTermOn = str(self.allgridpars['0']['psterm']); 

457# aTermOn = str(self.allgridpars['0']['aterm']); 

458# conjBeams = str(self.allgridpars['0']['conjbeams']); 

459# #aTermOn = str(True); 

460# # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") 

461# # casalog.post("AllCFList = ",allcflist) 

462# m = len(allcflist); 

463# # casalog.post("No. of nodes used: " + m,cfcPath,ftmname) 

464# # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") 

465 

466# joblist=[]; 

467# for node in self.listOfNodes[:m]: 

468# # casalog.post("#!$#!%#!$#@$#@$ " + allcflist) 

469# cmd = "toolsi.fillcfcache("+str(allcflist[node])+","+str(ftmname)+","+str(cfcPath)+","+psTermOn+","+aTermOn+","+conjBeams+")"; 

470# # casalog.post("CMD = " + str(node) +" " + cmd) 

471# joblist.append(self.PH.runcmd(cmd,node)); 

472# self.PH.checkJobs(joblist); 

473 

474# # Linear code 

475# cfcName = self.allgridpars['0']['cfcache']; 

476# cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)]; 

477# self.cfcachepars['cflist']=cflist; 

478# self.toolsi.fillcfcache(cflist, self.allgridpars['0']['gridder'], 

479# cfcName, 

480# self.allgridpars['0']['psterm'], 

481# self.allgridpars['0']['aterm'], 

482# self.allgridpars['0']['conjbeams']); 

483# # self.SItool.fillcfcache(**(self.cfcachepars)) ; 

484############################################# 

485 def makePSFCore(self): 

486 ### Make PSFs 

487 joblist=[] 

488 #### MPIInterface related changes 

489 #for node in range(0,self.PH.NN): 

490 for node in self.listOfNodes: 

491 joblist.append( self.PH.runcmd("toolsi.makepsf()",node) ) 

492 self.PH.checkJobs( joblist ) # this call blocks until all are done. 

493 

494############################################# 

495 def makePBCore(self): 

496 joblist=[] 

497 # Only one node needs to make the PB. It reads the freq from the image coordsys 

498 joblist.append( self.PH.runcmd("toolsi.makepb()",self.listOfNodes[0]) ) 

499 self.PH.checkJobs( joblist ) 

500 

501############################################# 

502 

503 def runMajorCycleCore(self, lastcycle): 

504 casalog.post("----------------------------- Running Parallel Major Cycle ----------------------------","INFO") 

505 ### Run major cycle 

506 joblist=[] 

507 #### MPIInterface related changes 

508 #for node in range(0,self.PH.NN): 

509 for node in self.listOfNodes: 

510 joblist.append( self.PH.runcmd("toolsi.executemajorcycle(controls={'lastcycle':"+str(lastcycle)+"})",node) ) 

511 self.PH.checkJobs( joblist ) # this call blocks until all are done. 

512 

513############################################# 

514 def predictModelCore(self): 

515 joblist=[] 

516 #### MPIInterface related changes 

517 #for node in range(0,self.PH.NN): 

518 for node in self.listOfNodes: 

519 joblist.append( self.PH.runcmd("toolsi.predictmodel()",node) ) 

520 self.PH.checkJobs( joblist ) # this call blocks until all are done. 

521 

522 def estimatememory(self): 

523 joblist=[] 

524 for node in self.listOfNodes: 

525 joblist.append( self.PH.runcmd("toolsi.estimatememory()", node) ) 

526 self.PH.checkJobs( joblist )