Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/imagerhelpers/imager_parallel_cube.py: 15%
237 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import os
2import math
3import shutil
4import string
5import time
6import re
7import copy
9from casatools import synthesisutils, synthesisimager
10from casatools import image as imageanalysis
11from casatasks import casalog
13from .imager_base import PySynthesisImager
14from .parallel_imager_helper import PyParallelImagerHelper
16'''
17An implementation of parallel cube imaging, using synthesisxxxx tools.
19Major and minor cycles are parallelized across frequency, by running separate
20PySynthesisImagers independently per frequency chunk.
21Iteration control is not synchronized, interactive mask drawing can't be done.
22Reference concatenation of all the image products is done at the end.
24There are N PySynthesisImager objects, each with their own
25synthesisimager, deconvolvers, normalizers and iterbot.
27'''
29#############################################
30# Parallelize both the major and minor cycle for Cube imaging
31# Run a separate instance of PySynthesisImager on each node.
32#### ( later, use the live-object interface of ImStore to reference-break the cubes )
33#### For nprocesses > nnodes, run the whole 'clean' loop multiple times.
34#############################################
35class PyParallelCubeSynthesisImager():
37 def __init__(self,params=None):
39 self.params=params
41 allselpars = params.getSelPars()
42 allimagepars = params.getImagePars()
43 self.allinimagepars = copy.deepcopy(allimagepars)
44 self.allgridpars = params.getGridPars()
45 self.allnormpars = params.getNormPars()
46 self.weightpars = params.getWeightPars()
47 self.decpars = params.getDecPars()
48 self.iterpars = params.getIterPars()
49 alldataimpars={}
51 self.PH = PyParallelImagerHelper()
52 self.NN = self.PH.NN
53 self.NF = len(allimagepars.keys())
54 self.listOfNodes = self.PH.getNodeList();
55 ## Partition both data and image coords the same way.
56 #self.allselpars = self.PH.partitionCubeDataSelection(allselpars)
57 #self.allimpars = self.PH.partitionCubeDeconvolution(allimagepars)
59 # to define final image coordinates, run selecdata and definemage
60 self.SItool = synthesisimager()
61 # casalog.post("allselpars="+allselpars)
62 origspw={}
63 for mss in sorted( allselpars.keys() ):
64# if(self.allimpars['0']['specmode']=='cubedata'):
65# self.allselpars[mss]['outframe']='Undefined'
66 origspw[mss]={'spw':allselpars[mss]['spw']}
67 self.SItool.selectdata( allselpars[mss] )
68 for fid in sorted( allimagepars.keys() ):
69 self.SItool.defineimage( allimagepars[fid], self.allgridpars[fid] )
70 # insert coordsys record in imagepars
71 # partionCubeSelection works per field ...
72 allimagepars[fid]['csys'] = self.SItool.getcsys()
73 if allimagepars[fid]['nchan'] == -1:
74 allimagepars[fid]['nchan'] = self.SItool.updatenchan()
75 alldataimpars[fid] = self.PH.partitionCubeSelection(allselpars,allimagepars[fid])
77 # if there are more nodes than nchan, there would be node(s)
78 # that do not get any subcubes and causes an error. So
79 # to avoid this, reduces the number of nodes actually uses.
80 # Variable nchans among the fields are not supported yet in parallel mode
81 fid0nchan = allimagepars['0']['nchan']
82 nnodes = len(self.listOfNodes)
83 if nnodes > fid0nchan:
84 self.modifiedListOfNodes = self.listOfNodes[0:fid0nchan]
85 casalog.post("Nchan = "+str(fid0nchan)+", Will use only "+str(len(self.modifiedListOfNodes))+" nodes out of "+str(nnodes), "WARN");
86 else:
87 self.modifiedListOfNodes = self.listOfNodes[:]
89 #casalog.post("********************** " + alldataimpars.keys())
90 #for kk in alldataimpars.keys():
91 # casalog.post("KEY : ", kk , " --->", alldataimpars[kk].keys())
93 # reorganize allselpars and allimpars for partitioned data
94 synu = synthesisutils()
95 self.allselpars={}
96 self.allimpars={}
97 ### casalog.post("self.listOfNodes=",self.listOfNodes)
98 # Repack the data/image parameters per node
99 # - internally it stores zero-based node ids
100 #
101 #for ipart in self.listOfNodes:
102 for ipart in self.modifiedListOfNodes:
103 # convert to zero-based indexing for nodes
104 nodeidx = str(ipart-1)
105 tnode = str(ipart)
106 selparsPerNode= {tnode:{}}
107 imparsPerNode= {tnode:{}}
108 for fid in allimagepars:
109 ###restoring original spw selection just to allow weight density to be the same
110 ###ultimately should be passed by MPI if done this way
111 for mss in origspw.keys():
112 alldataimpars[fid][nodeidx][mss]['spw']=origspw[mss]['spw']
113 for ky in alldataimpars[fid][nodeidx]:
114### commenting this as it is resetting the selpars when key is not "msxxx"
115## selparsPerNode[tnode]={}
116 if ky.find('ms')==0:
117 # data sel per field
118 selparsPerNode[tnode][ky] = alldataimpars[fid][nodeidx][ky].copy();
119 if alldataimpars[fid][nodeidx][ky]['spw']=='-1':
120 selparsPerNode[tnode][ky]['spw']=''
121 #else:
122 ####using original spw selection for weight calculation
123 # # remove chan selections (will be adjusted by tuneSelectData)
124 # newspw=selparsPerNode[tnode][ky]['spw']
125 # newspwlist = newspw.split(',')
126 # spwsOnly = ''
127 # for sp in newspwlist:
128 # if spwsOnly!='': spwsOnly+=','
129 # spwsOnly+=sp.split(':')[0]
130 # selparsPerNode[tnode][ky]['spw']=spwsOnly
132 imparsPerNode[tnode][fid] = allimagepars[fid].copy()
133 imparsPerNode[tnode][fid]['csys'] = alldataimpars[fid][nodeidx]['coordsys'].copy()
134 imparsPerNode[tnode][fid]['nchan'] = alldataimpars[fid][nodeidx]['nchan']
135## imparsPerNode[tnode][fid]['imagename'] = imparsPerNode[tnode][fid]['imagename'] + '.n'+str(tnode)
136 imparsPerNode[tnode][fid]['imagename'] = self.PH.getpartimagename( imparsPerNode[tnode][fid]['imagename'], ipart )
138 # skip this for now (it is not working properly, but should not affect results without this)
139 #imparsPerNode[tnode][fid]=synu.updateimpars(imparsPerNode[tnode][fid])
140 self.allselpars.update(selparsPerNode)
141 self.allimpars.update(imparsPerNode)
144 #casalog.post("****** SELPARS in init **********" + self.allselpars)
145 #casalog.post("****** SELIMPARS in init **********" + self.allimpars)
147 joblist=[]
148 casa6_import_prefix = 'casatasks.private.'
149 cmd_import_pars = ('from {0}imagerhelpers.input_parameters import ImagerParameters'.
150 format(casa6_import_prefix))
151 cmd_import_synth = ('from {0}imagerhelpers.imager_base import PySynthesisImager'.
152 format(casa6_import_prefix))
153 #### MPIInterface related changes
154 #for node in range(0,self.NN):
155 #for node in self.listOfNodes:
156 for node in self.modifiedListOfNodes:
157 joblist.append( self.PH.runcmd(cmd_import_pars, node) )
158 joblist.append( self.PH.runcmd(cmd_import_synth, node) )
159 self.PH.checkJobs( joblist )
161 self.exitflag={}
162 joblist=[]
163 #### MPIInterface related changes
164 #for node in range(0,self.NN):
165 #for node in self.listOfNodes:
166 for node in self.modifiedListOfNodes:
167 joblist.append( self.PH.runcmd("paramList = ImagerParameters()", node) )
168 joblist.append( self.PH.runcmd("paramList.setSelPars("+str(self.allselpars[str(node)])+")", node) )
169 joblist.append( self.PH.runcmd("paramList.setImagePars("+str(self.allimpars[str(node)])+")", node) )
171 joblist.append( self.PH.runcmd("paramList.setGridPars("+str(self.allgridpars)+")", node) )
172 joblist.append( self.PH.runcmd("paramList.setWeightPars("+str(self.weightpars)+")", node) )
173 joblist.append( self.PH.runcmd("paramList.setDecPars("+str(self.decpars)+")", node) )
174 joblist.append( self.PH.runcmd("paramList.setIterPars("+str(self.iterpars)+")", node) )
175 joblist.append( self.PH.runcmd("paramList.setNormPars("+str(self.allnormpars)+")", node) )
177 joblist.append( self.PH.runcmd("paramList.checkParameters()", node) )
179 joblist.append( self.PH.runcmd("imager = PySynthesisImager(params=paramList)", node) )
181 self.exitflag[str(node)] = False
183 self.PH.checkJobs( joblist )
185 def initializeImagers(self):
186 joblist=[]
187 #for node in self.listOfNodes:
188 for node in self.modifiedListOfNodes:
189 joblist.append( self.PH.runcmd("imager.initializeImagers()", node) )
190 self.PH.checkJobs( joblist )
192 def initializeDeconvolvers(self):
193 joblist=[]
194 #for node in self.listOfNodes:
195 for node in self.modifiedListOfNodes:
196 joblist.append( self.PH.runcmd("imager.initializeDeconvolvers()", node) )
197 self.PH.checkJobs( joblist )
199 def initializeNormalizers(self):
200 joblist=[]
201 #for node in self.listOfNodes:
202 for node in self.modifiedListOfNodes:
203 joblist.append( self.PH.runcmd("imager.initializeNormalizers()", node) )
204 self.PH.checkJobs( joblist )
206 def setWeighting(self):
207 ## Set weight parameters and accumulate weight density (natural)
208 joblist=[];
209 #for node in self.listOfNodes:
210 for node in self.modifiedListOfNodes:
211 ## Set weighting pars
212 joblist.append( self.PH.runcmd("imager.setWeighting()", node ) )
213 self.PH.checkJobs( joblist )
216 def initializeIterationControl(self):
217 joblist=[]
218 #for node in self.listOfNodes:
219 for node in self.modifiedListOfNodes:
220 joblist.append( self.PH.runcmd("imager.initializeIterationControl()", node) )
221 self.PH.checkJobs( joblist )
223 def makePSF(self):
224 joblist=[]
225 #for node in self.listOfNodes:
226 for node in self.modifiedListOfNodes:
227 joblist.append( self.PH.runcmd("imager.makePSF()", node) )
228 self.PH.checkJobs( joblist )
230 def runMajorMinorLoops(self):
231 joblist=[]
232 #for node in self.listOfNodes:
233 for node in self.modifiedListOfNodes:
234 joblist.append( self.PH.runcmd("imager.runMajorMinorLoops()", node) )
235 self.PH.checkJobs( joblist )
237 def runMajorCycle(self, isCleanCycle=True):
238 joblist=[]
239 #for node in self.listOfNodes:
240 for node in self.modifiedListOfNodes:
241 if self.exitflag[str(node)]==False:
242 joblist.append( self.PH.runcmd("imager.runMajorCycle(isCleanCycle="+isCleanCycle+")", node) )
243 self.PH.checkJobs( joblist )
245 def runMinorCycle(self):
246 joblist=[]
247 #for node in self.listOfNodes:
248 for node in self.modifiedListOfNodes:
249 if self.exitflag[str(node)]==False:
250 joblist.append( self.PH.runcmd("imager.runMinorCycle()", node) )
251 self.PH.checkJobs( joblist )
253 ## Merge the results from all pieces. Maintain an 'active' list of nodes...
254 def hasConverged(self):
256 joblist=[]
257 #for node in self.listOfNodes:
258 for node in self.modifiedListOfNodes:
259 if self.exitflag[str(node)]==False:
260 joblist.append( self.PH.runcmd("rest = imager.hasConverged()", node) )
261 self.PH.checkJobs( joblist )
263# self.PH.runcmdcheck("rest = imager.hasConverged()")
265 retval = True
266 #for node in self.listOfNodes:
267 for node in self.modifiedListOfNodes:
268 if self.exitflag[str(node)]==False:
269 rest = self.PH.pullval("rest", node )
270 retval = retval and rest[node]
271 self.exitflag[str(node)] = rest[node]
272 casalog.post("Node " + str(node) + " converged : " + str(rest[node]) , "INFO")
274 return retval
276 def updateMask(self):
278 joblist=[]
279 #for node in self.listOfNodes:
280 for node in self.modifiedListOfNodes:
281 if self.exitflag[str(node)]==False:
282 joblist.append( self.PH.runcmd("maskchanged = imager.updateMask()", node) )
283 self.PH.checkJobs( joblist )
285# self.PH.runcmdcheck("maskchanged = imager.updateMask()")
287 retval = False
288 #for node in self.listOfNodes:
289 for node in self.modifiedListOfNodes:
290 if self.exitflag[str(node)]==False:
291 rest = self.PH.pullval("maskchanged", node )
292 retval = retval or rest[node]
293 casalog.post("Node " + str(node) + " maskchanged : " + str(rest[node]) , "INFO")
295 return retval
297 def predictModel(self):
298 joblist=[]
299 #for node in self.listOfNodes:
300 for node in self.modifiedListOfNodes:
301 joblist.append( self.PH.runcmd("imager.predictmodel()", node) )
302 self.PH.checkJobs( joblist )
304 def restoreImages(self):
305 joblist=[]
306 #for node in self.listOfNodes:
307 for node in self.modifiedListOfNodes:
308 joblist.append( self.PH.runcmd("imager.restoreImages()", node) )
309 self.PH.checkJobs( joblist )
311 def pbcorImages(self):
312 joblist=[]
313 #for node in self.listOfNodes:
314 for node in self.modifiedListOfNodes:
315 joblist.append( self.PH.runcmd("imager.pbcorImages()", node) )
316 self.PH.checkJobs( joblist )
318 def makePB(self):
319 joblist=[]
320 #for node in self.listOfNodes:
321 for node in self.modifiedListOfNodes:
322 joblist.append( self.PH.runcmd("imager.makePB()", node) )
323 self.PH.checkJobs( joblist )
325 def checkPB(self):
326 joblist=[]
327 #for node in self.listOfNodes:
328 for node in self.modifiedListOfNodes:
329 joblist.append( self.PH.runcmd("imager.checkPB()", node) )
330 self.PH.checkJobs( joblist )
332 def concatImages(self, type='copyvirtual'):
333 import subprocess
334 imtypes=['image','psf','model','residual','mask','pb', 'image.pbcor', 'weight', 'sumwt']
335 for immod in range(0,self.NF):
336 for ext in imtypes:
337 subimliststr="'"
338 concatimname=self.allinimagepars[str(immod)]['imagename']+'.'+ ext
339 distpath = os.getcwd()
340 fullconcatimname = distpath+'/'+concatimname
341 #for node in self.listOfNodes:
342 for node in self.modifiedListOfNodes:
343 #rootimname=self.allinimagepars[str(immod)]['imagename']+'.n'+str(node)
344 #fullimname = self.PH.getpath(node) + '/' + rootimname
345 fullimname = self.PH.getpartimagename( self.allinimagepars[str(immod)]['imagename'] , node )
346 if (os.path.exists(fullimname+'.'+ext)):
347 subimliststr+=fullimname+'.'+ext+' '
348 subimliststr+="'"
349 if subimliststr!="''":
350 # parent images need to be cleaned up for restart=T
351 if self.allinimagepars[str(immod)]['restart'] and os.path.exists(fullconcatimname):
352 try:
353 casalog.post("Cleaning up the existing "+fullconcatimname,"DEBUG")
354 shutil.rmtree(fullconcatimname)
355 except:
356 casalog.post("Cleaning up the existing file named "+fullconcatimname,"DEBUG")
357 os.remove(fullconcatimname)
358 # set tempclose = false to avoid a long accessing issue
359 #cmd = 'imageconcat inimages='+subimliststr+' outimage='+"'"+fullconcatimname+"'"+' type='+type+' tempclose=false'
360 #ret=os.system(cmd)
361 #if ret!=0:
362 # casalog.post("concatenation of "+concatimname+" failed","WARN")
363 iatool=imageanalysis()
364 concattool = iatool.imageconcat(outfile=fullconcatimname, mode=type, infiles=subimliststr.strip("'"), axis=-1, tempclose=False, overwrite=True)
365 if(len(concattool.shape())==0):
366 casalog.post("concatenation of "+concatimname+" failed","WARN")
367 concattool.done()
370 def getSummary(self):
371 joblist=[]
372 #for node in self.listOfNodes:
373 for node in self.modifiedListOfNodes:
374 joblist.append( self.PH.runcmd("summ = imager.getSummary("+str(node)+")", node) )
375 self.PH.checkJobs( joblist )
377 fullsumm={}
378 #for node in self.listOfNodes:
379 for node in self.modifiedListOfNodes:
380 summ = self.PH.pullval("summ", node )
381 fullsumm["node"+str(node)] = summ
383 return fullsumm
385 def deleteTools(self):
386 joblist=[]
387 #for node in self.listOfNodes:
388 for node in self.modifiedListOfNodes:
389 joblist.append( self.PH.runcmd("imager.deleteTools()", node) )
390 self.PH.checkJobs( joblist )
392 def estimatememory(self):
393 joblist=[]
394 #for node in self.listOfNodes:
395 for node in self.modifiedListOfNodes:
396 joblist.append( self.PH.runcmd("imager.estimatememory()", node) )
397 self.PH.checkJobs( joblist )
398#############################################