Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_uvcontsub_old.py: 8%

242 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-31 19:10 +0000

1import os 

2import re 

3import shutil 

4import time 

5import tempfile 

6import numpy as np 

7 

8# shutil.copytree is useless with directories created by tempfile 

9# (or any directories that already exist). 

10from distutils.dir_util import copy_tree 

11 

12from casatools import calibrater, ms, table 

13from casatasks import casalog, virtualconcat 

14 

15from .mstools import write_history 

16from .update_spw import * 

17from .parallel.parallel_data_helper import ParallelDataHelper 

18from .parallel.parallel_task_helper import ParallelTaskHelper 

19 

20mytb = table( ) 

21mycb = calibrater( ) 

22myms = ms() 

23_ms = ms() # task also referenced the old global ms object 

24 

25def uvcontsub_old(vis, field, fitspw, excludechans, combine, solint, fitorder, spw, want_cont): 

26 

27 if ParallelDataHelper.isMMSAndNotServer(vis): 

28 helper = ParallelTaskHelper('uvcontsub_old', locals()) 

29 helper._consolidateOutput = False 

30 retVar = helper.go() 

31 

32 # Gather the list of continuum subtraction-SubMSs  

33 cont_subMS_list = [] 

34 contsub_subMS_list = [] 

35 for subMS in retVar: 

36 if retVar[subMS]: 

37 cont_subMS_list.append(subMS + ".cont") 

38 contsub_subMS_list.append(subMS + ".contsub") 

39 

40 if len(cont_subMS_list) <= 0: 

41 raise ValueError("No continuum-subtracted sub-MSs for concatenation","SEVERE") 

42 

43 # We have to sort the list because otherwise it  

44 # depends on the time the engines dispatches their sub-MSs 

45 cont_subMS_list.sort() 

46 contsub_subMS_list.sort() 

47 

48 # deal with the pointing table 

49 auxfile = "uvcontsub_aux2_"+str(time.time()) 

50 pnrows = 0 

51 try: 

52 mytb.open(vis+'/POINTING') 

53 pnrows = mytb.nrows() 

54 mytb.close() 

55 if(pnrows>0): 

56 shutil.copytree(os.path.realpath(vis+'/POINTING'), auxfile) 

57 except Exception as instance: 

58 raise RuntimeError("Error handling POINTING table %s: %s" % 

59 (vis+'/POINTING',str(instance))) 

60 

61 if want_cont: 

62 try: 

63 virtualconcat(concatvis=helper._arg['vis'] + ".cont",vis=cont_subMS_list, 

64 copypointing=False) 

65 except Exception as instance: 

66 raise RuntimeError("Error concatenating continuum sub-MSs %s: %s" % 

67 (str(cont_subMS_list),str(instance))) 

68 try: 

69 virtualconcat(concatvis=helper._arg['vis'] + ".contsub",vis=contsub_subMS_list, 

70 copypointing=False) 

71 except Exception as instance: 

72 raise RuntimeError("Error concatenating continuum-subtracted sub-MSs %s: %s" % 

73 (str(contsub_subMS_list),str(instance))) 

74 

75 # Remove continuum subtraction-SubMSs 

76 if want_cont: 

77 for subMS in cont_subMS_list: 

78 if (os.system("rm -rf " + subMS ) != 0): 

79 casalog.post("Problem removing continuum sub-MS %s into working directory" % (subMS),'SEVERE') 

80 for subMS in contsub_subMS_list: 

81 if (os.system("rm -rf " + subMS ) != 0): 

82 casalog.post("Problem removing continuum-subtracted sub-MS %s into working directory" % (subMS),'SEVERE') 

83 

84 if(pnrows>0): # put back the POINTING table 

85 casalog.post("Restoring the POINTING table ...",'NORMAL') 

86 try: 

87 if want_cont: 

88 theptab = os.path.realpath(helper._arg['vis'] + ".cont/POINTING") 

89 os.system('rm -rf '+theptab) 

90 shutil.copytree(auxfile, theptab) 

91 theptab = os.path.realpath(helper._arg['vis'] + ".contsub/POINTING") 

92 os.system('rm -rf '+theptab) 

93 os.system('mv '+auxfile+' '+theptab) 

94 except Exception as instance: 

95 raise RuntimeError("Error restoring pointing table from %s: %s" % 

96 (auxfile,str(instance))) 

97 

