Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_gencal.py: 89%

112 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import os 

2import warnings 

3import numpy as np 

4 

5from casatasks import casalog 

6from casatools import calibrater 

7from . import correct_ant_posns as getantposns 

8from .jyperk import gen_factor_via_web_api, JyPerKReader4File 

9from .eop import generate_eop 

10 

11_cb = calibrater() 

12 

13 

14def gencal(vis=None, caltable=None, caltype=None, infile='None', 

15 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0, 

16 spw=None, antenna=None, pol=None, 

17 parameter=None, uniform=None): 

18 """Externally specify calibration solutions of various types. 

19 

20 Arguments: 

21 vis {str} -- The file path stored the visibility data. 

22 caltable {str} -- A file name which store the caltable. 

23 caltype {str} -- The calibration type. 

24 Subparameter of caltype='gc|gceff|tecim|jyperk' 

25 infile {str} -- Specifies the name of the file to read. 

26 subparameter of caltype='jyperk: 

27 endpoint {str} -- The endpoint of the Jy/K DB Web API to access. 

28 options are 'asdm' (default), 'model-fit', 'interpolation'. 

29 timeout {int} -- Maximum waiting time [sec] for the Web API access, 

30 defaults to 180 sec. 

31 retry {int} -- Number of retry when the Web API access fails, 

32 defaults to 3 times. 

33 retry_wait_time {int} -- Waiting time [sec] until next query 

34 when the Web API access fails, defaults to 5 sec. 

35 spw {str} -- The spectral windows. 

36 antenna {str} -- Select data based on antenna/baseline. 

37 pol {str} -- Polarization selection for specified parameters. 

38 parameter {doubleVec} -- The calibration values. 

39 uniform {bool} -- Assume uniform calibration values across the array. 

40 """ 

41 

42 # validate arguments 

43 if (caltable == ''): 

44 raise ValueError('A caltable name must be specified') 

45 

46 if caltype == 'tecim' and not (type(infile) == str and os.path.exists(infile)): 

47 raise ValueError('An existing tec map must be specified in infile') 

48 

49 if caltype == 'jyperk' and endpoint not in ['asdm', 'interpolation', 'model-fit']: 

50 raise ValueError('When the caltype is jyperk, endpoint must be one of asdm, interpolation or model-fit') 

51 

52 if not ((type(vis) == str) and (os.path.exists(vis))): 

53 raise ValueError('Visibility data set not found - please verify the name') 

54 

55 if caltype not in ['antpos', 'jyperk', 'eop']: 

56 gencal_type = 'general' 

57 else: 

58 gencal_type = caltype 

59 

60 # If developer want to add new gencal task, developer should develop a new gencal class and 

61 # add the class to __gencal_factory dictionary. 

62 gencal = __gencal_factory[gencal_type] 

63 gencal.gencal(vis=vis, caltable=caltable, caltype=caltype, infile=infile, 

64 endpoint=endpoint, timeout=timeout, retry=retry, retry_wait_time=retry_wait_time, 

65 ant_pos_time_limit=ant_pos_time_limit, spw=spw, antenna=antenna, pol=pol, parameter=parameter, uniform=uniform) 

66 

67 

68class GeneralGencal(): 

69 @classmethod 

70 def gencal(cls, vis=None, caltable=None, caltype=None, infile='None', 

71 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0, 

72 spw=None, antenna=None, pol=None, 

73 parameter=None, uniform=None): 

74 try: 

75 # don't need scr col for this 

76 _cb.open(filename=vis, compress=False, addcorr=False, addmodel=False) 

77 _cb.specifycal(caltable=caltable, time='', spw=spw, antenna=antenna, pol=pol, 

78 caltype=caltype, parameter=parameter, infile=infile, 

79 uniform=uniform) 

80 

81 except UserWarning as instance: 

82 casalog.post('*** UserWarning *** %s' % instance, 'WARN') 

83 

84 finally: 

85 _cb.close() 

86 

87 

88class AntposGencal(): 

89 @classmethod 

90 def gencal(cls, vis=None, caltable=None, caltype=None, infile='None', 

91 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0, 

92 spw=None, antenna=None, pol=None, 

93 parameter=None, uniform=None): 

94 try: 

95 # don't need scr col for this 

96 _cb.open(filename=vis, compress=False, addcorr=False, addmodel=False) 

97 

98 # use the corrected anteanna positions from a JSON file 

99 if infile is not 'None' and infile is not '': 

100 if antenna is not '' or pol is not '' or len(parameter) != 0: 

101 raise ValueError('When using infile for ALMA the caltype is ' 

102 'antpos, antenna, pol and parameter must be empty') 

103 antenna, parameter = getantposns.correct_ant_posns_alma_json(vis, infile) 

104 

105 # call a Python function to retreive ant position offsets automatically (EVLA only) 

106 elif antenna == '': 

107 casalog.post(" Determine antenna position offsets from the baseline correction database") 

108 # correct_ant_posns returns a list , [return_code, antennas, offsets] 

109 antenna_offsets = getantposns.correct_ant_posns(vis, False, ant_pos_time_limit) 

110 

111 if ((len(antenna_offsets) == 3) and 

112 (int(antenna_offsets[0]) == 0) and 

113 (len(antenna_offsets[1]) > 0)): 

