Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/imagerhelpers/summary_minor.py: 16%
128 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
1import copy
2import numpy as np
3import os
5# from casatasks import casalog
7class SummaryMinor:
8 """Gathers the information together from the tclean return value in a way that makes it easier to query for exactly what you want.
10 The structure for this nested dictionary is:
11 {
12 multi-field id: {
13 channel id: {
14 stokes id: {
15 summary key: {
16 cycle: value
17 }
18 }
19 }
20 }
21 }
23 Examples:
25 1. To get the number of available channels, and the ids of those channels:
26 nchans = len(summ_min[0].keys())
27 avail_chans = summ_min[0].keys()
29 2. To get the number of iterations done on the main field, fifth channel, first stokes plane, during the middle minor cycle:
30 field0 = summ_min[0] # field 0 is the main field
31 chan = field0.keys()[5] # channel index doesn't necessarily start at 0
32 stoke = field0[chan].keys()[0] # stokes index doesn't necessarily start at 0
33 ncycles = len(field0[chan][stoke]['iterDone'])
34 itersDone = field0[chan][stoke]['iterDone'][ncycles/2]
36 3. To get the available minor cycle summary statistics:
37 field0 = summ_min[0]
38 chan0 = field0.keys()[0]
39 stoke0 = field0[chan0].keys()[0]
40 availSummStats = field0[field0][stoke0].keys()
41 """
42 # 0 1 2 3 4 5 6 7 8 9 10 11 "No Mask" 12 13 14 15 16
43 rowDescriptionsOldOrder = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan", "stoke", "cycleStartIters", "startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM", "peakResNM", "masksum", "mpiServer", "multifieldId", "stopCode"]
44 #rowDescriptions13683 = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan"]
45 rowDescriptions13683 = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan", "stoke"]
46 # rowDescriptions does not include {"multifieldId", "chan", "stoke", "deconId"}, and so the returned dictionary will not include those values in the summary keys
47 rowDescriptions = ["startIterDone", "iterDone", "startPeakRes", "peakRes", "startModelFlux", "modelFlux", "startPeakResNM", "peakResNM", "cycleThresh", "cycleStartIters", "masksum", "mpiServer", "stopCode"]
48 rowStartDescs = ["startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM"]
50 def convertMatrix(summaryminor_matrix, fullsummary, calc_iterdone_deltas=None, keep_startvals=None):
51 # casalog.post(summaryminor_matrix, "SEVERE")
52 ret = {}
54 # edge case: no iterations were done (eg threshold < model flux)
55 if summaryminor_matrix.shape[1] == 0:
56 return { 0: {} }
58 # get individual dictionaries for each field id
59 field_ids = SummaryMinor._getFieldIds(summaryminor_matrix, fullsummary)
60 if len(field_ids) > 1:
61 for fieldId in field_ids:
62 singleFieldMatrix = SummaryMinor._getSingleFieldMatrix(summaryminor_matrix, field_ids[fieldId], fullsummary)
63 ret[fieldId] = SummaryMinor._convertSingleFieldMatrix(singleFieldMatrix, fullsummary, calc_iterdone_deltas, keep_startvals)
64 elif len(field_ids) == 1:
65 ret[field_ids[0]] = SummaryMinor._convertSingleFieldMatrix(summaryminor_matrix, fullsummary, calc_iterdone_deltas, keep_startvals)
66 else:
67 raise RuntimeError("No multifield ids were found. Failed to parse summary minor matrix after tclean finished running.")
69 return ret
71 def _convertSingleFieldMatrix(single_field_matrix, fullsummary, calc_iterdone_deltas=None, keep_startvals=None):
72 # edge case: no iterations were done (eg threshold < model flux)
73 if single_field_matrix.shape[1] == 0:
74 return {}
76 summaryminor_dict = SummaryMinor.indexMinorCycleSummaryBySubimage(single_field_matrix, fullsummary)
77 percycleiters_dict = SummaryMinor._getPerCycleDict(copy.deepcopy(summaryminor_dict), fullsummary, calc_iterdone_deltas, keep_startvals)
78 return percycleiters_dict
80 def _getFieldIds(matrix, fullsummary):
81 """ Get a sorted list of available outlier field ids in the given matrix """
83 # edge case: running with MPI and CAS-13683 hasn't been fixed yet
84 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary)
85 if (fullsummary and not "multifieldId" in availRows) or (not fullsummary and not "deconId" in availRows):
86 return [0] # can't differentiate multiple fields from available data, assume one field
88 if fullsummary:
89 multifieldIdx = availRows.index("multifieldId")
90 else:
91 multifieldIdx = availRows.index("deconId")
92 nrows = matrix.shape[0]
93 ncols = matrix.shape[1]
94 fieldIds = sorted(np.unique(matrix[multifieldIdx,:]).tolist())
95 fieldIds = list(map(lambda x: int(x), fieldIds))
96 return fieldIds
98 def _getSingleFieldMatrix(matrixIn, fieldId, fullsummary):
99 """ Create a new matrix to hold all the values of the given matrix, but only for the given outlier field id """
100 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary)
101 #if not "multifieldId" in availRows:
102 if (fullsummary and not "multifieldId" in availRows) or (not fullsummary and not "deconId" in availRows):
103 return matrixIn
104 #multifieldIdx = availRows.index("multifieldId")
105 multifieldIdx = availRows.index("multifieldId") if fullsummary else availRows.index("deconId")
106 nrowsIn = matrixIn.shape[0]
107 ncolsIn = matrixIn.shape[1]
108 nrowsOut = nrowsIn
109 ncolsOut = matrixIn[multifieldIdx,:].tolist().count(fieldId)
111 matrixOut = np.zeros((nrowsOut, ncolsOut))
112 colOut = 0
113 maxColOut = 0
114 maxRowOut = 0
115 for colIn in range(ncolsIn):
116 if matrixIn[multifieldIdx,colIn] != fieldId:
117 continue
118 for rowIn in range(nrowsIn):
119 rowOut = rowIn
120 matrixOut[rowOut,colOut] = matrixIn[rowIn,colIn]
121 maxRowOut = max(rowOut, maxRowOut)
122 maxColOut = colOut
123 colOut += 1
125 return matrixOut
127 #def useSmallSummaryminor(ignored_parameter=None):
128 # """Temporary CAS-13683 workaround"""
129 # if ('USE_SMALL_SUMMARYMINOR' in os.environ):
130 # uss = os.environ['USE_SMALL_SUMMARYMINOR'].lower()
131 # if (uss == "true"):
132 # return True
133 # return False
135 #def _getRowDescriptionsOldOrder(useSmallSummaryminor):
136 def _getRowDescriptionsOldOrder(fullsummary):
137 """Temporary CAS-13683 workaround"""
138 #if (useSmallSummaryminor):
139 if (not fullsummary):
140 return SummaryMinor.rowDescriptions13683
141 return SummaryMinor.rowDescriptionsOldOrder
143 #def getRowDescriptionsOldOrder():
144 def getRowDescriptionsOldOrder(fullsummary):
145 """ Retrieves brief descriptions of the available minor cycle summary rows, in the old (matrix) order. """
146 #return SummaryMinor._getRowDescriptionsOldOrder(SummaryMinor.useSmallSummaryminor())
147 return SummaryMinor._getRowDescriptionsOldOrder(fullsummary)
149 #def _getRowDescriptions(useSmallSummaryminor):
150 def _getRowDescriptions(fullsummary):
151 """Temporary CAS-13683 workaround"""
152 ret = SummaryMinor.rowDescriptions
153 #availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor)
154 availRows = SummaryMinor._getRowDescriptionsOldOrder(fullsummary)
155 ret = list(filter(lambda x: x in availRows, ret))
156 return ret
158 #def getRowDescriptions():
159 def getRowDescriptions(fullsummary):
160 """ Retrieves brief descriptions of the available minor cycle summary rows """
161 #return SummaryMinor._getRowDescriptions(SummaryMinor.useSmallSummaryminor())
162 return SummaryMinor._getRowDescriptions(fullsummary)
164 #def _getRowStartDescs(useSmallSummaryminor):
165 def _getRowStartDescs(fullsummary):
166 """Temporary CAS-13683 workaround"""
167 ret = SummaryMinor.rowStartDescs
168 #availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor)
169 availRows = SummaryMinor._getRowDescriptionsOldOrder(fullsummary)
170 ret = list(filter(lambda x: x in availRows, ret))
171 return ret
173 #def getRowStartDescs():
174 def getRowStartDescs(fullsummary):
175 """ Retrieves abreviated names of the available minor cycle summary "start" rows.
177 These are the rows that catalog the values at the beggining of a minor cycle (pre-deconvolution). """
178 #return SummaryMinor._getRowStartDescs(SummaryMinor.useSmallSummaryminor())
179 return SummaryMinor._getRowStartDescs(fullsummary)
181 def indexMinorCycleSummaryBySubimage(matrix,fullsummary):
182 """ Re-indexes matrix from [row,column] to [channel,stokes,row,cycle].
184 Param matrix: the original matrix to convert.
185 """
186 # get some properties of the summary_minor matrix
187 nrows = matrix.shape[0]
188 ncols = matrix.shape[1]
189 #uss = SummaryMinor.useSmallSummaryminor() # Temporary CAS-13683 workaround
190 import sys
191 oldChanIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index("chan")
192 oldStokeIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index("stoke")
193 chans = list(np.sort(np.unique(matrix[oldChanIdx])))
194 chans = [int(x) for x in chans]
195 stokes = list(np.sort(np.unique(matrix[oldStokeIdx])))
196 stokes = [int(x) for x in stokes]
197 ncycles = 0
198 if len(chans) > 0 and len(stokes) > 0:
199 ncycles = int( ncols / (len(chans)*len(stokes)) )
200 #if uss:
201 if not fullsummary:
202 try:
203 from casampi.MPIEnvironment import MPIEnvironment
204 mpi_available = True
205 except ModuleNotFoundError:
206 mpi_available = False
208 if mpi_available and MPIEnvironment.is_mpi_enabled:
209 # This is necessary because we may have an odd number of "channels" due to each process getting only a subchunk.
210 # Example:
211 # Process 1 gets stokes 0-1, process 2 gets stokes 2
212 # Each of them assigns channel id = chan + stoke * nsubstokes
213 # Process 1 assigns channel ids [0,2], Process 2 assigns channel id 0.
214 # This hack is not needed when not using a small summary minor because we have the extra knowledge of the stokes, instead of mapping stokes + channels onto chunks.
215 #chanslist = matrix[oldChanIdx].tolist()
216 #for chan in chans:
217 # singlechan_occurances = list(filter(lambda x: x == chan, chanslist))
218 # ncycles = max(ncycles, len(singlechan_occurances))
219 pass
221 # ret is the return dictionary[chans][stokes][rows][cycles]
222 # cummulativeCnt counts how many cols we've read for each channel/stokes/row
223 #ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions()}
224 ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions(fullsummary)}
225 ret = {stoke:copy.deepcopy(ret) for stoke in stokes}
226 ret = {chan:copy.deepcopy(ret) for chan in chans}
227 cummulativeCnt = copy.deepcopy(ret) # copy ret's structure
228 # reindex based on subimage index (aka chan/stoke index)
229 #for desc in SummaryMinor.getRowDescriptions():
230 for desc in SummaryMinor.getRowDescriptions(fullsummary):
231 #oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder().index(desc)
232 oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index(desc)
233 for colIdx in range(ncols):
234 chan = int(matrix[oldChanIdx][colIdx])
235 stoke = int(matrix[oldStokeIdx][colIdx])
236 val = matrix[oldRowIdx][colIdx]
237 cummulativeCol = int(cummulativeCnt[chan][stoke][desc][0]) # const 0: cummulativeCnt doesn't make use of 'cycle' index from copied ret structure
238 ret[chan][stoke][desc][cummulativeCol] = val
239 #print('ret[{}][{}][{}][{}] = {}'.format(chan,stoke,desc,cummulativeCol,ret[chan][stoke][desc][cummulativeCol]))
240 cummulativeCnt[chan][stoke][desc][0] += 1
241 #print('cummulativeCnt now==', cummulativeCnt)
243 return ret
245 def _getPerCycleDict(summaryminor_dict, fullsummary, calc_iterdone_deltas=None, keep_startvals=None):
246 calc_iterdone_deltas = True if (calc_iterdone_deltas == None) else calc_iterdone_deltas
247 keep_startvals = True if (keep_startvals == None) else keep_startvals
248 ret = summaryminor_dict
249 #availRows = SummaryMinor.getRowDescriptionsOldOrder()
250 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary)
252 # This block is not needed as summary_minor iterDone is stored as non-cumulative
253 #if (calc_iterdone_deltas) and ("startIterDone" in availRows):
254 # for chan in ret:
255 # for stoke in ret[chan]:
256 # for cyc in range(len(ret[chan][stoke]["startIterDone"])):
257 # ret[chan][stoke]["iterDone"][cyc] -= ret[chan][stoke]["startIterDone"][cyc]
258 if not keep_startvals:
259 for chan in ret:
260 for stoke in ret[chan]:
261 for desc in SummaryMinor.getRowStartDescs(fullsummary):
262 del ret[chan][stoke][desc]
264 return ret