Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/imagerhelpers/summary_minor.py: 16%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-21 07:43 +0000

1import copy 

2import numpy as np 

3import os 

4 

5# from casatasks import casalog 

6 

7class SummaryMinor: 

8 """Gathers the information together from the tclean return value in a way that makes it easier to query for exactly what you want. 

9 

10 The structure for this nested dictionary is: 

11 { 

12 multi-field id: { 

13 channel id: { 

14 stokes id: { 

15 summary key: { 

16 cycle: value 

17 } 

18 } 

19 } 

20 } 

21 } 

22 

23 Examples: 

24  

25 1. To get the number of available channels, and the ids of those channels: 

26 nchans = len(summ_min[0].keys()) 

27 avail_chans = summ_min[0].keys() 

28  

29 2. To get the number of iterations done on the main field, fifth channel, first stokes plane, during the middle minor cycle: 

30 field0 = summ_min[0] # field 0 is the main field 

31 chan = field0.keys()[5] # channel index doesn't necessarily start at 0 

32 stoke = field0[chan].keys()[0] # stokes index doesn't necessarily start at 0 

33 ncycles = len(field0[chan][stoke]['iterDone']) 

34 itersDone = field0[chan][stoke]['iterDone'][ncycles/2] 

35 

36 3. To get the available minor cycle summary statistics: 

37 field0 = summ_min[0] 

38 chan0 = field0.keys()[0] 

39 stoke0 = field0[chan0].keys()[0] 

40 availSummStats = field0[field0][stoke0].keys() 

41 """ 

42 # 0 1 2 3 4 5 6 7 8 9 10 11 "No Mask" 12 13 14 15 16  

43 rowDescriptionsOldOrder = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan", "stoke", "cycleStartIters", "startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM", "peakResNM", "masksum", "mpiServer", "multifieldId", "stopCode"] 

44 #rowDescriptions13683 = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan"] 

45 rowDescriptions13683 = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan", "stoke"] 

46 # rowDescriptions does not include {"multifieldId", "chan", "stoke", "deconId"}, and so the returned dictionary will not include those values in the summary keys 

47 rowDescriptions = ["startIterDone", "iterDone", "startPeakRes", "peakRes", "startModelFlux", "modelFlux", "startPeakResNM", "peakResNM", "cycleThresh", "cycleStartIters", "masksum", "mpiServer", "stopCode"] 

48 rowStartDescs = ["startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM"] 

49 

50 def convertMatrix(summaryminor_matrix, fullsummary, calc_iterdone_deltas=None, keep_startvals=None): 

51 # casalog.post(summaryminor_matrix, "SEVERE") 

52 ret = {} 

53 

54 # edge case: no iterations were done (eg threshold < model flux) 

55 if summaryminor_matrix.shape[1] == 0: 

56 return { 0: {} } 

57 

58 # get individual dictionaries for each field id 

59 field_ids = SummaryMinor._getFieldIds(summaryminor_matrix, fullsummary) 

60 if len(field_ids) > 1: 

61 for fieldId in field_ids: 

62 singleFieldMatrix = SummaryMinor._getSingleFieldMatrix(summaryminor_matrix, field_ids[fieldId], fullsummary) 

63 ret[fieldId] = SummaryMinor._convertSingleFieldMatrix(singleFieldMatrix, fullsummary, calc_iterdone_deltas, keep_startvals) 

64 elif len(field_ids) == 1: 

65 ret[field_ids[0]] = SummaryMinor._convertSingleFieldMatrix(summaryminor_matrix, fullsummary, calc_iterdone_deltas, keep_startvals) 

66 else: 

67 raise RuntimeError("No multifield ids were found. Failed to parse summary minor matrix after tclean finished running.") 

68 

69 return ret 

70 

71 def _convertSingleFieldMatrix(single_field_matrix, fullsummary, calc_iterdone_deltas=None, keep_startvals=None): 

72 # edge case: no iterations were done (eg threshold < model flux) 

73 if single_field_matrix.shape[1] == 0: 

74 return {} 

75 

76 summaryminor_dict = SummaryMinor.indexMinorCycleSummaryBySubimage(single_field_matrix, fullsummary) 

77 percycleiters_dict = SummaryMinor._getPerCycleDict(copy.deepcopy(summaryminor_dict), fullsummary, calc_iterdone_deltas, keep_startvals) 

78 return percycleiters_dict 

79 

80 def _getFieldIds(matrix, fullsummary): 

81 """ Get a sorted list of available outlier field ids in the given matrix """ 

82 

83 # edge case: running with MPI and CAS-13683 hasn't been fixed yet 

84 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary) 

85 if (fullsummary and not "multifieldId" in availRows) or (not fullsummary and not "deconId" in availRows): 

86 return [0] # can't differentiate multiple fields from available data, assume one field 

87 