98 # Restore origin (otherwise gcwrap shows virtualconcat) 

99 casalog.origin('uvcontsub_old') 

100 

101 return 

102 

103 # Run normal code 

104 try: 

105 casalog.origin('uvcontsub_old') 

106 

107 casalog.post('This task is deprecated and will be removed in an upcoming release.', 

108 'WARN') 

109 

110 # Get these checks done and out of the way. 

111 # This one is redundant - it is already checked at the XML level. 

112 if not os.path.isdir(vis): 

113 raise ValueError('Visibility data set not found - please verify the name') 

114 

115 # 

116 if excludechans: 

117 locfitspw=_quantityRangesToChannels(vis,field,fitspw,excludechans) 

118 casalog.post("Exclude channels in fitspw: spws:channels will be used in the fit are:%s" % locfitspw) 

119 elif fitspw.count('Hz'): 

120 locfitspw=_quantityRangesToChannels(vis,field,fitspw,excludechans) 

121 else: 

122 locfitspw=fitspw 

123 # casalog.post("locfitspw=",locfitspw) 

124 mytb.open(vis + '/SPECTRAL_WINDOW') 

125 allspw = '0~' + str(mytb.nrows() - 1) 

126 mytb.close() 

127 

128 # Verify fitspw and spw as plausible 

129 if len(locfitspw)>0: 

130 fitspwerr=subtract_spws(locfitspw,allspw) 

131 if fitspwerr: 

132 badfitspw='' 

133 for ispw in range(len(fitspwerr)): 

134 badfitspw+=str(fitspwerr.pop()) 

135 raise ValueError("fitspw contains non-existent spw(s): "+badfitspw) 

136 if len(spw)>0: 

137 spwerr=subtract_spws(spw,allspw) 

138 if spwerr: 

139 badspw='' 

140 for ispw in range(len(spwerr)): 

141 badspw+=str(spwerr.pop()) 

142 raise ValueError("spw contains non-existent spw(s): "+badspw) 

143 

144 if 'spw' not in combine: 

145 #spwmfitspw = subtract_spws(spw, fitspw) 

146 spwmfitspw = subtract_spws(spw, locfitspw) 

147 if spwmfitspw == 'UNKNOWN': 

148 #spwmfitspw = subtract_spws(allspw, fitspw) 

149 spwmfitspw = subtract_spws(allspw, locfitspw) 

150 if spwmfitspw: 

151 raise ValueError("combine must include 'spw' when the fit is being applied to spws outside fitspw.") 

152 

153 # cb will put the continuum-subtracted data in CORRECTED_DATA, so 

154 # protect vis by splitting out its relevant parts to a working copy. 

155 csvis = vis.rstrip('/') + '.contsub' 

156 

157 # The working copy will need all of the channels in fitspw + spw, so we 

158 # may or may not need to filter it down to spw at the end. 

159 #myfitspw = fitspw 

160 myfitspw = locfitspw 

161 myspw = spw 

162 

163 # We only need the spws in the union of fitspw and spw, but keep all 

164 # the channels or the weights (as used by calibrater) will be messed up. 

165 #tempspw = re.sub(r':[^,]+', '', join_spws(fitspw, spw)) 

166 tempspw = re.sub(r':[^,]+', '', join_spws(locfitspw, spw)) 

167 if tempspw == allspw: 

168 tempspw = '' 

169 if tempspw: 

170 # The split will reindex the spws. Update spw and fitspw. 

171 # Do fitspw first because the spws in spw are supposed to be 

172 # a subset of the ones in fitspw. 

173 casalog.post('split is being run internally, and the selected spws') 

174 casalog.post('will be renumbered to start from 0 in the output!') 

175 

176 # Now get myfitspw. 

177 #myfitspw = update_spwchan(vis, tempspw, fitspw) 

178 myfitspw = update_spwchan(vis, tempspw, locfitspw) 

179 myspw = update_spwchan(vis, tempspw, spw) 

180 

181 final_csvis = csvis 

182 workingdir = os.path.abspath(os.path.dirname(vis.rstrip('/'))) 

183 csvis = tempfile.mkdtemp(prefix=csvis.split('/')[-1], dir=workingdir) 

184 

185 # ms does not have a colnames method, so open vis with tb even though 

186 # it is already open with myms. Note that both use nomodify=True, 

187 # however, and no problem was revealed in testing. 

