1import os
2import math
3import shutil
4import string
5import time
6import re;
7import copy
8import pdb
9
10from casatools import synthesisimager, synthesisnormalizer
11from casatasks import casalog
12
13from .imager_base import PySynthesisImager
14from .parallel_imager_helper import PyParallelImagerHelper
15synth_imager_name = 'synthesisimager'
16synth_imager_import = 'from casatools import synthesisimager'
17
18
19'''
20An implementation of parallel continuum imaging, using synthesisxxxx tools
21
22Datasets are partitioned by row and major cycles are parallelized.
23Gathers and normalization are done before passing the images to a
24non-parallel minor cycle. The output model image is them scattered to
25all the nodes for the next parallel major cycle.
26
27There are N synthesisimager objects.
28There is 1 instance per image field, of the normalizer and deconvolver.
29There is 1 iterbot.
30
31'''
32
33#############################################
34#############################################
35## Parallelize only major cycle.
36#############################################
37class PyParallelContSynthesisImager(PySynthesisImager):
38
39 def __init__(self,params=None):
40
41 PySynthesisImager.__init__(self,params)
42
43 self.PH = PyParallelImagerHelper()
44 self.NN = self.PH.NN
45 self.selpars = self.allselpars;
46 self.allselpars = self.PH.partitionContDataSelection(self.allselpars)
47 # self.allcflist = self.PH.partitionCFCacheList(self.cfcachepars['cflist']);
48 # self.allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']);
49 self.listOfNodes = self.PH.getNodeList();
50 self.coordsyspars = {};
51 self.toolsi=None
52
53#############################################
54 def resetSaveModelParams(self, params=None):
55 mainparams = params.getSelPars()
56 for n in self.allselpars: # for all nodes
57 for v in self.allselpars[n]: # for all MSes
58 self.allselpars[n][v]['readonly']=mainparams[v]['readonly']
59 self.allselpars[n][v]['usescratch']=mainparams[v]['usescratch']
60
61#############################################
62# def initializeImagers(self):
63# ### Drygridding, and Coordsys comes from a single imager on MAIN node.
64# ### No startmodel confusion. It's created only once and then scattered.
65 #self.initializeImagers()
66# pdb.set_trace()
67# super().initializeImagers()
68# ### Note : Leftover from CAS-9977
69# ### There is a coord system mismatch at scatter/gather, if the MAIN version already
70# ### exists on disk. With startmodel, it's xxx.model. With aproject, it's xxx.residual.
71# ### There is an exception in SIImageStore::openImage to handle this.
72# ### Turn on casalog.filter('DEBUG1') to see the warning message.
73
74
75#############################################
76 def initializeImagersBase(self,thisSelPars,partialSelPars):
77
78 if partialSelPars==False: ## Init only on the zero'th node
79
80 #
81 # Use the already-created imager on MAIN node
82 #
83 ##self.toolsi = synthesisimager()
84
85 #
86 # Select data.
87 #
88 for mss in sorted( self.selpars.keys() ):
89 self.toolsi.selectdata( thisSelPars[mss] )
90
91 # Defineimage.
92 # This makes the global csys. Get csys to distribute to other nodes
93 # It also sets 'startmodel' if available (this is later scattered to nodes)
94 for fld in range(0,self.NF):
95 tmpimpars = copy.deepcopy(self.allimpars[str(fld)])
96 #if tmpimpars.has_key('startmodel'):
97 # tmpimpars.pop('startmodel')
98 self.toolsi.defineimage( impars=tmpimpars, gridpars = self.allgridpars[str(fld)] )
99 fullcoords = self.toolsi.getcsys()
100 self.coordsyspars[str(fld)] = fullcoords
101
102 # Modify the coordsys inputs
103 for fld in range(0, self.NF):
104 self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy()
105
106 # Call the global defineimage again
107 # (to get around later error of different coordsys latpoles! (CAs-9977)
108 #for fld in range(0,self.NF):
109 # self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] )
110
111
112
113 else: ## partialSelPars==True , The actual initialization on all nodes.
114
115 #
116 # Start the imagers on all nodes.
117 #
118 joblist=[]
119 for node in self.listOfNodes:
120 joblist.append( self.PH.runcmd('{0}; toolsi = {1}()'.format(
121 synth_imager_import, synth_imager_name),
122 node) );
123 self.PH.checkJobs(joblist);
124
125 #
126 # Select data. If partialSelPars is True, use the thisSelPars
127 # data structure as a list of partitioned selections.
128 #
129 joblist=[];
130 nodes=self.listOfNodes;#[1];
131 for node in nodes:
132 for mss in sorted( self.selpars.keys() ):
133 selStr=str(thisSelPars[str(node-1)][mss]);
134 joblist.append( self.PH.runcmd("toolsi.selectdata( "+selStr+")", node) )
135 self.PH.checkJobs(joblist);
136
137 #
138 # Call defineimage at each node.
139 #
140 joblist=[];
141 for node in nodes:
142 ## For each image-field, define imaging parameters
143 nimpars = copy.deepcopy(self.allimpars)
144 # casalog.post("nimpars = "+str(nimpars))
145 ngridpars = copy.deepcopy(self.allgridpars)
146 for fld in range(0,self.NF):
147 if self.NN>1:
148 nimpars[str(fld)]['imagename'] = self.PH.getpartimagename( nimpars[str(fld)]['imagename'], node )
149
150 ## Pop out the startmodel, as it would already have been created on the main node,.
151 tmpimpars = nimpars[str(fld)]
152 if 'startmodel' in tmpimpars:
153 tmpimpars.pop('startmodel')
154
155 joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] )
156 + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) )
157 self.PH.checkJobs(joblist);
158
159#############################################
160
161 def initializeImagers(self):
162 #---------------------------------------
163 # Check if cfcache exists.
164 #
165 cfCacheName=''
166 cfcExists=False
167 if(self.allgridpars['0']['gridder'].startswith('awp')):
168 cfCacheName=self.allgridpars['0']['cfcache']
169 else:
170 self.allgridpars['0']['cfcache']=''
171 cfcExists=True
172 if(self.allgridpars['0']['gridder'] == 'awproject' or self.allgridpars['0']['gridder'] == 'awprojectft'):
173 if (cfCacheName == ''):
174 cfCacheName = self.allimpars['0']['imagename'] + '.cf'
175 cfCacheName=self.allgridpars['0']['cfcache'] = cfCacheName
176 self.allgridpars['0']['cfcache']= cfCacheName
177 cfcExists = (os.path.exists(cfCacheName) and os.path.isdir(cfCacheName));
178 if (cfcExists):
179 nCFs = len(os.listdir(cfCacheName));
180 if (nCFs == 0):
181 casalog.post(cfCacheName + " exists, but is empty. Attempt is being made to fill it now.","WARN")
182 cfcExists = False;
183 # casalog.post("##########################################")
184 # casalog.post("CFCACHE = "+cfCacheName,cfcExists)
185 # casalog.post("##########################################")
186
187
188 # Start one imager on MAIN node
189 self.toolsi = synthesisimager()
190
191 # Init one SI tool ( it records the csys per field in self.coordsyspars )
192 self.initializeImagersBase(self.selpars,False);
193
194 # Modify the coordsys inputs
195# for fld in range(0, self.NF):
196# self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy()
197
198 # Dry Gridding on the MAIN node ( i.e. on self.toolsi)
199 #if (not cfcExists):
200 # self.dryGridding();
201
202 ##weighting with mosfield=True
203 if( (self.weightpars['type']=='briggs') and (self.weightpars['multifield'])):
204 self.toolsi.setweighting(**self.weightpars)
205 ###master create the weight density for all fields
206 self.toolsi.getweightdensity()
207
208 # Clean up the single imager (MAIN node)
209 #self.toolsi.done()
210 #self.toolsi = None
211
212 # Do the second round, initializing imagers on ALL nodes
213 self.initializeImagersBase(self.allselpars,True);
214
215 # Fill CFCache - it uses all nodes.
216 if (not cfcExists):
217 self.SItool=self.toolsi
218 #super().initializeImagers()
219 ###Doing this serially as in parallel it randomly has race condition
220 ###about table.dat not available
221 super().makeCFCache(False)
222 #self.fillCFCache()
223 self.reloadCFCache();
224 self.SItool=None
225 self.toolsi.done()
226 self.toolsi = None
227
228
229######################################################################################################################################
230 #---------------------------------------
231 # 4. call setdata() for images on all nodes
232 #
233 # joblist=[];
234 # for node in self.listOfNodes:
235 # ## Send in Selection parameters for all MSs in the list
236 # #### MPIInterface related changes (the -1 in the expression below)
237 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ):
238 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) )
239 # self.PH.checkJobs(joblist);
240
241 #---------------------------------------
242 # 5. Call defineImage() on all nodes. This sets up the FTMs.
243 #
244# joblist=[];
245# for node in self.listOfNodes:
246# ## For each image-field, define imaging parameters
247# nimpars = copy.deepcopy(self.allimpars)
248# #casalog.post("nimpars = "+str(nimpars))
249# ngridpars = copy.deepcopy(self.allgridpars)
250# for fld in range(0,self.NF):
251# if self.NN>1:
252# nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
253# ### nimpars[str(fld)]['imagename'] = self.allnormpars[str(fld)]['workdir'] + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
254# ### nimpars[str(fld)]['imagename'] = nimpars[str(fld)]['imagename']+'.n'+str(node)
255
256# # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']+'.n'+str(node)
257# # # Give the same CFCache name to all nodes
258# ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache'];
259
260# joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) )
261# self.PH.checkJobs(joblist);
262
263 #---------------------------------------
264 # 6. If cfcache does not exist, call fillCFCache()
265 # This will fill the "empty" CFCache in parallel
266 # 7. Now call reloadCFCache() on all nodes.
267 # This reloads the latest cfcahce.
268
269
270
271 # TRY: Start all over again!
272 # self.deleteImagers();
273
274 # joblist=[]
275
276 # for node in self.listOfNodes:
277 # joblist.append( self.PH.runcmd("toolsi = casac.synthesisimager()", node) );
278 # self.PH.checkJobs(joblist);
279
280 # joblist=[];
281 # nodes=self.listOfNodes;#[1];
282 # for node in nodes:
283 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ):
284 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) )
285 # # for mss in sorted( self.selpars.keys() ):
286 # # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.selpars[mss])+")", node) )
287 # self.PH.checkJobs(joblist);
288
289 # joblist=[];
290 # for node in self.listOfNodes:
291 # nimpars = copy.deepcopy(self.allimpars)
292 # ngridpars = copy.deepcopy(self.allgridpars)
293 # for fld in range(0,self.NF):
294 # if self.NN>1:
295 # nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
296 # # # Give the same CFCache name to all nodes
297 # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache'];
298
299 # joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) )
300 # self.PH.checkJobs(joblist);
301
302
303#############################################
304
305
306#############################################
307
308 def initializeNormalizers(self):
309 for immod in range(0,self.NF):
310 self.PStools.append(synthesisnormalizer())
311 normpars = copy.deepcopy( self.allnormpars[str(immod)] )
312 partnames = []
313 if(self.NN>1):
314 #### MPIInterface related changes
315 #for node in range(0,self.NN):
316 for node in self.listOfNodes:
317 partnames.append( self.PH.getpartimagename( self.allimpars[str(immod)]['imagename'], node ) )
318 #onename = self.allimpars[str(immod)]['imagename']+'.n'+str(node)
319 #partnames.append( self.PH.getpath(node) + '/' + onename )
320 #self.PH.deletepartimages( self.PH.getpath(node), onename ) # To ensure restarts work properly.
321 self.PH.deletepartimages( self.allimpars[str(immod)]['imagename'] , node ) # To ensure restarts work properly.
322 normpars['partimagenames'] = partnames
323 self.PStools[immod].setupnormalizer(normpars=normpars)
324
325
326#############################################
327 def setWeighting(self):
328
329 ## Set weight parameters and accumulate weight density (natural)
330 joblist=[];
331 if( (self.weightpars['type']=='briggs') and (self.weightpars['multifield'])):
332 ###master created the weight density for all fields
333 ##Should have been in initializeImagersBase_New but it is not being called !
334 self.toolsi = synthesisimager()
335 for mss in sorted( self.selpars.keys() ):
336 self.toolsi.selectdata( self.selpars[mss] )
337 for fld in range(0,self.NF):
338 self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] )
339 self.toolsi.setweighting(**self.weightpars)
340 ###master create the weight density for all fields
341 weightimage=self.toolsi.getweightdensity()
342 self.toolsi.done()
343 self.toolsi=None
344 destWgtim=weightimage+'_moswt'
345 if( os.path.exists(destWgtim)):
346 shutil.rmtree(destWgtim)
347 shutil.move(weightimage, destWgtim)
348 joblist=[];
349 for node in self.listOfNodes:
350 joblist.append( self.PH.runcmd("toolsi.setweightdensity('"+str(destWgtim)+"')", node ) )
351 self.PH.checkJobs( joblist )
352 #for node in self.listOfNodes:
353 # ## Set weighting pars
354 # joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) )
355 #self.PH.checkJobs( joblist )
356 #joblist=[];
357 #for node in self.listOfNodes:
358 # joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) )
359 #self.PH.checkJobs( joblist )
360
361 #for immod in range(0,self.NF):
362 # #self.PStools[immod].gatherweightdensity()
363 # self.PStools[immod].scatterweightdensity()
364 ## Set weight density for each nodel
365 #joblist=[];
366 #for node in self.listOfNodes:
367 # joblist.append( self.PH.runcmd("toolsi.setweightdensity()", node ) )
368 #self.PH.checkJobs( joblist )
369 #### end of multifield or mosweight
370 else:
371 joblist=[];
372 for node in self.listOfNodes:
373 ## Set weighting pars
374 joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) )
375 self.PH.checkJobs( joblist )
376
377 ## If only one field, do the get/gather/set of the weight density.
378 if self.NF == 1 and self.allimpars['0']['stokes']=="I": ## Remove after gridded wts appear for all fields correctly (i.e. new FTM).
379
380 if not ( (self.weightpars['type'] == 'natural') or (self.weightpars['type'] == 'radial')) : ## For natural and radial, this array isn't created at all.
381 ## Remove when we switch to new FTM
382
383 casalog.post("Gathering/Merging/Scattering Weight Density for PSF generation","INFO")
384
385 joblist=[];
386 for node in self.listOfNodes:
387 joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) )
388 self.PH.checkJobs( joblist )
389
390 ## gather weightdensity and sum and scatter
391 casalog.post("******************************************************")
392 casalog.post(" gather and scatter now ")
393 casalog.post("******************************************************")
394 for immod in range(0,self.NF):
395 self.PStools[immod].gatherweightdensity()
396 self.PStools[immod].scatterweightdensity()
397
398 ## Set weight density for each nodel
399 joblist=[];
400 for node in self.listOfNodes:
401 joblist.append( self.PH.runcmd("toolsi.setweightdensity()", node ) )
402 self.PH.checkJobs( joblist )
403
404
405
406 def deleteImagers(self):
407 self.PH.runcmd("toolsi.done()")
408
409 def deleteWorkDir(self):
410 ## Delete the contents of the .workdirectory
411 for immod in range(0,self.NF):
412 normpars = copy.deepcopy( self.allnormpars[str(immod)] )
413 if(self.NN>1):
414 for node in self.listOfNodes:
415 self.PH.deletepartimages( self.allimpars[str(immod)]['imagename'], node ,deldir=True )
416
417# ## Delete the workdirectory
418# casalog.post("Deleting workdirectory : "+self.PH.getworkdir(imagename, node))
419# shutil.rmtree( self.PH.getworkdir(imagename, node) )
420
421 def deleteCluster(self):
422 self.PH.takedownCluster()
423
424# #############################################
425 def dryGridding(self):
426 dummy=['']
427 self.toolsi.drygridding(dummy)
428
429# def dryGridding_Old(self):
430# nodes=[1];
431# joblist=[];
432# for node in nodes:
433# dummy=[''];
434# cmd = "toolsi.drygridding("+str(dummy)+")";
435# joblist.append(self.PH.runcmd(cmd,node));
436# self.PH.checkJobs(joblist);
437
438#############################################
439 def reloadCFCache(self):
440 joblist=[];
441 for node in self.listOfNodes:
442 cmd = "toolsi.reloadcfcache()";
443 casalog.post("reloadCFCache, CMD = {} {}".format(node, cmd))
444 joblist.append(self.PH.runcmd(cmd,node));
445 self.PH.checkJobs(joblist);
446#############################################
447# def fillCFCache(self):
448# #casalog.post("-----------------------fillCFCache------------------------------------")
449# # cflist=[f for f in os.listdir(self.allgridpars['cfcache']) if re.match(r'CFS*', f)];
450# # partCFList =
451# if(not str(self.allgridpars['0']['gridder']).startswith("awp")):
452# return
453# allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']);
454# cfcPath = "\""+str(self.allgridpars['0']['cfcache'])+"\"";
455# ftmname = "\""+str(self.allgridpars['0']['gridder'])+"\"";
456# psTermOn = str(self.allgridpars['0']['psterm']);
457# aTermOn = str(self.allgridpars['0']['aterm']);
458# conjBeams = str(self.allgridpars['0']['conjbeams']);
459# #aTermOn = str(True);
460# # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
461# # casalog.post("AllCFList = ",allcflist)
462# m = len(allcflist);
463# # casalog.post("No. of nodes used: " + m,cfcPath,ftmname)
464# # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
465
466# joblist=[];
467# for node in self.listOfNodes[:m]:
468# # casalog.post("#!$#!%#!$#@$#@$ " + allcflist)
469# cmd = "toolsi.fillcfcache("+str(allcflist[node])+","+str(ftmname)+","+str(cfcPath)+","+psTermOn+","+aTermOn+","+conjBeams+")";
470# # casalog.post("CMD = " + str(node) +" " + cmd)
471# joblist.append(self.PH.runcmd(cmd,node));
472# self.PH.checkJobs(joblist);
473
474# # Linear code
475# cfcName = self.allgridpars['0']['cfcache'];
476# cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)];
477# self.cfcachepars['cflist']=cflist;
478# self.toolsi.fillcfcache(cflist, self.allgridpars['0']['gridder'],
479# cfcName,
480# self.allgridpars['0']['psterm'],
481# self.allgridpars['0']['aterm'],
482# self.allgridpars['0']['conjbeams']);
483# # self.SItool.fillcfcache(**(self.cfcachepars)) ;
484#############################################
485 def makePSFCore(self):
486 ### Make PSFs
487 joblist=[]
488 #### MPIInterface related changes
489 #for node in range(0,self.PH.NN):
490 for node in self.listOfNodes:
491 joblist.append( self.PH.runcmd("toolsi.makepsf()",node) )
492 self.PH.checkJobs( joblist ) # this call blocks until all are done.
493
494#############################################
495 def makePBCore(self):
496 joblist=[]
497 # Only one node needs to make the PB. It reads the freq from the image coordsys
498 joblist.append( self.PH.runcmd("toolsi.makepb()",self.listOfNodes[0]) )
499 self.PH.checkJobs( joblist )
500
501#############################################
502
503 def runMajorCycleCore(self, lastcycle):
504 casalog.post("----------------------------- Running Parallel Major Cycle ----------------------------","INFO")
505 ### Run major cycle
506 joblist=[]
507 #### MPIInterface related changes
508 #for node in range(0,self.PH.NN):
509 for node in self.listOfNodes:
510 joblist.append( self.PH.runcmd("toolsi.executemajorcycle(controls={'lastcycle':"+str(lastcycle)+"})",node) )
511 self.PH.checkJobs( joblist ) # this call blocks until all are done.
512
513#############################################
514 def predictModelCore(self):
515 joblist=[]
516 #### MPIInterface related changes
517 #for node in range(0,self.PH.NN):
518 for node in self.listOfNodes:
519 joblist.append( self.PH.runcmd("toolsi.predictmodel()",node) )
520 self.PH.checkJobs( joblist ) # this call blocks until all are done.
521
522 def estimatememory(self):
523 joblist=[]
524 for node in self.listOfNodes:
525 joblist.append( self.PH.runcmd("toolsi.estimatememory()", node) )
526 self.PH.checkJobs( joblist )