88 if fullsummary: 

89 multifieldIdx = availRows.index("multifieldId") 

90 else: 

91 multifieldIdx = availRows.index("deconId") 

92 nrows = matrix.shape[0] 

93 ncols = matrix.shape[1] 

94 fieldIds = sorted(np.unique(matrix[multifieldIdx,:]).tolist()) 

95 fieldIds = list(map(lambda x: int(x), fieldIds)) 

96 return fieldIds 

97 

98 def _getSingleFieldMatrix(matrixIn, fieldId, fullsummary): 

99 """ Create a new matrix to hold all the values of the given matrix, but only for the given outlier field id """ 

100 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary) 

101 #if not "multifieldId" in availRows: 

102 if (fullsummary and not "multifieldId" in availRows) or (not fullsummary and not "deconId" in availRows): 

103 return matrixIn 

104 #multifieldIdx = availRows.index("multifieldId") 

105 multifieldIdx = availRows.index("multifieldId") if fullsummary else availRows.index("deconId") 

106 nrowsIn = matrixIn.shape[0] 

107 ncolsIn = matrixIn.shape[1] 

108 nrowsOut = nrowsIn 

109 ncolsOut = matrixIn[multifieldIdx,:].tolist().count(fieldId) 

110 

111 matrixOut = np.zeros((nrowsOut, ncolsOut)) 

112 colOut = 0 

113 maxColOut = 0 

114 maxRowOut = 0 

115 for colIn in range(ncolsIn): 

116 if matrixIn[multifieldIdx,colIn] != fieldId: 

117 continue 

118 for rowIn in range(nrowsIn): 

119 rowOut = rowIn 

120 matrixOut[rowOut,colOut] = matrixIn[rowIn,colIn] 

121 maxRowOut = max(rowOut, maxRowOut) 

122 maxColOut = colOut 

123 colOut += 1 

124 

125 return matrixOut 

126 

127 #def useSmallSummaryminor(ignored_parameter=None): 

128 # """Temporary CAS-13683 workaround""" 

129 # if ('USE_SMALL_SUMMARYMINOR' in os.environ): 

130 # uss = os.environ['USE_SMALL_SUMMARYMINOR'].lower() 

131 # if (uss == "true"): 

132 # return True 

133 # return False 

134 

135 #def _getRowDescriptionsOldOrder(useSmallSummaryminor): 

136 def _getRowDescriptionsOldOrder(fullsummary): 

137 """Temporary CAS-13683 workaround""" 

138 #if (useSmallSummaryminor): 

139 if (not fullsummary): 

140 return SummaryMinor.rowDescriptions13683 

141 return SummaryMinor.rowDescriptionsOldOrder 

142 

143 #def getRowDescriptionsOldOrder(): 

144 def getRowDescriptionsOldOrder(fullsummary): 

145 """ Retrieves brief descriptions of the available minor cycle summary rows, in the old (matrix) order. """ 

146 #return SummaryMinor._getRowDescriptionsOldOrder(SummaryMinor.useSmallSummaryminor()) 

147 return SummaryMinor._getRowDescriptionsOldOrder(fullsummary) 

148 

149 #def _getRowDescriptions(useSmallSummaryminor): 

150 def _getRowDescriptions(fullsummary): 

151 """Temporary CAS-13683 workaround""" 

152 ret = SummaryMinor.rowDescriptions 

153 #availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor) 

154 availRows = SummaryMinor._getRowDescriptionsOldOrder(fullsummary) 

155 ret = list(filter(lambda x: x in availRows, ret)) 

156 return ret 

157 

158 #def getRowDescriptions(): 

159 def getRowDescriptions(fullsummary): 

160 """ Retrieves brief descriptions of the available minor cycle summary rows """ 

161 #return SummaryMinor._getRowDescriptions(SummaryMinor.useSmallSummaryminor()) 

162 return SummaryMinor._getRowDescriptions(fullsummary) 

163 

164 #def _getRowStartDescs(useSmallSummaryminor): 

165 def _getRowStartDescs(fullsummary): 

166 """Temporary CAS-13683 workaround""" 

167 ret = SummaryMinor.rowStartDescs 

168 #availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor) 

169 availRows = SummaryMinor._getRowDescriptionsOldOrder(fullsummary) 

170 ret = list(filter(lambda x: x in availRows, ret)) 

171 return ret 

172 

173 #def getRowStartDescs(): 

174 def getRowStartDescs(fullsummary): 

175 """ Retrieves abreviated names of the available minor cycle summary "start" rows. 

176 

177 These are the rows that catalog the values at the beggining of a minor cycle (pre-deconvolution). """ 

178 #return SummaryMinor._getRowStartDescs(SummaryMinor.useSmallSummaryminor()) 

179 return SummaryMinor._getRowStartDescs(fullsummary) 

180 

