Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/imagerhelpers/imager_parallel_cube.py: 15%

237 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import os 

2import math 

3import shutil 

4import string 

5import time 

6import re 

7import copy 

8 

9from casatools import synthesisutils, synthesisimager 

10from casatools import image as imageanalysis 

11from casatasks import casalog 

12 

13from .imager_base import PySynthesisImager 

14from .parallel_imager_helper import PyParallelImagerHelper 

15 

16''' 

17An implementation of parallel cube imaging, using synthesisxxxx tools. 

18 

19Major and minor cycles are parallelized across frequency, by running separate 

20PySynthesisImagers independently per frequency chunk. 

21Iteration control is not synchronized, interactive mask drawing can't be done. 

22Reference concatenation of all the image products is done at the end. 

23 

24There are N PySynthesisImager objects, each with their own  

25synthesisimager, deconvolvers, normalizers and iterbot.  

26  

27''' 

28 

29############################################# 

30# Parallelize both the major and minor cycle for Cube imaging 

31# Run a separate instance of PySynthesisImager on each node. 

32#### ( later, use the live-object interface of ImStore to reference-break the cubes ) 

33#### For nprocesses > nnodes, run the whole 'clean' loop multiple times.  

34############################################# 

35class PyParallelCubeSynthesisImager(): 

36 

37 def __init__(self,params=None): 

38 

39 self.params=params 

40 

41 allselpars = params.getSelPars() 

42 allimagepars = params.getImagePars() 

43 self.allinimagepars = copy.deepcopy(allimagepars) 

44 self.allgridpars = params.getGridPars() 

45 self.allnormpars = params.getNormPars() 

46 self.weightpars = params.getWeightPars() 

47 self.decpars = params.getDecPars() 

48 self.iterpars = params.getIterPars() 

49 alldataimpars={} 

50 

51 self.PH = PyParallelImagerHelper() 

52 self.NN = self.PH.NN 

53 self.NF = len(allimagepars.keys()) 

54 self.listOfNodes = self.PH.getNodeList(); 

55 ## Partition both data and image coords the same way. 

56 #self.allselpars = self.PH.partitionCubeDataSelection(allselpars) 

57 #self.allimpars = self.PH.partitionCubeDeconvolution(allimagepars) 

58 

59 # to define final image coordinates, run selecdata and definemage 

60 self.SItool = synthesisimager() 

61 # casalog.post("allselpars="+allselpars) 

62 origspw={} 

63 for mss in sorted( allselpars.keys() ): 

64# if(self.allimpars['0']['specmode']=='cubedata'): 

65# self.allselpars[mss]['outframe']='Undefined' 

66 origspw[mss]={'spw':allselpars[mss]['spw']} 

67 self.SItool.selectdata( allselpars[mss] ) 

68 for fid in sorted( allimagepars.keys() ): 

69 self.SItool.defineimage( allimagepars[fid], self.allgridpars[fid] ) 

70 # insert coordsys record in imagepars  

71 # partionCubeSelection works per field ... 

72 allimagepars[fid]['csys'] = self.SItool.getcsys() 

73 if allimagepars[fid]['nchan'] == -1: 

74 allimagepars[fid]['nchan'] = self.SItool.updatenchan() 

75 alldataimpars[fid] = self.PH.partitionCubeSelection(allselpars,allimagepars[fid]) 

76 

77 # if there are more nodes than nchan, there would be node(s) 

78 # that do not get any subcubes and causes an error. So 

79 # to avoid this, reduces the number of nodes actually uses. 

80 # Variable nchans among the fields are not supported yet in parallel mode 

81 fid0nchan = allimagepars['0']['nchan'] 

82 nnodes = len(self.listOfNodes) 

83 if nnodes > fid0nchan: 

84 self.modifiedListOfNodes = self.listOfNodes[0:fid0nchan] 

