Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/parallel/rflag_post_proc.py: 94%

51 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import numpy as np 

2# as of python 2.5 the key parameter was added to eventually replace the cmp parameter for sorting 

3# the cmp parameter was removed in python 3 

4# this function is available in both python 2.x and 3.x to convert to something to be used with the key parameter 

5# it would probably be better if this code was reworked to use the key parameter directly 

6from functools import cmp_to_key 

7 

8from casatasks import casalog 

9 

10def is_rflag_report(item): 

11 """ 

12 Is this an item from a flagdata report dictionary? 

13 

14 :param item: an object, normally an item from a dictionary 

15 

16 :returns: whether item looks like a report from Rflag (type and name = rflag). 

17 """ 

18 return 'type' in item and item['type'] == 'rflag'\ 

19 and 'name' in item and item['name'] == 'Rflag' 

20 

21def combine_rflag_subreport(sub_dict, agg_dict): 

22 """ Produces an aggregated RFlag return dictionary by adding in a sub-report. 

23 

24 You normally call this function on a sequence of per-subMS RFlag return dictionaries 

25 to aggregate all the (sub-)reports into an overall report. Then call  

26 finalize_agg_rflag_thresholds() to calculate overall timedev/freqdev thresholds. 

27 The output from this function has the threshold vectors in a list-of-list-of-list 

28 format which needs to be finalized using finalize_agg_rflag_thresholds(). 

29 

30 Example RFlag return dictionary: 

31 {'freqdev': array([[1, 0, 3.12e-02], [1, 3, 2.19e-02], [1, 4, 2.42e-02]]), 

32 'type': 'rflag', 'name': 'Rflag', 'timedev': 

33 array([[1, 0, 7.09e-03], [1, 3, 5.43e-03], [1, 4, 7.83e-03]]) } 

34 

35 :param sub_dict: (sub-)report/dictionary returned by RFlag (from one subMS) 

36 :param agg_dict: aggregated report or dictionary to aggregate 'sub_dict' into 

37 

38 :returns: RFlag dictionary after aggregating sub_dict into agg_dict 

39 """ 

40 for key, item in sub_dict.items(): 

41 agg_dict[key] = _aggregate_rflag_item(key, item, agg_dict) 

42 

43 return agg_dict 

44 

45def _aggregate_rflag_item(key, item, ret_dict): 

46 """ 

47 Aggregates a key-item pair into ret_dict, both from RFlag return dictionaries. 

48 """ 

49 

50 def aggregate_rflag_thresholds(item, ret_item): 

51 """ 

52 RFlag produces threshold vectors (freqdev or timedev vector) as a 2D numpy 

53 array with rows: 

54 [spw_id, field_id, value] 

55 Example: 

56 array([[1, 0, 3.12e-02], [1, 3, 2.19e-02], [1, 4, 2.42e-02]]) 

57 In general there is a list of vectors like these for multiple spw_id-field_id pairs. 

58 

59 This function aggregates such list of vectors produced for different subMS. 

60 In the aggregation stage, the structure used is a list-of-list-of-list: 

61 a list with one element for every spw-field pair, holding: 

62 [spw_id, field_id, [val1, val2, val3] where val1, val2, ... are the thresholds 

63 for different subMSs. A finalize step is needed to average/median the innermost 

64 values. 

65 Using this trick (accumulate threshold values into a list) which is far from ideal 

66 but I didn't find a more simple solution given the data structure used for the rflag 

67 reports (a list of dictionaries structured as a dictionary). 

68 

69 :param item: an RFlag list of threshold vectors to aggregate 

70 :param ret_item: an RFlag threshold list-of-list-of-list to aggregate into 

71 

72 :returns: The result of aggregating item into ret_item 

73 """ 

74 import numpy as np 

75 

76 def eq_id(row_a, row_b): 

77 return row_a[0] == row_b[0] and row_a[1] == row_b[1] 

78 

79 if type(ret_item) is np.ndarray: 

80 ret_item = ret_item.tolist() 

81 # Init as list to add sub-reports 

82 for idx in range(len(ret_item)): 

83 ret_item[idx][2] = [ret_item[idx][2]] 

84 

85 # Find a place for every row of the sub report to be added 

86 for idx_in in range(item.shape[0]): 

87 found_idx = False 

88 for ret_idx in range(len(ret_item)): 

89 if eq_id(item[idx_in], ret_item[ret_idx]): 

90 found_idx = True 

91 ret_item[ret_idx][2].append(item[idx_in, 2]) 

92 break 

93 if not found_idx: 

94 ret_item.append([item[idx_in, 0], item[idx_in, 1], [item[idx_in, 2]]]) 

95 

96 return ret_item 

97 

98 if key in ret_dict: 

99 ret_item = ret_dict[key] 

100 if not isinstance(ret_item, str): 

101 # must be either 'freqdev' or 'timedev' 

102 ret_dict[key] = aggregate_rflag_thresholds(item, ret_item) 

103 else: 

104 ret_dict[key] = item 

105 

106 return ret_dict[key] 

107 

108 

109def finalize_agg_rflag_thresholds(rflag_dict): 

110 """ 

111 For the thresholds included in an RFlag return dictionary (timedev and freqdev): 

112 build a 2D numpy array from a list of lists of lists, calculating a median of  

113 thresholds throughout sub-MSs 

114 

115 :param rflag_dict: RFlag dictionary with the un-finalized list-of-list-of-list 

116 structure produced by combine_rflag_subreport(). 

117 

118 :returns: the dictionary finalized, that is, with the per-subMS thresholds 

119 combined, currently using the median of the subMS values. 

120 """ 

121 

122 def spw_field_comp(x, y): 

123 """ 

124 Comparator function to sort by (spw_id, field_id) pairs from the first and 

125 second coords of RFlag threshold vectors (example): 

126 [1, 0, 3.12e-02] < [1, 3, 2.19e-02] 

127 [1, 2, 3.12e-02] < [2, 0, 2.19e-02] 

128 """ 

129 if x[0] < y[0] or (x[0] == y[0] and x[1] < y[1]): 

130 return -1 

131 elif x[0] > y[0] or (x[0] == y[0] and x[1] > y[1]): 

132 return 1 

133 else: 

134 return 0 

135 

136 for key, val in rflag_dict.items(): 

137 if not isinstance(val, str): 

138 # If the list was empty, we need a dummy (0,3)-shaped array 

139 if 0 == len(val): 

140 rflag_dict[key] = np.empty(shape=[0,3]) 

141 continue 

142 

143 # Choosing median for now. This is an open question from CAS-10202. 

144 for idx in range(len(val)): 

145 val[idx] = [val[idx][0], val[idx][1], np.median(val[idx][2])] 

146 # Sort to match better what is produced when not using parallelization 

147 val = sorted(val, key=cmp_to_key(spw_field_comp)) 

148 rflag_dict[key] = np.array(val) 

149 

150 return rflag_dict