Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/task_phaseshift.py: 8%

142 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-21 07:43 +0000

1from typing import Dict, Optional, Tuple, Union 

2 

3import numpy as np 

4from .mstools import write_history 

5from casatools import table, ms, mstransformer 

6from casatools import measures as me 

7from casatasks import casalog 

8from .parallel.parallel_data_helper import ParallelDataHelper 

9 

10 

11def phaseshift( 

12 vis: str, 

13 outputvis: str, 

14 keepmms: bool, 

15 field: Optional[str], 

16 spw: Optional[str], 

17 scan: Optional[str], 

18 intent: Optional[str], 

19 array: Optional[str], 

20 observation: Optional[str], 

21 datacolumn: Optional[str], 

22 phasecenter: Union[str, dict], 

23): 

24 """ 

25 Changes the phase center for either short or large 

26 offsets/angles w.r.t. the original 

27 """ 

28 casalog.origin("phaseshift") 

29 

30 if len(phasecenter) == 0: 

31 raise ValueError("phasecenter parameter must be specified") 

32 # Initiate the helper class 

33 pdh = ParallelDataHelper("phaseshift", locals()) 

34 

35 # Validate input and output parameters 

36 try: 

37 pdh.setupIO() 

38 except Exception as instance: 

39 casalog.post(str(instance), "ERROR") 

40 raise RuntimeError(str(instance)) 

41 

42 # Input vis is an MMS 

43 if pdh.isMMSAndNotServer(vis) and keepmms: 

44 if not pdh.validateInputParams(): 

45 raise RuntimeError("Unable to continue with MMS processing") 

46 

47 pdh.setupCluster("phaseshift") 

48 

49 # Execute the jobs 

50 try: 

51 pdh.go() 

52 except Exception as instance: 

53 casalog.post(str(instance), "ERROR") 

54 raise RuntimeError(str(instance)) 

55 return 

56 

57 # Actual task code starts here 

58 # Gather all the parameters in a dictionary. 

59 config = {} 

60 

61 config = pdh.setupParameters( 

62 inputms=vis, 

63 outputms=outputvis, 

64 field=field, 

65 spw=spw, 

66 array=array, 

67 scan=scan, 

68 intent=intent, 

69 observation=observation, 

70 ) 

71 

72 colnames = _get_col_names(vis) 

73 # Check if CORRECTED column exists, when requested 

74 datacolumn = datacolumn.upper() 

75 if datacolumn == "CORRECTED": 

76 if "CORRECTED_DATA" not in colnames: 

77 casalog.post( 

78 "Input data column CORRECTED_DATA does not exist. Will use DATA", "WARN" 

79 ) 

80 datacolumn = "DATA" 

81 

82 casalog.post(f"Will use datacolumn = {datacolumn}", "DEBUG") 

83 config["datacolumn"] = datacolumn 

84 

85 # Call MSTransform framework with tviphaseshift=True 

86 config["tviphaseshift"] = True 

87 config["reindex"] = False 

88 tviphaseshift_config = {"phasecenter": phasecenter} 

89 config["tviphaseshiftlib"] = tviphaseshift_config 

90 

91 # Configure the tool 

92 casalog.post(str(config), "DEBUG1") 

93 

94 mtlocal = mstransformer() 

95 try: 

96 mtlocal.config(config) 

97 

98 # Open the MS, select the data and configure the output 

99 mtlocal.open() 

100 

101 # Run the tool 

102 casalog.post("Shift phase center") 

103 mtlocal.run() 

104 finally: 

105 mtlocal.done() 

106 

107 # Write history to output MS, not the input ms. 

108 try: 

109 mslocal = ms() 

110 param_names = phaseshift.__code__.co_varnames[: phaseshift.__code__.co_argcount] 

111 vars = locals() 

112 param_vals = [vars[p] for p in param_names] 

113 casalog.post("Updating the history in the output", "DEBUG1") 

114 write_history( 

115 mslocal, outputvis, "phaseshift", param_names, param_vals, casalog 

116 ) 

117 except Exception as instance: 

118 casalog.post(f"*** Error {instance} updating HISTORY", "WARN") 

119 raise RuntimeError(str(instance)) 

120 finally: 

121 mslocal.done() 

122 

123 casalog.post( 

124 "Updating the FIELD subtable of the output MeasurementSet with shifted" 

125 " phase centers", 

126 "INFO", 

127 ) 

128 _update_field_subtable(outputvis, field, phasecenter) 

129 

130 

131def _get_col_names(vis: str) -> np.ndarray: 

132 tblocal = table() 

133 try: 

134 tblocal.open(vis) 

135 colnames = tblocal.colnames() 

136 finally: 

137 tblocal.done() 

138 return colnames 

139 

140 

141def _update_field_subtable(outputvis: str, field: str, phasecenter: Union[str, dict]): 

142 """Update MS/FIELD subtable with shifted center(s).""" 

143 

144 try: 

145 tblocal = table() 

146 # modify FIELD table 

147 tblocal.open(outputvis + "/FIELD", nomodify=False) 

148 pcol = tblocal.getcol("PHASE_DIR") 

149 

150 field_frames = _find_field_ref_frames(tblocal) 

151 if isinstance(phasecenter, str): 

152 if field: 

153 try: 

154 field_id = int(field) 

155 except ValueError as _exc: 

156 fnames = tblocal.getcol("NAME") 

157 field_id = np.where(fnames == field)[0][0] 

158 thenewra_rad, thenewdec_rad = _convert_to_ref_frame(phasecenter, field_frames[field_id]) 

159 pcol[0][0][field_id] = thenewra_rad 

160 pcol[1][0][field_id] = thenewdec_rad 

