Line data Source code
1 : //# StatWtTVI.cc: This file contains the implementation of the StatWtTVI class.
2 : //#
3 : //# CASA - Common Astronomy Software Applications (http://casa.nrao.edu/)
4 : //# Copyright (C) Associated Universities, Inc. Washington DC, USA 2011, All
5 : //# rights reserved.
6 : //# Copyright (C) European Southern Observatory, 2011, All rights reserved.
7 : //#
8 : //# This library is free software; you can redistribute it and/or
9 : //# modify it under the terms of the GNU Lesser General Public
10 : //# License as published by the Free software Foundation; either
11 : //# version 2.1 of the License, or (at your option) any later version.
12 : //#
13 : //# This library is distributed in the hope that it will be useful,
14 : //# but WITHOUT ANY WARRANTY, without even the implied warranty of
15 : //# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 : //# Lesser General Public License for more details.
17 : //#
18 : //# You should have received a copy of the GNU Lesser General Public
19 : //# License along with this library; if not, write to the Free Software
20 : //# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
21 : //# MA 02111-1307 USA
22 :
23 : #include <mstransform/TVI/StatWtTVI.h>
24 :
25 : #include <casacore/casa/Arrays/ArrayLogical.h>
26 : #include <casacore/casa/Quanta/QuantumHolder.h>
27 : #include <casacore/ms/MSOper/MSMetaData.h>
28 : #include <casacore/tables/Tables/ArrColDesc.h>
29 :
30 : #include <mstransform/TVI/StatWtClassicalDataAggregator.h>
31 : #include <mstransform/TVI/StatWtFloatingWindowDataAggregator.h>
32 : #include <mstransform/TVI/StatWtVarianceAndWeightCalculator.h>
33 :
34 : #ifdef _OPENMP
35 : #include <omp.h>
36 : #endif
37 :
38 : #include <iomanip>
39 :
40 : using namespace casacore;
41 : using namespace casac;
42 :
43 : namespace casa {
44 : namespace vi {
45 :
46 : const String StatWtTVI::CHANBIN = "stchanbin";
47 :
48 47 : StatWtTVI::StatWtTVI(ViImplementation2 * inputVii, const Record &configuration)
49 47 : : TransformingVi2 (inputVii) {
50 : // Parse and check configuration parameters
51 : // Note: if a constructor finishes by throwing an exception, the memory
52 : // associated with the object itself is cleaned up there is no memory leak.
53 47 : ThrowIf(
54 : ! _parseConfiguration(configuration),
55 : "Error parsing StatWtTVI configuration"
56 : );
57 92 : LogIO log(LogOrigin("StatWtTVI", __func__));
58 46 : log << LogIO::NORMAL << "Using " << StatWtTypes::asString(_column)
59 46 : << " to compute weights" << LogIO::POST;
60 : // FIXME when the TVI framework has methods to
61 : // check for metadata, like the existence of
62 : // columns, remove references to the original MS
63 46 : const auto& origMS = ms();
64 : // FIXME uses original MS explicitly
65 46 : ThrowIf(
66 : (_column == StatWtTypes::CORRECTED || _column == StatWtTypes::RESIDUAL)
67 : && ! origMS.isColumn(MSMainEnums::CORRECTED_DATA),
68 : "StatWtTVI requires the MS to have a CORRECTED_DATA column. This MS "
69 : "does not"
70 : );
71 : // FIXME uses original MS explicitly
72 46 : ThrowIf(
73 : (_column == StatWtTypes::DATA || _column == StatWtTypes::RESIDUAL_DATA)
74 : && ! origMS.isColumn(MSMainEnums::DATA),
75 : "StatWtTVI requires the MS to have a DATA column. This MS does not"
76 : );
77 46 : _mustComputeSigma = (
78 46 : _column == StatWtTypes::DATA || _column == StatWtTypes::RESIDUAL_DATA
79 : );
80 : // FIXME uses original MS explicitly
81 92 : _updateWeight = ! _mustComputeSigma
82 46 : || (_mustComputeSigma && ! origMS.isColumn(MSMainEnums::CORRECTED_DATA));
83 46 : _noModel = (
84 44 : _column == StatWtTypes::RESIDUAL || _column == StatWtTypes::RESIDUAL_DATA
85 8 : ) && ! origMS.isColumn(MSMainEnums::MODEL_DATA)
86 92 : && ! origMS.source().isColumn(MSSourceEnums::SOURCE_MODEL);
87 : // Initialize attached VisBuffer
88 46 : setVisBuffer(createAttachedVisBuffer(VbRekeyable));
89 61 : }
90 :
91 92 : StatWtTVI::~StatWtTVI() {}
92 :
93 47 : Bool StatWtTVI::_parseConfiguration(const Record& config) {
94 47 : String field = CHANBIN;
95 47 : if (config.isDefined(field)) {
96 : // channel binning
97 47 : auto fieldNum = config.fieldNumber(field);
98 47 : switch (config.type(fieldNum)) {
99 0 : case DataType::TpArrayBool:
100 : // because this is the actual default variant type, no matter
101 : // what is specified in the xml
102 0 : ThrowIf(
103 : ! config.asArrayBool(field).empty(),
104 : "Unsupported data type for " + field
105 : );
106 0 : _setDefaultChanBinMap();
107 0 : break;
108 6 : case DataType::TpInt:
109 : Int binWidth;
110 6 : config.get(CHANBIN, binWidth);
111 6 : _setChanBinMap(binWidth);
112 6 : break;
113 41 : case DataType::TpString:
114 : {
115 41 : auto chanbin = config.asString(field);
116 41 : if (chanbin == "spw") {
117 : // bin using entire spws
118 38 : _setDefaultChanBinMap();
119 38 : break;
120 : }
121 : else {
122 6 : QuantumHolder qh(casaQuantity(chanbin));
123 3 : _setChanBinMap(qh.asQuantity());
124 3 : }
125 3 : break;
126 41 : }
127 0 : default:
128 0 : ThrowCc("Unsupported data type for " + field);
129 : }
130 : }
131 : else {
132 0 : _setDefaultChanBinMap();
133 : }
134 47 : field = "minsamp";
135 47 : if (config.isDefined(field)) {
136 47 : config.get(field, _minSamp);
137 47 : ThrowIf(_minSamp < 2, "Minimum size of sample must be >= 2.");
138 : }
139 47 : field = "combine";
140 47 : if (config.isDefined(field)) {
141 47 : ThrowIf(
142 : config.type(config.fieldNumber(field)) != TpString,
143 : "Unsupported data type for combine"
144 : );
145 47 : _combineCorr = config.asString(field).contains("corr");
146 : }
147 47 : field = "wtrange";
148 47 : if (config.isDefined(field)) {
149 47 : ThrowIf(
150 : config.type(config.fieldNumber(field)) != TpArrayDouble,
151 : "Unsupported type for field '" + field + "'"
152 : );
153 47 : auto myrange = config.asArrayDouble(field);
154 47 : if (! myrange.empty()) {
155 3 : ThrowIf(
156 : myrange.size() != 2,
157 : "Array specified in '" + field
158 : + "' must have exactly two values"
159 : );
160 3 : ThrowIf(
161 : casacore::anyLT(myrange, 0.0),
162 : "Both values specified in '" + field
163 : + "' array must be non-negative"
164 : );
165 : // order the values in case they aren't ordered
166 6 : std::set<Double> rangeset(myrange.begin(), myrange.end());
167 3 : ThrowIf(
168 : rangeset.size() == 1, "Values specified in '" + field
169 : + "' array must be unique"
170 : );
171 3 : auto iter = rangeset.begin();
172 3 : auto first = *iter;
173 3 : auto second = *(++iter);
174 3 : _wtrange.reset(new std::pair<Double, Double>(first, second));
175 3 : }
176 47 : }
177 47 : auto excludeChans = False;
178 47 : field = "excludechans";
179 47 : if (config.isDefined(field)) {
180 47 : ThrowIf(
181 : config.type(config.fieldNumber(field)) != TpBool,
182 : "Unsupported type for field '" + field + "'"
183 : );
184 47 : excludeChans = config.asBool(field);
185 : }
186 47 : field = "fitspw";
187 47 : if (config.isDefined(field)) {
188 47 : ThrowIf(
189 : config.type(config.fieldNumber(field)) != TpString,
190 : "Unsupported type for field '" + field + "'"
191 : );
192 47 : auto val = config.asString(field);
193 47 : if (! val.empty()) {
194 : // FIXME references underlying MS
195 4 : const auto& myms = ms();
196 8 : MSSelection sel(myms);
197 4 : sel.setSpwExpr(val);
198 4 : auto chans = sel.getChanList();
199 4 : auto nrows = chans.nrow();
200 4 : MSMetaData md(&myms, 50);
201 4 : auto nchans = md.nChans();
202 4 : IPosition start(3, 0);
203 4 : IPosition stop(3, 0);
204 4 : IPosition step(3, 1);
205 10 : for (size_t i=0; i<nrows; ++i) {
206 6 : auto row = chans.row(i);
207 6 : const auto& spw = row[0];
208 6 : if (_chanSelFlags.find(spw) == _chanSelFlags.end()) {
209 4 : _chanSelFlags[spw]
210 8 : = Cube<Bool>(1, nchans[spw], 1, ! excludeChans);
211 : }
212 6 : start[1] = row[1];
213 6 : ThrowIf(
214 : start[1] < 0, "Invalid channel selection in spw "
215 : + String::toString(spw))
216 : ;
217 6 : stop[1] = row[2];
218 6 : step[1] = row[3];
219 6 : Slicer slice(start, stop, step, Slicer::endIsLast);
220 6 : _chanSelFlags[spw](slice) = excludeChans;
221 6 : }
222 4 : }
223 47 : }
224 47 : field = "datacolumn";
225 47 : if (config.isDefined(field)) {
226 47 : ThrowIf(
227 : config.type(config.fieldNumber(field)) != TpString,
228 : "Unsupported type for field '" + field + "'"
229 : );
230 47 : auto val = config.asString(field);
231 47 : if (! val.empty()) {
232 47 : val.downcase();
233 47 : ThrowIf (
234 : ! (
235 : val.startsWith("c") || val.startsWith("d")
236 : || val.startsWith("residual") || val.startsWith("residual_")
237 : ),
238 : "Unsupported value for " + field + ": " + val
239 : );
240 103 : _column = val.startsWith("c") ? StatWtTypes::CORRECTED
241 64 : : val.startsWith("d") ? StatWtTypes::DATA
242 55 : : val.startsWith("residual_") ? StatWtTypes::RESIDUAL_DATA
243 : : StatWtTypes::RESIDUAL;
244 :
245 : }
246 47 : }
247 47 : field = "slidetimebin";
248 47 : ThrowIf(
249 : ! config.isDefined(field), "Config param " + field + " must be defined"
250 : );
251 47 : ThrowIf(
252 : config.type(config.fieldNumber(field)) != TpBool,
253 : "Unsupported type for field '" + field + "'"
254 : );
255 47 : _timeBlockProcessing = ! config.asBool(field);
256 47 : field = "timebin";
257 47 : ThrowIf(
258 : ! config.isDefined(field), "Config param " + field + " must be defined"
259 : );
260 47 : auto mytype = config.type(config.fieldNumber(field));
261 47 : ThrowIf(
262 : ! (
263 : mytype == TpString || mytype == TpDouble
264 : || mytype == TpInt
265 : ),
266 : "Unsupported type for field '" + field + "'"
267 : );
268 47 : switch(mytype) {
269 0 : case TpDouble: {
270 0 : _binWidthInSeconds.reset(new Double(config.asDouble(field)));
271 0 : break;
272 : }
273 34 : case TpInt:
274 34 : _nTimeStampsInBin.reset(new Int(config.asInt(field)));
275 34 : ThrowIf(
276 : *_nTimeStampsInBin <= 0,
277 : "Logic Error: nTimeStamps must be positive"
278 : );
279 34 : break;
280 13 : case TpString: {
281 26 : QuantumHolder qh(casaQuantity(config.asString(field)));
282 26 : _binWidthInSeconds.reset(
283 13 : new Double(getTimeBinWidthInSec(qh.asQuantity()))
284 : );
285 13 : break;
286 13 : }
287 0 : default:
288 0 : ThrowCc("Logic Error: Unhandled type for timebin");
289 :
290 : }
291 47 : _doClassicVIVB = _binWidthInSeconds && _timeBlockProcessing;
292 47 : _configureStatAlg(config);
293 46 : if (_doClassicVIVB) {
294 12 : _dataAggregator.reset(
295 : new StatWtClassicalDataAggregator(
296 12 : getVii(), _chanBins, _samples, _column, _noModel, _chanSelFlags,
297 12 : _wtStats, _wtrange, _combineCorr, _statAlg, _minSamp
298 12 : )
299 : );
300 : }
301 : else {
302 34 : _dataAggregator.reset(
303 : new StatWtFloatingWindowDataAggregator(
304 34 : getVii(), _chanBins, _samples, _column, _noModel, _chanSelFlags,
305 34 : _combineCorr, _wtStats, _wtrange, _binWidthInSeconds,
306 34 : _nTimeStampsInBin, _timeBlockProcessing, _statAlg, _minSamp
307 34 : )
308 : );
309 : }
310 46 : _dataAggregator->setMustComputeWtSp(_mustComputeWtSp);
311 46 : return True;
312 47 : }
313 :
314 47 : void StatWtTVI::_configureStatAlg(const Record& config) {
315 47 : String field = "statalg";
316 47 : if (config.isDefined(field)) {
317 47 : ThrowIf(
318 : config.type(config.fieldNumber(field)) != TpString,
319 : "Unsupported type for field '" + field + "'"
320 : );
321 47 : auto alg = config.asString(field);
322 47 : alg.downcase();
323 47 : if (alg.startsWith("cl")) {
324 43 : _statAlg.reset(
325 : new ClassicalStatistics<
326 : Double, Array<Float>::const_iterator,
327 : Array<Bool>::const_iterator, Array<Double>::const_iterator
328 43 : >()
329 : );
330 : }
331 : else {
332 : casacore::StatisticsAlgorithmFactory<
333 : Double, Array<Float>::const_iterator,
334 : Array<Bool>::const_iterator, Array<Double>::const_iterator
335 4 : > saf;
336 4 : if (alg.startsWith("ch")) {
337 1 : Int maxiter = -1;
338 1 : field = "maxiter";
339 1 : if (config.isDefined(field)) {
340 1 : ThrowIf(
341 : config.type(config.fieldNumber(field)) != TpInt,
342 : "Unsupported type for field '" + field + "'"
343 : );
344 1 : maxiter = config.asInt(field);
345 : }
346 1 : Double zscore = -1;
347 1 : field = "zscore";
348 1 : if (config.isDefined(field)) {
349 1 : ThrowIf(
350 : config.type(config.fieldNumber(field)) != TpDouble,
351 : "Unsupported type for field '" + field + "'"
352 : );
353 1 : zscore = config.asDouble(field);
354 : }
355 1 : saf.configureChauvenet(zscore, maxiter);
356 : }
357 3 : else if (alg.startsWith("f")) {
358 1 : auto center = FitToHalfStatisticsData::CMEAN;
359 1 : field = "center";
360 1 : if (config.isDefined(field)) {
361 1 : ThrowIf(
362 : config.type(config.fieldNumber(field)) != TpString,
363 : "Unsupported type for field '" + field + "'"
364 : );
365 1 : auto cs = config.asString(field);
366 1 : cs.downcase();
367 1 : if (cs == "mean") {
368 0 : center = FitToHalfStatisticsData::CMEAN;
369 : }
370 1 : else if (cs == "median") {
371 1 : center = FitToHalfStatisticsData::CMEDIAN;
372 : }
373 0 : else if (cs == "zero") {
374 0 : center = FitToHalfStatisticsData::CVALUE;
375 : }
376 : else {
377 0 : ThrowCc("Unsupported value for '" + field + "'");
378 : }
379 1 : }
380 1 : field = "lside";
381 1 : auto ud = FitToHalfStatisticsData::LE_CENTER;
382 1 : if (config.isDefined(field)) {
383 1 : ThrowIf(
384 : config.type(config.fieldNumber(field)) != TpBool,
385 : "Unsupported type for field '" + field + "'"
386 : );
387 2 : ud = config.asBool(field)
388 1 : ? FitToHalfStatisticsData::LE_CENTER
389 : : FitToHalfStatisticsData::GE_CENTER;
390 : }
391 1 : saf.configureFitToHalf(center, ud, 0);
392 : }
393 2 : else if (alg.startsWith("h")) {
394 1 : Double fence = -1;
395 1 : field = "fence";
396 1 : if (config.isDefined(field)) {
397 1 : ThrowIf(
398 : config.type(config.fieldNumber(field)) != TpDouble,
399 : "Unsupported type for field '" + field + "'"
400 : );
401 1 : fence = config.asDouble(field);
402 : }
403 1 : saf.configureHingesFences(fence);
404 : }
405 : else {
406 1 : ThrowCc("Unsupported value for 'statalg'");
407 : }
408 : // clone needed for CountedPtr -> shared_ptr hand off
409 3 : _statAlg.reset(saf.createStatsAlgorithm()->clone());
410 4 : }
411 47 : }
412 : else {
413 0 : _statAlg.reset(
414 : new ClassicalStatistics<
415 : Double, Array<Float>::const_iterator,
416 : Array<Bool>::const_iterator, Array<Double>::const_iterator
417 0 : >());
418 : }
419 46 : std::set<StatisticsData::STATS> stats {StatisticsData::VARIANCE};
420 46 : _statAlg->setStatsToCalculate(stats);
421 : // also configure the _wtStats object here
422 : // FIXME? Does not include exposure weighting
423 46 : _wtStats.reset(
424 : new ClassicalStatistics<
425 : Double, Array<Float>::const_iterator,
426 : Array<Bool>::const_iterator
427 46 : >()
428 : );
429 46 : stats.insert(StatisticsData::MEAN);
430 46 : _wtStats->setStatsToCalculate(stats);
431 46 : _wtStats->setCalculateAsAdded(True);
432 47 : }
433 :
434 46 : void StatWtTVI::_logUsedChannels() const {
435 : // FIXME uses underlying MS
436 46 : MSMetaData msmd(&ms(), 100.0);
437 46 : const auto nchan = msmd.nChans();
438 92 : LogIO log(LogOrigin("StatWtTVI", __func__));
439 46 : log << LogIO::NORMAL << "Weights are being computed using ";
440 46 : const auto cend = _chanSelFlags.cend();
441 46 : const auto nspw = _samples->size();
442 46 : uInt spwCount = 0;
443 95 : for (const auto& kv: *_samples) {
444 49 : const auto spw = kv.first;
445 49 : log << "SPW " << spw << ", channels ";
446 49 : const auto flagCube = _chanSelFlags.find(spw);
447 49 : if (flagCube == cend) {
448 45 : log << "0~" << (nchan[spw] - 1);
449 : }
450 : else {
451 4 : vector<pair<uInt, uInt>> startEnd;
452 4 : const auto flags = flagCube->second.tovector();
453 4 : bool started = false;
454 4 : std::unique_ptr<pair<uInt, uInt>> curPair;
455 256 : for (uInt j=0; j<nchan[spw]; ++j) {
456 252 : if (started) {
457 204 : if (flags[j]) {
458 : // found a bad channel, end current range
459 4 : startEnd.push_back(*curPair);
460 4 : started = false;
461 : }
462 : else {
463 : // found a "good" channel, update end of current range
464 200 : curPair->second = j;
465 : }
466 : }
467 48 : else if (! flags[j]) {
468 : // found a good channel, start new range
469 8 : started = true;
470 8 : curPair.reset(new pair<uInt, uInt>(j, j));
471 : }
472 : }
473 4 : if (curPair) {
474 4 : if (started) {
475 : // The last pair won't get added inside the previous loop,
476 : // so add it here
477 4 : startEnd.push_back(*curPair);
478 : }
479 4 : auto nPairs = startEnd.size();
480 12 : for (uInt i=0; i<nPairs; ++i) {
481 8 : log << startEnd[i].first << "~" << startEnd[i].second;
482 8 : if (i < nPairs - 1) {
483 4 : log << ", ";
484 : }
485 : }
486 : }
487 : else {
488 : // if the pointer never got set, all the channels are bad
489 0 : log << "no channels";
490 : }
491 4 : }
492 49 : if (spwCount < (nspw - 1)) {
493 3 : log << ";";
494 : }
495 49 : ++spwCount;
496 : }
497 46 : log << LogIO::POST;
498 46 : }
499 :
500 3 : void StatWtTVI::_setChanBinMap(const casacore::Quantity& binWidth) {
501 3 : if (! binWidth.isConform(Unit("Hz"))) {
502 0 : ostringstream oss;
503 : oss << "If specified as a quantity, channel bin width must have "
504 0 : << "frequency units. " << binWidth << " does not.";
505 0 : ThrowCc(oss.str());
506 0 : }
507 3 : ThrowIf(binWidth.getValue() <= 0, "channel bin width must be positive");
508 3 : MSMetaData msmd(&ms(), 100.0);
509 3 : auto chanFreqs = msmd.getChanFreqs();
510 3 : auto nspw = chanFreqs.size();
511 3 : auto binWidthHz = binWidth.getValue("Hz");
512 6 : for (uInt i=0; i<nspw; ++i) {
513 3 : auto cfs = chanFreqs[i].getValue("Hz");
514 3 : auto citer = cfs.begin();
515 3 : auto cend = cfs.end();
516 3 : StatWtTypes::ChanBin bin;
517 3 : bin.start = 0;
518 3 : bin.end = 0;
519 3 : uInt chanNum = 0;
520 3 : auto startFreq = *citer;
521 3 : auto nchan = cfs.size();
522 192 : for (; citer!=cend; ++citer, ++chanNum) {
523 : // both could be true, in which case both conditionals
524 : // must be executed
525 189 : if (abs(*citer - startFreq) > binWidthHz) {
526 : // add bin to list
527 21 : bin.end = chanNum - 1;
528 21 : _chanBins[i].push_back(bin);
529 21 : bin.start = chanNum;
530 21 : startFreq = *citer;
531 : }
532 189 : if (chanNum + 1 == nchan) {
533 : // add last bin
534 3 : bin.end = chanNum;
535 3 : _chanBins[i].push_back(bin);
536 : }
537 : }
538 3 : }
539 : // weight spectrum must be computed
540 3 : _mustComputeWtSp.reset(new Bool(True));
541 3 : }
542 :
543 6 : void StatWtTVI::_setChanBinMap(Int binWidth) {
544 6 : ThrowIf(binWidth < 1, "Channel bin width must be positive");
545 6 : MSMetaData msmd(&ms(), 100.0);
546 6 : auto nchans = msmd.nChans();
547 6 : auto nspw = nchans.size();
548 6 : StatWtTypes::ChanBin bin;
549 16 : for (uInt i=0; i<nspw; ++i) {
550 10 : auto lastChan = nchans[i]-1;
551 138 : for (uInt j=0; j<nchans[i]; j += binWidth) {
552 128 : bin.start = j;
553 128 : bin.end = min(j+binWidth-1, lastChan);
554 128 : _chanBins[i].push_back(bin);
555 : }
556 : }
557 : // weight spectrum must be computed
558 6 : _mustComputeWtSp.reset(new Bool(True));
559 6 : }
560 :
561 38 : void StatWtTVI::_setDefaultChanBinMap() {
562 : // FIXME uses underlying MS
563 38 : MSMetaData msmd(&ms(), 0.0);
564 38 : auto nchans = msmd.nChans();
565 38 : auto niter = nchans.begin();
566 38 : auto nend = nchans.end();
567 38 : Int i = 0;
568 38 : StatWtTypes::ChanBin bin;
569 38 : bin.start = 0;
570 80 : for (; niter!=nend; ++niter, ++i) {
571 42 : bin.end = *niter - 1;
572 42 : _chanBins[i].push_back(bin);
573 : }
574 38 : }
575 :
576 25 : Double StatWtTVI::getTimeBinWidthInSec(const casacore::Quantity& binWidth) {
577 25 : ThrowIf(
578 : ! binWidth.isConform(Unit("s")),
579 : "Time bin width unit must be a unit of time"
580 : );
581 25 : auto v = binWidth.getValue("s");
582 25 : checkTimeBinWidth(v);
583 25 : return v;
584 : }
585 :
586 60 : void StatWtTVI::checkTimeBinWidth(Double binWidth) {
587 60 : ThrowIf(binWidth <= 0, "time bin width must be positive");
588 60 : }
589 :
590 672 : void StatWtTVI::sigmaSpectrum(Cube<Float>& sigmaSp) const {
591 672 : if (_mustComputeSigma) {
592 : {
593 672 : Cube<Float> wtsp;
594 : // this computes _newWtsp, ignore wtsp
595 672 : weightSpectrum(wtsp);
596 672 : }
597 672 : sigmaSp = Float(1.0)/sqrt(_newWtSp);
598 672 : if (anyEQ(_newWtSp, Float(0))) {
599 672 : auto iter = sigmaSp.begin();
600 672 : auto end = sigmaSp.end();
601 672 : auto witer = _newWtSp.cbegin();
602 1436232 : for ( ; iter != end; ++iter, ++witer) {
603 1435560 : if (*witer == 0) {
604 251566 : *iter = -1;
605 : }
606 : }
607 672 : }
608 : }
609 : else {
610 0 : TransformingVi2::sigmaSpectrum(sigmaSp);
611 : }
612 672 : }
613 :
614 6238 : void StatWtTVI::weightSpectrum(Cube<Float>& newWtsp) const {
615 6238 : ThrowIf(! _weightsComputed, "Weights have not been computed yet");
616 6238 : if (! _dataAggregator->mustComputeWtSp()) {
617 0 : newWtsp.resize(IPosition(3, 0));
618 0 : return;
619 : }
620 6238 : if (! _newWtSp.empty()) {
621 : // already calculated
622 3395 : if (_updateWeight) {
623 3275 : newWtsp = _newWtSp.copy();
624 : }
625 : else {
626 120 : TransformingVi2::weightSpectrum(newWtsp);
627 : }
628 3395 : return;
629 : }
630 2843 : _computeWeightSpectrumAndFlags();
631 2843 : if (_updateWeight) {
632 2723 : newWtsp = _newWtSp.copy();
633 : }
634 : else {
635 120 : TransformingVi2::weightSpectrum(newWtsp);
636 : }
637 : }
638 :
639 3559 : void StatWtTVI::_computeWeightSpectrumAndFlags() const {
640 : size_t nOrigFlagged;
641 3559 : auto mypair = _getLowerLayerWtSpFlags(nOrigFlagged);
642 3559 : auto& wtsp = mypair.first;
643 3559 : auto& flagCube = mypair.second;
644 3559 : if (_dataAggregator->mustComputeWtSp() && wtsp.empty()) {
645 : // This can happen in preview mode if
646 : // WEIGHT_SPECTRUM doesn't exist or is empty
647 0 : wtsp.resize(flagCube.shape());
648 : }
649 3559 : auto checkFlags = False;
650 3559 : Vector<Int> ant1, ant2, spws;
651 3559 : antenna1(ant1);
652 3559 : antenna2(ant2);
653 3559 : spectralWindows(spws);
654 3559 : Vector<rownr_t> rowIDs;
655 3559 : getRowIds(rowIDs);
656 3559 : Vector<Double> exposures;
657 3559 : exposure(exposures);
658 3559 : _dataAggregator->weightSpectrumFlags(
659 : wtsp, flagCube, checkFlags, ant1, ant2, spws, exposures, rowIDs
660 : );
661 3559 : if (checkFlags) {
662 2903 : _nNewFlaggedPts += ntrue(flagCube) - nOrigFlagged;
663 : }
664 3559 : _newWtSp = wtsp;
665 3559 : _newFlag = flagCube;
666 3559 : }
667 :
668 3559 : std::pair<Cube<Float>, Cube<Bool>> StatWtTVI::_getLowerLayerWtSpFlags(
669 : size_t& nOrigFlagged
670 : ) const {
671 7118 : auto mypair = std::make_pair(Cube<Float>(), Cube<Bool>());
672 3559 : if (_dataAggregator->mustComputeWtSp()) {
673 2903 : getVii()->weightSpectrum(mypair.first);
674 : }
675 3559 : getVii()->flag(mypair.second);
676 3559 : _nTotalPts += mypair.second.size();
677 3559 : nOrigFlagged = ntrue(mypair.second);
678 3559 : _nOrigFlaggedPts += nOrigFlagged;
679 3559 : return mypair;
680 0 : }
681 :
682 1328 : void StatWtTVI::sigma(Matrix<Float>& sigmaMat) const {
683 1328 : if (_mustComputeSigma) {
684 1328 : if (_newWt.empty()) {
685 120 : Matrix<Float> wtmat;
686 120 : weight(wtmat);
687 120 : }
688 1328 : sigmaMat = Float(1.0)/sqrt(_newWt);
689 1328 : if (anyEQ(_newWt, Float(0))) {
690 180 : Matrix<Float>::iterator iter = sigmaMat.begin();
691 180 : Matrix<Float>::iterator end = sigmaMat.end();
692 180 : Matrix<Float>::iterator witer = _newWt.begin();
693 19980 : for ( ; iter != end; ++iter, ++witer) {
694 19800 : if (*witer == 0) {
695 3602 : *iter = -1;
696 : }
697 : }
698 180 : }
699 : }
700 : else {
701 0 : TransformingVi2::sigma(sigmaMat);
702 : }
703 1328 : }
704 :
705 3499 : void StatWtTVI::weight(Matrix<Float> & wtmat) const {
706 3499 : ThrowIf(! _weightsComputed, "Weights have not been computed yet");
707 3499 : if (! _newWt.empty()) {
708 0 : if (_updateWeight) {
709 0 : wtmat = _newWt.copy();
710 : }
711 : else {
712 0 : TransformingVi2::weight(wtmat);
713 : }
714 0 : return;
715 : }
716 3499 : auto nrows = nRows();
717 3499 : getVii()->weight(wtmat);
718 3499 : if (_dataAggregator->mustComputeWtSp()) {
719 : // always use classical algorithm to get median for weights
720 : ClassicalStatistics<
721 : Double, Array<Float>::const_iterator, Array<Bool>::const_iterator
722 2843 : > cs;
723 2843 : Cube<Float> wtsp;
724 2843 : Cube<Bool> flagCube;
725 : // this computes _newWtsP which is what we will use, so
726 : // just ignore wtsp
727 2843 : weightSpectrum(wtsp);
728 2843 : flag(flagCube);
729 2843 : IPosition blc(3, 0);
730 2843 : IPosition trc = _newWtSp.shape() - 1;
731 2843 : const auto ncorr = _newWtSp.shape()[0];
732 135088 : for (rownr_t i=0; i<nrows; ++i) {
733 132245 : blc[2] = i;
734 132245 : trc[2] = i;
735 132245 : if (_combineCorr) {
736 66605 : auto flags = flagCube(blc, trc);
737 66605 : if (allTrue(flags)) {
738 16037 : wtmat.column(i) = 0;
739 : }
740 : else {
741 50568 : auto weights = _newWtSp(blc, trc);
742 50568 : auto mask = ! flags;
743 50568 : cs.setData(weights.begin(), mask.begin(), weights.size());
744 50568 : wtmat.column(i) = cs.getMedian();
745 50568 : }
746 66605 : }
747 : else {
748 202800 : for (uInt corr=0; corr<ncorr; ++corr) {
749 137160 : blc[0] = corr;
750 137160 : trc[0] = corr;
751 137160 : auto weights = _newWtSp(blc, trc);
752 137160 : auto flags = flagCube(blc, trc);
753 137160 : if (allTrue(flags)) {
754 22819 : wtmat(corr, i) = 0;
755 : }
756 : else {
757 114341 : auto mask = ! flags;
758 228682 : cs.setData(
759 343023 : weights.begin(), mask.begin(), weights.size()
760 : );
761 114341 : wtmat(corr, i) = cs.getMedian();
762 114341 : }
763 137160 : }
764 : }
765 : }
766 2843 : }
767 : else {
768 : // the only way this can happen is if there is a single channel bin
769 : // for each baseline/spw pair
770 656 : _dataAggregator->weightSingleChanBin(wtmat, nrows);
771 : }
772 3499 : _newWt = wtmat.copy();
773 3499 : if (! _updateWeight) {
774 120 : wtmat = Matrix<Float>(wtmat.shape());
775 120 : TransformingVi2::weight(wtmat);
776 : }
777 : }
778 :
779 9901 : void StatWtTVI::flag(Cube<Bool>& flagCube) const {
780 9901 : ThrowIf(! _weightsComputed, "Weights have not been computed yet");
781 9901 : if (! _newFlag.empty()) {
782 9185 : flagCube = _newFlag.copy();
783 9185 : return;
784 : }
785 716 : _computeWeightSpectrumAndFlags();
786 716 : flagCube = _newFlag.copy();
787 : }
788 :
789 3499 : void StatWtTVI::flagRow(Vector<Bool>& flagRow) const {
790 3499 : ThrowIf(! _weightsComputed, "Weights have not been computed yet");
791 3499 : if (! _newFlagRow.empty()) {
792 0 : flagRow = _newFlagRow.copy();
793 0 : return;
794 : }
795 3499 : Cube<Bool> flags;
796 3499 : flag(flags);
797 3499 : getVii()->flagRow(flagRow);
798 3499 : auto nrows = nRows();
799 139656 : for (rownr_t i=0; i<nrows; ++i) {
800 136157 : flagRow[i] = allTrue(flags.xyPlane(i));
801 : }
802 3499 : _newFlagRow = flagRow.copy();
803 3499 : }
804 :
805 46 : void StatWtTVI::originChunks(Bool forceRewind) {
806 : // Drive next lower layer
807 46 : getVii()->originChunks(forceRewind);
808 46 : _weightsComputed = False;
809 46 : _dataAggregator->aggregate();
810 46 : _weightsComputed = True;
811 46 : _clearCache();
812 : // re-origin this chunk in next layer
813 : // (ensures wider scopes see start of the this chunk)
814 46 : getVii()->origin();
815 46 : }
816 :
817 328 : void StatWtTVI::nextChunk() {
818 : // Drive next lower layer
819 328 : getVii()->nextChunk();
820 328 : _weightsComputed = False;
821 328 : _dataAggregator->aggregate();
822 328 : _weightsComputed = True;
823 328 : _clearCache();
824 : // re-origin this chunk next layer
825 : // (ensures wider scopes see start of the this chunk)
826 328 : getVii()->origin();
827 328 : }
828 :
829 4261 : void StatWtTVI::_clearCache() {
830 4261 : _newWtSp.resize(0, 0, 0);
831 4261 : _newWt.resize(0, 0);
832 4261 : _newFlag.resize(0, 0, 0);
833 4261 : _newFlagRow.resize(0);
834 4261 : }
835 :
836 0 : Cube<Bool> StatWtTVI::_getResultantFlags(
837 : Cube<Bool>& chanSelFlagTemplate, Cube<Bool>& chanSelFlags,
838 : Bool& initTemplate, Int spw, const Cube<Bool>& flagCube
839 : ) const {
840 0 : if (_chanSelFlags.find(spw) == _chanSelFlags.cend()) {
841 : // no selection of channels to ignore
842 0 : return flagCube;
843 : }
844 0 : if (initTemplate) {
845 : // this can be done just once per chunk because all the rows
846 : // in the chunk are guaranteed to have the same spw
847 : // because each subchunk is guaranteed to have a single
848 : // data description ID.
849 0 : chanSelFlagTemplate = _chanSelFlags.find(spw)->second;
850 0 : initTemplate = False;
851 : }
852 0 : auto dataShape = flagCube.shape();
853 0 : chanSelFlags.resize(dataShape, False);
854 0 : auto ncorr = dataShape[0];
855 0 : auto nrows = dataShape[2];
856 0 : IPosition start(3, 0);
857 0 : IPosition end = dataShape - 1;
858 0 : Slicer sl(start, end, Slicer::endIsLast);
859 0 : for (uInt corr=0; corr<ncorr; ++corr) {
860 0 : start[0] = corr;
861 0 : end[0] = corr;
862 0 : for (Int row=0; row<nrows; ++row) {
863 0 : start[2] = row;
864 0 : end[2] = row;
865 0 : sl.setStart(start);
866 0 : sl.setEnd(end);
867 0 : chanSelFlags(sl) = chanSelFlagTemplate;
868 : }
869 : }
870 0 : return flagCube || chanSelFlags;
871 0 : }
872 :
873 0 : void StatWtTVI::initWeightSpectrum (const Cube<Float>& wtspec) {
874 : // Pass to next layer down
875 0 : getVii()->initWeightSpectrum(wtspec);
876 0 : }
877 :
878 0 : void StatWtTVI::initSigmaSpectrum (const Cube<Float>& sigspec) {
879 : // Pass to next layer down
880 0 : getVii()->initSigmaSpectrum(sigspec);
881 0 : }
882 :
883 :
884 3499 : void StatWtTVI::writeBackChanges(VisBuffer2 *vb) {
885 : // Pass to next layer down
886 3499 : getVii()->writeBackChanges(vb);
887 3499 : }
888 :
889 46 : void StatWtTVI::summarizeFlagging() const {
890 46 : auto orig = (Double)_nOrigFlaggedPts/(Double)_nTotalPts*100;
891 46 : auto stwt = (Double)_nNewFlaggedPts/(Double)_nTotalPts*100;
892 46 : auto total = orig + stwt;
893 92 : LogIO log(LogOrigin("StatWtTVI", __func__));
894 : log << LogIO::NORMAL << "Originally, " << orig
895 : << "% of the data were flagged. StatWtTVI flagged an "
896 46 : << "additional " << stwt << "%." << LogIO::POST;
897 : log << LogIO::NORMAL << "TOTAL FLAGGED DATA AFTER RUNNING STATWT: "
898 46 : << total << "%" << LogIO::POST;
899 46 : log << LogIO::NORMAL << std::endl << LogIO::POST;
900 46 : if (_nOrigFlaggedPts == _nTotalPts) {
901 : log << LogIO::WARN << "IT APPEARS THAT ALL THE DATA IN THE INPUT "
902 : << "MS/SELECTION WERE FLAGGED PRIOR TO RUNNING STATWT"
903 0 : << LogIO::POST;
904 0 : log << LogIO::NORMAL << std::endl << LogIO::POST;
905 : }
906 46 : else if (_nOrigFlaggedPts + _nNewFlaggedPts == _nTotalPts) {
907 : log << LogIO::WARN << "IT APPEARS THAT STATWT FLAGGED ALL THE DATA "
908 : "IN THE REQUESTED SELECTION THAT WASN'T ORIGINALLY FLAGGED"
909 0 : << LogIO::POST;
910 0 : log << LogIO::NORMAL << std::endl << LogIO::POST;
911 : }
912 46 : String col0 = "SPECTRAL_WINDOW";
913 46 : String col1 = "SAMPLES_WITH_NON-ZERO_VARIANCE";
914 : String col2 = "SAMPLES_WHERE_REAL_PART_VARIANCE_DIFFERS_BY_>50%_FROM_"
915 46 : "IMAGINARY_PART";
916 46 : log << LogIO::NORMAL << col0 << " " << col1 << " " << col2 << LogIO::POST;
917 46 : auto n0 = col0.size();
918 46 : auto n1 = col1.size();
919 46 : auto n2 = col2.size();
920 95 : for (const auto& sample: *_samples) {
921 49 : ostringstream oss;
922 49 : oss << std::setw(n0) << sample.first << " " << std::setw(n1)
923 49 : << sample.second.first << " " << std::setw(n2)
924 49 : << sample.second.second;
925 49 : log << LogIO::NORMAL << oss.str() << LogIO::POST;
926 49 : }
927 46 : }
928 :
929 46 : void StatWtTVI::summarizeStats(Double& mean, Double& variance) const {
930 92 : LogIO log(LogOrigin("StatWtTVI", __func__));
931 46 : _logUsedChannels();
932 : try {
933 46 : mean = _wtStats->getStatistic(StatisticsData::MEAN);
934 46 : variance = _wtStats->getStatistic(StatisticsData::VARIANCE);
935 : log << LogIO::NORMAL << "The mean of the computed weights is "
936 46 : << mean << LogIO::POST;
937 : log << LogIO::NORMAL << "The variance of the computed weights is "
938 46 : << variance << LogIO::POST;
939 : log << LogIO::NORMAL << "Weights which had corresponding flags of True "
940 : << "prior to running this application were not used to compute these "
941 46 : << "stats." << LogIO::POST;
942 : }
943 0 : catch (const AipsError& x) {
944 : log << LogIO::WARN << "There was a problem calculating the mean and "
945 : << "variance of the weights computed by this application. Perhaps there "
946 : << "was something amiss with the input MS and/or the selection criteria. "
947 : << "Examples of such issues are that all the data were originally flagged "
948 : << "or that the sample size was consistently too small for computations "
949 0 : << "of variances" << LogIO::POST;
950 0 : setNaN(mean);
951 0 : setNaN(variance);
952 0 : }
953 46 : }
954 :
955 328 : void StatWtTVI::origin() {
956 : // Drive underlying ViImplementation2
957 328 : getVii()->origin();
958 : // Synchronize own VisBuffer
959 328 : configureNewSubchunk();
960 328 : _clearCache();
961 328 : }
962 :
963 3559 : void StatWtTVI::next() {
964 : // Drive underlying ViImplementation2
965 3559 : getVii()->next();
966 : // Synchronize own VisBuffer
967 3559 : configureNewSubchunk();
968 3559 : _clearCache();
969 3559 : }
970 :
971 : }
972 :
973 : }
|