85 casalog.post("Nchan = "+str(fid0nchan)+", Will use only "+str(len(self.modifiedListOfNodes))+" nodes out of "+str(nnodes), "WARN"); 

86 else: 

87 self.modifiedListOfNodes = self.listOfNodes[:] 

88 

89 #casalog.post("********************** " + alldataimpars.keys()) 

90 #for kk in alldataimpars.keys(): 

91 # casalog.post("KEY : ", kk , " --->", alldataimpars[kk].keys()) 

92 

93 # reorganize allselpars and allimpars for partitioned data  

94 synu = synthesisutils() 

95 self.allselpars={} 

96 self.allimpars={} 

97 ### casalog.post("self.listOfNodes=",self.listOfNodes) 

98 # Repack the data/image parameters per node 

99 # - internally it stores zero-based node ids 

100 #  

101 #for ipart in self.listOfNodes: 

102 for ipart in self.modifiedListOfNodes: 

103 # convert to zero-based indexing for nodes 

104 nodeidx = str(ipart-1) 

105 tnode = str(ipart) 

106 selparsPerNode= {tnode:{}} 

107 imparsPerNode= {tnode:{}} 

108 for fid in allimagepars: 

109 ###restoring original spw selection just to allow weight density to be the same 

110 ###ultimately should be passed by MPI if done this way 

111 for mss in origspw.keys(): 

112 alldataimpars[fid][nodeidx][mss]['spw']=origspw[mss]['spw'] 

113 for ky in alldataimpars[fid][nodeidx]: 

114### commenting this as it is resetting the selpars when key is not "msxxx"  

115## selparsPerNode[tnode]={} 

116 if ky.find('ms')==0: 

117 # data sel per field 

118 selparsPerNode[tnode][ky] = alldataimpars[fid][nodeidx][ky].copy(); 

119 if alldataimpars[fid][nodeidx][ky]['spw']=='-1': 

120 selparsPerNode[tnode][ky]['spw']='' 

121 #else: 

122 ####using original spw selection for weight calculation 

123 # # remove chan selections (will be adjusted by tuneSelectData) 

124 # newspw=selparsPerNode[tnode][ky]['spw'] 

125 # newspwlist = newspw.split(',') 

126 # spwsOnly = '' 

127 # for sp in newspwlist: 

128 # if spwsOnly!='': spwsOnly+=',' 

129 # spwsOnly+=sp.split(':')[0]  

130 # selparsPerNode[tnode][ky]['spw']=spwsOnly 

131 

132 imparsPerNode[tnode][fid] = allimagepars[fid].copy() 

133 imparsPerNode[tnode][fid]['csys'] = alldataimpars[fid][nodeidx]['coordsys'].copy() 

134 imparsPerNode[tnode][fid]['nchan'] = alldataimpars[fid][nodeidx]['nchan'] 

135## imparsPerNode[tnode][fid]['imagename'] = imparsPerNode[tnode][fid]['imagename'] + '.n'+str(tnode)  

136 imparsPerNode[tnode][fid]['imagename'] = self.PH.getpartimagename( imparsPerNode[tnode][fid]['imagename'], ipart ) 

137 

138 # skip this for now (it is not working properly, but should not affect results without this) 

139 #imparsPerNode[tnode][fid]=synu.updateimpars(imparsPerNode[tnode][fid]) 

140 self.allselpars.update(selparsPerNode) 

141 self.allimpars.update(imparsPerNode) 

142 

143 

144 #casalog.post("****** SELPARS in init **********" + self.allselpars) 

145 #casalog.post("****** SELIMPARS in init **********" + self.allimpars) 

146 

147 joblist=[] 

148 casa6_import_prefix = 'casatasks.private.' 

149 cmd_import_pars = ('from {0}imagerhelpers.input_parameters import ImagerParameters'. 

150 format(casa6_import_prefix)) 

151 cmd_import_synth = ('from {0}imagerhelpers.imager_base import PySynthesisImager'. 

152 format(casa6_import_prefix)) 