161 else: 

162 for row in range(0, tblocal.nrows()): 

163 thenewra_rad, thenewdec_rad = _convert_to_ref_frame(phasecenter, field_frames[row]) 

164 pcol[0][0][row] = thenewra_rad 

165 pcol[1][0][row] = thenewdec_rad 

166 

167 elif isinstance(phasecenter, dict): 

168 for field_id, field_center in phasecenter.items(): 

169 field_iidx = int(field_id) 

170 thenewra_rad, thenewdec_rad = _convert_to_ref_frame(field_center, field_frames[field_iidx]) 

171 pcol[0][0][field_iidx] = thenewra_rad 

172 pcol[1][0][field_iidx] = thenewdec_rad 

173 

174 tblocal.putcol("PHASE_DIR", pcol) 

175 

176 except Exception as instance: 

177 casalog.post(f"*** Error '%s' updating FIELD subtable {instance}", "WARN") 

178 raise RuntimeError(str(instance)) 

179 finally: 

180 tblocal.done() 

181 

182 

183def _find_field_ref_frames(tblocal: table) -> Dict[int, str]: 

184 """ 

185 Given an open FIELD subtable, returns a dict of {field: reference_frame} for the PHASE_DIR 

186 column, where field is of type int, and reference_frame is of type str. 

187 

188 This handles: 

189 - simple metadata where the reference frame is in the keywords of the PHASE_DIR column (same 

190 ref frame for all fields) 

191 - variable (per-field) reference frame metadata, usually in a PhaseDir_Ref additional column 

192 """ 

193 

194 dir_col = "PHASE_DIR" 

195 metainfo = tblocal.getcolkeyword(dir_col, "MEASINFO") 

196 nrows = tblocal.nrows() 

197 if "Ref" in metainfo: 

198 ref_frame = metainfo["Ref"] 

199 field_ref_frames = {field_id: ref_frame for field_id in range(0, nrows)} 

200 elif "Ref" not in metainfo and ("VarRefCol" in metainfo and "TabRefTypes" in metainfo and 

201 "TabRefCodes" in metainfo): 

202 col = metainfo["VarRefCol"] # presumably PhaseDir_Ref 

203 ref_frame_codes = tblocal.getcol(col) 

204 ref_codes_to_frame_idx = {code: idx for idx, code in enumerate(metainfo["TabRefCodes"])} 

205 ref_frame_names = [metainfo["TabRefTypes"][ref_codes_to_frame_idx[code]] for code in 

206 ref_frame_codes] 

207 field_ref_frames = {field_id: ref_frame_names[field_id] for field_id in np.arange(0, nrows)} 

208 else: 

209 raise RuntimeError("Error when retrieving reference frames from the metadata of column " 

210 f"{dir_col}. The field 'Ref' is not present but could not find the " 

211 "fields 'VarRefCol', 'TabRefTypes', and 'TabRefCodes'") 

212 

213 return field_ref_frames 

214 

215 

216def _convert_to_ref_frame(phasecenter: str, output_ref_frame: str) -> Tuple[float, float]: 

217 """ 

218 Converts one phasecenter (presumably given as input to this task) to another reference 

219 frame (presumably the frame used in the (output) MS for the relevant field). 

220 

221 When applying phaseshift, the output MS has the same reference frames as the input MS, as 

222 propagated by mstransform. 

223 

224 Returns the v0,v1 (RA,Dec) values in units of radians and using the requested frame, ready to be 

225 written to a FIELD subtable. 

226 """ 

227 def parse_phasecenter(center: str) -> Tuple[str, str, str]: 

228 """ 

229 Parse phase center string to obtain ra/dec (in rad). Splits the: 

230 - (optional) frame, 

231 - v0 (typically RA), 

232 - v1 (typically Dec) 

233 in a phasecenter string. 

234 """ 

235 dirstr = center.split(" ") 

236 if 3 == len(dirstr): 

237 dir_frame, dir_v0, dir_v1 = dirstr[0], dirstr[1], dirstr[2] 

238 elif 2 == len(dirstr): 

239 dir_frame = "" # J2000 will be default in me.direction, etc. 

240 dir_v0, dir_v1 = dirstr[0], dirstr[1] 

241 else: 

242 raise AttributeError(f"Wrong phasecenter string: '{center}'. It must have 3 or 2 " 

243 "items separated by space(s): 'reference_frame RA Dec' or 'RA Dec'") 

244 

245 return dir_frame, dir_v0, dir_v1 

246 

247 try: 

248 melocal = me() 

249 dir_frame, dir_v0, dir_v1 = parse_phasecenter(phasecenter) 

250 

251 # Note: even if the input frame is the same as output_ref_frame, we need to ensure units of 

252 # radians for the FIELD subtable 

253 if dir_frame: 

254 thedir = melocal.direction(dir_frame, dir_v0, dir_v1) 

255 else: 

256 thedir = melocal.direction(v0=dir_v0, v1=dir_v1) 

257 if not thedir: 

258 raise RuntimeError( 

259 f"measures.direction() failed for phasecenter string: {phasecenter}" 

260 ) 

261 

262 if dir_frame != output_ref_frame: 

263 thedir = melocal.measure(thedir, output_ref_frame) 

264 thenewra_rad = thedir["m0"]["value"] 

265 thenewdec_rad = thedir["m1"]["value"] 

266 except Exception as instance: 

267 casalog.post( 

268 f"*** Error {instance} when interpreting parameter 'phasecenter': ", 

269 "SEVERE", 

270 ) 

271 raise RuntimeError(str(instance)) 

272 finally: 

273 melocal.done() 

274 

275 return thenewra_rad, thenewdec_rad