Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/imagerhelpers/parallel_imager_helper.py: 24%

140 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import os 

2import math 

3import shutil 

4import string 

5import time 

6import re; 

7import copy 

8 

9from casatools import synthesisutils 

10from casatasks import casalog 

11 

12''' 

13A set of helper functions for the tasks tclean 

14 

15Summary... 

16  

17''' 

18 

19############################################# 

20### Parallel Imager Helper. 

21############################################# 

22#casalog.post('Using clustermanager from MPIInterface', 'WARN') 

23try: 

24 from casampi.MPIInterface import MPIInterface as mpi_clustermanager 

25 mpi_available = True 

26except ImportError: 

27 mpi_available = False 

28 

29class PyParallelImagerHelper(): 

30 

31 def __init__(self): 

32 

33 ############### Cluster Info 

34 self.CL=None 

35 self.sc=None 

36 self.nodeList=None; 

37 # Initialize cluster, and partitioning. 

38 ############### Number of nodes to parallelize on 

39 

40 # self.nodeList gets filled by setupCluster() 

41 self.NN = self.setupCluster() 

42 

43 def getNodeList(self): 

44 return self.nodeList; 

45 

46############################################# 

47 def chunkify(self,lst,n): 

48 return [ lst[i::n] for i in range(n) ] 

49 

50 def partitionCFCacheList(self,gridPars): 

51 

52 cfcName = gridPars['cfcache']; 

53 cflist=[]; 

54 if (not (cfcName == '')): 

55 cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)]; 

56 nCF = len(cflist); 

57 nProcs=len(self.nodeList); 

58 

59 if (nProcs > nCF): 

60 n=nCF; 

61 else: 

62 n=nProcs; 

63 if (nCF > 0): 

64 casalog.post("########################################################"); 

65 casalog.post("nCF = " + str(nCF) + " nProcs = " + str(n) + " NodeList=" + str(self.nodeList)); 

66 casalog.post("########################################################"); 

67 xx=self.chunkify(cflist,n); 

68 allcfs={}; 

69 for i in range(n): 

70 allcfs[i+1]=xx[i]; 

71 

72 return allcfs; 

73############################################# 

74# The above version works better (better balanced chunking). 

75# Keeping the code below in the file sometime, just in case...(SB). 

76 # def partitionCFCacheList(self,gridPars): 

77 

78 # cfcName = gridPars['cfcache']; 

79 # cflist=[]; 

80 # if (not (cfcName == '')): 

81 # cflist=[f for f in os.listdir(cfcName) if re.match(r'CFS*', f)]; 

82 

83 # nCF = len(cflist); 

84 # nProcs=len(self.nodeList); 

85 

86 # casalog.post("########################################################") 

87 # casalog.post("nCF = " + nCF + " nProcs = " + nProcs + " NodeList=" + self.nodeList) 

88 # casalog.post("########################################################") 

89 

90 # #n0=int(nCF/self.NN); 

91 # n0=int(float(nCF)/nProcs+0.5); 

92 # if (nProcs >= nCF): 

93 # n0 = 1; 

94 # allcfs = {}; 

95 # nUsed=0; i=1; 

96 # while (nUsed < nCF): 

97 # m = nUsed+n0; 

98 # if (m > nCF):  

99 # m=nCF; 

100 # allcfs[i]=cflist[nUsed:m]; 

101 # nUsed = m; 

102 # if (i >= nProcs): 

103 # break; 

104 # i=i+1; 

105 # if (nUsed < nCF): 

106 # allcfs[nProcs].append(cflist[i]); 

107 # return allcfs; 

108 

109############################################# 

110## Very rudimentary partitioning - only for tests. The actual code needs to go here. 

111 def partitionContDataSelection(self,oneselpars={}): 

112 

113 synu = synthesisutils() 

114 allselpars = synu.contdatapartition( oneselpars , self.NN ) 

115 synu.done() 

116 

117 casalog.post('Partitioned Selection : {}'.format(allselpars)) 

118 return allselpars 

119 

120############################################# 

121## Very rudimentary partitioning - only for tests. The actual code needs to go here. 

122 def partitionCubeDataSelection(self,oneselpars={}): 

123 

124 synu = synthesisutils() 

125 allselpars = synu.cubedatapartition( oneselpars , self.NN ) 

126 synu.done() 

127 

128 casalog.post('Partitioned Selection : {}'.format(allselpars)) 

129 return allselpars 

130 

131############################################# 

132 def partitionCubeDeconvolution(self,impars={}): 

133 

134 synu = synthesisutils() 

135 allimpars = synu.cubeimagepartition( impars , self.NN ) 

136 synu.done() 

137 

138 casalog.post('ImSplit : {}'.format(allimpars)) 

139 return allimpars 

140 

141############################################# 

142 def partitionCubeSelection(self, oneselpars={}, oneimpars={}): 

143 incsys = oneimpars['csys'] 

144 nchan = oneimpars['nchan'] 

145 synu = synthesisutils() 

146 allpars = synu.cubedataimagepartition(oneselpars, incsys, self.NN, nchan) 

147 synu.done() 

148 

149 # casalog.post("Cube Data/Im partitioned selection : {}".format(allpars)) 

150 return allpars 

151 

152############################################# 

153 def setupCluster(self): 

154 # Initialize cluster 

155 

156 # * Terminal: Client logs + Server logs 

157 # * casapy-<timestamp>.log: Client logs 

