Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_sdcal.py: 91%
171 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import os
2import shutil
4import numpy
5import numpy.random as random
7from casatasks import casalog
8from casatools import calibrater, ms, table
10from . import casaxmlutil, sdutil
11from .mstools import write_history
13# Table tool
14tb = table()
15# Calibrator tool
16cb = calibrater()
17# MS tool
18myms = ms()
21@casaxmlutil.xml_constraints_injector
22@sdutil.sdtask_decorator
23def sdcal(infile=None, calmode='tsys', fraction='10%', noff=-1,
24 width=0.5, elongated=False, applytable='', interp='', spwmap={},
25 outfile='', overwrite=False, field='', spw='', scan='', intent=''):
26 """Externally specify calibration solutions of various types."""
27 # Composite mode: compute calibration table and calibrate
28 if ',' in calmode:
29 handle_composite_mode(locals())
30 return
32 # Single mode: either calibrate or compute calibration table
33 try:
34 # Parameters check
35 if calmode == 'tsys':
36 if scan != '':
37 raise UserWarning("Scan input must be ''(=all) in calmode='tsys'.")
38 if spw != '':
39 raise UserWarning("Spw input must be ''(=all) in calmode='tsys'.")
41 if isinstance(infile, str) and os.path.exists(infile):
42 # check if CORRECTED_DATA is necessary
43 addcorr = calmode == 'apply'
44 cb.setvi(old=True)
45 cb.open(filename=infile, compress=False, addcorr=addcorr, addmodel=False)
46 cb.selectvis(spw=spw, scan=scan, field=field)
47 else:
48 raise Exception('Infile data set not found - please verify the name')
50 if not isinstance(calmode, str):
51 raise Exception("Calmode must be a string")
53 if calmode.lower() not in ['tsys', 'ps', 'otfraster', 'otf', 'apply']:
54 raise Exception(
55 "Calmode must be either 'ps' or 'otfraster' or 'otf' or 'tsys' or 'apply'.")
57 if (not overwrite) and os.path.exists(outfile):
58 raise RuntimeError("overwrite is False and output file exists: {}".format(outfile))
60 # Calibration
61 if calmode == 'apply': # Calibrate using existing tables
62 # single calibration table
63 if isinstance(applytable, str):
64 _table_list = [applytable]
66 # multiple calibration tables
67 if isinstance(applytable, list) or isinstance(applytable, numpy.ndarray):
68 _table_list = applytable
70 # no calibration table
71 if len(_table_list) == 0:
72 raise Exception('Applytable name must be specified.')
74 # check calibration table files
75 for _table_name in _table_list:
76 # empty string
77 if len(_table_name) == 0:
78 raise Exception('Applytable name must be specified.')
79 # unexisting table
80 if not os.path.exists(_table_name):
81 raise Exception("Table doesn't exist: {}".format(_table_name))
83 # warning on usage difference with asap.sdcal2
84 if (outfile != ''):
85 warning_msg = '\n'.join([
86 'The outfile you specified will NOT be created.',
87 "Calibrated data will be stored in a new 'CORRECTED_DATA' column",
88 'inserted in the main table of the input MS file.'
89 ])
90 casalog.post(warning_msg, priority="WARN")
92 if(type(spwmap) != list and (type(spwmap) != dict)):
93 raise Exception('Spwmap type must be list or dictionary.')
95 if (type(spwmap) == dict):
96 MS = infile
97 tb.open(MS + '/SPECTRAL_WINDOW')
98 total_spwID = tb.nrows()
99 tb.close()
101 spwmap_dict = spwmap
102 spwmap_list = list(range(total_spwID))
104 for key, value in spwmap_dict.items():
105 for v in value:
106 if v in spwmap_list:
107 index = spwmap_list.index(v)
108 spwmap_list[index] = int(key)
110 spwmap = spwmap_list
112 # Setup calibrator
113 for _table in _table_list:
114 caltype = inspect_caltype(_table)
115 if caltype == 'UNKNOWN':
116 raise RuntimeError('Applytable \'%s\' is not a caltable format' % (_table))
117 elif caltype == 'B TSYS':
118 cb.setapply(table=_table, spwmap=spwmap, interp=interp, calwt=True)
119 else:
120 # no spw mapping is needed for sky calibration
121 cb.setapply(table=_table, interp=interp, calwt=True)
123 # Calibrate
124 cb.correct(applymode='calflag')
126 # Write to HISTORY table of MS
127 param_names = sdcal.__code__.co_varnames[:sdcal.__code__.co_argcount]
128 vars = locals()
129 param_vals = [vars[p] for p in param_names]
131 write_history(myms, infile, 'sdcal', param_names,
132 param_vals, casalog)
134 else: # Compute calibration table
135 # Reconciliating 'Python world' calmode with 'C++ world' calmode
136 # cpp_calmode[python_calmode] = c++_calmode
137 cpp_calmode = {'tsys': 'tsys',
138 'ps': 'sdsky_ps',
139 'otfraster': 'sdsky_raster',
140 'otf': 'sdsky_otf'
141 }
143 if len(outfile) == 0:
144 raise RuntimeError('Output file name must be specified.')
146 if calmode == 'tsys':
147 cb.specifycal(caltable=outfile, time="", spw=spw, caltype=cpp_calmode[calmode])
148 else:
149 fraction_numeric = to_numeric_fraction(fraction)
150 if noff <= 0 and fraction_numeric >= 0.5:
151 raise ValueError('Too many edge points. fraction must be < 0.5.')
152 # Setup calibrator
153 cb.selectvis(spw=spw, scan=scan, field=field, intent=intent)
154 cb.setsolve(type=cpp_calmode[calmode], table=outfile,
155 fraction=fraction_numeric, numedge=noff)
156 # Compute calibration table
157 cb.solve()
159 except UserWarning as instance:
160 casalog.post('*** UserWarning *** %s' % instance, 'WARN')
162 finally:
163 cb.close()
166def inspect_caltype(table):
167 caltype = 'UNKNOWN'
168 with sdutil.table_manager(table) as tb:
169 if 'VisCal' in tb.keywordnames():
170 caltype = tb.getkeyword('VisCal')
171 return caltype
174def to_numeric_fraction(fraction):
175 try:
176 if isinstance(fraction, str):
177 if len(fraction.strip()) == 0:
178 # invalid, use default
179 fraction_numeric = 0.1
180 else:
181 pos = fraction.strip().find('%')
182 if pos != -1:
183 # percentage
184 fraction_numeric = float(fraction[:pos]) * 0.01
185 else:
186 # direct value
187 fraction_numeric = float(fraction)
188 else:
189 fraction_numeric = float(fraction)
190 except Exception as e:
191 casalog.post(str(e), priority='SEVERE', origin='sdcal')
192 raise RuntimeError('Invalid fraction value (original error message: "%s")' % (str(e)))
194 return fraction_numeric
197def temporary_name(calmode):
198 num_trial = 100
199 for i in range(num_trial):
200 number = random.random_integers(num_trial)
201 name = ('__sdcal_composite_mode_%s_%3s.tab' % (calmode, number)).replace(' ', '0')
202 if not os.path.exists(name):
203 return name
204 raise RuntimeError('Failed to configure temporary caltable name.')
207def temporary_calibration(calmode, arg_template, **kwargs):
208 caltable = temporary_name(calmode)
209 myargs = arg_template.copy()
210 myargs['calmode'] = calmode
211 myargs['outfile'] = caltable
212 # try to keep the existing file although
213 # outfile should never point to existing file
214 myargs['overwrite'] = False
215 # optional argument for sdcal
216 for (k, v) in kwargs.items():
217 if k in myargs:
218 myargs[k] = v
219 sdcal(**myargs)
220 if not os.path.exists(caltable):
221 raise RuntimeError('Failed to create temporary caltable.')
222 return caltable
225def fix_for_intent(calmodes, input_args):
226 if 'tsys' in calmodes and ('otfraster' in calmodes or 'otf' in calmodes):
227 casalog.post(
228 "Intent selection for 'otf' or 'otfraster' should be 'OBSERVE_TARGET#ON_SOURCE'. \n"
229 "However, the task is not allowed to set global intent selection "
230 "since calmode contains 'tsys'. \n"
231 "As a workaround, set intent selection locally when 'otf' or 'otfraster' calibration "
232 "is performed.",
233 priority='WARN')
234 output_args = input_args.copy()
235 output_args['intent'] = 'OBSERVE_TARGET#ON_SOURCE'
236 else:
237 output_args = input_args
238 return output_args
241def handle_composite_mode(args):
242 kwargs = args.copy()
243 calmodes = kwargs['calmode'].split(',')
244 _applytable = kwargs['applytable']
245 if isinstance(_applytable, str):
246 if len(_applytable) > 0:
247 precalibrations = [_applytable]
248 else:
249 precalibrations = []
250 else:
251 precalibrations = list(_applytable[:])
253 applytable_list = []
254 try:
255 # sky calibration
256 if 'ps' in calmodes:
257 # ps calibration
258 applytable_list.append(
259 temporary_calibration('ps', kwargs, spwmap={})
260 )
261 elif 'otfraster' in calmodes:
262 # otfraster calibration
263 kwargs_local = fix_for_intent(calmodes, kwargs)
264 applytable_list.append(
265 temporary_calibration('otfraster', kwargs_local, spwmap={})
266 )
267 elif 'otf' in calmodes:
268 # otf calibration
269 kwargs_local = fix_for_intent(calmodes, kwargs)
270 applytable_list.append(
271 temporary_calibration('otf', kwargs_local, spwmap={})
272 )
274 # Tsys calibration
275 if 'tsys' in calmodes:
276 applytable_list.append(
277 temporary_calibration('tsys', kwargs, field='', spw='', scan='')
278 )
280 # apply temporary caltables
281 if 'apply' in calmodes:
282 if len(applytable_list) == 0:
283 raise RuntimeError("No applytable has been created/registered.")
284 myargs = kwargs.copy()
285 myargs['calmode'] = 'apply'
286 myargs['applytable'] = precalibrations + applytable_list
287 sdcal(**myargs)
289 finally:
290 # clean up temporary tables
291 for _table in applytable_list:
292 if os.path.exists(_table):
293 casalog.post('removing \'%s\'' % (_table), priority='DEBUG')
294 shutil.rmtree(_table)