Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/task_phaseshift.py: 8%
142 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
1from typing import Dict, Optional, Tuple, Union
3import numpy as np
4from .mstools import write_history
5from casatools import table, ms, mstransformer
6from casatools import measures as me
7from casatasks import casalog
8from .parallel.parallel_data_helper import ParallelDataHelper
11def phaseshift(
12 vis: str,
13 outputvis: str,
14 keepmms: bool,
15 field: Optional[str],
16 spw: Optional[str],
17 scan: Optional[str],
18 intent: Optional[str],
19 array: Optional[str],
20 observation: Optional[str],
21 datacolumn: Optional[str],
22 phasecenter: Union[str, dict],
23):
24 """
25 Changes the phase center for either short or large
26 offsets/angles w.r.t. the original
27 """
28 casalog.origin("phaseshift")
30 if len(phasecenter) == 0:
31 raise ValueError("phasecenter parameter must be specified")
32 # Initiate the helper class
33 pdh = ParallelDataHelper("phaseshift", locals())
35 # Validate input and output parameters
36 try:
37 pdh.setupIO()
38 except Exception as instance:
39 casalog.post(str(instance), "ERROR")
40 raise RuntimeError(str(instance))
42 # Input vis is an MMS
43 if pdh.isMMSAndNotServer(vis) and keepmms:
44 if not pdh.validateInputParams():
45 raise RuntimeError("Unable to continue with MMS processing")
47 pdh.setupCluster("phaseshift")
49 # Execute the jobs
50 try:
51 pdh.go()
52 except Exception as instance:
53 casalog.post(str(instance), "ERROR")
54 raise RuntimeError(str(instance))
55 return
57 # Actual task code starts here
58 # Gather all the parameters in a dictionary.
59 config = {}
61 config = pdh.setupParameters(
62 inputms=vis,
63 outputms=outputvis,
64 field=field,
65 spw=spw,
66 array=array,
67 scan=scan,
68 intent=intent,
69 observation=observation,
70 )
72 colnames = _get_col_names(vis)
73 # Check if CORRECTED column exists, when requested
74 datacolumn = datacolumn.upper()
75 if datacolumn == "CORRECTED":
76 if "CORRECTED_DATA" not in colnames:
77 casalog.post(
78 "Input data column CORRECTED_DATA does not exist. Will use DATA", "WARN"
79 )
80 datacolumn = "DATA"
82 casalog.post(f"Will use datacolumn = {datacolumn}", "DEBUG")
83 config["datacolumn"] = datacolumn
85 # Call MSTransform framework with tviphaseshift=True
86 config["tviphaseshift"] = True
87 config["reindex"] = False
88 tviphaseshift_config = {"phasecenter": phasecenter}
89 config["tviphaseshiftlib"] = tviphaseshift_config
91 # Configure the tool
92 casalog.post(str(config), "DEBUG1")
94 mtlocal = mstransformer()
95 try:
96 mtlocal.config(config)
98 # Open the MS, select the data and configure the output
99 mtlocal.open()
101 # Run the tool
102 casalog.post("Shift phase center")
103 mtlocal.run()
104 finally:
105 mtlocal.done()
107 # Write history to output MS, not the input ms.
108 try:
109 mslocal = ms()
110 param_names = phaseshift.__code__.co_varnames[: phaseshift.__code__.co_argcount]
111 vars = locals()
112 param_vals = [vars[p] for p in param_names]
113 casalog.post("Updating the history in the output", "DEBUG1")
114 write_history(
115 mslocal, outputvis, "phaseshift", param_names, param_vals, casalog
116 )
117 except Exception as instance:
118 casalog.post(f"*** Error {instance} updating HISTORY", "WARN")
119 raise RuntimeError(str(instance))
120 finally:
121 mslocal.done()
123 casalog.post(
124 "Updating the FIELD subtable of the output MeasurementSet with shifted"
125 " phase centers",
126 "INFO",
127 )
128 _update_field_subtable(outputvis, field, phasecenter)
131def _get_col_names(vis: str) -> np.ndarray:
132 tblocal = table()
133 try:
134 tblocal.open(vis)
135 colnames = tblocal.colnames()
136 finally:
137 tblocal.done()
138 return colnames
141def _update_field_subtable(outputvis: str, field: str, phasecenter: Union[str, dict]):
142 """Update MS/FIELD subtable with shifted center(s)."""
144 try:
145 tblocal = table()
146 # modify FIELD table
147 tblocal.open(outputvis + "/FIELD", nomodify=False)
148 pcol = tblocal.getcol("PHASE_DIR")
150 field_frames = _find_field_ref_frames(tblocal)
151 if isinstance(phasecenter, str):
152 if field:
153 try:
154 field_id = int(field)
155 except ValueError as _exc:
156 fnames = tblocal.getcol("NAME")
157 field_id = np.where(fnames == field)[0][0]
158 thenewra_rad, thenewdec_rad = _convert_to_ref_frame(phasecenter, field_frames[field_id])
159 pcol[0][0][field_id] = thenewra_rad
160 pcol[1][0][field_id] = thenewdec_rad
161 else:
162 for row in range(0, tblocal.nrows()):
163 thenewra_rad, thenewdec_rad = _convert_to_ref_frame(phasecenter, field_frames[row])
164 pcol[0][0][row] = thenewra_rad
165 pcol[1][0][row] = thenewdec_rad
167 elif isinstance(phasecenter, dict):
168 for field_id, field_center in phasecenter.items():
169 field_iidx = int(field_id)
170 thenewra_rad, thenewdec_rad = _convert_to_ref_frame(field_center, field_frames[field_iidx])
171 pcol[0][0][field_iidx] = thenewra_rad
172 pcol[1][0][field_iidx] = thenewdec_rad
174 tblocal.putcol("PHASE_DIR", pcol)
176 except Exception as instance:
177 casalog.post(f"*** Error '%s' updating FIELD subtable {instance}", "WARN")
178 raise RuntimeError(str(instance))
179 finally:
180 tblocal.done()
183def _find_field_ref_frames(tblocal: table) -> Dict[int, str]:
184 """
185 Given an open FIELD subtable, returns a dict of {field: reference_frame} for the PHASE_DIR
186 column, where field is of type int, and reference_frame is of type str.
188 This handles:
189 - simple metadata where the reference frame is in the keywords of the PHASE_DIR column (same
190 ref frame for all fields)
191 - variable (per-field) reference frame metadata, usually in a PhaseDir_Ref additional column
192 """
194 dir_col = "PHASE_DIR"
195 metainfo = tblocal.getcolkeyword(dir_col, "MEASINFO")
196 nrows = tblocal.nrows()
197 if "Ref" in metainfo:
198 ref_frame = metainfo["Ref"]
199 field_ref_frames = {field_id: ref_frame for field_id in range(0, nrows)}
200 elif "Ref" not in metainfo and ("VarRefCol" in metainfo and "TabRefTypes" in metainfo and
201 "TabRefCodes" in metainfo):
202 col = metainfo["VarRefCol"] # presumably PhaseDir_Ref
203 ref_frame_codes = tblocal.getcol(col)
204 ref_codes_to_frame_idx = {code: idx for idx, code in enumerate(metainfo["TabRefCodes"])}
205 ref_frame_names = [metainfo["TabRefTypes"][ref_codes_to_frame_idx[code]] for code in
206 ref_frame_codes]
207 field_ref_frames = {field_id: ref_frame_names[field_id] for field_id in np.arange(0, nrows)}
208 else:
209 raise RuntimeError("Error when retrieving reference frames from the metadata of column "
210 f"{dir_col}. The field 'Ref' is not present but could not find the "
211 "fields 'VarRefCol', 'TabRefTypes', and 'TabRefCodes'")
213 return field_ref_frames
216def _convert_to_ref_frame(phasecenter: str, output_ref_frame: str) -> Tuple[float, float]:
217 """
218 Converts one phasecenter (presumably given as input to this task) to another reference
219 frame (presumably the frame used in the (output) MS for the relevant field).
221 When applying phaseshift, the output MS has the same reference frames as the input MS, as
222 propagated by mstransform.
224 Returns the v0,v1 (RA,Dec) values in units of radians and using the requested frame, ready to be
225 written to a FIELD subtable.
226 """
227 def parse_phasecenter(center: str) -> Tuple[str, str, str]:
228 """
229 Parse phase center string to obtain ra/dec (in rad). Splits the:
230 - (optional) frame,
231 - v0 (typically RA),
232 - v1 (typically Dec)
233 in a phasecenter string.
234 """
235 dirstr = center.split(" ")
236 if 3 == len(dirstr):
237 dir_frame, dir_v0, dir_v1 = dirstr[0], dirstr[1], dirstr[2]
238 elif 2 == len(dirstr):
239 dir_frame = "" # J2000 will be default in me.direction, etc.
240 dir_v0, dir_v1 = dirstr[0], dirstr[1]
241 else:
242 raise AttributeError(f"Wrong phasecenter string: '{center}'. It must have 3 or 2 "
243 "items separated by space(s): 'reference_frame RA Dec' or 'RA Dec'")
245 return dir_frame, dir_v0, dir_v1
247 try:
248 melocal = me()
249 dir_frame, dir_v0, dir_v1 = parse_phasecenter(phasecenter)
251 # Note: even if the input frame is the same as output_ref_frame, we need to ensure units of
252 # radians for the FIELD subtable
253 if dir_frame:
254 thedir = melocal.direction(dir_frame, dir_v0, dir_v1)
255 else:
256 thedir = melocal.direction(v0=dir_v0, v1=dir_v1)
257 if not thedir:
258 raise RuntimeError(
259 f"measures.direction() failed for phasecenter string: {phasecenter}"
260 )
262 if dir_frame != output_ref_frame:
263 thedir = melocal.measure(thedir, output_ref_frame)
264 thenewra_rad = thedir["m0"]["value"]
265 thenewdec_rad = thedir["m1"]["value"]
266 except Exception as instance:
267 casalog.post(
268 f"*** Error {instance} when interpreting parameter 'phasecenter': ",
269 "SEVERE",
270 )
271 raise RuntimeError(str(instance))
272 finally:
273 melocal.done()
275 return thenewra_rad, thenewdec_rad