188 mytb.open(vis, nomodify=True) 

189 if 'CORRECTED_DATA' in mytb.colnames(): 

190 whichcol = 'CORRECTED_DATA' 

191 else: 

192 # DON'T remind the user that split before uvcontsub wastes time - 

193 # scratch columns will eventually go away. 

194 whichcol = 'DATA' 

195 mytb.close() 

196 

197 casalog.post('Preparing to add scratch columns.') 

198 myfield = field 

199 if field == '': 

200 myfield = '*' 

201 if myfield != '*' and set(_ms.msseltoindex(vis, 

202 field=myfield)['field']) == set(_ms.msseltoindex(vis, 

203 field='*')['field']): 

204 myfield = '*' 

205 if whichcol != 'DATA' or tempspw != '' or myfield != '*': 

206 casalog.post('splitting to ' + csvis + ' with spw="' 

207 + tempspw + '"') 

208 myms.open(vis, nomodify=True) 

209 split_result = myms.split(csvis, spw=tempspw, field=myfield, whichcol=whichcol) 

210 myms.close() 

211 # jagonzal (CAS-4113): Take care of the trivial parallelization 

212 if not split_result: 

213 msg = "MSSelectionNullSelection: %s does not have data for spw=%s, field=%s, bailing out!" % (vis,tempspw,field) 

214 shutil.rmtree(csvis) 

215 raise RuntimeError(msg) 

216 else: 

217 # This takes almost 30s/GB. (lustre, 8/2011) 

218 casalog.post('Copying ' + vis + ' to ' + csvis + ' with cp.') 

219 copy_tree(vis, csvis, preserve_symlinks=True) 

220 

221 # It is less confusing if we write the history now that the "root" MS 

222 # is made, but before cb adds its messages. 

223 # 

224 param_names = uvcontsub_old.__code__.co_varnames[:uvcontsub_old.__code__.co_argcount] 

225 local_vars = locals( ) 

226 param_vals = [local_vars[p] for p in param_names] 

227 

228 write_history(myms, csvis, 'uvcontsub_old', param_names, param_vals, 

229 casalog) 

230 

231 if (type(csvis) == str) and os.path.isdir(csvis): 

232 mycb.setvi(old=True,quiet=False); # old VI, for now 

233 mycb.open(csvis) 

234 else: 

235 raise RuntimeError('Visibility data set not found - please verify the name') 

236 

237 # select the data for continuum subtraction 

238 mycb.reset() 

239 mycb.selectvis(spw=myfitspw) 

240 

241 # Set up the solve 

242 amuellertab = tempfile.mkdtemp(prefix='Temp_contsub.tab', 

243 dir=workingdir) 

244 

245 mycb.setsolve(type='A', t=solint, table=amuellertab, combine=combine, 

246 fitorder=fitorder) 

247 

248 # solve for the continuum 

249 mycb.solve() 

250 

251 # subtract the continuum 

252 mycb.selectvis(spw=myspw) 

253 aspwmap=-1 

254 # if we combined on spw in solve, fanout the result with spwmap=-999; 

255 if 'spw' in combine: 

256 aspwmap = -999 

257 mycb.setapply(table=amuellertab, spwmap=aspwmap) 

258 

259 # Generate CORRECTED_DATA without continuum 

260 mycb.correct() 

261 

262 if want_cont: 

263 # Generate MODEL_DATA with only the continuum model 

264 mycb.corrupt() 

265 

266 mycb.close() 

267 

268 # Delete the temporary caltable 

269 shutil.rmtree(amuellertab) 

270 

271 # Move the line data from CORRECTED_DATA to DATA, and do any 

272 # final filtering by spw. 

273 myms.open(csvis) 

274 # Using ^ in spw is untested here! 

275 myms.split(final_csvis, spw=myspw, whichcol='corrected') 

276 myms.close() 

277 

278 #casalog.post("\"want_cont\" = \"%s\"" % want_cont, 'WARN') 

279 #casalog.post("\"csvis\" = \"%s\"" % csvis, 'WARN') 

280 if want_cont: 

281 myms.open(csvis) 

282 # No channel averaging (== skipping for fitorder == 0) is done 

283 # here, but it would be a reasonable option. The user can always 

284 # do it by running split again. 

285 myms.split(final_csvis[:-3], # .contsub -> .cont 

286 whichcol='MODEL_DATA', 

287 spw=myspw) 

