1import os
2import math
3import shutil
4import string
5import time
6import re;
7import copy
8import pdb
9
10from casatools import synthesisimager, synthesisnormalizer
11from casatasks import casalog
12
13from .imager_base import PySynthesisImager
14from .parallel_imager_helper import PyParallelImagerHelper
15synth_imager_name = 'synthesisimager'
16synth_imager_import = 'from casatools import synthesisimager'
17
18
19'''
20An implementation of parallel continuum imaging, using synthesisxxxx tools
21
22Datasets are partitioned by row and major cycles are parallelized.
23Gathers and normalization are done before passing the images to a
24non-parallel minor cycle. The output model image is them scattered to
25all the nodes for the next parallel major cycle.
26
27There are N synthesisimager objects.
28There is 1 instance per image field, of the normalizer and deconvolver.
29There is 1 iterbot.
30
31'''
32
33#############################################
34#############################################
35## Parallelize only major cycle.
36#############################################
37class PyParallelContSynthesisImager(PySynthesisImager):
38
39 def __init__(self,params=None):
40
41 PySynthesisImager.__init__(self,params)
42
43 self.PH = PyParallelImagerHelper()
44 self.NN = self.PH.NN
45 self.selpars = self.allselpars;
46 self.allselpars = self.PH.partitionContDataSelection(self.allselpars)
47 # self.allcflist = self.PH.partitionCFCacheList(self.cfcachepars['cflist']);
48 # self.allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']);
49 self.listOfNodes = self.PH.getNodeList();
50 self.coordsyspars = {};
51 self.toolsi=None
52
53#############################################
54 def resetSaveModelParams(self, params=None):
55 mainparams = params.getSelPars()
56 for n in self.allselpars: # for all nodes
57 for v in self.allselpars[n]: # for all MSes
58 self.allselpars[n][v]['readonly']=mainparams[v]['readonly']
59 self.allselpars[n][v]['usescratch']=mainparams[v]['usescratch']
60
61#############################################
62# def initializeImagers(self):
63# ### Drygridding, and Coordsys comes from a single imager on MAIN node.
64# ### No startmodel confusion. It's created only once and then scattered.
65 #self.initializeImagers()
66# pdb.set_trace()
67# super().initializeImagers()
68# ### Note : Leftover from CAS-9977
69# ### There is a coord system mismatch at scatter/gather, if the MAIN version already
70# ### exists on disk. With startmodel, it's xxx.model. With aproject, it's xxx.residual.
71# ### There is an exception in SIImageStore::openImage to handle this.
72# ### Turn on casalog.filter('DEBUG1') to see the warning message.
73
74
75#############################################
76 def initializeImagersBase(self,thisSelPars,partialSelPars):
77
78 if partialSelPars==False: ## Init only on the zero'th node
79
80 #
81 # Use the already-created imager on MAIN node
82 #
83 ##self.toolsi = synthesisimager()
84
85 #
86 # Select data.
87 #
88 for mss in sorted( self.selpars.keys() ):
89 self.toolsi.selectdata( thisSelPars[mss] )
90
91 # Defineimage.
92 # This makes the global csys. Get csys to distribute to other nodes
93 # It also sets 'startmodel' if available (this is later scattered to nodes)
94 for fld in range(0,self.NF):
95 tmpimpars = copy.deepcopy(self.allimpars[str(fld)])
96 #if tmpimpars.has_key('startmodel'):
97 # tmpimpars.pop('startmodel')
98 self.toolsi.defineimage( impars=tmpimpars, gridpars = self.allgridpars[str(fld)] )
99 fullcoords = self.toolsi.getcsys()
100 self.coordsyspars[str(fld)] = fullcoords
101
102 # Modify the coordsys inputs
103 for fld in range(0, self.NF):
104 self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy()
105
106 # Call the global defineimage again
107 # (to get around later error of different coordsys latpoles! (CAs-9977)
108 #for fld in range(0,self.NF):
109 # self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] )
110
111
112
113 else: ## partialSelPars==True , The actual initialization on all nodes.
114
115 #
116 # Start the imagers on all nodes.
117 #
118 joblist=[]
119 for node in self.listOfNodes:
120 joblist.append( self.PH.runcmd('{0}; toolsi = {1}()'.format(
121 synth_imager_import, synth_imager_name),
122 node) );
123 self.PH.checkJobs(joblist);
124
125 #
126 # Select data. If partialSelPars is True, use the thisSelPars
127 # data structure as a list of partitioned selections.
128 #
129 joblist=[];
130 nodes=self.listOfNodes;#[1];
131 for node in nodes:
132 for mss in sorted( self.selpars.keys() ):
133 selStr=str(thisSelPars[str(node-1)][mss]);
134 joblist.append( self.PH.runcmd("toolsi.selectdata( "+selStr+")", node) )
135 self.PH.checkJobs(joblist);
136
137 #
138 # Call defineimage at each node.
139 #
140 joblist=[];
141 for node in nodes:
142 ## For each image-field, define imaging parameters
143 nimpars = copy.deepcopy(self.allimpars)
144 # casalog.post("nimpars = "+str(nimpars))
145 ngridpars = copy.deepcopy(self.allgridpars)
146 for fld in range(0,self.NF):
147 if self.NN>1:
148 nimpars[str(fld)]['imagename'] = self.PH.getpartimagename( nimpars[str(fld)]['imagename'], node )
149
150 ## Pop out the startmodel, as it would already have been created on the main node,.
151 tmpimpars = nimpars[str(fld)]
152 if 'startmodel' in tmpimpars:
153 tmpimpars.pop('startmodel')
154
155 joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] )
156 + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) )
157 self.PH.checkJobs(joblist);
158
159#############################################
160
161 def initializeImagers(self):
162 #---------------------------------------
163 # Check if cfcache exists.
164 #
165 cfCacheName=''
166 cfcExists=False
167 if(self.allgridpars['0']['gridder'].startswith('awpr')):
168 cfCacheName=self.allgridpars['0']['cfcache']
169 else:
170 self.allgridpars['0']['cfcache']=''
171 cfcExists=True
172 if(self.allgridpars['0']['gridder'] == 'awproject' or self.allgridpars['0']['gridder'] == 'awprojectft'):
173 if (cfCacheName == ''):
174 cfCacheName = self.allimpars['0']['imagename'] + '.cf'
175 cfCacheName=self.allgridpars['0']['cfcache'] = cfCacheName
176 self.allgridpars['0']['cfcache']= cfCacheName
177 cfcExists = (os.path.exists(cfCacheName) and os.path.isdir(cfCacheName));
178 if (cfcExists):
179 nCFs = len(os.listdir(cfCacheName));
180 if (nCFs == 0):
181 casalog.post(cfCacheName + " exists, but is empty. Attempt is being made to fill it now.","WARN")
182 cfcExists = False;
183 # casalog.post("##########################################")
184 # casalog.post("CFCACHE = "+cfCacheName,cfcExists)
185 # casalog.post("##########################################")
186
187
188 # Start one imager on MAIN node
189 self.toolsi = synthesisimager()
190
191 # Init one SI tool ( it records the csys per field in self.coordsyspars )
192 self.initializeImagersBase(self.selpars,False);
193
194 # Modify the coordsys inputs
195# for fld in range(0, self.NF):
196# self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy()
197
198 # Dry Gridding on the MAIN node ( i.e. on self.toolsi)
199 #if (not cfcExists):
200 # self.dryGridding();
201
202 ##weighting with mosfield=True
203 if( ( ( (self.weightpars['type'].count('briggs') or self.weightpars['type'].count('uniform')) > 0) and (self.weightpars['multifield']) ) ):
204 self.toolsi.setweighting(**self.weightpars)
205 ###master create the weight density for all fields
206 self.toolsi.getweightdensity()
207
208 # Clean up the single imager (MAIN node)
209 #self.toolsi.done()
210 #self.toolsi = None
211
212 # Do the second round, initializing imagers on ALL nodes
213 self.initializeImagersBase(self.allselpars,True);
214
215 # Fill CFCache - it uses all nodes.
216 if (not cfcExists):
217 self.SItool=self.toolsi
218 #super().initializeImagers()
219 ###Doing this serially as in parallel it randomly has race condition
220 ###about table.dat not available
221 super().makeCFCache(False)
222 #self.fillCFCache()
223 self.reloadCFCache();
224 self.SItool=None
225 self.toolsi.done()
226 self.toolsi = None
227
228
229######################################################################################################################################
230 #---------------------------------------
231 # 4. call setdata() for images on all nodes
232 #
233 # joblist=[];
234 # for node in self.listOfNodes:
235 # ## Send in Selection parameters for all MSs in the list
236 # #### MPIInterface related changes (the -1 in the expression below)
237 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ):
238 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) )
239 # self.PH.checkJobs(joblist);
240
241 #---------------------------------------
242 # 5. Call defineImage() on all nodes. This sets up the FTMs.
243 #
244# joblist=[];
245# for node in self.listOfNodes:
246# ## For each image-field, define imaging parameters
247# nimpars = copy.deepcopy(self.allimpars)
248# #casalog.post("nimpars = "+str(nimpars))
249# ngridpars = copy.deepcopy(self.allgridpars)
250# for fld in range(0,self.NF):
251# if self.NN>1:
252# nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
253# ### nimpars[str(fld)]['imagename'] = self.allnormpars[str(fld)]['workdir'] + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
254# ### nimpars[str(fld)]['imagename'] = nimpars[str(fld)]['imagename']+'.n'+str(node)
255
256# # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']+'.n'+str(node)
257# # # Give the same CFCache name to all nodes
258# ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache'];
259
260# joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) )
261# self.PH.checkJobs(joblist);
262
263 #---------------------------------------
264 # 6. If cfcache does not exist, call fillCFCache()
265 # This will fill the "empty" CFCache in parallel
266 # 7. Now call reloadCFCache() on all nodes.
267 # This reloads the latest cfcahce.
268
269
270
271 # TRY: Start all over again!
272 # self.deleteImagers();
273
274 # joblist=[]
275
276 # for node in self.listOfNodes:
277 # joblist.append( self.PH.runcmd("toolsi = casac.synthesisimager()", node) );
278 # self.PH.checkJobs(joblist);
279
280 # joblist=[];
281 # nodes=self.listOfNodes;#[1];
282 # for node in nodes:
283 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ):
284 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) )
285 # # for mss in sorted( self.selpars.keys() ):
286 # # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.selpars[mss])+")", node) )
287 # self.PH.checkJobs(joblist);
288
289 # joblist=[];
290 # for node in self.listOfNodes:
291 # nimpars = copy.deepcopy(self.allimpars)
292 # ngridpars = copy.deepcopy(self.allgridpars)
293 # for fld in range(0,self.NF):
294 # if self.NN>1:
295 # nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
296 # # # Give the same CFCache name to all nodes
297 # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache'];
298
299 # joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) )
300 # self.PH.checkJobs(joblist);
301
302
303#############################################
304
305
306#############################################
307
308 def initializeNormalizers(self):
309 for immod in range(0,self.NF):
310 self.PStools.append(synthesisnormalizer())
311 self.localnormpars = copy.deepcopy( self.allnormpars[str(immod)] )
312 partnames = []
313 if(self.NN>1):
314 #### MPIInterface related changes
315 #for node in range(0,self.NN):
316 for node in self.listOfNodes:
317 partnames.append( self.PH.getpartimagename( self.allimpars[str(immod)]['imagename'], node ) )
318 #onename = self.allimpars[str(immod)]['imagename']+'.n'+str(node)
319 #partnames.append( self.PH.getpath(node) + '/' + onename )
320 #self.PH.deletepartimages( self.PH.getpath(node), onename ) # To ensure restarts work properly.
321 self.PH.deletepartimages( self.allimpars[str(immod)]['imagename'] , node ) # To ensure restarts work properly.
322 self.localnormpars['partimagenames'] = partnames
323
324 self.PStools[immod].setupnormalizer(normpars=self.localnormpars)
325
326
327#############################################
328 def setWeighting(self):
329 ## Set weight parameters and accumulate weight density (natural)
330 joblist=[];
331 if( ( ((self.weightpars['type'].count('briggs') or self.weightpars['type'].count('uniform')) >0) and (self.weightpars['multifield']) ) ):
332 ###master created the weight density for all fields
333 ##Should have been in initializeImagersBase_New but it is not being called !
334 self.toolsi = synthesisimager()
335 for mss in sorted( self.selpars.keys() ):
336 self.toolsi.selectdata( self.selpars[mss] )
337 for fld in range(0,self.NF):
338 self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] )
339 self.toolsi.setweighting(**self.weightpars)
340 ###master create the weight density for all fields
341 weightimage=self.toolsi.getweightdensity()
342 self.toolsi.done()
343 self.toolsi=None
344 destWgtim=weightimage+'_moswt'
345 if( os.path.exists(destWgtim)):
346 shutil.rmtree(destWgtim)
347 shutil.move(weightimage, destWgtim)
348 joblist=[];
349 for node in self.listOfNodes:
350 joblist.append( self.PH.runcmd("toolsi.setweightdensity('"+str(destWgtim)+"')", node ) )
351 self.PH.checkJobs( joblist )
352 #for node in self.listOfNodes:
353 # ## Set weighting pars
354 # joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) )
355 #self.PH.checkJobs( joblist )
356 #joblist=[];
357 #for node in self.listOfNodes:
358 # joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) )
359 #self.PH.checkJobs( joblist )
360
361 #for immod in range(0,self.NF):
362 # #self.PStools[immod].gatherweightdensity()
363 # self.PStools[immod].scatterweightdensity()
364 ## Set weight density for each nodel
365 #joblist=[];
366 #for node in self.listOfNodes:
367 # joblist.append( self.PH.runcmd("toolsi.setweightdensity()", node ) )
368 #self.PH.checkJobs( joblist )
369 #### end of multifield or mosweight
370 else:
371 joblist=[];
372 for node in self.listOfNodes:
373 ## Set weighting pars
374 joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) )
375 self.PH.checkJobs( joblist )
376
377 ## If only one field, do the get/gather/set of the weight density.
378 if self.NF == 1: # and self.allimpars['0']['stokes']=="I": ## Remove after gridded wts appear for all fields correctly (i.e. new FTM).
379
380 if not ( (self.weightpars['type'] == 'natural') or (self.weightpars['type'] == 'radial')) : ## For natural and radial, this array isn't created at all.
381 ## Remove when we switch to new FTM
382
383 casalog.post("Gathering/Merging/Scattering Weight Density for PSF generation","INFO")
384
385 joblist=[];
386 for node in self.listOfNodes:
387 joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) )
388 self.PH.checkJobs( joblist )
389
390
391
392
393
394 ## gather weightdensity and sum and scatter
395 casalog.post("******************************************************")
396 casalog.post(" gather and scatter now ")
397 casalog.post("******************************************************")
398 locpstool=synthesisnormalizer()
399 locpstool.setupnormalizer(normpars=self.localnormpars)
400
401 locpstool.gatherweightdensity()
402 sumgridname=locpstool.scatterweightdensity()
403 resname=sumgridname.replace(".gridwt", ".residual")
404 #print("%%%%%%%%", sumgridname)
405 if(os.path.exists(sumgridname+"_temp") and (os.path.exists(resname) or os.path.exists(resname+".tt0")) ): # a restart
406 shutil.rmtree(sumgridname, True)
407 shutil.move(sumgridname+"_temp", sumgridname)
408
409 ## Set weight density for each nodel
410 joblist=[];
411 for node in self.listOfNodes:
412 joblist.append( self.PH.runcmd("toolsi.setweightdensity('"+str(sumgridname)+"')", node ) )
413 self.PH.checkJobs( joblist )
414 ###For some reason we cannot stop psf being made along with gridwt image and
415 ### and may have the wrong shape at this stage
416 #shutil.rmtree(sumgridname)
417 shutil.rmtree(sumgridname+"_temp", True)
418 shutil.move(sumgridname, sumgridname+"_temp")
419
420 tmppsfname=sumgridname.replace(".gridwt", ".psf")
421 resname=sumgridname.replace(".gridwt", ".residual")
422 if(not os.path.exists(resname)) : # not a restart so psf shape may be different if full pol...delete it
423 shutil.rmtree(tmppsfname, True)
424 if(not os.path.exists(resname+".tt0")) :
425 shutil.rmtree(tmppsfname+".tt0", True)
426
427
428 else:
429 if not ( (self.weightpars['type'] == 'natural') or (self.weightpars['type'] == 'radial')) :
430 casalog.post("Parallel-Continuum-multifield with briggs weighting will give different weighting schemes with number of processes used", "WARN")
431
432
433
434 def deleteImagers(self):
435 self.PH.runcmd("toolsi.done()")
436
437 def deleteWorkDir(self):
438 ## Delete the contents of the .workdirectory
439 for immod in range(0,self.NF):
440 normpars = copy.deepcopy( self.allnormpars[str(immod)] )
441 if(self.NN>1):
442 for node in self.listOfNodes:
443 self.PH.deletepartimages( self.allimpars[str(immod)]['imagename'], node ,deldir=True )
444
445# ## Delete the workdirectory
446# casalog.post("Deleting workdirectory : "+self.PH.getworkdir(imagename, node))
447# shutil.rmtree( self.PH.getworkdir(imagename, node) )
448
449 def deleteCluster(self):
450 self.PH.takedownCluster()
451
452# #############################################
453 def dryGridding(self):
454 dummy=['']
455 self.toolsi.drygridding(dummy)
456
457# def dryGridding_Old(self):
458# nodes=[1];
459# joblist=[];
460# for node in nodes:
461# dummy=[''];
462# cmd = "toolsi.drygridding("+str(dummy)+")";
463# joblist.append(self.PH.runcmd(cmd,node));
464# self.PH.checkJobs(joblist);
465
466#############################################
467 def reloadCFCache(self):
468 joblist=[];
469 for node in self.listOfNodes:
470 cmd = "toolsi.reloadcfcache()";
471 casalog.post("reloadCFCache, CMD = {} {}".format(node, cmd))
472 joblist.append(self.PH.runcmd(cmd,node));
473 self.PH.checkJobs(joblist);
474#############################################
475# def fillCFCache(self):
476# #casalog.post("-----------------------fillCFCache------------------------------------")
477# # cflist=[f for f in os.listdir(self.allgridpars['cfcache']) if re.match(r'CFS*', f)];
478# # partCFList =
479# if(not str(self.allgridpars['0']['gridder']).startswith("awp")):
480# return
481# allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']);
482# cfcPath = "\""+str(self.allgridpars['0']['cfcache'])+"\"";
483# ftmname = "\""+str(self.allgridpars['0']['gridder'])+"\"";
484# psTermOn = str(self.allgridpars['0']['psterm']);
485# aTermOn = str(self.allgridpars['0']['aterm']);
486# conjBeams = str(self.allgridpars['0']['conjbeams']);
487# #aTermOn = str(True);
488# # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
489# # casalog.post("AllCFList = ",allcflist)
490# m = len(allcflist);
491# # casalog.post("No. of nodes used: " + m,cfcPath,ftmname)
492# # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
493
494# joblist=[];
495# for node in self.listOfNodes[:m]:
496# # casalog.post("#!$#!%#!$#@$#@$ " + allcflist)
497# cmd = "toolsi.fillcfcache("+str(allcflist[node])+","+str(ftmname)+","+str(cfcPath)+","+psTermOn+","+aTermOn+","+conjBeams+")";
498# # casalog.post("CMD = " + str(node) +" " + cmd)
499# joblist.append(self.PH.runcmd(cmd,node));
500# self.PH.checkJobs(joblist);
501
502# # Linear code
503# cfcName = self.allgridpars['0']['cfcache'];
504# cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)];
505# self.cfcachepars['cflist']=cflist;
506# self.toolsi.fillcfcache(cflist, self.allgridpars['0']['gridder'],
507# cfcName,
508# self.allgridpars['0']['psterm'],
509# self.allgridpars['0']['aterm'],
510# self.allgridpars['0']['conjbeams']);
511# # self.SItool.fillcfcache(**(self.cfcachepars)) ;
512#############################################
513 def makePSFCore(self):
514 ### Make PSFs
515 joblist=[]
516 #### MPIInterface related changes
517 #for node in range(0,self.PH.NN):
518 for node in self.listOfNodes:
519 joblist.append( self.PH.runcmd("toolsi.makepsf()",node) )
520 self.PH.checkJobs( joblist ) # this call blocks until all are done.
521
522#############################################
523 def makePBCore(self):
524 joblist=[]
525 # Only one node needs to make the PB. It reads the freq from the image coordsys
526 joblist.append( self.PH.runcmd("toolsi.makepb()",self.listOfNodes[0]) )
527 self.PH.checkJobs( joblist )
528
529#############################################
530
531 def runMajorCycleCore(self, lastcycle):
532 casalog.post("----------------------------- Running Parallel Major Cycle ----------------------------","INFO")
533 ### Run major cycle
534 joblist=[]
535 #### MPIInterface related changes
536 #for node in range(0,self.PH.NN):
537 for node in self.listOfNodes:
538 joblist.append( self.PH.runcmd("toolsi.executemajorcycle(controls={'lastcycle':"+str(lastcycle)+"})",node) )
539 self.PH.checkJobs( joblist ) # this call blocks until all are done.
540
541#############################################
542 def predictModelCore(self):
543 joblist=[]
544 #### MPIInterface related changes
545 #for node in range(0,self.PH.NN):
546 for node in self.listOfNodes:
547 joblist.append( self.PH.runcmd("toolsi.predictmodel()",node) )
548 self.PH.checkJobs( joblist ) # this call blocks until all are done.
549
550 def estimatememory(self):
551 joblist=[]
552 for node in self.listOfNodes:
553 joblist.append( self.PH.runcmd("toolsi.estimatememory()", node) )
554 self.PH.checkJobs( joblist )