Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/imagerhelpers/summary_minor.py: 86%

136 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import copy 

2import numpy as np 

3import os 

4 

5# from casatasks import casalog 

6 

7class SummaryMinor: 

8 """Gathers the information together from the tclean return value in a way that makes it easier to query for exactly what you want. 

9 

10 The structure for this nested dictionary is: 

11 { 

12 multi-field id: { 

13 channel id: { 

14 stokes id: { 

15 summary key: { 

16 cycle: value 

17 } 

18 } 

19 } 

20 } 

21 } 

22 

23 Examples: 

24  

25 1. To get the number of available channels, and the ids of those channels: 

26 nchans = len(summ_min[0].keys()) 

27 avail_chans = summ_min[0].keys() 

28  

29 2. To get the number of iterations done on the main field, fifth channel, first stokes plane, during the middle minor cycle: 

30 field0 = summ_min[0] # field 0 is the main field 

31 chan = field0.keys()[5] # channel index doesn't necessarily start at 0 

32 stoke = field0[chan].keys()[0] # stokes index doesn't necessarily start at 0 

33 ncycles = len(field0[chan][stoke]['iterDone']) 

34 itersDone = field0[chan][stoke]['iterDone'][ncycles/2] 

35 

36 3. To get the available minor cycle summary statistics: 

37 field0 = summ_min[0] 

38 chan0 = field0.keys()[0] 

39 stoke0 = field0[chan0].keys()[0] 

40 availSummStats = field0[field0][stoke0].keys() 

41 """ 

42 # 0 1 2 3 4 5 6 7 8 9 10 11 "No Mask" 12 13 14 15 16  

43 rowDescriptionsOldOrder = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan", "stoke", "cycleStartIters", "startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM", "peakResNM", "masksum", "mpiServer", "multifieldId", "stopCode"] 

44 rowDescriptions13683 = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan"] 

45 # rowDescriptions does not include {"multifieldId", "chan", "stoke", "deconId"}, and so the returned dictionary will not include those values in the summary keys 

46 rowDescriptions = ["startIterDone", "iterDone", "startPeakRes", "peakRes", "startModelFlux", "modelFlux", "startPeakResNM", "peakResNM", "cycleThresh", "cycleStartIters", "masksum", "mpiServer", "stopCode"] 

47 rowStartDescs = ["startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM"] 

48 

49 def convertMatrix(summaryminor_matrix, fullsummary, calc_iterdone_deltas=None, keep_startvals=None): 

50 # casalog.post(summaryminor_matrix, "SEVERE") 

51 ret = {} 

52 

53 # edge case: no iterations were done (eg threshold < model flux) 

54 if summaryminor_matrix.shape[1] == 0: 

55 return { 0: {} } 

56 

57 # get individual dictionaries for each field id 

58 field_ids = SummaryMinor._getFieldIds(summaryminor_matrix, fullsummary) 

59 if len(field_ids) > 1: 

60 for fieldId in field_ids: 

61 singleFieldMatrix = SummaryMinor._getSingleFieldMatrix(summaryminor_matrix, field_ids[fieldId], fullsummary) 

62 ret[fieldId] = SummaryMinor._convertSingleFieldMatrix(singleFieldMatrix, fullsummary, calc_iterdone_deltas, keep_startvals) 

63 elif len(field_ids) == 1: 

64 ret[field_ids[0]] = SummaryMinor._convertSingleFieldMatrix(summaryminor_matrix, fullsummary, calc_iterdone_deltas, keep_startvals) 

65 else: 

66 raise RuntimeError("No multifield ids were found. Failed to parse summary minor matrix after tclean finished running.") 

67 

68 return ret 

69 

70 def _convertSingleFieldMatrix(single_field_matrix, fullsummary, calc_iterdone_deltas=None, keep_startvals=None): 

71 # edge case: no iterations were done (eg threshold < model flux) 

72 if single_field_matrix.shape[1] == 0: 

73 return {} 

74 

75 summaryminor_dict = SummaryMinor.indexMinorCycleSummaryBySubimage(single_field_matrix, fullsummary) 

76 percycleiters_dict = SummaryMinor._getPerCycleDict(copy.deepcopy(summaryminor_dict), fullsummary, calc_iterdone_deltas, keep_startvals) 

77 return percycleiters_dict 

78 

79 def _getFieldIds(matrix, fullsummary): 

80 """ Get a sorted list of available outlier field ids in the given matrix """ 

81 

82 # edge case: running with MPI and CAS-13683 hasn't been fixed yet 

83 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary) 

84 if (fullsummary and not "multifieldId" in availRows) or (not fullsummary and not "deconId" in availRows): 

85 return [0] # can't differentiate multiple fields from available data, assume one field 

86 

87 if fullsummary: 

88 multifieldIdx = availRows.index("multifieldId") 

89 else: 

90 multifieldIdx = availRows.index("deconId") 

91 nrows = matrix.shape[0] 