153 #### MPIInterface related changes 

154 #for node in range(0,self.NN): 

155 #for node in self.listOfNodes: 

156 for node in self.modifiedListOfNodes: 

157 joblist.append( self.PH.runcmd(cmd_import_pars, node) ) 

158 joblist.append( self.PH.runcmd(cmd_import_synth, node) ) 

159 self.PH.checkJobs( joblist ) 

160 

161 self.exitflag={} 

162 joblist=[] 

163 #### MPIInterface related changes 

164 #for node in range(0,self.NN): 

165 #for node in self.listOfNodes: 

166 for node in self.modifiedListOfNodes: 

167 joblist.append( self.PH.runcmd("paramList = ImagerParameters()", node) ) 

168 joblist.append( self.PH.runcmd("paramList.setSelPars("+str(self.allselpars[str(node)])+")", node) ) 

169 joblist.append( self.PH.runcmd("paramList.setImagePars("+str(self.allimpars[str(node)])+")", node) ) 

170 

171 joblist.append( self.PH.runcmd("paramList.setGridPars("+str(self.allgridpars)+")", node) ) 

172 joblist.append( self.PH.runcmd("paramList.setWeightPars("+str(self.weightpars)+")", node) ) 

173 joblist.append( self.PH.runcmd("paramList.setDecPars("+str(self.decpars)+")", node) ) 

174 joblist.append( self.PH.runcmd("paramList.setIterPars("+str(self.iterpars)+")", node) ) 

175 joblist.append( self.PH.runcmd("paramList.setNormPars("+str(self.allnormpars)+")", node) ) 

176 

177 joblist.append( self.PH.runcmd("paramList.checkParameters()", node) ) 

178 

179 joblist.append( self.PH.runcmd("imager = PySynthesisImager(params=paramList)", node) ) 

180 

181 self.exitflag[str(node)] = False 

182 

183 self.PH.checkJobs( joblist ) 

184 

185 def initializeImagers(self): 

186 joblist=[] 

187 #for node in self.listOfNodes: 

188 for node in self.modifiedListOfNodes: 

189 joblist.append( self.PH.runcmd("imager.initializeImagers()", node) ) 

190 self.PH.checkJobs( joblist ) 

191 

192 def initializeDeconvolvers(self): 

193 joblist=[] 

194 #for node in self.listOfNodes: 

195 for node in self.modifiedListOfNodes: 

196 joblist.append( self.PH.runcmd("imager.initializeDeconvolvers()", node) ) 

197 self.PH.checkJobs( joblist ) 

198 

199 def initializeNormalizers(self): 

200 joblist=[] 

201 #for node in self.listOfNodes: 

202 for node in self.modifiedListOfNodes: 

203 joblist.append( self.PH.runcmd("imager.initializeNormalizers()", node) ) 

204 self.PH.checkJobs( joblist ) 

205 

206 def setWeighting(self): 

207 ## Set weight parameters and accumulate weight density (natural) 

208 joblist=[]; 

209 #for node in self.listOfNodes: 

210 for node in self.modifiedListOfNodes: 

211 ## Set weighting pars 

212 joblist.append( self.PH.runcmd("imager.setWeighting()", node ) ) 

213 self.PH.checkJobs( joblist ) 

214 

215 

216 def initializeIterationControl(self): 

217 joblist=[] 

218 #for node in self.listOfNodes: 

219 for node in self.modifiedListOfNodes: 

220 joblist.append( self.PH.runcmd("imager.initializeIterationControl()", node) ) 

221 self.PH.checkJobs( joblist ) 

222 

223 def makePSF(self): 

224 joblist=[] 

225 #for node in self.listOfNodes: 

226 for node in self.modifiedListOfNodes: 

227 joblist.append( self.PH.runcmd("imager.makePSF()", node) ) 

228 self.PH.checkJobs( joblist ) 

229 

230 def runMajorMinorLoops(self): 

