Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/imagerhelpers/imager_parallel_continuum.py: 14%
223 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
1import os
2import math
3import shutil
4import string
5import time
6import re
7import copy
8import pdb
10from casatools import synthesisimager, synthesisnormalizer
11from casatasks import casalog
13from .imager_base import PySynthesisImager
14from .parallel_imager_helper import PyParallelImagerHelper
16synth_imager_name = "synthesisimager"
17synth_imager_import = "from casatools import synthesisimager"
20"""
21An implementation of parallel continuum imaging, using synthesisxxxx tools
23Datasets are partitioned by row and major cycles are parallelized.
24Gathers and normalization are done before passing the images to a
25non-parallel minor cycle. The output model image is them scattered to
26all the nodes for the next parallel major cycle.
28There are N synthesisimager objects.
29There is 1 instance per image field, of the normalizer and deconvolver.
30There is 1 iterbot.
32"""
35#############################################
36#############################################
37## Parallelize only major cycle.
38#############################################
39class PyParallelContSynthesisImager(PySynthesisImager):
41 def __init__(self, params=None):
43 PySynthesisImager.__init__(self, params)
45 self.PH = PyParallelImagerHelper()
46 self.NN = self.PH.NN
47 self.selpars = self.allselpars
48 self.allselpars = self.PH.partitionContDataSelection(self.allselpars)
49 # self.allcflist = self.PH.partitionCFCacheList(self.cfcachepars['cflist']);
50 # self.allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']);
51 self.listOfNodes = self.PH.getNodeList()
52 self.coordsyspars = {}
53 self.toolsi = None
55 #############################################
56 def resetSaveModelParams(self, params=None):
57 mainparams = params.getSelPars()
58 for n in self.allselpars: # for all nodes
59 for v in self.allselpars[n]: # for all MSes
60 self.allselpars[n][v]["readonly"] = mainparams[v]["readonly"]
61 self.allselpars[n][v]["usescratch"] = mainparams[v][
62 "usescratch"
63 ]
65 #############################################
66 # def initializeImagers(self):
67 # ### Drygridding, and Coordsys comes from a single imager on MAIN node.
68 # ### No startmodel confusion. It's created only once and then scattered.
69 # self.initializeImagers()
70 # pdb.set_trace()
71 # super().initializeImagers()
72 # ### Note : Leftover from CAS-9977
73 # ### There is a coord system mismatch at scatter/gather, if the MAIN version already
74 # ### exists on disk. With startmodel, it's xxx.model. With aproject, it's xxx.residual.
75 # ### There is an exception in SIImageStore::openImage to handle this.
76 # ### Turn on casalog.filter('DEBUG1') to see the warning message.
78 #############################################
79 def initializeImagersBase(self, thisSelPars, partialSelPars):
81 if partialSelPars == False: ## Init only on the zero'th node
83 #
84 # Use the already-created imager on MAIN node
85 #
86 ##self.toolsi = synthesisimager()
88 #
89 # Select data.
90 #
91 for mss in sorted(self.selpars.keys()):
92 self.toolsi.selectdata(thisSelPars[mss])
94 # Defineimage.
95 # This makes the global csys. Get csys to distribute to other nodes
96 # It also sets 'startmodel' if available (this is later scattered to nodes)
97 for fld in range(0, self.NF):
98 tmpimpars = copy.deepcopy(self.allimpars[str(fld)])
99 # if tmpimpars.has_key('startmodel'):
100 # tmpimpars.pop('startmodel')
101 self.toolsi.defineimage(
102 impars=tmpimpars, gridpars=self.allgridpars[str(fld)]
103 )
104 fullcoords = self.toolsi.getcsys()
105 self.coordsyspars[str(fld)] = fullcoords
107 # Modify the coordsys inputs
108 for fld in range(0, self.NF):
109 self.allimpars[str(fld)]["csys"] = self.coordsyspars[str(fld)][
110 "coordsys"
111 ].copy()
113 # Call the global defineimage again
114 # (to get around later error of different coordsys latpoles! (CAs-9977)
115 # for fld in range(0,self.NF):
116 # self.toolsi.defineimage( impars=self.allimpars[str(fld)], gridpars = self.allgridpars[str(fld)] )
118 else: ## partialSelPars==True , The actual initialization on all nodes.
120 #
121 # Start the imagers on all nodes.
122 #
123 joblist = []
124 for node in self.listOfNodes:
125 joblist.append(
126 self.PH.runcmd(
127 "{0}; toolsi = {1}()".format(
128 synth_imager_import, synth_imager_name
129 ),
130 node,
131 )
132 )
133 self.PH.checkJobs(joblist)
135 #
136 # Select data. If partialSelPars is True, use the thisSelPars
137 # data structure as a list of partitioned selections.
138 #
139 joblist = []
140 nodes = self.listOfNodes
141 # [1];
142 for node in nodes:
143 for mss in sorted(self.selpars.keys()):
144 selStr = str(thisSelPars[str(node - 1)][mss])
145 joblist.append(
146 self.PH.runcmd(
147 "toolsi.selectdata( " + selStr + ")", node
148 )
149 )
150 self.PH.checkJobs(joblist)
152 #
153 # Call defineimage at each node.
154 #
155 joblist = []
156 for node in nodes:
157 ## For each image-field, define imaging parameters
158 nimpars = copy.deepcopy(self.allimpars)
159 # casalog.post("nimpars = "+str(nimpars))
160 ngridpars = copy.deepcopy(self.allgridpars)
161 for fld in range(0, self.NF):
162 if self.NN > 1:
163 nimpars[str(fld)]["imagename"] = (
164 self.PH.getpartimagename(
165 nimpars[str(fld)]["imagename"], node
166 )
167 )
169 ## Pop out the startmodel, as it would already have been created on the main node,.
170 tmpimpars = nimpars[str(fld)]
171 if "startmodel" in tmpimpars:
172 tmpimpars.pop("startmodel")
174 joblist.append(
175 self.PH.runcmd(
176 "toolsi.defineimage( impars="
177 + str(nimpars[str(fld)])
178 + ", gridpars="
179 + str(ngridpars[str(fld)])
180 + ")",
181 node,
182 )
183 )
184 self.PH.checkJobs(joblist)
186 #############################################
188 def initializeImagers(self):
189 # ---------------------------------------
190 # Check if cfcache exists.
191 #
192 cfCacheName = ""
193 cfcExists = False
194 if self.allgridpars["0"]["gridder"].startswith("awpr"):
195 cfCacheName = self.allgridpars["0"]["cfcache"]
196 else:
197 self.allgridpars["0"]["cfcache"] = ""
198 cfcExists = True
199 if (
200 self.allgridpars["0"]["gridder"] == "awproject"
201 or self.allgridpars["0"]["gridder"] == "awprojectft"
202 ):
203 if cfCacheName == "":
204 cfCacheName = self.allimpars["0"]["imagename"] + ".cf"
205 cfCacheName = self.allgridpars["0"]["cfcache"] = cfCacheName
206 self.allgridpars["0"]["cfcache"] = cfCacheName
207 cfcExists = os.path.exists(cfCacheName) and os.path.isdir(
208 cfCacheName
209 )
210 if cfcExists:
211 nCFs = len(os.listdir(cfCacheName))
212 if nCFs == 0:
213 casalog.post(
214 cfCacheName
215 + " exists, but is empty. Attempt is being made to fill it now.",
216 "WARN",
217 )
218 cfcExists = False
219 # casalog.post("##########################################")
220 # casalog.post("CFCACHE = "+cfCacheName,cfcExists)
221 # casalog.post("##########################################")
223 # Start one imager on MAIN node
224 self.toolsi = synthesisimager()
226 # Init one SI tool ( it records the csys per field in self.coordsyspars )
227 self.initializeImagersBase(self.selpars, False)
229 # Modify the coordsys inputs
230 # for fld in range(0, self.NF):
231 # self.allimpars[str(fld)]['csys']=self.coordsyspars[str(fld)]['coordsys'].copy()
233 # Dry Gridding on the MAIN node ( i.e. on self.toolsi)
234 # if (not cfcExists):
235 # self.dryGridding();
237 ##weighting with mosfield=True
238 if (
239 (
240 self.weightpars["type"].count("briggs")
241 or self.weightpars["type"].count("uniform")
242 )
243 > 0
244 ) and (self.weightpars["multifield"]):
245 self.toolsi.setweighting(**self.weightpars)
246 ###master create the weight density for all fields
247 self.toolsi.getweightdensity()
249 # Clean up the single imager (MAIN node)
250 # self.toolsi.done()
251 # self.toolsi = None
253 # Do the second round, initializing imagers on ALL nodes
254 self.initializeImagersBase(self.allselpars, True)
256 # Fill CFCache - it uses all nodes.
257 if not cfcExists:
258 self.SItool = self.toolsi
259 # super().initializeImagers()
260 ###Doing this serially as in parallel it randomly has race condition
261 ###about table.dat not available
262 super().makeCFCache(False)
263 # self.fillCFCache()
264 self.reloadCFCache()
265 self.SItool = None
266 self.toolsi.done()
267 self.toolsi = None
269 ######################################################################################################################################
270 # ---------------------------------------
271 # 4. call setdata() for images on all nodes
272 #
273 # joblist=[];
274 # for node in self.listOfNodes:
275 # ## Send in Selection parameters for all MSs in the list
276 # #### MPIInterface related changes (the -1 in the expression below)
277 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ):
278 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) )
279 # self.PH.checkJobs(joblist);
281 # ---------------------------------------
282 # 5. Call defineImage() on all nodes. This sets up the FTMs.
283 #
284 # joblist=[];
285 # for node in self.listOfNodes:
286 # ## For each image-field, define imaging parameters
287 # nimpars = copy.deepcopy(self.allimpars)
288 # #casalog.post("nimpars = "+str(nimpars))
289 # ngridpars = copy.deepcopy(self.allgridpars)
290 # for fld in range(0,self.NF):
291 # if self.NN>1:
292 # nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
293 # ### nimpars[str(fld)]['imagename'] = self.allnormpars[str(fld)]['workdir'] + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
294 # ### nimpars[str(fld)]['imagename'] = nimpars[str(fld)]['imagename']+'.n'+str(node)
296 # # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache']+'.n'+str(node)
297 # # # Give the same CFCache name to all nodes
298 # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache'];
300 # joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) )
301 # self.PH.checkJobs(joblist);
303 # ---------------------------------------
304 # 6. If cfcache does not exist, call fillCFCache()
305 # This will fill the "empty" CFCache in parallel
306 # 7. Now call reloadCFCache() on all nodes.
307 # This reloads the latest cfcahce.
309 # TRY: Start all over again!
310 # self.deleteImagers();
312 # joblist=[]
314 # for node in self.listOfNodes:
315 # joblist.append( self.PH.runcmd("toolsi = casac.synthesisimager()", node) );
316 # self.PH.checkJobs(joblist);
318 # joblist=[];
319 # nodes=self.listOfNodes;#[1];
320 # for node in nodes:
321 # for mss in sorted( (self.allselpars[str(node-1)]).keys() ):
322 # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.allselpars[str(node-1)][mss])+")", node) )
323 # # for mss in sorted( self.selpars.keys() ):
324 # # joblist.append( self.PH.runcmd("toolsi.selectdata( "+str(self.selpars[mss])+")", node) )
325 # self.PH.checkJobs(joblist);
327 # joblist=[];
328 # for node in self.listOfNodes:
329 # nimpars = copy.deepcopy(self.allimpars)
330 # ngridpars = copy.deepcopy(self.allgridpars)
331 # for fld in range(0,self.NF):
332 # if self.NN>1:
333 # nimpars[str(fld)]['imagename'] = self.PH.getpath(node) + '/' + nimpars[str(fld)]['imagename']+'.n'+str(node)
334 # # # Give the same CFCache name to all nodes
335 # ngridpars[str(fld)]['cfcache'] = ngridpars[str(fld)]['cfcache'];
337 # joblist.append( self.PH.runcmd("toolsi.defineimage( impars=" + str( nimpars[str(fld)] ) + ", gridpars=" + str( ngridpars[str(fld)] ) + ")", node ) )
338 # self.PH.checkJobs(joblist);
340 #############################################
342 #############################################
344 def initializeNormalizers(self):
345 for immod in range(0, self.NF):
346 self.PStools.append(synthesisnormalizer())
347 self.localnormpars = copy.deepcopy(self.allnormpars[str(immod)])
348 partnames = []
349 if self.NN > 1:
350 #### MPIInterface related changes
351 # for node in range(0,self.NN):
352 for node in self.listOfNodes:
353 partnames.append(
354 self.PH.getpartimagename(
355 self.allimpars[str(immod)]["imagename"], node
356 )
357 )
358 # onename = self.allimpars[str(immod)]['imagename']+'.n'+str(node)
359 # partnames.append( self.PH.getpath(node) + '/' + onename )
360 # self.PH.deletepartimages( self.PH.getpath(node), onename ) # To ensure restarts work properly.
361 self.PH.deletepartimages(
362 self.allimpars[str(immod)]["imagename"], node
363 ) # To ensure restarts work properly.
364 self.localnormpars["partimagenames"] = partnames
366 self.PStools[immod].setupnormalizer(normpars=self.localnormpars)
368 #############################################
369 def setWeighting(self):
370 ## Set weight parameters and accumulate weight density (natural)
371 joblist = []
372 if (
373 (
374 self.weightpars["type"].count("briggs")
375 or self.weightpars["type"].count("uniform")
376 )
377 > 0
378 ) and (self.weightpars["multifield"]):
379 ###master created the weight density for all fields
380 ##Should have been in initializeImagersBase_New but it is not being called !
381 self.toolsi = synthesisimager()
382 for mss in sorted(self.selpars.keys()):
383 self.toolsi.selectdata(self.selpars[mss])
384 for fld in range(0, self.NF):
385 self.toolsi.defineimage(
386 impars=self.allimpars[str(fld)],
387 gridpars=self.allgridpars[str(fld)],
388 )
389 self.toolsi.setweighting(**self.weightpars)
390 ###master create the weight density for all fields
391 weightimage = self.toolsi.getweightdensity()
392 self.toolsi.done()
393 self.toolsi = None
394 destWgtim = weightimage + "_moswt"
395 if os.path.exists(destWgtim):
396 shutil.rmtree(destWgtim)
397 shutil.move(weightimage, destWgtim)
398 joblist = []
399 for node in self.listOfNodes:
400 joblist.append(
401 self.PH.runcmd(
402 "toolsi.setweightdensity('" + str(destWgtim) + "')",
403 node,
404 )
405 )
406 self.PH.checkJobs(joblist)
407 # for node in self.listOfNodes:
408 # ## Set weighting pars
409 # joblist.append( self.PH.runcmd("toolsi.setweighting( **" + str(self.weightpars) + ")", node ) )
410 # self.PH.checkJobs( joblist )
411 # joblist=[];
412 # for node in self.listOfNodes:
413 # joblist.append( self.PH.runcmd("toolsi.getweightdensity()", node ) )
414 # self.PH.checkJobs( joblist )
416 # for immod in range(0,self.NF):
417 # #self.PStools[immod].gatherweightdensity()
418 # self.PStools[immod].scatterweightdensity()
419 ## Set weight density for each nodel
420 # joblist=[];
421 # for node in self.listOfNodes:
422 # joblist.append( self.PH.runcmd("toolsi.setweightdensity()", node ) )
423 # self.PH.checkJobs( joblist )
424 #### end of multifield or mosweight
425 else:
426 joblist = []
427 for node in self.listOfNodes:
428 ## Set weighting pars
429 joblist.append(
430 self.PH.runcmd(
431 "toolsi.setweighting( **" + str(self.weightpars) + ")",
432 node,
433 )
434 )
435 self.PH.checkJobs(joblist)
437 ## If only one field, do the get/gather/set of the weight density.
438 if (
439 self.NF == 1
440 ): # and self.allimpars['0']['stokes']=="I": ## Remove after gridded wts appear for all fields correctly (i.e. new FTM).
442 if not (
443 (self.weightpars["type"] == "natural")
444 or (self.weightpars["type"] == "radial")
445 ): ## For natural and radial, this array isn't created at all.
446 ## Remove when we switch to new FTM
448 casalog.post(
449 "Gathering/Merging/Scattering Weight Density for PSF generation",
450 "INFO",
451 )
453 # joblist = []
454 # for node in self.listOfNodes:
455 # joblist.append(
456 # self.PH.runcmd("toolsi.getweightdensity()", node)
457 # )
458 # self.PH.checkJobs(joblist)
460 ## gather weightdensity and sum and scatter
461 casalog.post(
462 "******************************************************"
463 )
464 casalog.post(" gather and scatter now ")
465 casalog.post(
466 "******************************************************"
467 )
468 resname = self.localnormpars["imagename"] + ".residual"
469 sumgridname = resname.replace(".residual", ".gridwt")
470 ################
472 if os.path.exists(sumgridname + "_temp") and (
473 os.path.exists(resname)
474 or os.path.exists(resname + ".tt0")
475 ): # a restart
476 shutil.rmtree(sumgridname, True)
477 shutil.move(sumgridname + "_temp", sumgridname)
478 #############################
480 else:
481 try:
482 joblist = []
483 if (
484 self.localnormpars["calcres"]
485 and self.localnormpars["calcpsf"]
486 ):
487 ##SIImageStore ..keeps comparing shape
488 ##when we try to make a gridwt
489 psfname = resname.replace(".residual", ".psf")
490 pbname = resname.replace(".residual", ".pb")
491 shutil.rmtree(resname, True)
492 shutil.rmtree(resname + ".tt0", True)
493 shutil.rmtree(psfname, True)
494 shutil.rmtree(psfname + ".tt0", True)
495 shutil.rmtree(pbname, True)
496 shutil.rmtree(pbname + ".tt0", True)
498 for node in self.listOfNodes:
499 joblist.append(
500 self.PH.runcmd(
501 "toolsi.getweightdensity()", node
502 )
503 )
504 self.PH.checkJobs(joblist)
505 locpstool = synthesisnormalizer()
506 locpstool.setupnormalizer(
507 normpars=self.localnormpars
508 )
510 locpstool.gatherweightdensity()
511 sumgridname = locpstool.scatterweightdensity()
512 # print("%%%%%%%%", sumgridname)
513 except Exception as inst:
514 if os.path.exists(resname) or os.path.exists(
515 resname + ".tt0"
516 ):
517 raise Exception(
518 f"restarting Briggs Style weighting for parallel mfs imaging without the {sumgridname}_temp file on disk try restarting with calcres and calcpsf=True"
519 ) from inst
520 else:
521 raise Exception(inst)
523 ## Set weight density for each nodel
524 joblist = []
525 for node in self.listOfNodes:
526 joblist.append(
527 self.PH.runcmd(
528 "toolsi.setweightdensity('"
529 + str(sumgridname)
530 + "')",
531 node,
532 )
533 )
534 self.PH.checkJobs(joblist)
535 ###For some reason we cannot stop psf being made along with gridwt image and
536 ### and may have the wrong shape at this stage
537 # shutil.rmtree(sumgridname)
538 shutil.rmtree(sumgridname + "_temp", True)
539 shutil.move(sumgridname, sumgridname + "_temp")
541 tmppsfname = sumgridname.replace(".gridwt", ".psf")
542 resname = sumgridname.replace(".gridwt", ".residual")
543 if not os.path.exists(
544 resname
545 ): # not a restart so psf shape may be different if full pol...delete it
546 shutil.rmtree(tmppsfname, True)
547 if not os.path.exists(resname + ".tt0"):
548 shutil.rmtree(tmppsfname + ".tt0", True)
550 else:
551 if not (
552 (self.weightpars["type"] == "natural")
553 or (self.weightpars["type"] == "radial")
554 ):
555 casalog.post(
556 "Parallel-Continuum-multifield with briggs weighting will give different weighting schemes with number of processes used",
557 "WARN",
558 )
560 def deleteImagers(self):
561 self.PH.runcmd("toolsi.done()")
563 def deleteWorkDir(self):
564 ## Delete the contents of the .workdirectory
565 for immod in range(0, self.NF):
566 normpars = copy.deepcopy(self.allnormpars[str(immod)])
567 if self.NN > 1:
568 for node in self.listOfNodes:
569 self.PH.deletepartimages(
570 self.allimpars[str(immod)]["imagename"],
571 node,
572 deldir=True,
573 )
575 # ## Delete the workdirectory
576 # casalog.post("Deleting workdirectory : "+self.PH.getworkdir(imagename, node))
577 # shutil.rmtree( self.PH.getworkdir(imagename, node) )
579 def deleteCluster(self):
580 self.PH.takedownCluster()
582 # #############################################
583 def dryGridding(self):
584 dummy = [""]
585 self.toolsi.drygridding(dummy)
587 # def dryGridding_Old(self):
588 # nodes=[1];
589 # joblist=[];
590 # for node in nodes:
591 # dummy=[''];
592 # cmd = "toolsi.drygridding("+str(dummy)+")";
593 # joblist.append(self.PH.runcmd(cmd,node));
594 # self.PH.checkJobs(joblist);
596 #############################################
597 def reloadCFCache(self):
598 joblist = []
599 for node in self.listOfNodes:
600 cmd = "toolsi.reloadcfcache()"
601 casalog.post("reloadCFCache, CMD = {} {}".format(node, cmd))
602 joblist.append(self.PH.runcmd(cmd, node))
603 self.PH.checkJobs(joblist)
605 #############################################
606 # def fillCFCache(self):
607 # #casalog.post("-----------------------fillCFCache------------------------------------")
608 # # cflist=[f for f in os.listdir(self.allgridpars['cfcache']) if re.match(r'CFS*', f)];
609 # # partCFList =
610 # if(not str(self.allgridpars['0']['gridder']).startswith("awp")):
611 # return
612 # allcflist = self.PH.partitionCFCacheList(self.allgridpars['0']);
613 # cfcPath = "\""+str(self.allgridpars['0']['cfcache'])+"\"";
614 # ftmname = "\""+str(self.allgridpars['0']['gridder'])+"\"";
615 # psTermOn = str(self.allgridpars['0']['psterm']);
616 # aTermOn = str(self.allgridpars['0']['aterm']);
617 # conjBeams = str(self.allgridpars['0']['conjbeams']);
618 # #aTermOn = str(True);
619 # # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
620 # # casalog.post("AllCFList = ",allcflist)
621 # m = len(allcflist);
622 # # casalog.post("No. of nodes used: " + m,cfcPath,ftmname)
623 # # casalog.post("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
625 # joblist=[];
626 # for node in self.listOfNodes[:m]:
627 # # casalog.post("#!$#!%#!$#@$#@$ " + allcflist)
628 # cmd = "toolsi.fillcfcache("+str(allcflist[node])+","+str(ftmname)+","+str(cfcPath)+","+psTermOn+","+aTermOn+","+conjBeams+")";
629 # # casalog.post("CMD = " + str(node) +" " + cmd)
630 # joblist.append(self.PH.runcmd(cmd,node));
631 # self.PH.checkJobs(joblist);
633 # # Linear code
634 # cfcName = self.allgridpars['0']['cfcache'];
635 # cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)];
636 # self.cfcachepars['cflist']=cflist;
637 # self.toolsi.fillcfcache(cflist, self.allgridpars['0']['gridder'],
638 # cfcName,
639 # self.allgridpars['0']['psterm'],
640 # self.allgridpars['0']['aterm'],
641 # self.allgridpars['0']['conjbeams']);
642 # # self.SItool.fillcfcache(**(self.cfcachepars)) ;
643 #############################################
644 def makePSFCore(self):
645 ### Make PSFs
646 joblist = []
647 #### MPIInterface related changes
648 # for node in range(0,self.PH.NN):
649 for node in self.listOfNodes:
650 joblist.append(self.PH.runcmd("toolsi.makepsf()", node))
651 self.PH.checkJobs(joblist) # this call blocks until all are done.
653 #############################################
654 def makePBCore(self):
655 joblist = []
656 # Only one node needs to make the PB. It reads the freq from the image coordsys
657 joblist.append(self.PH.runcmd("toolsi.makepb()", self.listOfNodes[0]))
658 self.PH.checkJobs(joblist)
660 #############################################
662 def runMajorCycleCore(self, lastcycle):
663 casalog.post(
664 "----------------------------- Running Parallel Major Cycle ----------------------------",
665 "INFO",
666 )
667 ### Run major cycle
668 joblist = []
669 #### MPIInterface related changes
670 # for node in range(0,self.PH.NN):
671 for node in self.listOfNodes:
672 joblist.append(
673 self.PH.runcmd(
674 "toolsi.executemajorcycle(controls={'lastcycle':"
675 + str(lastcycle)
676 + "})",
677 node,
678 )
679 )
680 self.PH.checkJobs(joblist) # this call blocks until all are done.
682 #############################################
683 def predictModelCore(self):
684 joblist = []
685 #### MPIInterface related changes
686 # for node in range(0,self.PH.NN):
687 for node in self.listOfNodes:
688 joblist.append(self.PH.runcmd("toolsi.predictmodel()", node))
689 self.PH.checkJobs(joblist) # this call blocks until all are done.
691 def estimatememory(self):
692 joblist = []
693 for node in self.listOfNodes:
694 joblist.append(self.PH.runcmd("toolsi.estimatememory()", node))
695 self.PH.checkJobs(joblist)