92 ncols = matrix.shape[1] 

93 fieldIds = sorted(np.unique(matrix[multifieldIdx,:]).tolist()) 

94 fieldIds = list(map(lambda x: int(x), fieldIds)) 

95 return fieldIds 

96 

97 def _getSingleFieldMatrix(matrixIn, fieldId, fullsummary): 

98 """ Create a new matrix to hold all the values of the given matrix, but only for the given outlier field id """ 

99 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary) 

100 #if not "multifieldId" in availRows: 

101 if (fullsummary and not "multifieldId" in availRows) or (not fullsummary and not "deconId" in availRows): 

102 return matrixIn 

103 #multifieldIdx = availRows.index("multifieldId") 

104 multifieldIdx = availRows.index("multifieldId") if fullsummary else availRows.index("deconId") 

105 nrowsIn = matrixIn.shape[0] 

106 ncolsIn = matrixIn.shape[1] 

107 nrowsOut = nrowsIn 

108 ncolsOut = matrixIn[multifieldIdx,:].tolist().count(fieldId) 

109 

110 matrixOut = np.zeros((nrowsOut, ncolsOut)) 

111 colOut = 0 

112 maxColOut = 0 

113 maxRowOut = 0 

114 for colIn in range(ncolsIn): 

115 if matrixIn[multifieldIdx,colIn] != fieldId: 

116 continue 

117 for rowIn in range(nrowsIn): 

118 rowOut = rowIn 

119 matrixOut[rowOut,colOut] = matrixIn[rowIn,colIn] 

120 maxRowOut = max(rowOut, maxRowOut) 

121 maxColOut = colOut 

122 colOut += 1 

123 

124 return matrixOut 

125 

126 #def useSmallSummaryminor(ignored_parameter=None): 

127 # """Temporary CAS-13683 workaround""" 

128 # if ('USE_SMALL_SUMMARYMINOR' in os.environ): 

129 # uss = os.environ['USE_SMALL_SUMMARYMINOR'].lower() 

130 # if (uss == "true"): 

131 # return True 

132 # return False 

133 

134 #def _getRowDescriptionsOldOrder(useSmallSummaryminor): 

135 def _getRowDescriptionsOldOrder(fullsummary): 

136 """Temporary CAS-13683 workaround""" 

137 #if (useSmallSummaryminor): 

138 if (not fullsummary): 

139 return SummaryMinor.rowDescriptions13683 

140 return SummaryMinor.rowDescriptionsOldOrder 

141 

142 #def getRowDescriptionsOldOrder(): 

143 def getRowDescriptionsOldOrder(fullsummary): 

144 """ Retrieves brief descriptions of the available minor cycle summary rows, in the old (matrix) order. """ 

145 #return SummaryMinor._getRowDescriptionsOldOrder(SummaryMinor.useSmallSummaryminor()) 

146 return SummaryMinor._getRowDescriptionsOldOrder(fullsummary) 

147 

148 #def _getRowDescriptions(useSmallSummaryminor): 

149 def _getRowDescriptions(fullsummary): 

150 """Temporary CAS-13683 workaround""" 

151 ret = SummaryMinor.rowDescriptions 

152 #availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor) 

153 availRows = SummaryMinor._getRowDescriptionsOldOrder(fullsummary) 

154 ret = list(filter(lambda x: x in availRows, ret)) 

155 return ret 

156 

157 #def getRowDescriptions(): 

158 def getRowDescriptions(fullsummary): 

159 """ Retrieves brief descriptions of the available minor cycle summary rows """ 

160 #return SummaryMinor._getRowDescriptions(SummaryMinor.useSmallSummaryminor()) 

161 return SummaryMinor._getRowDescriptions(fullsummary) 

162 

163 #def _getRowStartDescs(useSmallSummaryminor): 

164 def _getRowStartDescs(fullsummary): 

165 """Temporary CAS-13683 workaround""" 

166 ret = SummaryMinor.rowStartDescs 

167 #availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor) 

168 availRows = SummaryMinor._getRowDescriptionsOldOrder(fullsummary) 

169 ret = list(filter(lambda x: x in availRows, ret)) 

170 return ret 

171 

172 #def getRowStartDescs(): 

173 def getRowStartDescs(fullsummary): 

174 """ Retrieves abreviated names of the available minor cycle summary "start" rows. 

175 

176 These are the rows that catalog the values at the beggining of a minor cycle (pre-deconvolution). """ 

177 #return SummaryMinor._getRowStartDescs(SummaryMinor.useSmallSummaryminor()) 

178 return SummaryMinor._getRowStartDescs(fullsummary) 

179 

180 def indexMinorCycleSummaryBySubimage(matrix,fullsummary): 

181 """ Re-indexes matrix from [row,column] to [channel,stokes,row,cycle].  

182 

183 Param matrix: the original matrix to convert. 

184 """ 

185 # get some properties of the summary_minor matrix 

186 nrows = matrix.shape[0] 