158 # * casapy-<timestamp>.log-server-<rank>-host-<hostname>-pid-<pid>: Server logs  

159 mpi_clustermanager.set_log_mode('redirect'); 

160 

161 self.sc=mpi_clustermanager.getCluster() 

162 self.sc.set_log_level('DEBUG') 

163 

164 self.CL=self.sc._cluster 

165 self.nodeList = self.CL.get_engines(); 

166 numproc=len(self.CL.get_engines()) 

167 numprocperhost=len(self.nodeList)/len(self.nodeList) if (len(self.nodeList) >0 ) else 1 

168 

169 owd=os.getcwd() 

170 self.CL.pgc('import os') 

171 self.CL.pgc('from numpy import array,int32') 

172 self.CL.pgc('os.chdir("'+owd+'")') 

173 os.chdir(owd) 

174 casalog.post("setupCluster, Setting up {} engines.".format(numproc)) 

175 return numproc 

176 

177############################################# 

178 def takedownCluster(self): 

179 # Check that all nodes have returned, before stopping the cluster 

180 self.checkJobs() 

181 casalog.post('Ending use of cluster, but not closing it. Call clustermanager.stop_cluster() to close it if needed.') 

182# self.sc.stop_cluster() 

183 self.CL=None 

184 self.sc=None 

185 

186############################################# 

187 # This is a blocking call that will wait until jobs are done. 

188 def checkJobs(self,joblist=[]): 

189 #### MPIInterface related changes 

190 numcpu = len(self.nodeList) 

191 

192 if len(joblist)==0: 

193 joblist = list(range(numcpu)) 

194 #for k in range(numcpu): 

195 for k in self.nodeList: 

196 joblist[k-1] = self.CL.odo('casalog.post("node '+str(k)+' has completed its job")', k) 

197 

198 casalog.post('checkJobs. Blocking for nodes to finish') 

199 over=False 

200 while(not over): 

201 overone=True 

202 time.sleep(1) 

203 for k in range(len(joblist)): 

204 try: 

205 overone = self.CL.check_job(joblist[k],False) and overone 

206 except Exception: 

207 raise 

208 over = overone 

209 casalog.post('...done') 

210 

211############################################# 

212 def runcmd(self, cmdstr="", node=-1): 

213 if node >= 0: 

214 return self.CL.odo( cmdstr , node) 

215 else: 

216 self.CL.pgc( cmdstr ) 

217 

218############################################# 

219 def runcmdcheck(self, cmdstr): 

220 joblist=[] 

221 #### MPIInterface related changes 

222 #for node in range(0,self.NN): 

223 for node in self.nodeList: 

224 joblist.append( self.CL.odo( cmdstr, node ) ) 

225 self.checkJobs( joblist ) 

226 

227############################################# 

228 def pullval(self, varname="", node=0): 

229 return self.CL.pull( varname , node ) 

230 

231############################################# 

232 def pushval(self, varname="", val=None, node=0): 

233 return self.CL.push( varname , val, node ) 

234 

235############################################# 

236 def getpath(self, node=0): 

237 enginepath = self.sc.get_engine_store(node) 

238 if enginepath==None: 

239 return "" 

240 else: 

241 return enginepath 

242############################################# 

243# def deletepartimages(self, dirname, imname): 

244# namelist = shutil.fnmatch.filter( os.listdir(dirname), imname+".*" ) 

245# #casalog.post("Deleting : " + namelist + ' from ' + dirname + ' starting with ' + imname) 

246# for aname in namelist: 

247# shutil.rmtree( dirname + "/" + aname ) 

248############################################# 

249 def deletepartimages(self, imagename, node, deldir=False): 

250 namelist = shutil.fnmatch.filter( os.listdir(self.getworkdir(imagename, node)), "*" ) 

251 #casalog.post("Deleting : " + namelist + ' from ' + self.getworkdir(imagename, node) + ' starting with ' + imagename) 

252 for aname in namelist: 

253 shutil.rmtree( os.path.join(self.getworkdir(imagename, node), aname) ) 

254 if deldir==True: 

255 #casalog.post("Deleting workdirectory : "+self.getworkdir(imagename, node)) 

256 shutil.rmtree( self.getworkdir(imagename, node) ) 

257 

258############################################# 

259 def getworkdir(self, imagename, nodeid): 

260 workdir = '' 

261 workdir = os.path.join(self.getpath(nodeid), imagename + '.workdirectory') 

262 

263 if( not os.path.exists(workdir) ): 

264 os.mkdir( workdir ) 

265 

266 return workdir 

267 

268############################################# 

269 def getpartimagename(self, imagename, nodeid): 

270 """ 

271 For imagename = 'imaging_subdir/foo_img', it produces something like: 

272 'imaging_subdir/foo_img.workdirectory/foo_img.n5.gridwt' (where n5 is the node idx) 

273 

274 :param imagename: imagename as passed to the tclean task 

275 :param nodeid: id of MPI node 

276 

277 :returns: (full path) name of a part/sub-image for nodeid, produced by concatenating 

278 the working directory, the image basename and the node id as a string. 

279 """ 

280 # don't include subdirs again here - the workdir is already inside the subdir(s) 

281 image_basename = os.path.basename(imagename) 

282 return os.path.join(self.getworkdir(imagename,nodeid), image_basename + '.n' + 

283 str(nodeid)) 

284 

285 

286############################################# 

287 

288