Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/imagerhelpers/parallel_imager_helper.py: 24%
140 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-10-31 19:10 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-10-31 19:10 +0000
1import os
2import math
3import shutil
4import string
5import time
6import re;
7import copy
9from casatools import synthesisutils
10from casatasks import casalog
12'''
13A set of helper functions for the tasks tclean
15Summary...
17'''
19#############################################
20### Parallel Imager Helper.
21#############################################
22#casalog.post('Using clustermanager from MPIInterface', 'WARN')
23try:
24 from casampi.MPIInterface import MPIInterface as mpi_clustermanager
25 mpi_available = True
26except ImportError:
27 mpi_available = False
29class PyParallelImagerHelper():
31 def __init__(self):
33 ############### Cluster Info
34 self.CL=None
35 self.sc=None
36 self.nodeList=None;
37 # Initialize cluster, and partitioning.
38 ############### Number of nodes to parallelize on
40 # self.nodeList gets filled by setupCluster()
41 self.NN = self.setupCluster()
43 def getNodeList(self):
44 return self.nodeList;
46#############################################
47 def chunkify(self,lst,n):
48 return [ lst[i::n] for i in range(n) ]
50 def partitionCFCacheList(self,gridPars):
52 cfcName = gridPars['cfcache'];
53 cflist=[];
54 if (not (cfcName == '')):
55 cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)];
56 nCF = len(cflist);
57 nProcs=len(self.nodeList);
59 if (nProcs > nCF):
60 n=nCF;
61 else:
62 n=nProcs;
63 if (nCF > 0):
64 casalog.post("########################################################");
65 casalog.post("nCF = " + str(nCF) + " nProcs = " + str(n) + " NodeList=" + str(self.nodeList));
66 casalog.post("########################################################");
67 xx=self.chunkify(cflist,n);
68 allcfs={};
69 for i in range(n):
70 allcfs[i+1]=xx[i];
72 return allcfs;
73#############################################
74# The above version works better (better balanced chunking).
75# Keeping the code below in the file sometime, just in case...(SB).
76 # def partitionCFCacheList(self,gridPars):
78 # cfcName = gridPars['cfcache'];
79 # cflist=[];
80 # if (not (cfcName == '')):
81 # cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)];
83 # nCF = len(cflist);
84 # nProcs=len(self.nodeList);
86 # casalog.post("########################################################")
87 # casalog.post("nCF = " + nCF + " nProcs = " + nProcs + " NodeList=" + self.nodeList)
88 # casalog.post("########################################################")
90 # #n0=int(nCF/self.NN);
91 # n0=int(float(nCF)/nProcs+0.5);
92 # if (nProcs >= nCF):
93 # n0 = 1;
94 # allcfs = {};
95 # nUsed=0; i=1;
96 # while (nUsed < nCF):
97 # m = nUsed+n0;
98 # if (m > nCF):
99 # m=nCF;
100 # allcfs[i]=cflist[nUsed:m];
101 # nUsed = m;
102 # if (i >= nProcs):
103 # break;
104 # i=i+1;
105 # if (nUsed < nCF):
106 # allcfs[nProcs].append(cflist[i]);
107 # return allcfs;
109#############################################
110## Very rudimentary partitioning - only for tests. The actual code needs to go here.
111 def partitionContDataSelection(self,oneselpars={}):
113 synu = synthesisutils()
114 allselpars = synu.contdatapartition( oneselpars , self.NN )
115 synu.done()
117 casalog.post('Partitioned Selection : {}'.format(allselpars))
118 return allselpars
120#############################################
121## Very rudimentary partitioning - only for tests. The actual code needs to go here.
122 def partitionCubeDataSelection(self,oneselpars={}):
124 synu = synthesisutils()
125 allselpars = synu.cubedatapartition( oneselpars , self.NN )
126 synu.done()
128 casalog.post('Partitioned Selection : {}'.format(allselpars))
129 return allselpars
131#############################################
132 def partitionCubeDeconvolution(self,impars={}):
134 synu = synthesisutils()
135 allimpars = synu.cubeimagepartition( impars , self.NN )
136 synu.done()
138 casalog.post('ImSplit : {}'.format(allimpars))
139 return allimpars
141#############################################
142 def partitionCubeSelection(self, oneselpars={}, oneimpars={}):
143 incsys = oneimpars['csys']
144 nchan = oneimpars['nchan']
145 synu = synthesisutils()
146 allpars = synu.cubedataimagepartition(oneselpars, incsys, self.NN, nchan)
147 synu.done()
149 # casalog.post("Cube Data/Im partitioned selection : {}".format(allpars))
150 return allpars
152#############################################
153 def setupCluster(self):
154 # Initialize cluster
156 # * Terminal: Client logs + Server logs
157 # * casapy-<timestamp>.log: Client logs
158 # * casapy-<timestamp>.log-server-<rank>-host-<hostname>-pid-<pid>: Server logs
159 mpi_clustermanager.set_log_mode('redirect');
161 self.sc=mpi_clustermanager.getCluster()
162 self.sc.set_log_level('DEBUG')
164 self.CL=self.sc._cluster
165 self.nodeList = self.CL.get_engines();
166 numproc=len(self.CL.get_engines())
167 numprocperhost=len(self.nodeList)/len(self.nodeList) if (len(self.nodeList) >0 ) else 1
169 owd=os.getcwd()
170 self.CL.pgc('import os')
171 self.CL.pgc('from numpy import array,int32')
172 self.CL.pgc('os.chdir("'+owd+'")')
173 os.chdir(owd)
174 casalog.post("setupCluster, Setting up {} engines.".format(numproc))
175 return numproc
177#############################################
178 def takedownCluster(self):
179 # Check that all nodes have returned, before stopping the cluster
180 self.checkJobs()
181 casalog.post('Ending use of cluster, but not closing it. Call clustermanager.stop_cluster() to close it if needed.')
182# self.sc.stop_cluster()
183 self.CL=None
184 self.sc=None
186#############################################
187 # This is a blocking call that will wait until jobs are done.
188 def checkJobs(self,joblist=[]):
189 #### MPIInterface related changes
190 numcpu = len(self.nodeList)
192 if len(joblist)==0:
193 joblist = list(range(numcpu))
194 #for k in range(numcpu):
195 for k in self.nodeList:
196 joblist[k-1] = self.CL.odo('casalog.post("node '+str(k)+' has completed its job")', k)
198 casalog.post('checkJobs. Blocking for nodes to finish')
199 over=False
200 while(not over):
201 overone=True
202 time.sleep(1)
203 for k in range(len(joblist)):
204 try:
205 overone = self.CL.check_job(joblist[k],False) and overone
206 except Exception:
207 raise
208 over = overone
209 casalog.post('...done')
211#############################################
212 def runcmd(self, cmdstr="", node=-1):
213 if node >= 0:
214 return self.CL.odo( cmdstr , node)
215 else:
216 self.CL.pgc( cmdstr )
218#############################################
219 def runcmdcheck(self, cmdstr):
220 joblist=[]
221 #### MPIInterface related changes
222 #for node in range(0,self.NN):
223 for node in self.nodeList:
224 joblist.append( self.CL.odo( cmdstr, node ) )
225 self.checkJobs( joblist )
227#############################################
228 def pullval(self, varname="", node=0):
229 return self.CL.pull( varname , node )
231#############################################
232 def pushval(self, varname="", val=None, node=0):
233 return self.CL.push( varname , val, node )
235#############################################
236 def getpath(self, node=0):
237 enginepath = self.sc.get_engine_store(node)
238 if enginepath==None:
239 return ""
240 else:
241 return enginepath
242#############################################
243# def deletepartimages(self, dirname, imname):
244# namelist = shutil.fnmatch.filter( os.listdir(dirname), imname+".*" )
245# #casalog.post("Deleting : " + namelist + ' from ' + dirname + ' starting with ' + imname)
246# for aname in namelist:
247# shutil.rmtree( dirname + "/" + aname )
248#############################################
249 def deletepartimages(self, imagename, node, deldir=False):
250 namelist = shutil.fnmatch.filter( os.listdir(self.getworkdir(imagename, node)), "*" )
251 #casalog.post("Deleting : " + namelist + ' from ' + self.getworkdir(imagename, node) + ' starting with ' + imagename)
252 for aname in namelist:
253 shutil.rmtree( os.path.join(self.getworkdir(imagename, node), aname) )
254 if deldir==True:
255 #casalog.post("Deleting workdirectory : "+self.getworkdir(imagename, node))
256 shutil.rmtree( self.getworkdir(imagename, node) )
258#############################################
259 def getworkdir(self, imagename, nodeid):
260 workdir = ''
261 workdir = os.path.join(self.getpath(nodeid), imagename + '.workdirectory')
263 if( not os.path.exists(workdir) ):
264 os.mkdir( workdir )
266 return workdir
268#############################################
269 def getpartimagename(self, imagename, nodeid):
270 """
271 For imagename = 'imaging_subdir/foo_img', it produces something like:
272 'imaging_subdir/foo_img.workdirectory/foo_img.n5.gridwt' (where n5 is the node idx)
274 :param imagename: imagename as passed to the tclean task
275 :param nodeid: id of MPI node
277 :returns: (full path) name of a part/sub-image for nodeid, produced by concatenating
278 the working directory, the image basename and the node id as a string.
279 """
280 # don't include subdirs again here - the workdir is already inside the subdir(s)
281 image_basename = os.path.basename(imagename)
282 return os.path.join(self.getworkdir(imagename,nodeid), image_basename + '.n' +
283 str(nodeid))
286#############################################