187 ncols = matrix.shape[1] 

188 #uss = SummaryMinor.useSmallSummaryminor() # Temporary CAS-13683 workaround 

189 import sys 

190 oldChanIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index("chan") 

191 #if not uss: 

192 if fullsummary: 

193 oldStokeIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index("stoke") 

194 chans = list(np.sort(np.unique(matrix[oldChanIdx]))) 

195 chans = [int(x) for x in chans] 

196 #if uss: 

197 if not fullsummary: 

198 stokes = [0] 

199 else: 

200 stokes = list(np.sort(np.unique(matrix[oldStokeIdx]))) 

201 stokes = [int(x) for x in stokes] 

202 ncycles = 0 

203 if len(chans) > 0 and len(stokes) > 0: 

204 ncycles = int( ncols / (len(chans)*len(stokes)) ) 

205 #if uss: 

206 if not fullsummary: 

207 try: 

208 from casampi.MPIEnvironment import MPIEnvironment 

209 mpi_available = True 

210 except ModuleNotFoundError: 

211 mpi_available = False 

212 

213 if mpi_available and MPIEnvironment.is_mpi_enabled: 

214 # This is necessary because we may have an odd number of "channels" due to each process getting only a subchunk. 

215 # Example: 

216 # Process 1 gets stokes 0-1, process 2 gets stokes 2 

217 # Each of them assigns channel id = chan + stoke * nsubstokes 

218 # Process 1 assigns channel ids [0,2], Process 2 assigns channel id 0. 

219 # This hack is not needed when not using a small summary minor because we have the extra knowledge of the stokes, instead of mapping stokes + channels onto chunks. 

220 chanslist = matrix[oldChanIdx].tolist() 

221 for chan in chans: 

222 singlechan_occurances = list(filter(lambda x: x == chan, chanslist)) 

223 ncycles = max(ncycles, len(singlechan_occurances)) 

224 

225 # ret is the return dictionary[chans][stokes][rows][cycles] 

226 # cummulativeCnt counts how many cols we've read for each channel/stokes/row 

227 #ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions()} 

228 ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions(fullsummary)} 

229 ret = {stoke:copy.deepcopy(ret) for stoke in stokes} 

230 ret = {chan:copy.deepcopy(ret) for chan in chans} 

231 cummulativeCnt = copy.deepcopy(ret) # copy ret's structure 

232 #print('ncycle=', ncycles) 

233 #print('ret2=',ret) 

234 #print('cummCnt=',cummulativeCnt) 

235 # reindex based on subimage index (aka chan/stoke index) 

236 #for desc in SummaryMinor.getRowDescriptions(): 

237 for desc in SummaryMinor.getRowDescriptions(fullsummary): 

238 #oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder().index(desc) 

239 oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder(fullsummary).index(desc) 

240 for colIdx in range(ncols): 

241 chan = int(matrix[oldChanIdx][colIdx]) 

242 #if uss: 

243 if not fullsummary: 

244 stoke = 0 

245 else: 

246 stoke = int(matrix[oldStokeIdx][colIdx]) 

247 val = matrix[oldRowIdx][colIdx] 

248 cummulativeCol = int(cummulativeCnt[chan][stoke][desc][0]) # const 0: cummulativeCnt doesn't make use of 'cycle' index from copied ret structure 

249 ret[chan][stoke][desc][cummulativeCol] = val 

250 #print('ret[{}][{}][{}][{}] = {}'.format(chan,stoke,desc,cummulativeCol,ret[chan][stoke][desc][cummulativeCol]))  

251 cummulativeCnt[chan][stoke][desc][0] += 1 

252 #print('cummulativeCnt now==', cummulativeCnt) 

253 

254 return ret 

255 

256 def _getPerCycleDict(summaryminor_dict, fullsummary, calc_iterdone_deltas=None, keep_startvals=None): 

257 calc_iterdone_deltas = True if (calc_iterdone_deltas == None) else calc_iterdone_deltas 

258 keep_startvals = True if (keep_startvals == None) else keep_startvals 

259 ret = summaryminor_dict 

260 #availRows = SummaryMinor.getRowDescriptionsOldOrder() 

261 availRows = SummaryMinor.getRowDescriptionsOldOrder(fullsummary) 

262 

263 # This block is not needed as summary_minor iterDone is stored as non-cumulative 

264 #if (calc_iterdone_deltas) and ("startIterDone" in availRows): 

265 # for chan in ret: 

266 # for stoke in ret[chan]: 

267 # for cyc in range(len(ret[chan][stoke]["startIterDone"])): 

268 # ret[chan][stoke]["iterDone"][cyc] -= ret[chan][stoke]["startIterDone"][cyc] 

269 if not keep_startvals: 

270 for chan in ret: 

271 for stoke in ret[chan]: 

272 for desc in SummaryMinor.getRowStartDescs(fullsummary): 

273 del ret[chan][stoke][desc] 

274 

275 return ret 

276