181 def indexMinorCycleSummaryBySubimage(matrix,fullsummary): 

182 """ Re-indexes matrix from [row,column] to [channel,stokes,row,cycle].  

183 

184 Param matrix: the original matrix to convert. 

185 """ 

186 # get some properties of the summary_minor matrix 

187 nrows = matrix.shape[0] 

188 ncols = matrix.shape[1] 

189 #uss = SummaryMinor.useSmallSummaryminor() # Temporary CAS-13683 workaround 

190 import sys 

191 oldChanIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index("chan") 

192 oldStokeIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index("stoke") 

193 chans = list(np.sort(np.unique(matrix[oldChanIdx]))) 

194 chans = [int(x) for x in chans] 

195 stokes = list(np.sort(np.unique(matrix[oldStokeIdx]))) 

196 stokes = [int(x) for x in stokes] 

197 ncycles = 0 

198 if len(chans) > 0 and len(stokes) > 0: 

199 ncycles = int( ncols / (len(chans)*len(stokes)) ) 

200 #if uss: 

201 if not fullsummary: 

202 try: 

203 from casampi.MPIEnvironment import MPIEnvironment 

204 mpi_available = True 

205 except ModuleNotFoundError: 

206 mpi_available = False 

207 

208 if mpi_available and MPIEnvironment.is_mpi_enabled: 

209 # This is necessary because we may have an odd number of "channels" due to each process getting only a subchunk. 

210 # Example: 

211 # Process 1 gets stokes 0-1, process 2 gets stokes 2 

212 # Each of them assigns channel id = chan + stoke * nsubstokes 

213 # Process 1 assigns channel ids [0,2], Process 2 assigns channel id 0. 

214 # This hack is not needed when not using a small summary minor because we have the extra knowledge of the stokes, instead of mapping stokes + channels onto chunks. 

215 #chanslist = matrix[oldChanIdx].tolist() 

216 #for chan in chans: 

217 # singlechan_occurances = list(filter(lambda x: x == chan, chanslist)) 

218 # ncycles = max(ncycles, len(singlechan_occurances)) 

219 pass 

220 

221 # ret is the return dictionary[chans][stokes][rows][cycles] 

222 # cummulativeCnt counts how many cols we've read for each channel/stokes/row 

223 #ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions()} 

224 ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions(fullsummary)} 

225 ret = {stoke:copy.deepcopy(ret) for stoke in stokes} 

226 ret = {chan:copy.deepcopy(ret) for chan in chans} 

227 cummulativeCnt = copy.deepcopy(ret) # copy ret's structure 

228 # reindex based on subimage index (aka chan/stoke index) 

229 #for desc in SummaryMinor.getRowDescriptions(): 

230 for desc in SummaryMinor.getRowDescriptions(fullsummary): 

231 #oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder().index(desc) 

232 oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index(desc) 

233 for colIdx in range(ncols): 

234 chan = int(matrix[oldChanIdx][colIdx]) 

235 stoke = int(matrix[oldStokeIdx][colIdx]) 

236 val = matrix[oldRowIdx][colIdx] 

237 cummulativeCol = int(cummulativeCnt[chan][stoke][desc][0]) # const 0: cummulativeCnt doesn't make use of 'cycle' index from copied ret structure 

238 ret[chan][stoke][desc][cummulativeCol] = val 

239 #print('ret[{}][{}][{}][{}] = {}'.format(chan,stoke,desc,cummulativeCol,ret[chan][stoke][desc][cummulativeCol]))  

240 cummulativeCnt[chan][stoke][desc][0] += 1 

241 #print('cummulativeCnt now==', cummulativeCnt) 

242 

243 return ret 

244 

245 def _getPerCycleDict(summaryminor_dict, fullsummary, calc_iterdone_deltas=None, keep_startvals=None): 

246 calc_iterdone_deltas = True if (calc_iterdone_deltas == None) else calc_iterdone_deltas 

247 keep_startvals = True if (keep_startvals == None) else keep_startvals 

248 ret = summaryminor_dict 

249 #availRows = SummaryMinor.getRowDescriptionsOldOrder() 

250 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary) 

251 

252 # This block is not needed as summary_minor iterDone is stored as non-cumulative 

253 #if (calc_iterdone_deltas) and ("startIterDone" in availRows): 

254 # for chan in ret: 

255 # for stoke in ret[chan]: 

256 # for cyc in range(len(ret[chan][stoke]["startIterDone"])): 

257 # ret[chan][stoke]["iterDone"][cyc] -= ret[chan][stoke]["startIterDone"][cyc] 

258 if not keep_startvals: 

259 for chan in ret: 

260 for stoke in ret[chan]: 

261 for desc in SummaryMinor.getRowStartDescs(fullsummary): 

262 del ret[chan][stoke][desc] 

263 

264 return ret 

265