231 joblist=[] 

232 #for node in self.listOfNodes: 

233 for node in self.modifiedListOfNodes: 

234 joblist.append( self.PH.runcmd("imager.runMajorMinorLoops()", node) ) 

235 self.PH.checkJobs( joblist ) 

236 

237 def runMajorCycle(self, isCleanCycle=True): 

238 joblist=[] 

239 #for node in self.listOfNodes: 

240 for node in self.modifiedListOfNodes: 

241 if self.exitflag[str(node)]==False: 

242 joblist.append( self.PH.runcmd("imager.runMajorCycle(isCleanCycle="+isCleanCycle+")", node) ) 

243 self.PH.checkJobs( joblist ) 

244 

245 def runMinorCycle(self): 

246 joblist=[] 

247 #for node in self.listOfNodes: 

248 for node in self.modifiedListOfNodes: 

249 if self.exitflag[str(node)]==False: 

250 joblist.append( self.PH.runcmd("imager.runMinorCycle()", node) ) 

251 self.PH.checkJobs( joblist ) 

252 

253 ## Merge the results from all pieces. Maintain an 'active' list of nodes... 

254 def hasConverged(self): 

255 

256 joblist=[] 

257 #for node in self.listOfNodes: 

258 for node in self.modifiedListOfNodes: 

259 if self.exitflag[str(node)]==False: 

260 joblist.append( self.PH.runcmd("rest = imager.hasConverged()", node) ) 

261 self.PH.checkJobs( joblist ) 

262 

263# self.PH.runcmdcheck("rest = imager.hasConverged()") 

264 

265 retval = True 

266 #for node in self.listOfNodes: 

267 for node in self.modifiedListOfNodes: 

268 if self.exitflag[str(node)]==False: 

269 rest = self.PH.pullval("rest", node ) 

270 retval = retval and rest[node] 

271 self.exitflag[str(node)] = rest[node] 

272 casalog.post("Node " + str(node) + " converged : " + str(rest[node]) , "INFO") 

273 

274 return retval 

275 

276 def updateMask(self): 

277 

278 joblist=[] 

279 #for node in self.listOfNodes: 

280 for node in self.modifiedListOfNodes: 

281 if self.exitflag[str(node)]==False: 

282 joblist.append( self.PH.runcmd("maskchanged = imager.updateMask()", node) ) 

283 self.PH.checkJobs( joblist ) 

284 

285# self.PH.runcmdcheck("maskchanged = imager.updateMask()") 

286 

287 retval = False 

288 #for node in self.listOfNodes: 

289 for node in self.modifiedListOfNodes: 

290 if self.exitflag[str(node)]==False: 

291 rest = self.PH.pullval("maskchanged", node ) 

292 retval = retval or rest[node] 

293 casalog.post("Node " + str(node) + " maskchanged : " + str(rest[node]) , "INFO") 

294 

295 return retval 

296 

297 def predictModel(self): 

298 joblist=[] 

299 #for node in self.listOfNodes: 

300 for node in self.modifiedListOfNodes: 

301 joblist.append( self.PH.runcmd("imager.predictmodel()", node) ) 

302 self.PH.checkJobs( joblist ) 

303 

304 def restoreImages(self): 

305 joblist=[] 

306 #for node in self.listOfNodes: 

307 for node in self.modifiedListOfNodes: 

308 joblist.append( self.PH.runcmd("imager.restoreImages()", node) ) 

309 self.PH.checkJobs( joblist ) 

310 

311 def pbcorImages(self): 

312 joblist=[] 

313 #for node in self.listOfNodes: 

314 for node in self.modifiedListOfNodes: 

315 joblist.append( self.PH.runcmd("imager.pbcorImages()", node) ) 

316 self.PH.checkJobs( joblist ) 

317 

318 def makePB(self): 

319 joblist=[] 

320 #for node in self.listOfNodes: 

321 for node in self.modifiedListOfNodes: 

