Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_gencal.py: 89%
112 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import os
2import warnings
3import numpy as np
5from casatasks import casalog
6from casatools import calibrater
7from . import correct_ant_posns as getantposns
8from .jyperk import gen_factor_via_web_api, JyPerKReader4File
9from .eop import generate_eop
11_cb = calibrater()
14def gencal(vis=None, caltable=None, caltype=None, infile='None',
15 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0,
16 spw=None, antenna=None, pol=None,
17 parameter=None, uniform=None):
18 """Externally specify calibration solutions of various types.
20 Arguments:
21 vis {str} -- The file path stored the visibility data.
22 caltable {str} -- A file name which store the caltable.
23 caltype {str} -- The calibration type.
24 Subparameter of caltype='gc|gceff|tecim|jyperk'
25 infile {str} -- Specifies the name of the file to read.
26 subparameter of caltype='jyperk:
27 endpoint {str} -- The endpoint of the Jy/K DB Web API to access.
28 options are 'asdm' (default), 'model-fit', 'interpolation'.
29 timeout {int} -- Maximum waiting time [sec] for the Web API access,
30 defaults to 180 sec.
31 retry {int} -- Number of retry when the Web API access fails,
32 defaults to 3 times.
33 retry_wait_time {int} -- Waiting time [sec] until next query
34 when the Web API access fails, defaults to 5 sec.
35 spw {str} -- The spectral windows.
36 antenna {str} -- Select data based on antenna/baseline.
37 pol {str} -- Polarization selection for specified parameters.
38 parameter {doubleVec} -- The calibration values.
39 uniform {bool} -- Assume uniform calibration values across the array.
40 """
42 # validate arguments
43 if (caltable == ''):
44 raise ValueError('A caltable name must be specified')
46 if caltype == 'tecim' and not (type(infile) == str and os.path.exists(infile)):
47 raise ValueError('An existing tec map must be specified in infile')
49 if caltype == 'jyperk' and endpoint not in ['asdm', 'interpolation', 'model-fit']:
50 raise ValueError('When the caltype is jyperk, endpoint must be one of asdm, interpolation or model-fit')
52 if not ((type(vis) == str) and (os.path.exists(vis))):
53 raise ValueError('Visibility data set not found - please verify the name')
55 if caltype not in ['antpos', 'jyperk', 'eop']:
56 gencal_type = 'general'
57 else:
58 gencal_type = caltype
60 # If developer want to add new gencal task, developer should develop a new gencal class and
61 # add the class to __gencal_factory dictionary.
62 gencal = __gencal_factory[gencal_type]
63 gencal.gencal(vis=vis, caltable=caltable, caltype=caltype, infile=infile,
64 endpoint=endpoint, timeout=timeout, retry=retry, retry_wait_time=retry_wait_time,
65 ant_pos_time_limit=ant_pos_time_limit, spw=spw, antenna=antenna, pol=pol, parameter=parameter, uniform=uniform)
68class GeneralGencal():
69 @classmethod
70 def gencal(cls, vis=None, caltable=None, caltype=None, infile='None',
71 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0,
72 spw=None, antenna=None, pol=None,
73 parameter=None, uniform=None):
74 try:
75 # don't need scr col for this
76 _cb.open(filename=vis, compress=False, addcorr=False, addmodel=False)
77 _cb.specifycal(caltable=caltable, time='', spw=spw, antenna=antenna, pol=pol,
78 caltype=caltype, parameter=parameter, infile=infile,
79 uniform=uniform)
81 except UserWarning as instance:
82 casalog.post('*** UserWarning *** %s' % instance, 'WARN')
84 finally:
85 _cb.close()
88class AntposGencal():
89 @classmethod
90 def gencal(cls, vis=None, caltable=None, caltype=None, infile='None',
91 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0,
92 spw=None, antenna=None, pol=None,
93 parameter=None, uniform=None):
94 try:
95 # don't need scr col for this
96 _cb.open(filename=vis, compress=False, addcorr=False, addmodel=False)
98 # use the corrected anteanna positions from a JSON file
99 if infile is not 'None' and infile is not '':
100 if antenna is not '' or pol is not '' or len(parameter) != 0:
101 raise ValueError('When using infile for ALMA the caltype is '
102 'antpos, antenna, pol and parameter must be empty')
103 antenna, parameter = getantposns.correct_ant_posns_alma_json(vis, infile)
105 # call a Python function to retreive ant position offsets automatically (EVLA only)
106 elif antenna == '':
107 casalog.post(" Determine antenna position offsets from the baseline correction database")
108 # correct_ant_posns returns a list , [return_code, antennas, offsets]
109 antenna_offsets = getantposns.correct_ant_posns(vis, False, ant_pos_time_limit)
111 if ((len(antenna_offsets) == 3) and
112 (int(antenna_offsets[0]) == 0) and
113 (len(antenna_offsets[1]) > 0)):
114 antenna = antenna_offsets[1]
115 parameter = antenna_offsets[2]
116 else:
117 # raise Exception, 'No offsets found. No caltable created.'
118 warnings.simplefilter('error', UserWarning)
119 warnings.warn('No offsets found. No caltable created.')
121 _cb.specifycal(caltable=caltable, time='', spw=spw, antenna=antenna, pol=pol,
122 caltype=caltype, parameter=parameter, infile=infile,
123 uniform=uniform)
125 except UserWarning as instance:
126 casalog.post('*** UserWarning *** %s' % instance, 'WARN')
128 finally:
129 _cb.close()
132class JyperkGencal():
133 """A class to generate caltable using the Jy/K DB or the factor CSV file.
135 This class will be called if the caltype is 'jyperk'.
136 """
138 @classmethod
139 def gencal(cls, vis=None, caltable=None, caltype=None, infile='None',
140 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0,
141 spw=None, antenna=None, pol=None,
142 parameter=None, uniform=None):
143 """Generate calibration table."""
144 try:
145 # don't need scr col for this
146 _cb.open(filename=vis, compress=False, addcorr=False, addmodel=False)
148 specifycal_input_list = cls.__gen_specifycal_input(
149 vis=vis,
150 spw=spw,
151 endpoint=endpoint,
152 infile=infile,
153 timeout=timeout,
154 retry=retry,
155 retry_wait_time=retry_wait_time
156 )
157 for selection, param in specifycal_input_list:
158 pol = cls.__convert_to_pol_selection(polspec=selection['pol'])
159 _cb.specifycal(caltable=caltable, time='', spw=selection['spw'],
160 caltype='amp', antenna=selection['antenna'], pol=pol,
161 parameter=param, infile='', uniform=uniform)
163 except UserWarning as instance:
164 casalog.post('*** UserWarning *** %s' % instance, 'WARN')
166 finally:
167 _cb.close()
169 @classmethod
170 def __gen_specifycal_input(cls, vis=None, spw='*',
171 endpoint='asdm', infile=None,
172 timeout=180, retry=3, retry_wait_time=5):
173 # The default infile is defined 'string' in gencal.xml.
174 if infile == '':
175 infile = None
177 if type(infile) is str:
178 f = JyPerKReader4File(infile)
179 factors = f.get()
181 elif infile is None:
182 factors = gen_factor_via_web_api(vis, spw=spw,
183 endpoint=endpoint,
184 timeout=timeout, retry=retry,
185 retry_wait_time=retry_wait_time)
186 else:
187 raise Exception('The infile argument should be str or None.')
189 factors = JyperkGencal.__extract_valid_factor(factors, os.path.basename(vis))
190 if len(factors) == 0:
191 raise Exception('There is no factor.')
193 for factor in factors:
194 selection = {}
195 selection['vis'] = factor[0]
196 selection['antenna'] = factor[1]
197 selection['spw'] = factor[2]
198 selection['pol'] = factor[3]
200 yield selection, 1/np.sqrt(float(factor[4]))
202 @classmethod
203 def __extract_valid_factor(cls, factors, vis_key):
204 valid_factors = []
205 for factor in factors:
206 if factor[0] == vis_key:
207 valid_factors.append(factor)
208 return valid_factors
210 @classmethod
211 def __convert_to_pol_selection(cls, polspec: str) -> str:
212 if polspec in ['I', '']:
213 # apply the value to all polarizations
214 pol = ''
215 elif polspec in ['XX', 'YY', 'RR', 'LL']:
216 pol = polspec[0]
217 else:
218 # unexpected value, no pol selection applied
219 pol = ''
221 return pol
223class EOPGencal():
224 """A class to generate caltable from update EOP values.
226 This class will be called if the caltype is 'eop'.
227 """
229 @classmethod
230 def gencal(cls, vis=None, caltable=None, caltype=None, infile='None',
231 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0,
232 spw=None, antenna=None, pol=None,
233 parameter=None, uniform=None):
234 """Generate calibration table."""
235 try:
236 # don't need scr col for this
237 _cb.open(filename=vis, compress=False, addcorr=False, addmodel=False)
238 _cb.createcaltable(caltable=caltable, partype='Real', caltype='Fringe Jones', singlechan=True)
239 _cb.close()
240 generate_eop(vis, caltable, infile)
242 except UserWarning as instance:
243 casalog.post('*** UserWarning *** %s' % instance, 'WARN')
245 finally:
246 _cb.close()
249__gencal_factory = {
250 'general': GeneralGencal,
251 'antpos': AntposGencal,
252 'jyperk': JyperkGencal,
253 'eop': EOPGencal,
254}