Line data Source code
1 : //# CASA - Common Astronomy Software Applications (http://casa.nrao.edu/)
2 : //# Copyright (C) Associated Universities, Inc. Washington DC, USA 2011, All
3 : //# rights reserved.
4 : //# Copyright (C) European Southern Observatory, 2011, All rights reserved.
5 : //#
6 : //# This library is free software; you can redistribute it and/or
7 : //# modify it under the terms of the GNU Lesser General Public
8 : //# License as published by the Free software Foundation; either
9 : //# version 2.1 of the License, or (at your option) any later version.
10 : //#
11 : //# This library is distributed in the hope that it will be useful,
12 : //# but WITHOUT ANY WARRANTY, without even the implied warranty of
13 : //# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : //# Lesser General Public License for more details.
15 : //#
16 : //# You should have received a copy of the GNU Lesser General Public
17 : //# License along with this library; if not, write to the Free Software
18 : //# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19 : //# MA 02111-1307 USA
20 :
21 : #include <mstransform/TVI/StatWtFloatingWindowDataAggregator.h>
22 :
23 : #include <casacore/scimath/StatsFramework/ClassicalStatistics.h>
24 :
25 : #include <msvis/MSVis/ViImplementation2.h>
26 : #include <mstransform/TVI/StatWtTypes.h>
27 :
28 : #ifdef _OPENMP
29 : #include <omp.h>
30 : #endif
31 :
32 : using namespace casacore;
33 : using namespace std;
34 :
35 : namespace casa {
36 :
37 : namespace vi {
38 :
39 0 : StatWtFloatingWindowDataAggregator::StatWtFloatingWindowDataAggregator(
40 : ViImplementation2 *const vii,
41 : const std::map<casacore::Int, std::vector<StatWtTypes::ChanBin>>& chanBins,
42 : std::shared_ptr<map<uInt, pair<uInt, uInt>>>& samples,
43 : StatWtTypes::Column column, Bool noModel,
44 : const map<uInt, Cube<Bool>>& chanSelFlags, Bool combineCorr,
45 : shared_ptr<
46 : ClassicalStatistics<
47 : Double, Array<Float>::const_iterator,
48 : Array<Bool>::const_iterator
49 : >
50 : >& wtStats,
51 : shared_ptr<const pair<Double, Double>> wtrange,
52 : shared_ptr<const Double> binWidthInSeconds,
53 : shared_ptr<const Int> nTimeStampsInBin, const Bool timeBlockProcessing,
54 : shared_ptr<
55 : StatisticsAlgorithm<
56 : Double, Array<Float>::const_iterator,
57 : Array<Bool>::const_iterator, Array<Double>::const_iterator
58 : >
59 : >& statAlg, Int minSamp
60 0 : ) : StatWtDataAggregator(
61 : vii, chanBins, samples, column, noModel, chanSelFlags, /* mustComputeWtSp,*/
62 : wtStats, wtrange, combineCorr, statAlg, minSamp
63 0 : ), _binWidthInSeconds(binWidthInSeconds),
64 0 : _nTimeStampsInBin(nTimeStampsInBin),
65 0 : _timeBlockProcessing(timeBlockProcessing) {
66 0 : ThrowIf(
67 : ! (_binWidthInSeconds || _nTimeStampsInBin ),
68 : "Logic error: neither binWidthInSeconds "
69 : "nor nTimeStampsInBin has been specified"
70 : );
71 0 : }
72 :
73 0 : StatWtFloatingWindowDataAggregator::~StatWtFloatingWindowDataAggregator() {}
74 :
75 0 : void StatWtFloatingWindowDataAggregator::aggregate() {
76 0 : auto* vb = _vii->getVisBuffer();
77 : // map of rowID (the index of the vector) to rowIDs that should be used to
78 : // compute the weight for the key rowID
79 0 : std::vector<std::set<uInt>> rowMap;
80 0 : auto firstTime = True;
81 : // each subchunk is guaranteed to represent exactly one time stamp
82 0 : std::vector<Double> subChunkToTimeStamp;
83 : // Baseline-subchunk number pair to row index in that chunk
84 0 : std::map<std::pair<StatWtTypes::Baseline, uInt>, uInt> baselineSubChunkToIndex;
85 : // we build up the chunk data in the chunkData and chunkFlags cubes
86 0 : Cube<Complex> chunkData;
87 0 : Cube<Bool> chunkFlags;
88 0 : std::vector<Double> exposures;
89 0 : rownr_t subchunkStartRowNum = 0;
90 0 : auto initChanSelTemplate = True;
91 0 : Cube<Bool> chanSelFlagTemplate, chanSelFlags;
92 : // we cannot know the spw until inside the subchunk loop
93 0 : Int spw = -1;
94 0 : _rowIDInMSToRowIndexInChunk.clear();
95 0 : Slicer sl(IPosition(3, 0), IPosition(3, 1));
96 0 : auto slStart = sl.start();
97 0 : auto slEnd = sl.end();
98 0 : std::vector<std::pair<uInt, uInt>> idToChunksNeededByIDMap,
99 0 : chunkNeededToIDsThatNeedChunkIDMap;
100 0 : _limits(idToChunksNeededByIDMap, chunkNeededToIDsThatNeedChunkIDMap);
101 0 : uInt subChunkID = 0;
102 0 : for (_vii->origin(); _vii->more(); _vii->next(), ++subChunkID) {
103 0 : if (_checkFirstSubChunk(spw, firstTime, vb)) {
104 0 : return;
105 : }
106 0 : if (! _mustComputeWtSp) {
107 0 : _mustComputeWtSp.reset(
108 : new Bool(
109 0 : vb->existsColumn(VisBufferComponent2::WeightSpectrum)
110 0 : )
111 : );
112 : }
113 0 : _rowIDInMSToRowIndexInChunk[*vb->rowIds().begin()] = subchunkStartRowNum;
114 0 : const auto& ant1 = vb->antenna1();
115 0 : const auto& ant2 = vb->antenna2();
116 : // [nCorrs, nFreqs, nRows)
117 0 : const auto nrows = vb->nRows();
118 : // there is no guarantee a previous subchunk will be included,
119 : // eg if the timewidth is small enough
120 : // This is the first subchunk ID that should be used for averaging
121 : // grouping data for weight computation of the current subchunk ID.
122 : const auto firstChunkNeededByCurrentID
123 0 : = idToChunksNeededByIDMap[subChunkID].first;
124 : const auto lastChunkNeededByCurrentID
125 0 : = idToChunksNeededByIDMap[subChunkID].second;
126 : const auto firstChunkThatNeedsCurrentID
127 0 : = chunkNeededToIDsThatNeedChunkIDMap[subChunkID].first;
128 0 : auto subchunkTime = vb->time()[0];
129 0 : auto rowInChunk = subchunkStartRowNum;
130 0 : pair<StatWtTypes::Baseline, uInt> mypair;
131 0 : mypair.second = subChunkID;
132 0 : for (rownr_t row=0; row<nrows; ++row, ++rowInChunk) {
133 : // loop over rows in sub chunk, grouping baseline specific data
134 : // together
135 0 : const auto baseline = _baseline(ant1[row], ant2[row]);
136 0 : mypair.first = baseline;
137 0 : baselineSubChunkToIndex[mypair] = rowInChunk;
138 0 : std::set<uInt> neededRowNums;
139 0 : neededRowNums.insert(rowInChunk);
140 0 : if (subChunkID > 0) {
141 0 : auto s = min(
142 : firstChunkNeededByCurrentID, firstChunkThatNeedsCurrentID
143 : );
144 0 : auto tpair = mypair;
145 0 : for (; s < subChunkID; ++s) {
146 0 : const auto myend = baselineSubChunkToIndex.end();
147 0 : tpair.second = s;
148 0 : const auto iter = baselineSubChunkToIndex.find(tpair);
149 0 : auto found = iter != myend;
150 0 : if (found) {
151 0 : const auto existingRowNum = iter->second;
152 0 : if (
153 : s >= firstChunkNeededByCurrentID
154 0 : && s <= lastChunkNeededByCurrentID
155 : ) {
156 : // The subchunk data is needed for computation
157 : // of the current subchunkID's weights
158 0 : neededRowNums.insert(existingRowNum);
159 : }
160 0 : if (
161 0 : idToChunksNeededByIDMap[s].first <= subChunkID
162 0 : && idToChunksNeededByIDMap[s].second >= subChunkID
163 : ) {
164 0 : rowMap[existingRowNum].insert(rowInChunk);
165 : }
166 : }
167 : }
168 : }
169 0 : rowMap.push_back(neededRowNums);
170 0 : }
171 0 : const auto dataCube = _dataCube(vb);
172 : const auto resultantFlags = _getResultantFlags(
173 : chanSelFlagTemplate, chanSelFlags, initChanSelTemplate,
174 0 : spw, vb->flagCube()
175 0 : );
176 0 : const auto myExposures = vb->exposure().tovector();
177 0 : exposures.insert(
178 0 : exposures.end(), myExposures.begin(), myExposures.end()
179 : );
180 0 : const auto cubeShape = dataCube.shape();
181 0 : IPosition sliceStart(3, 0);
182 0 : auto sliceEnd = cubeShape - 1;
183 : // build up chunkData and chunkFlags one subchunk at a time
184 0 : if (chunkData.empty()) {
185 0 : chunkData = dataCube;
186 0 : chunkFlags = resultantFlags;
187 : }
188 : else {
189 0 : auto newShape = chunkData.shape();
190 0 : newShape[2] += nrows;
191 0 : chunkData.resize(newShape, True);
192 0 : chunkFlags.resize(newShape, True);
193 0 : slStart[2] = subchunkStartRowNum;
194 0 : sl.setStart(slStart);
195 0 : slEnd = newShape - 1;
196 0 : sl.setEnd(slEnd);
197 0 : chunkData(sl) = dataCube;
198 0 : chunkFlags(sl) = resultantFlags;
199 0 : }
200 0 : subChunkToTimeStamp.push_back(subchunkTime);
201 0 : subchunkStartRowNum += nrows;
202 0 : }
203 0 : _computeWeights(
204 0 : chunkData, chunkFlags, Vector<Double>(exposures), rowMap, spw
205 : );
206 0 : }
207 :
208 0 : void StatWtFloatingWindowDataAggregator::weightSingleChanBin(
209 : Matrix<Float>& wtmat, Int nrows
210 : ) const {
211 0 : Vector<rownr_t> rowIDs;
212 0 : _vii->getRowIds(rowIDs);
213 0 : const auto start = _rowIDInMSToRowIndexInChunk.find(*rowIDs.begin());
214 0 : ThrowIf(
215 : start == _rowIDInMSToRowIndexInChunk.end(),
216 : "Logic Error: Cannot find requested subchunk in stored chunk"
217 : );
218 : // this is the row index in the chunk
219 0 : auto chunkRowIndex = start->second;
220 0 : auto ncorr = wtmat.nrow();
221 0 : for (Int i=0; i<nrows; ++i, ++chunkRowIndex) {
222 0 : if (_combineCorr) {
223 0 : wtmat.column(i) = _weights(0, 0, chunkRowIndex);
224 : }
225 : else {
226 0 : for (uInt corr=0; corr<ncorr; ++corr) {
227 0 : wtmat(corr, i) = _weights(
228 : corr, 0, chunkRowIndex
229 : );
230 : }
231 : }
232 : }
233 0 : }
234 :
235 0 : void StatWtFloatingWindowDataAggregator::_computeWeights(
236 : const Cube<Complex>& data, const Cube<Bool>& flags,
237 : const Vector<Double>& exposures, const std::vector<std::set<uInt>>& rowMap,
238 : uInt spw
239 : ) const {
240 0 : auto chunkShape = data.shape();
241 0 : const auto nActCorr = chunkShape[0];
242 0 : const auto ncorr = _combineCorr ? 1 : nActCorr;
243 0 : const auto& chanBins = _chanBins.find(spw)->second;
244 0 : _weights.resize(IPosition(3, ncorr, chanBins.size(), chunkShape[2]), False);
245 0 : const auto nRows = rowMap.size();
246 : #ifdef _OPENMP
247 0 : #pragma omp parallel for
248 : // cout << "DEBUG PARALLEL LOOPING IS OFF" << endl;
249 : #endif
250 :
251 : for (size_t iRow=0; iRow<nRows; ++iRow) {
252 : IPosition chunkSliceStart(3, 0);
253 : auto chunkSliceLength = chunkShape;
254 : chunkSliceLength[2] = 1;
255 : Slicer chunkSlice(
256 : chunkSliceStart, chunkSliceLength, Slicer::endIsLength
257 : );
258 : auto chunkSliceEnd = chunkSlice.end();
259 : auto appendingSlice = chunkSlice;
260 : auto appendingSliceStart = appendingSlice.start();
261 : auto appendingSliceEnd = appendingSlice.end();
262 : auto intraChunkSlice = appendingSlice;
263 : auto intraChunkSliceStart = intraChunkSlice.start();
264 : auto intraChunkSliceEnd = intraChunkSlice.end();
265 : intraChunkSliceEnd[0] = nActCorr - 1;
266 : const auto& rowsToInclude = rowMap[iRow];
267 : auto dataShape = chunkShape;
268 : dataShape[2] = rowsToInclude.size();
269 : Cube<Complex> dataArray(dataShape);
270 : Cube<Bool> flagArray(dataShape);
271 : auto siter = rowsToInclude.begin();
272 : auto send = rowsToInclude.end();
273 : Vector<Double> exposureVector(rowsToInclude.size(), 0);
274 : uInt n = 0;
275 : // create an array with only the rows that should
276 : // be used in the computation of weights for the
277 : // current row
278 : for (; siter!=send; ++siter, ++n) {
279 : exposureVector[n] = exposures[*siter];
280 : appendingSliceStart[2] = n;
281 : appendingSlice.setStart(appendingSliceStart);
282 : appendingSliceEnd[2] = n;
283 : appendingSlice.setEnd(appendingSliceEnd);
284 : chunkSliceStart[2] = *siter;
285 : chunkSlice.setStart(chunkSliceStart);
286 : chunkSliceEnd[2] = *siter;
287 : chunkSlice.setEnd(chunkSliceEnd);
288 : dataArray(appendingSlice) = data(chunkSlice);
289 : flagArray(appendingSlice) = flags(chunkSlice);
290 : }
291 : // slice up for correlations and channel binning
292 : intraChunkSliceEnd[2] = dataShape[2] - 1;
293 : for (uInt corr=0; corr<ncorr; ++corr) {
294 : if (! _combineCorr) {
295 : intraChunkSliceStart[0] = corr;
296 : intraChunkSliceEnd[0] = corr;
297 : }
298 : auto citer = chanBins.begin();
299 : auto cend = chanBins.end();
300 : auto iChanBin = 0;
301 : for (; citer!=cend; ++citer, ++iChanBin) {
302 : intraChunkSliceStart[1] = citer->start;
303 : intraChunkSliceEnd[1] = citer->end;
304 : intraChunkSlice.setStart(intraChunkSliceStart);
305 : intraChunkSlice.setEnd(intraChunkSliceEnd);
306 : _weights(corr, iChanBin, iRow)
307 : = _varianceComputer->computeWeight(
308 : dataArray(intraChunkSlice), flagArray(intraChunkSlice),
309 : exposureVector, spw, exposures[iRow]
310 : );
311 : }
312 : }
313 : }
314 0 : }
315 :
316 0 : void StatWtFloatingWindowDataAggregator::weightSpectrumFlags(
317 : Cube<Float>& wtsp, Cube<Bool>& flagCube, Bool& checkFlags,
318 : const Vector<Int>& ant1, const Vector<Int>&,
319 : const Vector<Int>& spws, const Vector<Double>&,
320 : const Vector<rownr_t>& rowIDs
321 : ) const {
322 : // fish out the rows relevant to this subchunk
323 0 : const auto start = _rowIDInMSToRowIndexInChunk.find(*rowIDs.begin());
324 0 : ThrowIf(
325 : start == _rowIDInMSToRowIndexInChunk.end(),
326 : "Logic Error: Cannot find requested subchunk in stored chunk"
327 : );
328 : // this is the row index in the chunk
329 0 : auto chunkRowIndex = start->second;
330 0 : auto chunkRowEnd = chunkRowIndex + ant1.size();
331 0 : Slicer slice(IPosition(3, 0), flagCube.shape(), Slicer::endIsLength);
332 0 : auto sliceStart = slice.start();
333 0 : auto sliceEnd = slice.end();
334 0 : auto nCorrBins = _combineCorr ? 1 : flagCube.shape()[0];
335 0 : auto spw = *spws.begin();
336 0 : const auto& chanBins = _chanBins.find(spw)->second;
337 0 : auto subChunkRowIndex = 0;
338 0 : for (; chunkRowIndex < chunkRowEnd; ++chunkRowIndex, ++subChunkRowIndex) {
339 0 : sliceStart[2] = subChunkRowIndex;
340 0 : sliceEnd[2] = subChunkRowIndex;
341 0 : auto iChanBin = 0;
342 0 : for (const auto& chanBin: chanBins) {
343 0 : sliceStart[1] = chanBin.start;
344 0 : sliceEnd[1] = chanBin.end;
345 0 : auto corr = 0;
346 0 : for (; corr < nCorrBins; ++corr) {
347 0 : if (! _combineCorr) {
348 0 : sliceStart[0] = corr;
349 0 : sliceEnd[0] = corr;
350 : }
351 0 : slice.setStart(sliceStart);
352 0 : slice.setEnd(sliceEnd);
353 0 : _updateWtSpFlags(
354 : wtsp, flagCube, checkFlags, slice,
355 0 : _weights(corr, iChanBin, chunkRowIndex)
356 : );
357 : }
358 0 : ++iChanBin;
359 : }
360 : }
361 0 : }
362 :
363 0 : void StatWtFloatingWindowDataAggregator::_limits(
364 : std::vector<std::pair<uInt, uInt>>& idToChunksNeededByIDMap,
365 : std::vector<std::pair<uInt, uInt>>& chunkNeededToIDsThatNeedChunkIDMap
366 : ) const {
367 0 : pair<uInt, uInt> p, q;
368 0 : const uInt nTimes = _vii->nTimes();
369 0 : if (_nTimeStampsInBin) {
370 : // fixed number of time stamps specified
371 0 : if (_timeBlockProcessing) {
372 : // integer division
373 0 : uInt nBlocks = nTimes/(*_nTimeStampsInBin);
374 0 : if (nTimes % *_nTimeStampsInBin > 0) {
375 0 : ++nBlocks;
376 : }
377 0 : uInt subChunkCount = 0;
378 0 : for (uInt blockCount = 0; blockCount < nBlocks; ++blockCount) {
379 0 : if ((subChunkCount + *_nTimeStampsInBin <= nTimes)) {
380 0 : p.first = subChunkCount;
381 0 : p.second = subChunkCount + *_nTimeStampsInBin - 1;
382 : }
383 : else {
384 : // chunk upper edge
385 0 : p.first = nTimes < (uInt)*_nTimeStampsInBin
386 0 : ? 0 : nTimes - *_nTimeStampsInBin;
387 0 : p.second = nTimes - 1;
388 : }
389 0 : q = p;
390 0 : for (uInt i=subChunkCount; i<=p.second; ++i, ++subChunkCount) {
391 0 : idToChunksNeededByIDMap.push_back(p);
392 0 : chunkNeededToIDsThatNeedChunkIDMap.push_back(q);
393 : }
394 : }
395 : }
396 : else {
397 : // sliding time window, fixed number of time stamps (timebin
398 : // specified as int
399 0 : const auto isEven = *_nTimeStampsInBin % 2 == 0;
400 : // integer division
401 0 : const uInt halfTimeBin = *_nTimeStampsInBin/2;
402 : const auto nBefore = isEven
403 0 : ? (halfTimeBin - 1) : (*_nTimeStampsInBin - 1)/2;
404 0 : const auto nAfter = isEven ? halfTimeBin : nBefore;
405 : // integer division
406 : // p.first is the first sub chunk needed by the current index.
407 : // p.second is the first sub chunk that needs the current index
408 0 : for (uInt i=0; i<nTimes; ++i) {
409 0 : Int begin = i - nBefore;
410 0 : if (begin < 0) {
411 0 : begin = 0;
412 : }
413 0 : p.second = begin + *_nTimeStampsInBin - 1;
414 0 : if (p.second >= nTimes) {
415 0 : p.second = nTimes - 1;
416 0 : begin = p.second - *_nTimeStampsInBin + 1;
417 0 : if (begin < 0) {
418 0 : begin = 0;
419 : }
420 : }
421 0 : ThrowIf(begin < 0, "Logic Error: begin < 0");
422 0 : p.first = begin;
423 0 : if ((Int)nTimes <= *_nTimeStampsInBin) {
424 0 : q = p;
425 : }
426 : else {
427 0 : if ((Int)i < *_nTimeStampsInBin) {
428 0 : q.first = 0;
429 : }
430 0 : else if (i > nTimes - *_nTimeStampsInBin) {
431 0 : q.second = nTimes - 1;
432 : }
433 0 : else if (isEven) {
434 0 : begin = i - nAfter;
435 0 : q.first = begin;
436 0 : q.second = i + nBefore;
437 : }
438 : else {
439 : // isOdd
440 0 : q = p;
441 : }
442 : }
443 0 : idToChunksNeededByIDMap.push_back(p);
444 0 : chunkNeededToIDsThatNeedChunkIDMap.push_back(q);
445 : }
446 : }
447 : }
448 : else {
449 0 : if (_timeBlockProcessing) {
450 : // shouldn't get in here
451 0 : ThrowCc("Logic error: shouldn't have gotten into this code block");
452 : }
453 : else {
454 0 : ThrowIf(
455 : ! _binWidthInSeconds,
456 : "Logic error: _binWidthInSeconds not defined"
457 : );
458 0 : const auto halfBinWidth = *_binWidthInSeconds/2;
459 0 : auto vb = _vii->getVisBuffer();
460 0 : vector<Double> times;
461 0 : for (_vii->origin(); _vii->more(); _vii->next()) {
462 0 : times.push_back(vb->time()[0]);
463 : }
464 0 : for (uInt i=0; i<nTimes; ++i) {
465 0 : auto mytime = times[i];
466 0 : auto loit = std::lower_bound(
467 0 : times.begin(), times.end(), mytime - halfBinWidth
468 : );
469 0 : ThrowIf(
470 : loit == times.end(),
471 : "Logic Error for std::lower_bound()"
472 : );
473 0 : p.first = std::distance(times.begin(), loit);
474 0 : auto upit = std::upper_bound(
475 0 : times.begin(), times.end(), mytime + halfBinWidth
476 : );
477 0 : p.second = upit == times.end()
478 0 : ? nTimes - 1 : std::distance(times.begin(), upit) - 1;
479 0 : q = p;
480 0 : idToChunksNeededByIDMap.push_back(p);
481 0 : chunkNeededToIDsThatNeedChunkIDMap.push_back(q);
482 : }
483 0 : }
484 : }
485 0 : }
486 :
487 : }
488 :
489 : }
|