Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_sdcal.py: 91%

171 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import os 

2import shutil 

3 

4import numpy 

5import numpy.random as random 

6 

7from casatasks import casalog 

8from casatools import calibrater, ms, table 

9 

10from . import casaxmlutil, sdutil 

11from .mstools import write_history 

12 

13# Table tool 

14tb = table() 

15# Calibrator tool 

16cb = calibrater() 

17# MS tool 

18myms = ms() 

19 

20 

21@casaxmlutil.xml_constraints_injector 

22@sdutil.sdtask_decorator 

23def sdcal(infile=None, calmode='tsys', fraction='10%', noff=-1, 

24 width=0.5, elongated=False, applytable='', interp='', spwmap={}, 

25 outfile='', overwrite=False, field='', spw='', scan='', intent=''): 

26 """Externally specify calibration solutions of various types.""" 

27 # Composite mode: compute calibration table and calibrate 

28 if ',' in calmode: 

29 handle_composite_mode(locals()) 

30 return 

31 

32 # Single mode: either calibrate or compute calibration table 

33 try: 

34 # Parameters check 

35 if calmode == 'tsys': 

36 if scan != '': 

37 raise UserWarning("Scan input must be ''(=all) in calmode='tsys'.") 

38 if spw != '': 

39 raise UserWarning("Spw input must be ''(=all) in calmode='tsys'.") 

40 

41 if isinstance(infile, str) and os.path.exists(infile): 

42 # check if CORRECTED_DATA is necessary 

43 addcorr = calmode == 'apply' 

44 cb.setvi(old=True) 

45 cb.open(filename=infile, compress=False, addcorr=addcorr, addmodel=False) 

46 cb.selectvis(spw=spw, scan=scan, field=field) 

47 else: 

48 raise Exception('Infile data set not found - please verify the name') 

49 

50 if not isinstance(calmode, str): 

51 raise Exception("Calmode must be a string") 

52 

53 if calmode.lower() not in ['tsys', 'ps', 'otfraster', 'otf', 'apply']: 

54 raise Exception( 

55 "Calmode must be either 'ps' or 'otfraster' or 'otf' or 'tsys' or 'apply'.") 

56 

57 if (not overwrite) and os.path.exists(outfile): 

58 raise RuntimeError("overwrite is False and output file exists: {}".format(outfile)) 

59 

60 # Calibration 

61 if calmode == 'apply': # Calibrate using existing tables 

62 # single calibration table 

63 if isinstance(applytable, str): 

64 _table_list = [applytable] 

65 

66 # multiple calibration tables 

67 if isinstance(applytable, list) or isinstance(applytable, numpy.ndarray): 

68 _table_list = applytable 

69 

70 # no calibration table 

71 if len(_table_list) == 0: 

72 raise Exception('Applytable name must be specified.') 

73 

74 # check calibration table files 

75 for _table_name in _table_list: 

76 # empty string 

77 if len(_table_name) == 0: 

78 raise Exception('Applytable name must be specified.') 

79 # unexisting table 

80 if not os.path.exists(_table_name): 

81 raise Exception("Table doesn't exist: {}".format(_table_name)) 

82 

83 # warning on usage difference with asap.sdcal2 

84 if (outfile != ''): 

85 warning_msg = '\n'.join([ 

86 'The outfile you specified will NOT be created.', 

87 "Calibrated data will be stored in a new 'CORRECTED_DATA' column", 

88 'inserted in the main table of the input MS file.' 

89 ]) 

90 casalog.post(warning_msg, priority="WARN") 

91 

92 if(type(spwmap) != list and (type(spwmap) != dict)): 

93 raise Exception('Spwmap type must be list or dictionary.') 

94 

95 if (type(spwmap) == dict): 

96 MS = infile 

97 tb.open(MS + '/SPECTRAL_WINDOW') 

98 total_spwID = tb.nrows() 

99 tb.close() 

100 

101 spwmap_dict = spwmap 

102 spwmap_list = list(range(total_spwID)) 

103 

104 for key, value in spwmap_dict.items(): 

105 for v in value: 

106 if v in spwmap_list: 

107 index = spwmap_list.index(v) 

108 spwmap_list[index] = int(key) 

109 

110 spwmap = spwmap_list 

111 

112 # Setup calibrator 

113 for _table in _table_list: 

114 caltype = inspect_caltype(_table) 

115 if caltype == 'UNKNOWN': 

116 raise RuntimeError('Applytable \'%s\' is not a caltable format' % (_table)) 

117 elif caltype == 'B TSYS': 

118 cb.setapply(table=_table, spwmap=spwmap, interp=interp, calwt=True) 

119 else: 

120 # no spw mapping is needed for sky calibration 

121 cb.setapply(table=_table, interp=interp, calwt=True) 

122 

123 # Calibrate 

124 cb.correct(applymode='calflag') 

125 

126 # Write to HISTORY table of MS 

127 param_names = sdcal.__code__.co_varnames[:sdcal.__code__.co_argcount] 

128 vars = locals() 

129 param_vals = [vars[p] for p in param_names] 

130 

131 write_history(myms, infile, 'sdcal', param_names, 

132 param_vals, casalog) 

133 

134 else: # Compute calibration table 

135 # Reconciliating 'Python world' calmode with 'C++ world' calmode 

136 # cpp_calmode[python_calmode] = c++_calmode 

137 cpp_calmode = {'tsys': 'tsys', 

138 'ps': 'sdsky_ps', 

139 'otfraster': 'sdsky_raster', 

140 'otf': 'sdsky_otf' 

141 } 

142 

143 if len(outfile) == 0: 

144 raise RuntimeError('Output file name must be specified.') 

145 

146 if calmode == 'tsys': 