288 myms.close() 

289 

290 #casalog.post("rming \"%s\"" % csvis, 'WARN') 

291 shutil.rmtree(csvis) 

292 

293 finally: 

294 mycb.close() # Harmless if cb is closed. 

295 

296 

297def _quantityRangesToChannels(vis,field,infitspw,excludechans): 

298 """ 

299 private function to convert frequnecy (in the future, velocity 

300 as well) ranges in fitspw to channel indexes. 

301 For excludeechans=True, it will select channels outside given by infitspw 

302 returns: list containing new channel ranges 

303 """ 

304 mytb.open(vis+'/SPECTRAL_WINDOW') 

305 nspw=mytb.nrows() 

306 mytb.close() 

307 

308 fullspwids=str(list(range(nspw))).strip('[,]') 

309 tql={'field':field,'spw':fullspwids} 

310 myms.open(vis) 

311 myms.msselect(tql,True) 

312 allsels=myms.msselectedindices() 

313 myms.reset() 

314 # input fitspw selection 

315 tql['spw']=infitspw 

316 myms.msselect(tql,True) 

317 usersels=myms.msselectedindices()['channel'] 

318 myms.close() 

319 # sort the arrays so that chan ranges are 

320 # in order 

321 usersels=usersels[np.lexsort((usersels[:,1],usersels[:,0]))] 

322 spwsels='' 

323 spwid=-1 

324 prevspwid=None 

325 newchanlist=[] 

326 nsels=len(usersels) 

327 # casalog.post("Usersels=",usersels) 

328 if excludechans: 

329 for isel in range(nsels): 

330 prevspwid = spwid 

331 spwid=usersels[isel][0] 

332 lochan=usersels[isel][1] 

333 hichan=usersels[isel][2] 

334 stp=usersels[isel][3] 

335 maxchanid=allsels['channel'][spwid][2] 

336 # find left and right side ranges of the selected range 

337 if spwid != prevspwid: 

338 # first line in the selected spw 

339 if lochan > 0: 

340 outloL=0 

341 outhiL=lochan-1 

342 outloR= (0 if hichan+1>=maxchanid else hichan+1) 

343 if outloR: 

344 if isel<nsels-1 and usersels[isel+1][0]==spwid: 

345 outhiR=usersels[isel+1][1]-1 

346 else: 

347 outhiR=maxchanid 

348 else: 

349 outhiR=0 # higher end of the user selected range reaches maxchanid 

350 # so no right hand side range 

351 # casalog.post("outloL,outhiL,outloR,outhiR==", outloL,outhiL,outloR,outhiR) 

352 else: 

353 # no left hand side range 

354 outloL=0 

355 outhiL=0 

356 outloR=hichan+1 

357 if isel<nsels-1 and usersels[isel+1][0]==spwid: 

358 outhiR=usersels[isel+1][1]-1 

359 else: 

360 outhiR=maxchanid 

361 else: 

362 #expect the left side range is already taken care of 

363 outloL=0 

364 outhiL=0 

365 outloR=hichan+1 

366 if outloR>=maxchanid: 

367 #No more boundaries to consider 

368 outloR=0 

369 outhiR=0 

370 else: 

371 if isel<nsels-1 and usersels[isel+1][0]==spwid: 

372 outhiR=min(usersels[isel+1][1]-1,maxchanid) 

373 else: 

374 outhiR=maxchanid 

375 if outloR > outhiR: 

376 outloR = 0 

377 outhiR = 0 

378 

379 # 

380 if (not(outloL == 0 and outhiL == 0)) and outloL <= outhiL: 

381 newchanlist.append([spwid,outloL,outhiL,stp]) 

382 if (not(outloR == 0 and outhiR == 0)) and outloR <= outhiR: 

383 newchanlist.append([spwid,outloR,outhiR,stp]) 

384 # casalog.post("newchanlist=",newchanlist) 

385 else: 

386 # excludechans=False 

387 newchanlist=usersels 

388 

389 #return newchanlist 

390 # create spw selection string from newchanlist 

391 spwstr='' 

392 for irange in range(len(newchanlist)): 

393 chanrange=newchanlist[irange] 

394 spwstr+=str(chanrange[0])+':'+str(chanrange[1])+'~'+str(chanrange[2]) 

395 if irange != len(newchanlist)-1: 

396 spwstr+=',' 

397 return spwstr