Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/parallel/rflag_post_proc.py: 94%
51 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import numpy as np
2# as of python 2.5 the key parameter was added to eventually replace the cmp parameter for sorting
3# the cmp parameter was removed in python 3
4# this function is available in both python 2.x and 3.x to convert to something to be used with the key parameter
5# it would probably be better if this code was reworked to use the key parameter directly
6from functools import cmp_to_key
8from casatasks import casalog
10def is_rflag_report(item):
11 """
12 Is this an item from a flagdata report dictionary?
14 :param item: an object, normally an item from a dictionary
16 :returns: whether item looks like a report from Rflag (type and name = rflag).
17 """
18 return 'type' in item and item['type'] == 'rflag'\
19 and 'name' in item and item['name'] == 'Rflag'
21def combine_rflag_subreport(sub_dict, agg_dict):
22 """ Produces an aggregated RFlag return dictionary by adding in a sub-report.
24 You normally call this function on a sequence of per-subMS RFlag return dictionaries
25 to aggregate all the (sub-)reports into an overall report. Then call
26 finalize_agg_rflag_thresholds() to calculate overall timedev/freqdev thresholds.
27 The output from this function has the threshold vectors in a list-of-list-of-list
28 format which needs to be finalized using finalize_agg_rflag_thresholds().
30 Example RFlag return dictionary:
31 {'freqdev': array([[1, 0, 3.12e-02], [1, 3, 2.19e-02], [1, 4, 2.42e-02]]),
32 'type': 'rflag', 'name': 'Rflag', 'timedev':
33 array([[1, 0, 7.09e-03], [1, 3, 5.43e-03], [1, 4, 7.83e-03]]) }
35 :param sub_dict: (sub-)report/dictionary returned by RFlag (from one subMS)
36 :param agg_dict: aggregated report or dictionary to aggregate 'sub_dict' into
38 :returns: RFlag dictionary after aggregating sub_dict into agg_dict
39 """
40 for key, item in sub_dict.items():
41 agg_dict[key] = _aggregate_rflag_item(key, item, agg_dict)
43 return agg_dict
45def _aggregate_rflag_item(key, item, ret_dict):
46 """
47 Aggregates a key-item pair into ret_dict, both from RFlag return dictionaries.
48 """
50 def aggregate_rflag_thresholds(item, ret_item):
51 """
52 RFlag produces threshold vectors (freqdev or timedev vector) as a 2D numpy
53 array with rows:
54 [spw_id, field_id, value]
55 Example:
56 array([[1, 0, 3.12e-02], [1, 3, 2.19e-02], [1, 4, 2.42e-02]])
57 In general there is a list of vectors like these for multiple spw_id-field_id pairs.
59 This function aggregates such list of vectors produced for different subMS.
60 In the aggregation stage, the structure used is a list-of-list-of-list:
61 a list with one element for every spw-field pair, holding:
62 [spw_id, field_id, [val1, val2, val3] where val1, val2, ... are the thresholds
63 for different subMSs. A finalize step is needed to average/median the innermost
64 values.
65 Using this trick (accumulate threshold values into a list) which is far from ideal
66 but I didn't find a more simple solution given the data structure used for the rflag
67 reports (a list of dictionaries structured as a dictionary).
69 :param item: an RFlag list of threshold vectors to aggregate
70 :param ret_item: an RFlag threshold list-of-list-of-list to aggregate into
72 :returns: The result of aggregating item into ret_item
73 """
74 import numpy as np
76 def eq_id(row_a, row_b):
77 return row_a[0] == row_b[0] and row_a[1] == row_b[1]
79 if type(ret_item) is np.ndarray:
80 ret_item = ret_item.tolist()
81 # Init as list to add sub-reports
82 for idx in range(len(ret_item)):
83 ret_item[idx][2] = [ret_item[idx][2]]
85 # Find a place for every row of the sub report to be added
86 for idx_in in range(item.shape[0]):
87 found_idx = False
88 for ret_idx in range(len(ret_item)):
89 if eq_id(item[idx_in], ret_item[ret_idx]):
90 found_idx = True
91 ret_item[ret_idx][2].append(item[idx_in, 2])
92 break
93 if not found_idx:
94 ret_item.append([item[idx_in, 0], item[idx_in, 1], [item[idx_in, 2]]])
96 return ret_item
98 if key in ret_dict:
99 ret_item = ret_dict[key]
100 if not isinstance(ret_item, str):
101 # must be either 'freqdev' or 'timedev'
102 ret_dict[key] = aggregate_rflag_thresholds(item, ret_item)
103 else:
104 ret_dict[key] = item
106 return ret_dict[key]
109def finalize_agg_rflag_thresholds(rflag_dict):
110 """
111 For the thresholds included in an RFlag return dictionary (timedev and freqdev):
112 build a 2D numpy array from a list of lists of lists, calculating a median of
113 thresholds throughout sub-MSs
115 :param rflag_dict: RFlag dictionary with the un-finalized list-of-list-of-list
116 structure produced by combine_rflag_subreport().
118 :returns: the dictionary finalized, that is, with the per-subMS thresholds
119 combined, currently using the median of the subMS values.
120 """
122 def spw_field_comp(x, y):
123 """
124 Comparator function to sort by (spw_id, field_id) pairs from the first and
125 second coords of RFlag threshold vectors (example):
126 [1, 0, 3.12e-02] < [1, 3, 2.19e-02]
127 [1, 2, 3.12e-02] < [2, 0, 2.19e-02]
128 """
129 if x[0] < y[0] or (x[0] == y[0] and x[1] < y[1]):
130 return -1
131 elif x[0] > y[0] or (x[0] == y[0] and x[1] > y[1]):
132 return 1
133 else:
134 return 0
136 for key, val in rflag_dict.items():
137 if not isinstance(val, str):
138 # If the list was empty, we need a dummy (0,3)-shaped array
139 if 0 == len(val):
140 rflag_dict[key] = np.empty(shape=[0,3])
141 continue
143 # Choosing median for now. This is an open question from CAS-10202.
144 for idx in range(len(val)):
145 val[idx] = [val[idx][0], val[idx][1], np.median(val[idx][2])]
146 # Sort to match better what is produced when not using parallelization
147 val = sorted(val, key=cmp_to_key(spw_field_comp))
148 rflag_dict[key] = np.array(val)
150 return rflag_dict