322 joblist.append( self.PH.runcmd("imager.makePB()", node) ) 

323 self.PH.checkJobs( joblist ) 

324 

325 def checkPB(self): 

326 joblist=[] 

327 #for node in self.listOfNodes: 

328 for node in self.modifiedListOfNodes: 

329 joblist.append( self.PH.runcmd("imager.checkPB()", node) ) 

330 self.PH.checkJobs( joblist ) 

331 

332 def concatImages(self, type='copyvirtual'): 

333 import subprocess 

334 imtypes=['image','psf','model','residual','mask','pb', 'image.pbcor', 'weight', 'sumwt'] 

335 for immod in range(0,self.NF): 

336 for ext in imtypes: 

337 subimliststr="'" 

338 concatimname=self.allinimagepars[str(immod)]['imagename']+'.'+ ext 

339 distpath = os.getcwd() 

340 fullconcatimname = distpath+'/'+concatimname 

341 #for node in self.listOfNodes: 

342 for node in self.modifiedListOfNodes: 

343 #rootimname=self.allinimagepars[str(immod)]['imagename']+'.n'+str(node) 

344 #fullimname = self.PH.getpath(node) + '/' + rootimname  

345 fullimname = self.PH.getpartimagename( self.allinimagepars[str(immod)]['imagename'] , node ) 

346 if (os.path.exists(fullimname+'.'+ext)): 

347 subimliststr+=fullimname+'.'+ext+' ' 

348 subimliststr+="'" 

349 if subimliststr!="''": 

350 # parent images need to be cleaned up for restart=T 

351 if self.allinimagepars[str(immod)]['restart'] and os.path.exists(fullconcatimname): 

352 try: 

353 casalog.post("Cleaning up the existing "+fullconcatimname,"DEBUG") 

354 shutil.rmtree(fullconcatimname) 

355 except: 

356 casalog.post("Cleaning up the existing file named "+fullconcatimname,"DEBUG") 

357 os.remove(fullconcatimname) 

358 # set tempclose = false to avoid a long accessing issue 

359 #cmd = 'imageconcat inimages='+subimliststr+' outimage='+"'"+fullconcatimname+"'"+' type='+type+' tempclose=false'  

360 #ret=os.system(cmd) 

361 #if ret!=0: 

362 # casalog.post("concatenation of "+concatimname+" failed","WARN") 

363 iatool=imageanalysis() 

364 concattool = iatool.imageconcat(outfile=fullconcatimname, mode=type, infiles=subimliststr.strip("'"), axis=-1, tempclose=False, overwrite=True) 

365 if(len(concattool.shape())==0): 

366 casalog.post("concatenation of "+concatimname+" failed","WARN") 

367 concattool.done() 

368 

369 

370 def getSummary(self): 

371 joblist=[] 

372 #for node in self.listOfNodes: 

373 for node in self.modifiedListOfNodes: 

374 joblist.append( self.PH.runcmd("summ = imager.getSummary("+str(node)+")", node) ) 

375 self.PH.checkJobs( joblist ) 

376 

377 fullsumm={} 

378 #for node in self.listOfNodes: 

379 for node in self.modifiedListOfNodes: 

380 summ = self.PH.pullval("summ", node ) 

381 fullsumm["node"+str(node)] = summ 

382 

383 return fullsumm 

384 

385 def deleteTools(self): 

386 joblist=[] 

387 #for node in self.listOfNodes: 

388 for node in self.modifiedListOfNodes: 

389 joblist.append( self.PH.runcmd("imager.deleteTools()", node) ) 

390 self.PH.checkJobs( joblist ) 

391 

392 def estimatememory(self): 

393 joblist=[] 

394 #for node in self.listOfNodes: 

395 for node in self.modifiedListOfNodes: 

396 joblist.append( self.PH.runcmd("imager.estimatememory()", node) ) 

397 self.PH.checkJobs( joblist ) 

398#############################################