147 cb.specifycal(caltable=outfile, time="", spw=spw, caltype=cpp_calmode[calmode]) 

148 else: 

149 fraction_numeric = to_numeric_fraction(fraction) 

150 if noff <= 0 and fraction_numeric >= 0.5: 

151 raise ValueError('Too many edge points. fraction must be < 0.5.') 

152 # Setup calibrator 

153 cb.selectvis(spw=spw, scan=scan, field=field, intent=intent) 

154 cb.setsolve(type=cpp_calmode[calmode], table=outfile, 

155 fraction=fraction_numeric, numedge=noff) 

156 # Compute calibration table 

157 cb.solve() 

158 

159 except UserWarning as instance: 

160 casalog.post('*** UserWarning *** %s' % instance, 'WARN') 

161 

162 finally: 

163 cb.close() 

164 

165 

166def inspect_caltype(table): 

167 caltype = 'UNKNOWN' 

168 with sdutil.table_manager(table) as tb: 

169 if 'VisCal' in tb.keywordnames(): 

170 caltype = tb.getkeyword('VisCal') 

171 return caltype 

172 

173 

174def to_numeric_fraction(fraction): 

175 try: 

176 if isinstance(fraction, str): 

177 if len(fraction.strip()) == 0: 

178 # invalid, use default 

179 fraction_numeric = 0.1 

180 else: 

181 pos = fraction.strip().find('%') 

182 if pos != -1: 

183 # percentage 

184 fraction_numeric = float(fraction[:pos]) * 0.01 

185 else: 

186 # direct value 

187 fraction_numeric = float(fraction) 

188 else: 

189 fraction_numeric = float(fraction) 

190 except Exception as e: 

191 casalog.post(str(e), priority='SEVERE', origin='sdcal') 

192 raise RuntimeError('Invalid fraction value (original error message: "%s")' % (str(e))) 

193 

194 return fraction_numeric 

195 

196 

197def temporary_name(calmode): 

198 num_trial = 100 

199 for i in range(num_trial): 

200 number = random.random_integers(num_trial) 

201 name = ('__sdcal_composite_mode_%s_%3s.tab' % (calmode, number)).replace(' ', '0') 

202 if not os.path.exists(name): 

203 return name 

204 raise RuntimeError('Failed to configure temporary caltable name.') 

205 

206 

207def temporary_calibration(calmode, arg_template, **kwargs): 

208 caltable = temporary_name(calmode) 

209 myargs = arg_template.copy() 

210 myargs['calmode'] = calmode 

211 myargs['outfile'] = caltable 

212 # try to keep the existing file although 

213 # outfile should never point to existing file 

214 myargs['overwrite'] = False 

215 # optional argument for sdcal 

216 for (k, v) in kwargs.items(): 

217 if k in myargs: 

218 myargs[k] = v 

219 sdcal(**myargs) 

220 if not os.path.exists(caltable): 

221 raise RuntimeError('Failed to create temporary caltable.') 

222 return caltable 

223 

224 

225def fix_for_intent(calmodes, input_args): 

226 if 'tsys' in calmodes and ('otfraster' in calmodes or 'otf' in calmodes): 

227 casalog.post( 

228 "Intent selection for 'otf' or 'otfraster' should be 'OBSERVE_TARGET#ON_SOURCE'. \n" 

229 "However, the task is not allowed to set global intent selection " 

230 "since calmode contains 'tsys'. \n" 

231 "As a workaround, set intent selection locally when 'otf' or 'otfraster' calibration " 

232 "is performed.", 

233 priority='WARN') 

234 output_args = input_args.copy() 

235 output_args['intent'] = 'OBSERVE_TARGET#ON_SOURCE' 

236 else: 

237 output_args = input_args 

238 return output_args 

239 

240 

241def handle_composite_mode(args): 

242 kwargs = args.copy() 

243 calmodes = kwargs['calmode'].split(',') 

244 _applytable = kwargs['applytable'] 

245 if isinstance(_applytable, str): 

246 if len(_applytable) > 0: 

247 precalibrations = [_applytable] 

248 else: 

249 precalibrations = [] 

250 else: 

251 precalibrations = list(_applytable[:]) 

252 

253 applytable_list = [] 

254 try: 

255 # sky calibration 

256 if 'ps' in calmodes: 

257 # ps calibration 

258 applytable_list.append( 

259 temporary_calibration('ps', kwargs, spwmap={}) 

260 ) 

261 elif 'otfraster' in calmodes: 

262 # otfraster calibration 

263 kwargs_local = fix_for_intent(calmodes, kwargs) 

264 applytable_list.append( 

265 temporary_calibration('otfraster', kwargs_local, spwmap={}) 

266 ) 

267 elif 'otf' in calmodes: 

268 # otf calibration 

269 kwargs_local = fix_for_intent(calmodes, kwargs) 

270 applytable_list.append( 

271 temporary_calibration('otf', kwargs_local, spwmap={}) 

272 ) 

273 

274 # Tsys calibration 

275 if 'tsys' in calmodes: 

276 applytable_list.append( 

277 temporary_calibration('tsys', kwargs, field='', spw='', scan='') 

278 ) 

279 

280 # apply temporary caltables 

281 if 'apply' in calmodes: 

282 if len(applytable_list) == 0: 

283 raise RuntimeError("No applytable has been created/registered.") 

284 myargs = kwargs.copy() 

285 myargs['calmode'] = 'apply' 

286 myargs['applytable'] = precalibrations + applytable_list 

287 sdcal(**myargs) 

288 

289 finally: 

290 # clean up temporary tables 

291 for _table in applytable_list: 

292 if os.path.exists(_table): 

293 casalog.post('removing \'%s\'' % (_table), priority='DEBUG') 

294 shutil.rmtree(_table)