Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/imagerhelpers/summary_minor.py: 86%
136 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import copy
2import numpy as np
3import os
5# from casatasks import casalog
7class SummaryMinor:
8 """Gathers the information together from the tclean return value in a way that makes it easier to query for exactly what you want.
10 The structure for this nested dictionary is:
11 {
12 multi-field id: {
13 channel id: {
14 stokes id: {
15 summary key: {
16 cycle: value
17 }
18 }
19 }
20 }
21 }
23 Examples:
25 1. To get the number of available channels, and the ids of those channels:
26 nchans = len(summ_min[0].keys())
27 avail_chans = summ_min[0].keys()
29 2. To get the number of iterations done on the main field, fifth channel, first stokes plane, during the middle minor cycle:
30 field0 = summ_min[0] # field 0 is the main field
31 chan = field0.keys()[5] # channel index doesn't necessarily start at 0
32 stoke = field0[chan].keys()[0] # stokes index doesn't necessarily start at 0
33 ncycles = len(field0[chan][stoke]['iterDone'])
34 itersDone = field0[chan][stoke]['iterDone'][ncycles/2]
36 3. To get the available minor cycle summary statistics:
37 field0 = summ_min[0]
38 chan0 = field0.keys()[0]
39 stoke0 = field0[chan0].keys()[0]
40 availSummStats = field0[field0][stoke0].keys()
41 """
42 # 0 1 2 3 4 5 6 7 8 9 10 11 "No Mask" 12 13 14 15 16
43 rowDescriptionsOldOrder = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan", "stoke", "cycleStartIters", "startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM", "peakResNM", "masksum", "mpiServer", "multifieldId", "stopCode"]
44 rowDescriptions13683 = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan"]
45 # rowDescriptions does not include {"multifieldId", "chan", "stoke", "deconId"}, and so the returned dictionary will not include those values in the summary keys
46 rowDescriptions = ["startIterDone", "iterDone", "startPeakRes", "peakRes", "startModelFlux", "modelFlux", "startPeakResNM", "peakResNM", "cycleThresh", "cycleStartIters", "masksum", "mpiServer", "stopCode"]
47 rowStartDescs = ["startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM"]
49 def convertMatrix(summaryminor_matrix, fullsummary, calc_iterdone_deltas=None, keep_startvals=None):
50 # casalog.post(summaryminor_matrix, "SEVERE")
51 ret = {}
53 # edge case: no iterations were done (eg threshold < model flux)
54 if summaryminor_matrix.shape[1] == 0:
55 return { 0: {} }
57 # get individual dictionaries for each field id
58 field_ids = SummaryMinor._getFieldIds(summaryminor_matrix, fullsummary)
59 if len(field_ids) > 1:
60 for fieldId in field_ids:
61 singleFieldMatrix = SummaryMinor._getSingleFieldMatrix(summaryminor_matrix, field_ids[fieldId], fullsummary)
62 ret[fieldId] = SummaryMinor._convertSingleFieldMatrix(singleFieldMatrix, fullsummary, calc_iterdone_deltas, keep_startvals)
63 elif len(field_ids) == 1:
64 ret[field_ids[0]] = SummaryMinor._convertSingleFieldMatrix(summaryminor_matrix, fullsummary, calc_iterdone_deltas, keep_startvals)
65 else:
66 raise RuntimeError("No multifield ids were found. Failed to parse summary minor matrix after tclean finished running.")
68 return ret
70 def _convertSingleFieldMatrix(single_field_matrix, fullsummary, calc_iterdone_deltas=None, keep_startvals=None):
71 # edge case: no iterations were done (eg threshold < model flux)
72 if single_field_matrix.shape[1] == 0:
73 return {}
75 summaryminor_dict = SummaryMinor.indexMinorCycleSummaryBySubimage(single_field_matrix, fullsummary)
76 percycleiters_dict = SummaryMinor._getPerCycleDict(copy.deepcopy(summaryminor_dict), fullsummary, calc_iterdone_deltas, keep_startvals)
77 return percycleiters_dict
79 def _getFieldIds(matrix, fullsummary):
80 """ Get a sorted list of available outlier field ids in the given matrix """
82 # edge case: running with MPI and CAS-13683 hasn't been fixed yet
83 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary)
84 if (fullsummary and not "multifieldId" in availRows) or (not fullsummary and not "deconId" in availRows):
85 return [0] # can't differentiate multiple fields from available data, assume one field
87 if fullsummary:
88 multifieldIdx = availRows.index("multifieldId")
89 else:
90 multifieldIdx = availRows.index("deconId")
91 nrows = matrix.shape[0]
92 ncols = matrix.shape[1]
93 fieldIds = sorted(np.unique(matrix[multifieldIdx,:]).tolist())
94 fieldIds = list(map(lambda x: int(x), fieldIds))
95 return fieldIds
97 def _getSingleFieldMatrix(matrixIn, fieldId, fullsummary):
98 """ Create a new matrix to hold all the values of the given matrix, but only for the given outlier field id """
99 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary)
100 #if not "multifieldId" in availRows:
101 if (fullsummary and not "multifieldId" in availRows) or (not fullsummary and not "deconId" in availRows):
102 return matrixIn
103 #multifieldIdx = availRows.index("multifieldId")
104 multifieldIdx = availRows.index("multifieldId") if fullsummary else availRows.index("deconId")
105 nrowsIn = matrixIn.shape[0]
106 ncolsIn = matrixIn.shape[1]
107 nrowsOut = nrowsIn
108 ncolsOut = matrixIn[multifieldIdx,:].tolist().count(fieldId)
110 matrixOut = np.zeros((nrowsOut, ncolsOut))
111 colOut = 0
112 maxColOut = 0
113 maxRowOut = 0
114 for colIn in range(ncolsIn):
115 if matrixIn[multifieldIdx,colIn] != fieldId:
116 continue
117 for rowIn in range(nrowsIn):
118 rowOut = rowIn
119 matrixOut[rowOut,colOut] = matrixIn[rowIn,colIn]
120 maxRowOut = max(rowOut, maxRowOut)
121 maxColOut = colOut
122 colOut += 1
124 return matrixOut
126 #def useSmallSummaryminor(ignored_parameter=None):
127 # """Temporary CAS-13683 workaround"""
128 # if ('USE_SMALL_SUMMARYMINOR' in os.environ):
129 # uss = os.environ['USE_SMALL_SUMMARYMINOR'].lower()
130 # if (uss == "true"):
131 # return True
132 # return False
134 #def _getRowDescriptionsOldOrder(useSmallSummaryminor):
135 def _getRowDescriptionsOldOrder(fullsummary):
136 """Temporary CAS-13683 workaround"""
137 #if (useSmallSummaryminor):
138 if (not fullsummary):
139 return SummaryMinor.rowDescriptions13683
140 return SummaryMinor.rowDescriptionsOldOrder
142 #def getRowDescriptionsOldOrder():
143 def getRowDescriptionsOldOrder(fullsummary):
144 """ Retrieves brief descriptions of the available minor cycle summary rows, in the old (matrix) order. """
145 #return SummaryMinor._getRowDescriptionsOldOrder(SummaryMinor.useSmallSummaryminor())
146 return SummaryMinor._getRowDescriptionsOldOrder(fullsummary)
148 #def _getRowDescriptions(useSmallSummaryminor):
149 def _getRowDescriptions(fullsummary):
150 """Temporary CAS-13683 workaround"""
151 ret = SummaryMinor.rowDescriptions
152 #availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor)
153 availRows = SummaryMinor._getRowDescriptionsOldOrder(fullsummary)
154 ret = list(filter(lambda x: x in availRows, ret))
155 return ret
157 #def getRowDescriptions():
158 def getRowDescriptions(fullsummary):
159 """ Retrieves brief descriptions of the available minor cycle summary rows """
160 #return SummaryMinor._getRowDescriptions(SummaryMinor.useSmallSummaryminor())
161 return SummaryMinor._getRowDescriptions(fullsummary)
163 #def _getRowStartDescs(useSmallSummaryminor):
164 def _getRowStartDescs(fullsummary):
165 """Temporary CAS-13683 workaround"""
166 ret = SummaryMinor.rowStartDescs
167 #availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor)
168 availRows = SummaryMinor._getRowDescriptionsOldOrder(fullsummary)
169 ret = list(filter(lambda x: x in availRows, ret))
170 return ret
172 #def getRowStartDescs():
173 def getRowStartDescs(fullsummary):
174 """ Retrieves abreviated names of the available minor cycle summary "start" rows.
176 These are the rows that catalog the values at the beggining of a minor cycle (pre-deconvolution). """
177 #return SummaryMinor._getRowStartDescs(SummaryMinor.useSmallSummaryminor())
178 return SummaryMinor._getRowStartDescs(fullsummary)
180 def indexMinorCycleSummaryBySubimage(matrix,fullsummary):
181 """ Re-indexes matrix from [row,column] to [channel,stokes,row,cycle].
183 Param matrix: the original matrix to convert.
184 """
185 # get some properties of the summary_minor matrix
186 nrows = matrix.shape[0]
187 ncols = matrix.shape[1]
188 #uss = SummaryMinor.useSmallSummaryminor() # Temporary CAS-13683 workaround
189 import sys
190 oldChanIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index("chan")
191 #if not uss:
192 if fullsummary:
193 oldStokeIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index("stoke")
194 chans = list(np.sort(np.unique(matrix[oldChanIdx])))
195 chans = [int(x) for x in chans]
196 #if uss:
197 if not fullsummary:
198 stokes = [0]
199 else:
200 stokes = list(np.sort(np.unique(matrix[oldStokeIdx])))
201 stokes = [int(x) for x in stokes]
202 ncycles = 0
203 if len(chans) > 0 and len(stokes) > 0:
204 ncycles = int( ncols / (len(chans)*len(stokes)) )
205 #if uss:
206 if not fullsummary:
207 try:
208 from casampi.MPIEnvironment import MPIEnvironment
209 mpi_available = True
210 except ModuleNotFoundError:
211 mpi_available = False
213 if mpi_available and MPIEnvironment.is_mpi_enabled:
214 # This is necessary because we may have an odd number of "channels" due to each process getting only a subchunk.
215 # Example:
216 # Process 1 gets stokes 0-1, process 2 gets stokes 2
217 # Each of them assigns channel id = chan + stoke * nsubstokes
218 # Process 1 assigns channel ids [0,2], Process 2 assigns channel id 0.
219 # This hack is not needed when not using a small summary minor because we have the extra knowledge of the stokes, instead of mapping stokes + channels onto chunks.
220 chanslist = matrix[oldChanIdx].tolist()
221 for chan in chans:
222 singlechan_occurances = list(filter(lambda x: x == chan, chanslist))
223 ncycles = max(ncycles, len(singlechan_occurances))
225 # ret is the return dictionary[chans][stokes][rows][cycles]
226 # cummulativeCnt counts how many cols we've read for each channel/stokes/row
227 #ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions()}
228 ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions(fullsummary)}
229 ret = {stoke:copy.deepcopy(ret) for stoke in stokes}
230 ret = {chan:copy.deepcopy(ret) for chan in chans}
231 cummulativeCnt = copy.deepcopy(ret) # copy ret's structure
232 #print('ncycle=', ncycles)
233 #print('ret2=',ret)
234 #print('cummCnt=',cummulativeCnt)
235 # reindex based on subimage index (aka chan/stoke index)
236 #for desc in SummaryMinor.getRowDescriptions():
237 for desc in SummaryMinor.getRowDescriptions(fullsummary):
238 #oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder().index(desc)
239 oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index(desc)
240 for colIdx in range(ncols):
241 chan = int(matrix[oldChanIdx][colIdx])
242 #if uss:
243 if not fullsummary:
244 stoke = 0
245 else:
246 stoke = int(matrix[oldStokeIdx][colIdx])
247 val = matrix[oldRowIdx][colIdx]
248 cummulativeCol = int(cummulativeCnt[chan][stoke][desc][0]) # const 0: cummulativeCnt doesn't make use of 'cycle' index from copied ret structure
249 ret[chan][stoke][desc][cummulativeCol] = val
250 #print('ret[{}][{}][{}][{}] = {}'.format(chan,stoke,desc,cummulativeCol,ret[chan][stoke][desc][cummulativeCol]))
251 cummulativeCnt[chan][stoke][desc][0] += 1
252 #print('cummulativeCnt now==', cummulativeCnt)
254 return ret
256 def _getPerCycleDict(summaryminor_dict, fullsummary, calc_iterdone_deltas=None, keep_startvals=None):
257 calc_iterdone_deltas = True if (calc_iterdone_deltas == None) else calc_iterdone_deltas
258 keep_startvals = True if (keep_startvals == None) else keep_startvals
259 ret = summaryminor_dict
260 #availRows = SummaryMinor.getRowDescriptionsOldOrder()
261 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary)
263 # This block is not needed as summary_minor iterDone is stored as non-cumulative
264 #if (calc_iterdone_deltas) and ("startIterDone" in availRows):
265 # for chan in ret:
266 # for stoke in ret[chan]:
267 # for cyc in range(len(ret[chan][stoke]["startIterDone"])):
268 # ret[chan][stoke]["iterDone"][cyc] -= ret[chan][stoke]["startIterDone"][cyc]
269 if not keep_startvals:
270 for chan in ret:
271 for stoke in ret[chan]:
272 for desc in SummaryMinor.getRowStartDescs(fullsummary):
273 del ret[chan][stoke][desc]
275 return ret