114 antenna = antenna_offsets[1] 

115 parameter = antenna_offsets[2] 

116 else: 

117 # raise Exception, 'No offsets found. No caltable created.' 

118 warnings.simplefilter('error', UserWarning) 

119 warnings.warn('No offsets found. No caltable created.') 

120 

121 _cb.specifycal(caltable=caltable, time='', spw=spw, antenna=antenna, pol=pol, 

122 caltype=caltype, parameter=parameter, infile=infile, 

123 uniform=uniform) 

124 

125 except UserWarning as instance: 

126 casalog.post('*** UserWarning *** %s' % instance, 'WARN') 

127 

128 finally: 

129 _cb.close() 

130 

131 

132class JyperkGencal(): 

133 """A class to generate caltable using the Jy/K DB or the factor CSV file. 

134 

135 This class will be called if the caltype is 'jyperk'. 

136 """ 

137 

138 @classmethod 

139 def gencal(cls, vis=None, caltable=None, caltype=None, infile='None', 

140 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0, 

141 spw=None, antenna=None, pol=None, 

142 parameter=None, uniform=None): 

143 """Generate calibration table.""" 

144 try: 

145 # don't need scr col for this 

146 _cb.open(filename=vis, compress=False, addcorr=False, addmodel=False) 

147 

148 specifycal_input_list = cls.__gen_specifycal_input( 

149 vis=vis, 

150 spw=spw, 

151 endpoint=endpoint, 

152 infile=infile, 

153 timeout=timeout, 

154 retry=retry, 

155 retry_wait_time=retry_wait_time 

156 ) 

157 for selection, param in specifycal_input_list: 

158 pol = cls.__convert_to_pol_selection(polspec=selection['pol']) 

159 _cb.specifycal(caltable=caltable, time='', spw=selection['spw'], 

160 caltype='amp', antenna=selection['antenna'], pol=pol, 

161 parameter=param, infile='', uniform=uniform) 

162 

163 except UserWarning as instance: 

164 casalog.post('*** UserWarning *** %s' % instance, 'WARN') 

165 

166 finally: 

167 _cb.close() 

168 

169 @classmethod 

170 def __gen_specifycal_input(cls, vis=None, spw='*', 

171 endpoint='asdm', infile=None, 

172 timeout=180, retry=3, retry_wait_time=5): 

173 # The default infile is defined 'string' in gencal.xml. 

174 if infile == '': 

175 infile = None 

176 

177 if type(infile) is str: 

178 f = JyPerKReader4File(infile) 

179 factors = f.get() 

180 

181 elif infile is None: 

182 factors = gen_factor_via_web_api(vis, spw=spw, 

183 endpoint=endpoint, 

184 timeout=timeout, retry=retry, 

185 retry_wait_time=retry_wait_time) 

186 else: 

187 raise Exception('The infile argument should be str or None.') 

188 

189 factors = JyperkGencal.__extract_valid_factor(factors, os.path.basename(vis)) 

190 if len(factors) == 0: 

191 raise Exception('There is no factor.') 

192 

193 for factor in factors: 

194 selection = {} 

195 selection['vis'] = factor[0] 

196 selection['antenna'] = factor[1] 

197 selection['spw'] = factor[2] 

198 selection['pol'] = factor[3] 

199 

200 yield selection, 1/np.sqrt(float(factor[4])) 

201 

202 @classmethod 

203 def __extract_valid_factor(cls, factors, vis_key): 

204 valid_factors = [] 

205 for factor in factors: 

206 if factor[0] == vis_key: 

207 valid_factors.append(factor) 

208 return valid_factors 

209 

210 @classmethod 

211 def __convert_to_pol_selection(cls, polspec: str) -> str: 

212 if polspec in ['I', '']: 

213 # apply the value to all polarizations 

214 pol = '' 

215 elif polspec in ['XX', 'YY', 'RR', 'LL']: 

216 pol = polspec[0] 

217 else: 

218 # unexpected value, no pol selection applied 

219 pol = '' 

220 

221 return pol 

222 

223class EOPGencal(): 

224 """A class to generate caltable from update EOP values. 

225 

226 This class will be called if the caltype is 'eop'. 

227 """ 

228 

229 @classmethod 

230 def gencal(cls, vis=None, caltable=None, caltype=None, infile='None', 

231 endpoint='asdm', timeout=180, retry=3, retry_wait_time=5, ant_pos_time_limit=0, 

232 spw=None, antenna=None, pol=None, 

233 parameter=None, uniform=None): 

234 """Generate calibration table.""" 

235 try: 

236 # don't need scr col for this 

237 _cb.open(filename=vis, compress=False, addcorr=False, addmodel=False) 

238 _cb.createcaltable(caltable=caltable, partype='Real', caltype='Fringe Jones', singlechan=True) 

239 _cb.close() 

240 generate_eop(vis, caltable, infile) 

241 

242 except UserWarning as instance: 

243 casalog.post('*** UserWarning *** %s' % instance, 'WARN') 

244 

245 finally: 

246 _cb.close() 

247 

248 

249__gencal_factory = { 

250 'general': GeneralGencal, 

251 'antpos': AntposGencal, 

252 'jyperk': JyperkGencal, 

253 'eop': EOPGencal, 

254}