Coverage for / home / casatest / venv / lib / python3.12 / site-packages / casatasks / private / task_pccor.py: 6%
622 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 07:14 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 07:14 +0000
1import numpy as np
2import math
3from casatools import table, measures, calibrater
4from casatasks import fringefit, casalog
6#########################################################
7# Constants and ms table buffer
8#########################################################
9tb_obj = table()
10twopi = 2 * np.pi
11nanosec = 1e-9
12day = 86400
13rad2deg = 180 / np.pi
14lnbr = '\n'
15pol_str = np.array(
16 ["0", "I", "Q", "U", "V", "RR", "RL", "LR", "LL", "XX", "XY", "YX", "YY"]
17)
20#########################################################
21# Utilities (timing, printing, phase_wrapping, etc)
22#########################################################
23def print_dict(ledict, godeep=False, print_str=True, label=None, tab=3):
24 if label is None:
25 outstr = f'Dictionaire contains:{lnbr}'
26 else:
27 outstr = f'{label} contains:{lnbr}'
29 ident = tab * ' '
30 maxlen = 0
31 for key in ledict.keys():
32 if len(key) > maxlen:
33 maxlen = len(key)
35 for key, value in ledict.items():
36 if isinstance(value, dict):
37 if godeep:
38 outstr += ident + print_dict(value, godeep=godeep, print_str=False, label=key, tab=tab + 3)
39 else:
40 outstr += f'{ident}{key:{maxlen}s} -> (dict){lnbr}'
41 else:
42 outstr += f'{ident}{key:{maxlen}s} -> '
43 if isinstance(value, np.ndarray):
44 outstr += f'{value.shape}, ({value.dtype}){lnbr}'
45 elif isinstance(value, list):
46 outstr += f'{value[0]}, ..., {value[-1]}'
47 else:
48 outstr += f'{value}{lnbr}'
49 if print_str:
50 print(outstr)
51 return outstr
54def phase_wrapping(phase):
55 """
56 Wraps phase to the -pi to pi interval
57 Args:
58 phase: phase to be wrapped
60 Returns:
61 Phase wrapped to the -pi to pi interval
62 """
63 return (phase + np.pi) % twopi - np.pi
66def print_array(array, name):
67 print(f'{name}: {array.shape}, {array.dtype}')
68 print(array)
71def get_ant_key(ant_id, ant_names):
72 """
73 Return appropriate antenna key for the given antenna ID
74 :param ant_id: Antenna ID
75 :param ant_names: list of antenna names
76 :return: antenna key
77 """
78 return f'ant_{ant_names[ant_id]}'
81def get_spw_key(spw_id):
82 """
83 Return appropriate spw key for spw_id
84 :param spw_id: SPW ID (int)
85 :return: SPW key
86 """
87 return f'spw_{spw_id}'
90def get_ant_id(user_ant, ant_names, ant_ids=None):
91 """
92 Get antenna ID from name
93 :param user_ant: User provided antenna name
94 :param ant_names: List of antenna names
95 :param ant_ids: optional list of antenna IDs
96 :return: antenna ID for user antenna
97 """
98 if ant_ids is None:
99 ant_ids = np.arange(ant_names.shape[0])
100 try:
101 this_ant_id = ant_ids[ant_names == user_ant][0]
102 except IndexError:
103 raise ValueError(f'Antenna {user_ant} is not available in this dataset')
104 return this_ant_id
107#########################################################
108# Rough fringe fit routines, not currently used, but might be useful in the future
109#########################################################
110def get_ddi_for_spw(msname, spw):
111 """
112 From MSes metadata derive the DDI for a particular SPW
113 :param msname: MS name on disk
114 :param spw: Selected SPW
115 :return: DDIs for the desired SPW
116 """
117 tb_obj.open(msname + '/DATA_DESCRIPTION')
118 spwd_id = tb_obj.getcol('SPECTRAL_WINDOW_ID')
119 spw_selection = spwd_id == spw
120 ddis = np.arange(spwd_id.shape[0])[spw_selection]
121 return ddis
124def get_visibilities(msname, sol_times, antenna, refant, spw):
125 """
126 Get visibilities from the MS for the selected time, antenna&refant baseline and SPW
127 :param msname: Name of the MS on disk
128 :param sol_times: time interval over which to fetch data
129 :param antenna: Antenna for which to fetch data
130 :param refant: Reference antenna for selecting baseline
131 :param spw: SPW for which to fetch data
132 :return: Dictionary containg visibilities, weights and flags over the selected time, baseline and SPW.
133 """
134 valid_ddis = get_ddi_for_spw(msname, spw)
135 tb_obj.open(msname)
136 query = f'TIME >= {sol_times["start"]} && TIME <= {sol_times["stop"]}'
137 st = tb_obj.query(query=query)
138 raw_vis = st.getcol('DATA')
139 ant_1 = st.getcol('ANTENNA1')
140 ant_2 = st.getcol('ANTENNA2')
141 weight = st.getcol("WEIGHT")
142 flag = st.getcol("FLAG")
143 vis_time = st.getcol("TIME")
144 vis_ddi = st.getcol("DATA_DESC_ID")
145 st.close()
146 tb_obj.close()
148 ddi_selection = np.full_like(ant_1, False)
149 for ddi in valid_ddis:
150 ddi_selection = np.logical_or(vis_ddi == ddi, ddi_selection)
151 ant_1_selection = np.logical_or(ant_1 == antenna, ant_2 == antenna)
152 ant_2_selection = np.logical_or(ant_1 == refant, ant_2 == refant)
153 baseline_selection = np.logical_and(ant_1_selection, ant_2_selection)
154 final_selection = np.logical_and(baseline_selection, ddi_selection)
156 vis_dict = {
157 'time': vis_time[final_selection],
158 'vis': raw_vis[:, :, final_selection], # [n_corr, n_chan, n_time]
159 'weight': weight[:, final_selection], # [n_corr, n_time]
160 'flag': flag[:, :, final_selection]
161 }
163 return vis_dict
166def rough_one_spw_fringe_fit(vis, chan_width, padding_factor=10):
167 """
168 Based on AIPS's AFRFI.FOR
169 :param vis: complex visibilities array, Assumes single corr
170 :param chan_width: Chan width in Herz
171 :param padding_factor: Padding factor for zero padding of the FFT
172 :return: estimated delay rate, and estimate phase at first point in the spectrum
173 """
174 n_chan = vis.shape[0]
175 k_coeff = np.log(n_chan * padding_factor) / np.log(2)
176 k_integer = math.ceil(k_coeff)
177 padded_size = k_integer ** 2
178 padded_vis = np.zeros(padded_size, dtype=complex)
179 padded_vis[:n_chan] = vis
181 fft_vis = np.fft.ifft(padded_vis)
182 fft_amp = np.absolute(fft_vis)
183 fft_phase = np.angle(fft_vis)
185 i_max_amp = np.argmax(fft_amp)
186 max_amp = fft_amp[i_max_amp] / n_chan
187 max_amp_phase = fft_phase[i_max_amp]
188 phase_rate = i_max_amp / 2.0 / n_chan
189 if i_max_amp >= n_chan:
190 phase_rate -= 1.0
191 phase_rate *= 2 * n_chan
193 base_rate = phase_rate
194 nrate = 5
195 rate_step = 1.0 / nrate
196 coeff = (np.pi / 2) * (1 - 1.0 / n_chan)
197 twopp = twopi / (2.0 * n_chan)
198 min_sum_amp_diff = 1e10
199 min_data_diff_irate = -1
200 for irate in range(1, nrate + 4):
201 delta_rate = (irate - 2) * rate_step - 0.5
202 phase_rate = base_rate + delta_rate
203 estimated_phase = max_amp_phase - coeff * delta_rate
204 max_amp_propagated = max_amp * np.exp(1j * ((twopp * phase_rate * np.arange(n_chan)) + estimated_phase))
205 sum_amp_diff = np.sum(np.abs(vis - max_amp_propagated) ** 2)
207 if sum_amp_diff < min_sum_amp_diff:
208 min_data_diff_irate = irate
209 min_sum_amp_diff = sum_amp_diff
211 delta_rate = (min_data_diff_irate - 2) * rate_step - 0.5
212 phase_rate = base_rate + delta_rate
213 estimated_phase = max_amp_phase - coeff * delta_rate
215 # Numpy version
216 matrix = np.zeros([3, 3])
217 vector = np.zeros([3])
218 letwopp = np.arange(n_chan) * twopp
219 chan_arg = letwopp * phase_rate + estimated_phase
220 chan_sin_arg = np.sin(chan_arg)
221 chan_cos_arg = np.cos(chan_arg)
223 vector[0] = np.sum(vis.real * chan_cos_arg + vis.imag * chan_sin_arg)
224 vector[1] = max_amp * np.sum(letwopp * ((vis.imag * chan_cos_arg) - (vis.real * chan_sin_arg)))
225 vector[2] = max_amp * np.sum((vis.imag * chan_cos_arg) - (vis.real * chan_sin_arg))
227 matrix[0, 0] = np.sum(chan_cos_arg ** 2 + chan_sin_arg ** 2)
229 matrix[0, 1] = max_amp * np.sum(letwopp * ((chan_cos_arg * chan_sin_arg) - (chan_sin_arg * chan_cos_arg)))
230 matrix[1, 1] = max_amp ** 2 * np.sum(letwopp ** 2 * (chan_sin_arg ** 2 + chan_cos_arg ** 2))
232 matrix[0, 2] = max_amp * np.sum(chan_cos_arg * chan_sin_arg - chan_sin_arg * chan_cos_arg)
233 matrix[1, 2] = max_amp ** 2 * np.sum(letwopp * (chan_sin_arg ** 2 + chan_cos_arg ** 2))
234 matrix[2, 2] = max_amp ** 2 * np.sum(chan_sin_arg ** 2 + chan_cos_arg ** 2)
236 answer, residuals, rank, singulars = np.linalg.lstsq(matrix, vector)
238 fitted_amp = max_amp + answer[0]
239 fitted_phase = estimated_phase + answer[2]
240 fitted_delay = (phase_rate + answer[1]) / 2 / n_chan / chan_width
242 return fitted_amp, fitted_phase, fitted_delay
245def estimate_phase_and_delays_from_visibilities(metadata):
246 """
247 Perform a rough fringe to selected antennas and SPWs
248 :param metadata: parsed metadata
249 :return: Dictionary containing rough fringe fit derived phase and delay solutions
250 """
251 # Assumes we are working with VLBA data
252 accepted_correlations = ['RR', 'LL']
253 corr_axis = metadata['correlation_axis']
254 msname = metadata['msname']
255 sol_times = metadata['solution_times']
257 ff_data = {}
258 for i_ant in metadata['ant_list']:
259 ant_key = get_ant_key(i_ant, metadata['antenna_names'])
260 ant_dict = {}
261 for i_spw in metadata['spw_list']:
262 spw_key = get_spw_key(i_spw)
263 vis_data = get_visibilities(msname, sol_times, i_ant, metadata['refant_id'], i_spw)
264 if vis_data['weight'].ndim == 2:
265 weights = np.zeros_like(vis_data['vis'])
266 weights[:, :, :] = vis_data['weight'][:, np.newaxis, :]
267 elif vis_data['weight'].ndim == 3:
268 weights = vis_data['weight']
269 else:
270 vis_shape = vis_data["vis"].shape
271 wei_shape = [vis_shape[0], vis_shape[2]]
272 raise Exception(f'Unexpected weight shape, expected {vis_shape} or '
273 f'{wei_shape} got {vis_data["weight"].shape}')
274 time_avg_vis = np.average(vis_data['vis'], weights=weights, axis=2)
275 ref_freq = metadata[spw_key]['reference_frequency']
276 chan_width = metadata[spw_key]['channel_width']
277 phase = np.zeros(2)
278 delay = np.zeros(2)
279 for i_corr, correlation in enumerate(corr_axis):
280 if correlation not in accepted_correlations:
281 continue
282 if correlation[0] == 'R':
283 i_pol = 0
284 else:
285 i_pol = 1
286 _, phase[i_pol], delay[i_pol] = rough_one_spw_fringe_fit(time_avg_vis[i_corr, :], chan_width)
288 ant_dict[spw_key] = {
289 'frequency': ref_freq, # scalar
290 'time': sol_times['start'], # [n_time]
291 'phase': phase, # [n_pol]
292 'delay': delay, # [n_pol]
293 }
294 ff_data[ant_key] = ant_dict
296 # REFERENCE PHASE AND DELAYS TO REFANT
297 refant_key = metadata["refant_key"]
298 refant_dict = ff_data[refant_key]
300 for i_ant in metadata['ant_list']:
301 ant_key = get_ant_key(i_ant, metadata['antenna_names'])
302 for i_spw in metadata['spw_list']:
303 spw_key = get_spw_key(i_spw)
305 ff_data[ant_key][spw_key]['delay'] -= refant_dict[spw_key]['delay']
306 ff_data[ant_key][spw_key]['phase'] = phase_wrapping(ff_data[ant_key][spw_key]['phase'] -
307 refant_dict[spw_key]['phase'])
309 return ff_data
312def estimate_multiband_delay(phasor, tone_axis, padding_factor=2):
313 """
314 Estimate multiband delay across several spectral windows
315 :param phasor: Amplitude=1 complex array describing phases
316 :param tone_axis: Axis containing tone frequencies
317 :param padding_factor: FFT padding factor
318 :return: estimated multiband delay
319 """
320 fstep = 0.5 / tone_axis[-1]
322 n_tones = phasor.shape[0]
323 k_coeff = np.log(n_tones * padding_factor) / np.log(2)
324 k_integer = math.ceil(k_coeff)
325 padded_size = k_integer ** 2
326 padded_phasor = np.zeros(padded_size, dtype=complex)
327 padded_phasor[:n_tones] = phasor
329 fft_vis = np.fft.ifft(padded_phasor)
330 fft_amp = np.absolute(fft_vis)
331 fft_phase = np.angle(fft_vis)
333 i_max_amp = np.argmax(fft_amp)
334 max_amp = fft_amp[i_max_amp] / n_tones
335 max_amp_phase = fft_phase[i_max_amp]
336 phase_rate = i_max_amp * fstep
338 matrix = np.zeros([3, 3])
339 vector = np.zeros([3])
341 dtone_axis = twopi * (tone_axis - tone_axis[0])
342 arg = dtone_axis * phase_rate + max_amp_phase
344 tone_cos_arg = np.cos(arg)
345 tone_sin_arg = np.sin(arg)
347 vector[0] = np.sum(phasor.real * tone_cos_arg + phasor.imag * tone_sin_arg)
348 vector[1] = max_amp * np.sum(dtone_axis * ((phasor.imag * tone_cos_arg) - (phasor.real * tone_sin_arg)))
349 vector[2] = max_amp * np.sum((phasor.imag * tone_cos_arg) - (phasor.real * tone_sin_arg))
351 matrix[0, 0] = np.sum(tone_cos_arg ** 2 + tone_sin_arg ** 2)
353 matrix[0, 1] = max_amp * np.sum(dtone_axis * ((tone_cos_arg * tone_sin_arg) - (tone_sin_arg * tone_cos_arg)))
354 matrix[1, 1] = max_amp ** 2 * np.sum(dtone_axis ** 2 * (tone_sin_arg ** 2 + tone_cos_arg ** 2))
356 matrix[0, 2] = max_amp * np.sum(tone_cos_arg * tone_sin_arg - tone_sin_arg * tone_cos_arg)
357 matrix[1, 2] = max_amp ** 2 * np.sum(dtone_axis * (tone_sin_arg ** 2 + tone_cos_arg ** 2))
358 matrix[2, 2] = max_amp ** 2 * np.sum(tone_sin_arg ** 2 + tone_cos_arg ** 2)
360 answer, residuals, rank, singulars = np.linalg.lstsq(matrix, vector)
362 # fitted_amp = max_amp + answer[0]
363 # fitted_phase = max_amp_phase + answer[2]
364 fitted_delay = phase_rate + answer[1]
366 return fitted_delay
369#########################################################
370# Parsing and initialization
371#########################################################
372def parse_antenna(user_antenna, ant_names):
373 """
374 Parse user's selection of antennas
375 :param user_antenna: user typed antenna selection criteria
376 :param ant_names: MS fetched list of antenna names
377 :return: list with selected antennas
378 """
379 ant_ids = np.arange(ant_names.shape[0])
380 if user_antenna == "none":
381 ant_list = ant_ids
382 else:
383 if '!' in user_antenna:
384 ant_list = ant_ids.tolist()
385 user_ant_list = user_antenna.split(',')
386 for user_ant in user_ant_list:
387 if user_ant[0].strip() == '!':
388 rm_id = get_ant_id(user_ant[1:].strip(), ant_names, ant_ids)
389 ant_list.remove(rm_id)
390 else:
391 raise ValueError(
392 f"Mixing of antennas to include and antennas to exclude in the same list is not allowed")
393 else:
394 user_ant_list = user_antenna.split(',')
395 ant_list = []
396 for user_ant in user_ant_list:
397 ant_list.append(get_ant_id(user_ant.strip(), ant_names, ant_ids))
398 return np.array(ant_list)
401def parse_spw(user_spw, spw_ids):
402 """
403 Parse user's selection of spectral windows
404 :param user_spw: user typed spectral window selection criteria
405 :param spw_ids: MSes Spectral windows
406 :return: list with selected spectral windows
407 """
408 if user_spw == "none":
409 spw_list = spw_ids
410 else:
411 if '!' in user_spw:
412 spw_list = spw_ids.tolist()
413 user_spw_list = user_spw.split(',')
414 for user_spw in user_spw_list:
415 if user_spw[0].strip() == '!':
416 rm_id = int(user_spw[1:].strip())
417 spw_list.remove(rm_id)
418 else:
419 raise ValueError(
420 f"Mixing of SPWs to include and SPWs to exclude in the same list is not allowed")
421 elif '~' in user_spw:
422 spw_wrds = user_spw.split("~")
423 fi_spw = int(spw_wrds[0])
424 la_spw = int(spw_wrds[1])
425 if fi_spw < 0:
426 fi_spw = 0
427 casalog.post('First SPW set to a negative number setting it to 0', 'WARN')
428 if la_spw > spw_ids[-1]:
429 la_spw = spw_ids[-1]
430 casalog.post(f'last SPW set to a number larger than the last SPW in MS, setting it to {la_spw}', 'WARN')
431 spw_list = np.arange(fi_spw, la_spw+1)
432 else:
433 spw_list = []
434 wrong_spw = False
435 user_spw_list = user_spw.split(',')
436 for user_spw in user_spw_list:
437 try:
438 int_spw = int(user_spw.strip())
439 if int_spw in spw_ids:
440 spw_list.append(int_spw)
441 else:
442 wrong_spw = True
443 casalog.post(f'{user_spw.strip()} is not a valid SPW')
444 except ValueError:
445 wrong_spw = True
446 casalog.post(f'{user_spw.strip()} is not a valid SPW')
447 if wrong_spw:
448 raise ValueError(f'{user_spw} is not a valid SPW specification')
450 return np.array(spw_list)
453def parse_scan(user_scan, scan_times):
454 """
455 Parse user typed scan parameter
456 :param user_scan: user typed scan parameter
457 :param scan_times: Dictionary containing scan times
458 :return: start and stop times for user scan in MJD in seconds
459 """
460 try:
461 int_scan = int(user_scan)
462 except ValueError:
463 raise ValueError(f'{user_scan} is not a valid single scan number')
464 if int_scan in scan_times["scans"]:
465 start, stop = scan_times["times"][int_scan == scan_times["scans"]][0]
466 else:
467 raise ValueError(f'Selected scan {int_scan} is not present in the ms')
468 return start, stop
471def parse_timerange(user_timerange):
472 """
473 Parse user typed timerange parameter
474 :param user_timerange: user typed time range parameter
475 :return: start and stop times in MJD in seconds
476 """
477 me = measures()
478 split_user_range = user_timerange.split('~')
479 if len(split_user_range) != 2:
480 raise ValueError('timerange must contain one and only one <~> to separate start and finish times')
482 try:
483 start = me.epoch('UTC', split_user_range[0])['m0']['value']*day
484 except KeyError:
485 raise ValueError('Start time in timerange is invalid')
487 if '/' in split_user_range[1]:
488 stop_str = split_user_range[1]
489 else:
490 slash_split_date = split_user_range[0].split('/')[0:3]
491 slash_split_date.append(split_user_range[1])
492 stop_str = '/'.join(slash_split_date)
494 try:
495 stop = me.epoch('UTC', stop_str)['m0']['value'] * day
496 except KeyError:
497 raise ValueError('Stop time in timerange is invalid')
498 return start, stop
501def parse_solution_times_from_scan_and_timerange(user_scan, user_timerange, scan_times):
502 """
503 parse user's time selection parameters
504 :param user_scan: User typed scan
505 :param user_timerange: user typed timerange
506 :param scan_times: Dictionary containing scan times
507 :return: Dictionary with solution start and stop times in MJD in seconds
508 """
509 if user_scan == "none" and user_timerange == "none":
510 raise ValueError('Scan and timerange cannot be both undefined')
511 elif user_scan != "none" and user_timerange == "none":
512 start, stop = parse_scan(user_scan, scan_times)
513 elif user_scan == "none" and user_timerange != "none":
514 start, stop = parse_timerange(user_timerange)
515 else:
516 scan_start, scan_stop = parse_scan(user_scan, scan_times)
517 start, stop = parse_timerange(user_timerange)
518 if stop < scan_start or start > scan_stop:
519 raise ValueError('timerange is incompatible with scan times')
521 duration = stop-start
522 if duration > 180:
523 casalog.post('WARNING: time range for solution might be too long')
525 if stop < scan_times['times'][0, 0] or start > scan_times['times'][-1, -1]:
526 raise ValueError('Timerange is outside observation times')
528 sol_times = {
529 'start': start,
530 'stop': stop,
531 'duration': duration
532 }
533 return sol_times
536#########################################################
537# Fetching data from MS and Cal table
538#########################################################
539def parse_and_get_metadata(user_inp_par):
540 """
541 Call parsing functions and produce a execution metadata dictionary
542 :param user_inp_par: Dictionary with user inputs
543 :return: Parsed metadata dictionary
544 """
545 msname = user_inp_par['vis']
547 # fetch metadata from ms
548 tb_obj.open(msname + '/ANTENNA')
549 ant_names = tb_obj.getcol('NAME')
550 tb_obj.close()
552 tb_obj.open(msname + '/SPECTRAL_WINDOW')
553 freq_axes = tb_obj.getcol('CHAN_FREQ')
554 chan_width = tb_obj.getcol('CHAN_WIDTH')
555 ref_frequencies = tb_obj.getcol('REF_FREQUENCY')
556 tot_bandwidth = tb_obj.getcol('TOTAL_BANDWIDTH')
557 spw_ids = np.arange(ref_frequencies.shape[0])
558 tb_obj.close()
560 tb_obj.open(msname + '/POLARIZATION')
561 pol_ids = tb_obj.getcol('CORR_TYPE')
562 tb_obj.close()
564 # Antenna parsing
565 ant_list = parse_antenna(user_inp_par['antenna'], ant_names)
566 refant_id = get_ant_id(user_inp_par['refant'], ant_names)
567 if refant_id not in ant_list:
568 raise RuntimeError('Reference antenna must be included in processing antennas in PCCOR')
570 # SPW parsing
571 spw_list = parse_spw(user_inp_par['spw'], spw_ids)
573 # Get scan times
574 scan_times = get_scan_times(msname)
576 sol_times = parse_solution_times_from_scan_and_timerange(user_inp_par['scan'], user_inp_par['timerange'],
577 scan_times)
579 metadata_dict = {
580 'ant_list': ant_list,
581 'spw_list': spw_list,
582 'n_spw': spw_list.shape[0],
583 'n_ant': ant_list.shape[0],
584 'antenna_names': ant_names,
585 'refant_id': refant_id,
586 'refant_key': f'ant_{ant_names[refant_id]}',
587 'correlation_axis': np.squeeze(pol_str[pol_ids]),
588 'msname': msname,
589 'user_inputs': user_inp_par,
590 'scan_times': scan_times,
591 'solution_times': sol_times,
592 'fallback_to_fringefit': user_inp_par['fallback_to_fringefit']
593 }
595 for i_spw in spw_list:
596 freq_axis = freq_axes[:, i_spw]
597 ref_freq = ref_frequencies[i_spw]
598 chanbw = chan_width[0, i_spw] # This assumes all channels are equal within an IF, reasonable?
599 closest_chan = np.argmin(np.abs(freq_axis - ref_freq))
600 ref_chan = closest_chan + (freq_axis[closest_chan] - ref_freq) / chanbw
602 spw_key = get_spw_key(i_spw)
603 metadata_dict[spw_key] = {
604 'frequency_axis': freq_axis,
605 'reference_frequency': ref_freq,
606 'reference_channel': ref_chan,
607 'total_bandwidth': tot_bandwidth[i_spw],
608 'channel_width': chanbw
609 }
610 return metadata_dict
613def get_pulse_cal_data(metadata):
614 """
615 Extract pulse cal data from the MS::PHASE_CAL table paying attention to missing data.
616 :param metadata: Parsed metadata
617 :return: Dictionary containing time resolved phases and 2 tone delays, average phase and delays at solution times
618 as well as relevant tone metadata.
619 """
620 msname = metadata['msname']
621 sol_times = metadata['solution_times']
623 try:
624 tb_obj.open(msname + '/PHASE_CAL')
625 ant_id = tb_obj.getcol('ANTENNA_ID')
626 spw_id = tb_obj.getcol('SPECTRAL_WINDOW_ID')
627 pc_times = tb_obj.getcol('TIME')
628 tone_freq = tb_obj.getcol('TONE_FREQUENCY') # [n_pol, n_tone, n_row]
629 phase_cal = tb_obj.getcol('PHASE_CAL') # [n_pol, n_tone, n_row]
630 cable_cal = tb_obj.getcol('CABLE_CAL') # [n_row]
631 tb_obj.close()
632 except RuntimeError:
633 msg = f'{msname} does not contain a PHASE_CAL subtable'
634 raise RuntimeError(msg)
636 n_pol = tone_freq.shape[0]
637 n_tones = tone_freq.shape[1]
638 pc_dict = {
639 'n_pol': n_pol,
640 'n_tones': n_tones
641 }
643 error = False
645 for i_ant in metadata['ant_list']:
646 ant_dict = {}
647 ant_key = get_ant_key(i_ant, metadata['antenna_names'])
648 ant_selection = ant_id == i_ant
649 is_refant = ant_key == metadata['refant_key']
650 for i_spw in metadata['spw_list']:
651 spw_key = get_spw_key(i_spw)
652 spw_selection = spw_id == i_spw
653 full_selection = np.logical_and(spw_selection, ant_selection)
655 n_data = np.sum(full_selection)
656 if n_data == 0 and is_refant:
657 msg = (f'No pulse cal data for {metadata["antenna_names"][i_ant]} antenna, spw {i_spw}, '
658 f'which is the reference antenna no solutions possible for this SPW')
659 casalog.post(msg, 'SEVERE')
660 error = True
661 skip = True
662 times = []
663 phases = []
664 these_tones = []
665 delays = []
666 complex_data = []
668 elif n_data == 0 and not metadata['fallback_to_fringefit']:
669 msg = (f'No pulse cal data for {metadata["antenna_names"][i_ant]} antenna, spw {i_spw}, no '
670 'solutions will be provided for this antenna and SPW')
671 casalog.post(msg, 'WARN')
673 times = []
674 phases = []
675 these_tones = []
676 delays = []
677 complex_data = []
678 skip = True
679 elif n_data == 0 and metadata['fallback_to_fringefit']:
680 msg = (f'No pulse cal data for {metadata["antenna_names"][i_ant]} antenna, spw {i_spw}, using '
681 f'fringe fit solution as a backup')
682 casalog.post(msg, 'WARN')
684 # Copying first time stamp to provide a solution time
685 times = np.full(1, pc_times[0])
686 # Filling data with blank info
687 complex_data = np.zeros((n_pol, n_tones, 1), dtype=complex)
688 these_tones = np.ones((n_pol, n_tones, 1))
689 these_tones[:, -1, 0] = 2
690 skip = False
691 else:
692 times = pc_times[full_selection]
693 complex_data = phase_cal[:, :, full_selection]
694 these_tones = tone_freq[:, :, full_selection]
695 skip = False
697 # Compute average phase and delay during solution interval
698 if skip:
699 sol_delay = np.zeros(n_pol)
700 sol_phases = np.zeros(n_pol)
701 else:
702 phases = np.angle(complex_data)
703 # assumes largest and smallest frequency are last and first tone respectively
704 delta_phase = phases[:, -1, :] - phases[:, 0, :]
705 delta_freq = these_tones[:, -1, :] - these_tones[:, 0, :]
706 delays = delta_phase / delta_freq / twopi
708 time_selection = np.logical_and(times >= sol_times['start'], times <= sol_times["stop"])
709 n_sol_data = np.sum(time_selection)
710 if n_sol_data == 0 and ant_key == metadata['refant_key']:
711 msg = (f'Missing pulse cal data during solution interval for reference antenna in spw '
712 f'{i_spw}')
713 casalog.post(msg, 'SEVERE')
714 error = True
715 elif n_sol_data == 0 and not skip:
716 if not metadata['fallback_to_fringefit']:
717 msg = (f'No pulse data available for {metadata["antenna_names"][i_ant]} antenna, spw '
718 f'{i_spw}, during selected time range for fringe fit solution, no solutions will be '
719 f'provided for this antenna and SPW')
720 casalog.post(msg, 'WARN')
721 skip = True
722 else:
723 pass
724 sol_delay = np.zeros(n_pol)
725 sol_phases = np.zeros((n_pol, 1))
726 else:
727 sol_delay = np.mean(delays[:, time_selection], axis=1)
728 sol_phases = np.angle(np.mean(complex_data[:, :, time_selection], axis=2))
730 ant_dict[spw_key] = {
731 'skip': skip,
732 'time': times, # [n_times]
733 'phase': phases, # [n_pol, n_tone, n_times]
734 'amplitude': np.absolute(complex_data), # [n_pol, n_tone, n_times]
735 'tone': these_tones, # [n_pol, n_tone, n_times]
736 'cable_cal': cable_cal[full_selection], # [n_times]
737 'delay': delays, # [n_pol, n_times]
738 'solution_phase': sol_phases, # [n_pol, n_tone]
739 'solution_delay': sol_delay # [n_pol]
740 }
741 pc_dict[ant_key] = ant_dict
743 if error:
744 msg = (f'Missing pulse cal data for the reference antenna ({metadata["refant_key"].split("_")[1]}) '
745 f'nothing to do.')
746 raise RuntimeError(msg)
747 return pc_dict
750def get_delay_and_phase_from_ff_table(ff_table, metadata, pc_data):
751 """
752 Get delay and phase solutions from a fringe fit cal table.
753 :param ff_table: Fringe fit cal table
754 :param metadata: Parsed metadata
755 :param pc_data: Dictionary containing pulse cal data
756 :return: Dictionary containing fringe fit phase and delay solutions
757 """
758 sol_times = metadata['solution_times']
759 tb_obj.open(ff_table)
760 ant1 = tb_obj.getcol('ANTENNA1')
761 ant2 = tb_obj.getcol('ANTENNA2') # should be =refant
762 times = tb_obj.getcol('TIME')
763 spw_id = tb_obj.getcol('SPECTRAL_WINDOW_ID')
764 ff_solution = tb_obj.getcol('FPARAM')
765 tb_obj.close()
766 tb_obj.open(f'{ff_table}/SPECTRAL_WINDOW')
767 freqs = tb_obj.getcol('CHAN_FREQ')[0, :] # mpc solution frequencies (1 per spw)
768 tb_obj.close()
770 ant_names = metadata["antenna_names"]
772 # Ref ant checks
773 ff_refant = np.unique(ant2)
774 if ff_refant.shape[0] > 1:
775 raise Exception('Multiple reference antennas not supported in fringe fit table')
776 ff_refant = ff_refant[0]
777 if ff_refant != metadata['refant_id']:
778 raise Exception(f'Different reference antenna between pccor ({ant_names[metadata["refant_id"]]}) and '
779 f'fringe fit data ({ant_names[ff_refant]})')
780 # Time check
781 unq_times = np.unique(times)
782 valid_times = np.logical_and(unq_times > sol_times['start'], unq_times <= sol_times["stop"])
783 if np.sum(valid_times) <= 0:
784 raise RuntimeError('Fringe fit solutions not available for specified times')
786 ff_data = {}
787 for i_ant in metadata['ant_list']:
788 ant_key = get_ant_key(i_ant, ant_names)
789 ant_dict = {}
790 for i_spw in metadata['spw_list']:
791 spw_key = get_spw_key(i_spw)
792 if pc_data[ant_key][spw_key]['skip']:
793 continue
794 selection = np.logical_and(ant1 == i_ant, spw_id == i_spw)
795 selected_time = times[selection]
796 this_solution = ff_solution[:, 0, selection]
797 if selected_time.shape[0] != 1:
798 raise Exception('Expected a single fringe fit solution for the time interval')
800 # Delay in Fringe fit table is in nanosec
801 ff_delay = 1e-9 * this_solution[1::4, 0]
802 ff_phase = this_solution[0::4, 0]
804 ant_dict[spw_key] = {
805 'frequency': freqs[i_spw], # scalar
806 'time': selected_time, # [n_time]
807 'phase': ff_phase, # [n_pol]
808 'delay': ff_delay, # [n_pol]
809 }
811 ff_data[ant_key] = ant_dict
813 return ff_data
816def get_scan_and_field_for_time(time_value, ms_scan_times):
817 """
818 Get scan and field id for a given time value
819 :param time_value: MJD time in seconds.
820 :param ms_scan_times: Dictionnary with the scans, associated time ranges and field Ids.
821 :return: Scan number and Field_id for time.
822 """
823 sel = time_value >= ms_scan_times['times'][:, 0]
824 if np.sum(sel) > 0:
825 return ms_scan_times['scans'][sel][-1], ms_scan_times['fields'][sel][-1]
826 else:
827 return np.nan, np.nan
830def get_scan_times(msname):
831 """
832 Goes through the MS building a list of time ranges and field Ids for all scans
833 :param msname: name of the MS.
834 :return: Dictionnary with the scans, associated time ranges and field Ids.
835 """
836 tb_obj.open(msname)
837 times = tb_obj.getcol('TIME')
838 scans = tb_obj.getcol('SCAN_NUMBER')
839 field_ids = tb_obj.getcol('FIELD_ID')
840 tb_obj.close()
842 scan = 0
843 ltimes = []
844 lfields = []
845 lscans = []
846 for i_vis in range(times.shape[0]):
847 if scans[i_vis] != scan:
848 scan = scans[i_vis]
849 ltimes.append(times[i_vis])
850 lfields.append(field_ids[i_vis])
851 lscans.append(scan)
853 fields = np.array(lfields)
854 unq_scans = np.array(lscans)
855 scan_times = np.zeros((unq_scans.shape[0], 2))
857 scan_times[:, 0] = ltimes
858 scan_times[0:-1, 1] = scan_times[1:, 0]
859 scan_times[-1, -1] = times[-1]
861 ms_scan_times = {
862 "scans": unq_scans,
863 "fields": np.array(fields),
864 "times": np.array(scan_times)
865 }
866 return ms_scan_times
869#########################################################
870# Actual computations
871#########################################################
872def execute_casa_fringefit(metadata, pc_data):
873 """
874 Execute CASA's fringe fit to produce a fringe fit cal table with solutions for the selected timerange
875 :param metadata: Parsed metadata
876 :param pc_data: Dictionary containing pulse cal data
877 :return: Dictionnary containing the Fringe fit solutions extracted from the fringe fit cal table.
878 """
879 def adapt_parameter_for_fringefit_call(parameter):
880 if parameter == 'none':
881 return ''
882 else:
883 return parameter
885 msname = metadata['msname']
886 user_inp = metadata['user_inputs']
887 ff_tbl_name = msname+'.mpc'
889 ff_args = (
890 msname, # msname parameter
891 ff_tbl_name, # Output cal table name
892 '', # Field
893 '', # spw
894 '', # intent
895 True, # selectdata
896 adapt_parameter_for_fringefit_call(user_inp['timerange']), # timerange
897 '', # uvrange
898 '', # antenna
899 adapt_parameter_for_fringefit_call(user_inp['scan']), # scan
900 '', # observation
901 '', # msselect
902 'inf', # solint
903 '', # combine
904 adapt_parameter_for_fringefit_call(user_inp['refant']), # refant
905 3.0, # minsnr
906 True, # zerorates
907 True, # globalsolve
908 100, # niter
909 [-1e6, 1e6], # delaywindow
910 [-1e6, 1e6], # ratewindow
911 False, # append
912 False, # corrdepflags
913 'none', # corrcomb
914 False, # docallib
915 '', # callib
916 '', # gaintable
917 '', # gainfield
918 '', # interp
919 '', # spwmap
920 [True, True, False], # paramactive
921 True, # concatspws
922 False, # parang
923 )
924 fringefit(*ff_args)
925 casalog.origin('pccor')
926 return get_delay_and_phase_from_ff_table(ff_tbl_name, metadata, pc_data)
929def align_all_pc_delays_to_ff_solution(metadata, pc_data, ff_data, cablecal_correction):
930 """
931 Call the function for aligning pulse cal derived phase and delays for all antennas and SPWs
932 :param metadata: Parse metadata
933 :param pc_data: pulse cal data dict with all antennas and spws
934 :param ff_data: Extracted fringe fit solutions dict with all antennas and spws
935 :param cablecal_correction: Apply cable cal delay to solution
936 :return: Dictionary containing solution for all antennas and spws
937 """
938 exit_dict = {}
939 refant_key = metadata['refant_key']
940 for i_ant in metadata['ant_list']:
941 ant_key = get_ant_key(i_ant, metadata['antenna_names'])
942 ant_dict = {}
943 # lowest_freq_tone = pc_data[ant_key][get_spw_key(np.min(metadata['spw_list']))]['tone'][:, 0, :]
944 for i_spw in metadata['spw_list']:
945 spw_key = get_spw_key(i_spw)
946 if not pc_data[ant_key][spw_key]['skip'] and not pc_data[refant_key][spw_key]['skip']:
947 ant_dict[spw_key] = align_one_ant_spw_to_ff_selection(pc_data[ant_key][spw_key],
948 ff_data[ant_key][spw_key],
949 metadata['scan_times'],
950 pc_data[refant_key][spw_key],
951 cablecal_correction)
952 exit_dict[ant_key] = ant_dict
953 return exit_dict
956def align_one_ant_spw_to_ff_selection(pc_data, ff_data, scan_times, refant_pc_data,
957 cablecal_correction):
958 """
959 Align the phase and delays derived from the pulse cal table to a reference fringe solution taking into account the
960 refant phases and delays for a single antenna and SPW
961 :param pc_data: Extracted pulse cal data dict for this antenna and spw
962 :param ff_data: Extracted fringe fit solutions dict for this antenna and spw
963 :param scan_times: Scan times to determine to which scan and fields solutions belong
964 :param refant_pc_data: Reference antenna pulse cal data
965 :param cablecal_correction: Apply cable cal delay to solution
966 :return: Dictionary containing solution for this antenna and SPW.
967 """
969 # This means pc_data is missing, hence we need to make the change to the times so that they reflect the time of the
970 # fringefit solution which will be the actual solution
972 if pc_data['time'].shape == tuple([1]) and pc_data['delay'][0, 0] == 0:
973 times = ff_data['time']
974 final_delay = np.zeros_like(pc_data['delay'])
975 final_phase = np.zeros_like(pc_data['delay'])
976 for i_pol in range(2):
977 final_delay[i_pol, 0] = ff_data['delay'][i_pol]
978 final_phase[i_pol, 0] = ff_data['phase'][i_pol]
980 else:
981 times = pc_data['time']
982 final_delay = (pc_data['delay'] + ff_data['delay'][:, np.newaxis] -
983 (pc_data['solution_delay'][:, np.newaxis] - refant_pc_data['solution_delay'][:, np.newaxis]))
984 if cablecal_correction:
985 addel = pc_data['cable_cal']
986 else:
987 addel = 0
988 final_delay += addel
990 final_phase = (ff_data['phase'][:, np.newaxis] + pc_data['phase'][:, 0, :] -
991 pc_data['solution_phase'][:, 0, np.newaxis])
993 final_phase = phase_wrapping(final_phase)
995 # Scans and field Ids need to be properly filled
996 scans = np.zeros_like(times)
997 field_id = np.zeros_like(times)
998 for i_time, atime in enumerate(times):
999 scans[i_time], field_id[i_time] = get_scan_and_field_for_time(atime, scan_times)
1001 out_dict = {
1002 'time': times, # [n_time]
1003 'scan': scans, # [n_time]
1004 'field_id': field_id, # [n_time]
1005 'phase': final_phase, # [n_pol, n_time]
1006 'delay': final_delay, # [n_pol, n_time]
1007 'ff_phase': phase_wrapping(ff_data['phase']), # [n_pol]
1008 'ff_delay': ff_data['delay'], # [n_pol]
1009 'pc_mean_phase': pc_data['solution_phase'], # [n_pol, n_tone]
1010 'pc_mean_delay': pc_data['solution_delay'], # [n_pol]
1011 'pc_refant_phase': refant_pc_data['solution_phase'], # [n_pol, n_tone]
1012 'pc_refant_delay': refant_pc_data['solution_delay'], # [n_pol]
1013 }
1014 return out_dict
1017def create_pccor_caltable(metadata, output_dict, cal_table_name):
1018 """
1019 Take solutions from a dictionary and save them to a fringe fit like cal table
1020 :param metadata: Parsed metadata
1021 :param output_dict: Output dictionary with all the pccor solutions
1022 :param cal_table_name: Name of the call to put the pccor solutions
1023 :return: None
1024 """
1025 cb = calibrater()
1026 cb.open(metadata['msname'], False, False, False)
1027 cb.createcaltable(cal_table_name, 'Float', 'Fringe Jones', True)
1028 cb.close()
1030 key_list = ['time', 'delay', 'phase', 'scan', 'field_id', 'antenna', 'spw_id']
1031 unordered_dict = {}
1032 for key in key_list:
1033 unordered_dict[key] = []
1035 for i_ant in metadata['ant_list']:
1036 ant_key = get_ant_key(i_ant, metadata['antenna_names'])
1037 for i_spw in metadata['spw_list']:
1038 spw_key = get_spw_key(i_spw)
1039 try:
1040 out_data = output_dict[ant_key][spw_key]
1041 for key in key_list:
1042 if key == 'antenna':
1043 unordered_dict[key].append(np.full_like(out_data['time'], i_ant, dtype=int))
1044 elif key == 'spw_id':
1045 unordered_dict[key].append(np.full_like(out_data['time'], i_spw, dtype=int))
1046 else:
1047 unordered_dict[key].append(out_data[key])
1048 except KeyError: # No data for antenna and spw combination
1049 pass
1051 for key in key_list:
1052 if unordered_dict[key][0].ndim == 2:
1053 unordered_dict[key] = np.concatenate(unordered_dict[key], axis=1)
1054 else:
1055 unordered_dict[key] = np.concatenate(unordered_dict[key])
1057 sorted_order = np.lexsort((unordered_dict['antenna'], unordered_dict['spw_id'], unordered_dict['time']))
1058 sorted_dict = {}
1059 for key, item in unordered_dict.items():
1060 if item.ndim == 2:
1061 sorted_dict[key] = item[:, sorted_order]
1062 else:
1063 sorted_dict[key] = item[sorted_order]
1065 valid_solutions = np.isfinite(sorted_dict['scan'])
1066 for key, item in sorted_dict.items():
1067 if item.ndim == 2:
1068 sorted_dict[key] = item[:, valid_solutions]
1069 else:
1070 sorted_dict[key] = item[valid_solutions]
1072 refant_id = metadata['refant_id']
1073 n_pol = 2
1074 n_times = sorted_dict['time'].shape[0]
1076 fparam = np.zeros((8, 1, n_times))
1077 for i_pol in range(n_pol):
1078 fparam[4 * i_pol, 0, :] = sorted_dict['phase'][i_pol, :]
1079 # delay in table needs to be in nano sec
1080 fparam[4 * i_pol + 1, 0, :] = sorted_dict['delay'][i_pol, :] * 1e9
1082 snr = np.full_like(fparam, 999.0)
1083 weight = np.ones_like(fparam)
1084 flags = np.zeros_like(fparam)
1086 tb_obj.open(cal_table_name, nomodify=False)
1087 tb_obj.addrows(n_times)
1088 tb_obj.putcol('TIME', sorted_dict['time'])
1089 tb_obj.putcol('SPECTRAL_WINDOW_ID', sorted_dict['spw_id'])
1090 tb_obj.putcol('ANTENNA1', sorted_dict['antenna'])
1091 tb_obj.putcol('ANTENNA2', [refant_id] * n_times)
1092 tb_obj.putcol('FIELD_ID', sorted_dict['field_id'])
1093 tb_obj.putcol('OBSERVATION_ID', [0] * n_times)
1094 tb_obj.putcol('SCAN_NUMBER', sorted_dict['scan'])
1095 tb_obj.putcol('SNR', snr)
1096 tb_obj.putcol('WEIGHT', weight)
1097 tb_obj.putcol('FLAG', flags)
1098 tb_obj.putcol('FPARAM', fparam)
1099 tb_obj.close()
1102#########################################################
1103# Main routine
1104#########################################################
1105def pccor(vis, pccor_caltable, refant, timerange, scan, spw, antenna, cablecal_correction, ff_table,
1106 fallback_to_fringefit):
1107 """
1108 Produce time resolved Pulse Cal CORrections (pccor) for phase and delay for VLBA data, at least one between
1109 timerange and scan must be defined.
1110 :param vis: MS name
1111 :param pccor_caltable: Output fringe fit like cal table
1112 :param refant: Reference antenna to be used
1113 :param timerange: Time range over which to compute fringe fit solution
1114 :param scan: Scan over which to compute fringe fit solution
1115 :param spw: Select spectral windows for which to compute pccor solutions
1116 :param antenna: Select antenns for which to compute pccor solutions
1117 :param cablecal_correction: Apply cable cal corrections?
1118 :param ff_table: (Option for experts) Use a precomputed fringe fit solution
1119 :param fallback_to_fringefit: Fallback to fringefit solution when PC data is missing
1120 :return: Output dictionary containing the fringe fit solutions
1121 """
1122 casalog.origin('pccor')
1123 metadata = parse_and_get_metadata(locals())
1124 pc_data = get_pulse_cal_data(metadata)
1126 if ff_table == 'none':
1127 # This is the route to execute the local rough fringe fit
1128 # ff_data = estimate_phase_and_delays_from_visibilities(metadata)
1129 ff_data = execute_casa_fringefit(metadata, pc_data)
1130 else:
1131 ff_data = get_delay_and_phase_from_ff_table(ff_table, metadata, pc_data)
1133 output_dict = align_all_pc_delays_to_ff_solution(metadata, pc_data, ff_data, cablecal_correction)
1135 create_pccor_caltable(metadata, output_dict, pccor_caltable)
1136 return output_dict