Line data Source code
1 : //# FlagMSHandler.h: This file contains the implementation of the FlagMSHandler class.
2 : //#
3 : //# CASA - Common Astronomy Software Applications (http://casa.nrao.edu/)
4 : //# Copyright (C) Associated Universities, Inc. Washington DC, USA 2011, All rights reserved.
5 : //# Copyright (C) European Southern Observatory, 2011, All rights reserved.
6 : //#
7 : //# This library is free software; you can redistribute it and/or
8 : //# modify it under the terms of the GNU Lesser General Public
9 : //# License as published by the Free software Foundation; either
10 : //# version 2.1 of the License, or (at your option) any later version.
11 : //#
12 : //# This library is distributed in the hope that it will be useful,
13 : //# but WITHOUT ANY WARRANTY, without even the implied warranty of
14 : //# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 : //# Lesser General Public License for more details.
16 : //#
17 : //# You should have received a copy of the GNU Lesser General Public
18 : //# License along with this library; if not, write to the Free Software
19 : //# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20 : //# MA 02111-1307 USA
21 : //# $Id: $
22 :
23 : #include <flagging/Flagging/FlagMSHandler.h>
24 :
25 : // Measurement Set selection
26 : #include <casacore/ms/MeasurementSets/MeasurementSet.h>
27 : #include <casacore/ms/MSSel/MSSelection.h>
28 : #include <casacore/ms/MeasurementSets/MSAntennaColumns.h>
29 : #include <casacore/ms/MeasurementSets/MSFieldColumns.h>
30 : #include <casacore/ms/MeasurementSets/MSPolColumns.h>
31 : #include <casacore/ms/MeasurementSets/MSSpWindowColumns.h>
32 : #include <casacore/ms/MeasurementSets/MSProcessorColumns.h>
33 :
34 : #include <msvis/MSVis/ViFrequencySelection.h>
35 :
36 : #include <mstransform/TVI/ChannelAverageTVI.h>
37 : #include <msvis/MSVis/LayeredVi2Factory.h>
38 : #include <synthesis/TransformMachines2/VisModelData.h>
39 :
40 : using namespace casacore;
41 : namespace casa { //# NAMESPACE CASA - BEGIN
42 :
43 : ////////////////////////////////////
44 : /// FlagMSHandler implementation ///
45 : ////////////////////////////////////
46 :
47 :
48 : // -----------------------------------------------------------------------
49 : // Default constructor
50 : // -----------------------------------------------------------------------
51 911 : FlagMSHandler::FlagMSHandler(string tablename, uShort iterationApproach, Double timeInterval):
52 911 : FlagDataHandler(tablename,iterationApproach,timeInterval)
53 : {
54 911 : selectedMeasurementSet_p = NULL;
55 911 : originalMeasurementSet_p = NULL;
56 911 : visibilityIterator_p = NULL;
57 911 : tableTye_p = MEASUREMENT_SET;
58 911 : processorTableExist_p = false;
59 911 : }
60 :
61 :
62 : // -----------------------------------------------------------------------
63 : // Default destructor
64 : // -----------------------------------------------------------------------
65 1822 : FlagMSHandler::~FlagMSHandler()
66 : {
67 911 : logger_p->origin(LogOrigin("FlagDataHandler",__FUNCTION__,WHERE));
68 911 : *logger_p << LogIO::DEBUG2 << "FlagMSHandler::~FlagMSHandler()" << LogIO::POST;
69 :
70 : // Delete MS objects
71 911 : if (selectedMeasurementSet_p) delete selectedMeasurementSet_p;
72 911 : if (originalMeasurementSet_p) delete originalMeasurementSet_p;
73 :
74 : // Delete VisibilityIterator
75 911 : if (visibilityIterator_p) delete visibilityIterator_p;
76 1822 : }
77 :
78 :
79 : // -----------------------------------------------------------------------
80 : // Open Measurement Set
81 : // -----------------------------------------------------------------------
82 : bool
83 911 : FlagMSHandler::open()
84 : {
85 911 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
86 :
87 911 : if (originalMeasurementSet_p) delete originalMeasurementSet_p;
88 : //originalMeasurementSet_p = new MeasurementSet(tablename_p,Table::Update);
89 :
90 911 : if(Table::isWritable(tablename_p))
91 : {
92 911 : originalMeasurementSet_p = new MeasurementSet(tablename_p, TableLock(TableLock::AutoNoReadLocking) ,Table::Update);
93 : }
94 : else
95 : {
96 0 : originalMeasurementSet_p = new MeasurementSet(tablename_p, TableLock(TableLock::AutoNoReadLocking) ,Table::Old);
97 : }
98 :
99 :
100 : // Activate Memory Resident Sub-tables for everything but Pointing, Syscal and History
101 911 : originalMeasurementSet_p->setMemoryResidentSubtables (MrsEligibility::defaultEligible());
102 :
103 : // Read antenna names and diameters from Antenna table
104 911 : MSAntennaColumns antennaSubTable(originalMeasurementSet_p->antenna());
105 911 : antennaNames_p = new Vector<String>(antennaSubTable.name().getColumn());
106 911 : antennaDiameters_p = new Vector<Double>(antennaSubTable.dishDiameter().getColumn());
107 911 : antennaPositions_p = new ScalarMeasColumn<MPosition>(antennaSubTable.positionMeas());
108 :
109 : // File the baseline to Ant1xAnt2 map
110 911 : String baseline;
111 911 : std::pair<Int,Int> ant1ant2;
112 15073 : for (Int ant1Idx=0;ant1Idx < static_cast<Int>(antennaNames_p->size());ant1Idx++)
113 : {
114 198116 : for (Int ant2Idx=ant1Idx+1;ant2Idx < static_cast<Int>(antennaNames_p->size());ant2Idx++)
115 : {
116 183954 : ant1ant2.first = ant1Idx;
117 183954 : ant1ant2.second = ant2Idx;
118 183954 : baseline = antennaNames_p->operator()(ant1Idx) + "&&" + antennaNames_p->operator()(ant2Idx);
119 183954 : baselineToAnt1Ant2_p[baseline] = ant1ant2;
120 183954 : Ant1Ant2ToBaseline_p[ant1ant2] = baseline;
121 : }
122 : }
123 :
124 : // Read field names
125 : {
126 911 : MSFieldColumns *fieldSubTable = new MSFieldColumns(originalMeasurementSet_p->field());
127 911 : fieldNames_p = new Vector<String>(fieldSubTable->name().getColumn());
128 911 : delete fieldSubTable;
129 : }
130 :
131 : // Read polarizations
132 911 : MSPolarizationColumns polarizationSubTable(originalMeasurementSet_p->polarization());
133 911 : ArrayColumn<Int> corrTypeColum = polarizationSubTable.corrType();
134 911 : corrProducts_p = new std::vector<String>();
135 1939 : for (uInt polRow_idx=0;polRow_idx<corrTypeColum.nrow();polRow_idx++)
136 : {
137 1028 : Array<Int> polRow = corrTypeColum(polRow_idx);
138 3818 : for (uInt corr_i=0;corr_i<polRow.size();corr_i++)
139 : {
140 2790 : switch (polRow(IPosition(1,corr_i)))
141 : {
142 31 : case Stokes::I:
143 : {
144 31 : *logger_p << LogIO::DEBUG1 << " Correlation product I found, which should correspond to ALMA WVR data - skipping" << LogIO::POST;
145 : // jagonzal (CAS-4234): We need this to Sanitize correlation expressions
146 31 : corrProducts_p->push_back("I");
147 31 : break;
148 : }
149 0 : case Stokes::Q:
150 : {
151 0 : *logger_p << LogIO::DEBUG1 << " Correlation product Q found" << LogIO::POST;
152 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("Q")) == corrProducts_p->end()) corrProducts_p->push_back("Q");
153 0 : break;
154 : }
155 0 : case Stokes::U:
156 : {
157 0 : *logger_p << LogIO::DEBUG1 << " Correlation product U found" << LogIO::POST;
158 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("U")) == corrProducts_p->end()) corrProducts_p->push_back("U");
159 0 : break;
160 : }
161 0 : case Stokes::V:
162 : {
163 0 : *logger_p << LogIO::DEBUG1 << " Correlation product V found" << LogIO::POST;
164 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("V")) == corrProducts_p->end()) corrProducts_p->push_back("V");
165 0 : break;
166 : }
167 156 : case Stokes::XX:
168 : {
169 156 : *logger_p << LogIO::DEBUG1 << " Correlation product XX found" << LogIO::POST;
170 156 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("XX")) == corrProducts_p->end()) corrProducts_p->push_back("XX");
171 156 : break;
172 : }
173 77 : case Stokes::YY:
174 : {
175 77 : *logger_p << LogIO::DEBUG1 << " Correlation product YY found" << LogIO::POST;
176 77 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("YY")) == corrProducts_p->end()) corrProducts_p->push_back("YY");
177 77 : break;
178 : }
179 12 : case Stokes::XY:
180 : {
181 12 : *logger_p << LogIO::DEBUG1 << " Correlation product XY found" << LogIO::POST;
182 12 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("XY")) == corrProducts_p->end()) corrProducts_p->push_back("XY");
183 12 : break;
184 : }
185 12 : case Stokes::YX:
186 : {
187 12 : *logger_p << LogIO::DEBUG1 << " Correlation product YX found" << LogIO::POST;
188 12 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("YX")) == corrProducts_p->end()) corrProducts_p->push_back("YX");
189 12 : break;
190 : }
191 824 : case Stokes::RR:
192 : {
193 824 : *logger_p << LogIO::DEBUG1 << " Correlation product RR found" << LogIO::POST;
194 824 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("RR")) == corrProducts_p->end()) corrProducts_p->push_back("RR");
195 824 : break;
196 : }
197 806 : case Stokes::LL:
198 : {
199 806 : *logger_p << LogIO::DEBUG1 << " Correlation product LL found" << LogIO::POST;
200 806 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("LL")) == corrProducts_p->end()) corrProducts_p->push_back("LL");
201 806 : break;
202 : }
203 436 : case Stokes::RL:
204 : {
205 436 : *logger_p << LogIO::DEBUG1 << " Correlation product RL found" << LogIO::POST;
206 436 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("RL")) == corrProducts_p->end()) corrProducts_p->push_back("RL");
207 436 : break;
208 : }
209 436 : case Stokes::LR:
210 : {
211 436 : *logger_p << LogIO::DEBUG1 << " Correlation product LR found" << LogIO::POST;
212 436 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("LR")) == corrProducts_p->end()) corrProducts_p->push_back("LR");
213 436 : break;
214 : }
215 0 : default:
216 : {
217 0 : *logger_p << LogIO::WARN << " Correlation product unknown found: " << polRow(IPosition(1,corr_i)) << LogIO::POST;
218 0 : break;
219 : }
220 : }
221 : }
222 1028 : }
223 :
224 : // Read reference frequencies per SPW
225 1822 : MSSpWindowColumns spwSubTable(originalMeasurementSet_p->spectralWindow());
226 911 : ScalarColumn<Double> refFrequencies = spwSubTable.refFrequency();
227 911 : lambdaMap_p = new lambdaMap();
228 6224 : for (uInt spwidx=0;spwidx<refFrequencies.nrow();spwidx++)
229 : {
230 5313 : (*lambdaMap_p)[spwidx]=C::c/refFrequencies(spwidx);
231 5313 : *logger_p << LogIO::DEBUG1 << " spwidx: " << spwidx << " lambda: " << (*lambdaMap_p)[spwidx] << LogIO::POST;
232 : }
233 :
234 1822 : return true;
235 911 : }
236 :
237 :
238 : // -----------------------------------------------------------------------
239 : // Close Measurement Set
240 : // -----------------------------------------------------------------------
241 : bool
242 0 : FlagMSHandler::close()
243 : {
244 0 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
245 :
246 0 : if (selectedMeasurementSet_p)
247 : {
248 : // Flush and unlock MS
249 0 : selectedMeasurementSet_p->flush();
250 0 : selectedMeasurementSet_p->relinquishAutoLocks(true);
251 0 : selectedMeasurementSet_p->unlock();
252 :
253 : // Post stats
254 0 : if (stats_p)
255 : {
256 0 : *logger_p << LogIO::NORMAL << " Total Flag Cube accesses: " << cubeAccessCounter_p << LogIO::POST;
257 : }
258 : }
259 :
260 0 : return true;
261 : }
262 :
263 :
264 : // -----------------------------------------------------------------------
265 : // Generate selected Measurement Set
266 : // -----------------------------------------------------------------------
267 : bool
268 795 : FlagMSHandler::selectData()
269 : {
270 795 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
271 :
272 : // Create Measurement Selection object
273 795 : const String dummyExpr = String("");
274 795 : if (measurementSetSelection_p) {
275 0 : delete measurementSetSelection_p;
276 0 : measurementSetSelection_p = NULL;
277 : }
278 :
279 : // Set the MS Selection error handler to catch antenna names that are
280 : // not present in the MS in an expression that contains valid antenna names.
281 : // Catch also invalid intent or spw values in a valid expression.
282 : // This will issue a WARNING and not fail.
283 795 : MSSelectionLogError mssLEA,mssLES,mssLESPW;
284 795 : measurementSetSelection_p = new MSSelection();
285 :
286 795 : measurementSetSelection_p->setErrorHandler(MSSelection::ANTENNA_EXPR, &mssLEA);
287 795 : measurementSetSelection_p->setErrorHandler(MSSelection::STATE_EXPR, &mssLES);
288 795 : measurementSetSelection_p->setErrorHandler(MSSelection::SPW_EXPR, &mssLESPW, true);
289 :
290 1590 : measurementSetSelection_p->reset(
291 795 : *originalMeasurementSet_p,
292 795 : MSSelection::PARSE_NOW,
293 1590 : (const String)timeSelection_p,
294 1590 : (const String)baselineSelection_p,
295 1590 : (const String)fieldSelection_p,
296 1590 : (const String)spwSelection_p,
297 1590 : (const String)uvwSelection_p,
298 : dummyExpr, // taqlExpr
299 1590 : (const String)polarizationSelection_p,
300 1590 : (const String)scanSelection_p,
301 1590 : (const String)arraySelection_p,
302 1590 : (const String)scanIntentSelection_p,
303 1590 : (const String)observationSelection_p);
304 :
305 : // Apply Measurement Selection to a copy of the original Measurement Set
306 795 : MeasurementSet auxMeasurementSet = MeasurementSet(*originalMeasurementSet_p);
307 795 : measurementSetSelection_p->getSelectedMS(auxMeasurementSet, String(""));
308 795 : if (selectedMeasurementSet_p) delete selectedMeasurementSet_p;
309 795 : selectedMeasurementSet_p = new MeasurementSet(auxMeasurementSet);
310 :
311 : // Check if selected MS has rows...
312 795 : if (selectedMeasurementSet_p->nrow() == 0)
313 : {
314 0 : *logger_p << LogIO::WARN << "Selected Measurement Set doesn't have any rows " << LogIO::POST;
315 : }
316 :
317 : // More debugging information from MS-Selection
318 795 : if (!arraySelection_p.empty())
319 : {
320 1 : *logger_p << LogIO::DEBUG1 << " Selected array ids are " << measurementSetSelection_p->getSubArrayList() << LogIO::POST;
321 : }
322 :
323 795 : if (!observationSelection_p.empty())
324 : {
325 5 : *logger_p << LogIO::DEBUG1 << " Selected observation ids are " << measurementSetSelection_p->getObservationList() << LogIO::POST;
326 : }
327 :
328 795 : if (!fieldSelection_p.empty())
329 : {
330 10 : *logger_p << LogIO::DEBUG1 << " Selected field ids are " << measurementSetSelection_p->getFieldList() << LogIO::POST;
331 : }
332 :
333 795 : if (!scanSelection_p.empty())
334 : {
335 38 : *logger_p << LogIO::DEBUG1 << " Selected scan ids are " << measurementSetSelection_p->getScanList() << LogIO::POST;
336 : }
337 :
338 795 : if (!scanIntentSelection_p.empty())
339 : {
340 2 : *logger_p << LogIO::DEBUG1 << " Selected scan intent ids are " << measurementSetSelection_p->getStateObsModeList() << LogIO::POST;
341 : }
342 :
343 795 : if (!timeSelection_p.empty())
344 : {
345 5 : *logger_p << LogIO::DEBUG1 << " Selected time range is " << measurementSetSelection_p->getTimeList() << LogIO::POST;
346 : }
347 :
348 795 : if (!spwSelection_p.empty())
349 : {
350 134 : *logger_p << LogIO::NORMAL << " Selected spw-channels ids are " << measurementSetSelection_p->getChanList() << LogIO::POST;
351 134 : if (measurementSetSelection_p->getChanList().size())
352 : {
353 134 : inrowSelection_p = true;
354 : }
355 : }
356 :
357 795 : if (!baselineSelection_p.empty())
358 : {
359 31 : *logger_p << LogIO::DEBUG1 << " Selected antenna1 ids are " << measurementSetSelection_p->getAntenna1List() << LogIO::POST;
360 31 : *logger_p << LogIO::DEBUG1 << " Selected antenna2 ids are " << measurementSetSelection_p->getAntenna2List() << LogIO::POST;
361 31 : *logger_p << LogIO::DEBUG1 << " Selected baselines are " << measurementSetSelection_p->getBaselineList() << LogIO::POST;
362 : }
363 :
364 795 : if (!uvwSelection_p.empty())
365 : {
366 2 : *logger_p << LogIO::DEBUG1 << " Selected uv range is " << measurementSetSelection_p->getUVList() << LogIO::POST;
367 : }
368 :
369 795 : if (!polarizationSelection_p.empty())
370 : {
371 0 : ostringstream polarizationListToPrint (ios::in | ios::out);
372 0 : polarizationListToPrint << measurementSetSelection_p->getPolMap();
373 0 : *logger_p << LogIO::DEBUG1 << " Selected correlation ids are " << polarizationListToPrint.str() << LogIO::POST;
374 0 : }
375 :
376 : // There is a new selected MS so iterators have to be regenerated
377 795 : iteratorGenerated_p = false;
378 795 : chunksInitialized_p = false;
379 795 : buffersInitialized_p = false;
380 795 : stopIteration_p = false;
381 :
382 795 : return true;
383 795 : }
384 :
385 :
386 : // -----------------------------------------------------------------------
387 : // Parse MSSelection expression
388 : // -----------------------------------------------------------------------
389 : bool
390 402 : FlagMSHandler::parseExpression(MSSelection &parser)
391 : {
392 402 : parser.toTableExprNode(selectedMeasurementSet_p);
393 400 : return true;
394 : }
395 :
396 :
397 : // -----------------------------------------------------------------------
398 : // Swap MS to check what is the maximum RAM memory needed
399 : // -----------------------------------------------------------------------
400 : void
401 14 : FlagMSHandler::preSweep()
402 : {
403 14 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
404 :
405 156 : for (visibilityIterator_p->originChunks(); visibilityIterator_p->moreChunks();visibilityIterator_p->nextChunk())
406 : {
407 : // Iterate over vis buffers
408 6542 : for (visibilityIterator_p->origin(); visibilityIterator_p->more();visibilityIterator_p->next())
409 : {
410 :
411 6400 : if (mapScanStartStop_p)
412 : {
413 6400 : generateScanStartStopMap();
414 : }
415 : }
416 : }
417 :
418 14 : if (mapScanStartStop_p)
419 : {
420 14 : *logger_p << LogIO::NORMAL << " " << scanStartStopMap_p->size() <<" Scans found in MS" << LogIO::POST;
421 : }
422 :
423 14 : return;
424 : }
425 :
426 :
427 : // -----------------------------------------------------------------------
428 : // Generate Visibility Iterator with a given sort order and time interval
429 : // -----------------------------------------------------------------------
430 : bool
431 768 : FlagMSHandler::generateIterator()
432 : {
433 768 : if (!iteratorGenerated_p)
434 : {
435 :
436 768 : if (asyncio_enabled_p)
437 : {
438 : // Set preFetchColumns
439 0 : prefetchColumns_p = new VisBufferComponents2();
440 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FlagCube);
441 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FlagRow);
442 0 : prefetchColumns_p->operator +=(VisBufferComponent2::NRows);
443 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FieldId);
444 :
445 0 : preFetchColumns();
446 :
447 : // Then create and initialize RO Async iterator
448 0 : if (visibilityIterator_p) delete visibilityIterator_p;
449 0 : visibilityIterator_p = new vi::VisibilityIterator2(*selectedMeasurementSet_p,
450 0 : vi::SortColumns (sortOrder_p, true),
451 0 : true,prefetchColumns_p,timeInterval_p);
452 : }
453 : else
454 : {
455 : //The vector where all the factories will be stacked
456 768 : std::vector<vi::ViiLayerFactory*> factories;
457 :
458 : //Create a VI Factory that access directly the MS (or the selected MS)
459 : vi::IteratingParameters ipar(timeInterval_p,
460 768 : vi::SortColumns (sortOrder_p, true));
461 768 : std::unique_ptr<vi::VisIterImpl2LayerFactory> directMsVIFactory;
462 768 : directMsVIFactory.reset(new vi::VisIterImpl2LayerFactory(selectedMeasurementSet_p,ipar, true));
463 : //Apply the channel selections only on the layer that access the MS directly
464 768 : if(enableChanAvg_p)
465 21 : applyChannelSelection(directMsVIFactory.get());
466 768 : factories.push_back(directMsVIFactory.get());
467 :
468 : //Add a time averaging layer if so requested
469 768 : std::unique_ptr<vi::AveragingVi2LayerFactory> timeAvgFactory;
470 768 : if(enableTimeAvg_p)
471 : {
472 : // Time averaging in clip mode uses the Time Averaging Iterator
473 0 : vi::AveragingParameters parameters(timeAverageBin_p, 0,vi::SortColumns(sortOrder_p, false),
474 17 : timeAvgOptions_p,0.0,NULL,true);
475 17 : timeAvgFactory.reset(new vi::AveragingVi2LayerFactory(parameters));
476 17 : factories.push_back(timeAvgFactory.get());
477 17 : }
478 :
479 : //Create a ChannelAverageTVI Factory if so requested
480 768 : std::unique_ptr<vi::ChannelAverageTVILayerFactory> chanAvgFactory;
481 768 : if(enableChanAvg_p)
482 : {
483 21 : chanAvgFactory.reset(new vi::ChannelAverageTVILayerFactory(chanAvgOptions_p));
484 21 : factories.push_back(chanAvgFactory.get());
485 : }
486 :
487 : //Create the final visibility iterator
488 768 : Vector<vi::ViiLayerFactory*> const factoriesV(factories);
489 768 : visibilityIterator_p = new vi::VisibilityIterator2(factoriesV);
490 768 : }
491 :
492 : // Do quack pre-sweep
493 768 : if (mapScanStartStop_p)
494 14 : preSweep();
495 :
496 : // Set the table data manager (ISM and SSM) cache size to the full column size, for
497 : // the columns ANTENNA1, ANTENNA2, FEED1, FEED2, TIME, INTERVAL, FLAG_ROW, SCAN_NUMBER and UVW
498 768 : if (slurp_p) visibilityIterator_p->slurp();
499 :
500 : // Apply channel selection
501 : // CAS-3959: Channel selection is now going to be handled at the FlagAgent level
502 : // applyChannelSelection(visibilityIterator_p);
503 :
504 : // Group all the time stamps in one single buffer
505 : // NOTE: Otherwise we have to iterate over Visibility Buffers
506 : // that contain all the rows with the same time step.
507 768 : if (groupTimeSteps_p)
508 : {
509 : // Set row blocking to a huge number
510 70 : uLong maxChunkRows = selectedMeasurementSet_p->nrow();
511 70 : visibilityIterator_p->setRowBlocking(maxChunkRows);
512 :
513 70 : *logger_p << LogIO::NORMAL << "Setting row blocking to number of rows in selected table: " << maxChunkRows << LogIO::POST;
514 :
515 :
516 : }
517 :
518 : // Get Visibility Buffer reference from VisibilityIterator
519 768 : visibilityBuffer_p = visibilityIterator_p->getVisBuffer();
520 :
521 : // Get the TYPE column of the PROCESSOR sub-table
522 768 : if (loadProcessorTable_p){
523 5 : processorTable();
524 5 : loadProcessorTable_p = false;
525 : }
526 :
527 768 : iteratorGenerated_p = true;
528 : }
529 768 : chunksInitialized_p = false;
530 768 : buffersInitialized_p = false;
531 768 : stopIteration_p = false;
532 768 : processedRows_p = 0;
533 :
534 768 : return true;
535 : }
536 :
537 :
538 : // -----------------------------------------------------------------------
539 : // Apply channel selection for asyn or normal iterator
540 : // NOTE (first implementation): We always have to do this, even if there is no SPW:channel selection
541 : // NOTE (after Dic 2011 testing): As far as I know spw selection does not have to be *, it can be empty,
542 : // therefore this step will in practice do nothing , because the spw and channel lists are empty too.
543 : // -----------------------------------------------------------------------
544 : void
545 21 : FlagMSHandler::applyChannelSelection(vi::VisIterImpl2LayerFactory *viFactory)
546 : {
547 21 : vi::FrequencySelectionUsingChannels channelSelector;
548 :
549 : // Apply channel selection (in row selection cannot be done with MSSelection)
550 : // NOTE: Each row of the Matrix has the following elements: SpwID StartCh StopCh Step
551 21 : Matrix<Int> spwchan = measurementSetSelection_p->getChanList();
552 21 : IPosition shape = spwchan.shape();
553 21 : uInt nSelections = shape[0];
554 : Int spw,channelStart,channelStop,channelStep,channelWidth;
555 28 : for(uInt selection_i=0;selection_i<nSelections;selection_i++)
556 : {
557 : // NOTE: selectChannel needs channelStart,channelWidth,channelStep
558 7 : spw = spwchan(selection_i,0);
559 7 : channelStart = spwchan(selection_i,1);
560 7 : channelStop = spwchan(selection_i,2);
561 7 : channelStep = spwchan(selection_i,3);
562 7 : channelWidth = channelStop-channelStart+1;
563 7 : channelSelector.add (spw, channelStart, channelWidth,channelStep);
564 : }
565 :
566 21 : auto freqSel = std::make_shared<vi::FrequencySelections>();
567 21 : freqSel->add(channelSelector);
568 21 : viFactory->setFrequencySelections(freqSel);
569 :
570 42 : return;
571 21 : }
572 :
573 :
574 : // -----------------------------------------------------------------------
575 : // Move to next chunk
576 : // -----------------------------------------------------------------------
577 : bool
578 7193 : FlagMSHandler::nextChunk()
579 : {
580 7193 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
581 :
582 7193 : chunkCounts_p = 0;
583 7193 : bool moreChunks = false;
584 7193 : if (stopIteration_p)
585 : {
586 0 : moreChunks = false;
587 : }
588 : else
589 : {
590 7193 : if (!chunksInitialized_p)
591 : {
592 768 : if (!iteratorGenerated_p) generateIterator();
593 768 : visibilityIterator_p->originChunks();
594 768 : chunksInitialized_p = true;
595 768 : buffersInitialized_p = false;
596 768 : chunkNo_p++;
597 768 : bufferNo_p = 0;
598 768 : moreChunks = true;
599 : }
600 : else
601 : {
602 6425 : visibilityIterator_p->nextChunk();
603 6425 : if (visibilityIterator_p->moreChunks())
604 : {
605 5657 : buffersInitialized_p = false;
606 5657 : moreChunks = true;
607 5657 : chunkNo_p++;
608 5657 : bufferNo_p = 0;
609 : }
610 : }
611 : }
612 :
613 7193 : if (!moreChunks)
614 : {
615 768 : *logger_p << LogIO::NORMAL << "==================================================================================== " << LogIO::POST;
616 : }
617 :
618 7193 : return moreChunks;
619 : }
620 :
621 :
622 : // -----------------------------------------------------------------------
623 : // Move to next buffer
624 : // -----------------------------------------------------------------------
625 : bool
626 348796 : FlagMSHandler::nextBuffer()
627 : {
628 348796 : bool moreBuffers = false;
629 348796 : if (stopIteration_p) {
630 0 : return moreBuffers;
631 : }
632 :
633 348796 : if (!buffersInitialized_p)
634 : {
635 6425 : visibilityIterator_p->origin();
636 6425 : buffersInitialized_p = true;
637 6425 : if (!asyncio_enabled_p) preFetchColumns();
638 6425 : if (mapAntennaPairs_p) generateAntennaPairMap();
639 6425 : if (mapSubIntegrations_p) generateSubIntegrationMap();
640 6425 : if (mapPolarizations_p) generatePolarizationsMap();
641 6425 : if (mapAntennaPointing_p) generateAntennaPointingMap();
642 :
643 6425 : moreBuffers = true;
644 6425 : flushFlags_p = false;
645 6425 : flushFlagRow_p = false;
646 6425 : bufferNo_p++;
647 : }
648 : else
649 : {
650 : // WARNING: ++ operator is defined for VisibilityIterator class ("advance" function)
651 : // but if you define a VisibilityIterator pointer, then ++ operator does not call
652 : // the advance function but increments the pointers.
653 342371 : visibilityIterator_p->next();
654 :
655 : // WARNING: We iterate and afterwards check if the iterator is valid
656 342371 : if (visibilityIterator_p->more())
657 : {
658 335946 : if (!asyncio_enabled_p) preFetchColumns();
659 335946 : if (mapAntennaPairs_p) generateAntennaPairMap();
660 335946 : if (mapSubIntegrations_p) generateSubIntegrationMap();
661 335946 : if (mapPolarizations_p) generatePolarizationsMap();
662 335946 : if (mapAntennaPointing_p) generateAntennaPointingMap();
663 335946 : moreBuffers = true;
664 335946 : flushFlags_p = false;
665 335946 : flushFlagRow_p = false;
666 335946 : bufferNo_p++;
667 : }
668 : }
669 :
670 :
671 : // Set new common flag cube
672 348796 : if (moreBuffers)
673 : {
674 : // Get flag (WARNING: We have to modify the shape of the cube before re-assigning it)
675 342371 : Cube<Bool> curentFlagCube= visibilityBuffer_p->flagCube();
676 342371 : modifiedFlagCube_p.resize(curentFlagCube.shape());
677 342371 : modifiedFlagCube_p = curentFlagCube;
678 342371 : originalFlagCube_p.resize(curentFlagCube.shape());
679 342371 : originalFlagCube_p = curentFlagCube;
680 :
681 : // Get flag row (WARNING: We have to modify the shape of the cube before re-assigning it)
682 342371 : Vector<Bool> curentflagRow= visibilityBuffer_p->flagRow();
683 342371 : modifiedFlagRow_p.resize(curentflagRow.shape());
684 342371 : modifiedFlagRow_p = curentflagRow;
685 342371 : originalFlagRow_p.resize(curentflagRow.shape());
686 342371 : originalFlagRow_p = curentflagRow;
687 :
688 : // Compute total number of flags per buffer to be used for generating the agents stats
689 342371 : Int64 currentBufferCounts = curentFlagCube.shape().product();
690 342371 : chunkCounts_p += currentBufferCounts;
691 342371 : progressCounts_p += currentBufferCounts;
692 342371 : msCounts_p += currentBufferCounts;
693 :
694 : // Print chunk characteristics
695 342371 : if (bufferNo_p == 1)
696 : {
697 6425 : processedRows_p += visibilityIterator_p->nRowsInChunk();
698 6425 : double progress = 100.0* ((double) processedRows_p / (double) selectedMeasurementSet_p->nrow());
699 : // Send only a limited number of lines to INFO level, 'other' lines to debug
700 6425 : if (checkDoChunkLine(progress)) {
701 4008 : *logger_p << LogIO::NORMAL;
702 : } else {
703 2417 : *logger_p << LogIO::DEBUG1;
704 : }
705 6425 : logger_p->origin(LogOrigin("FlagMSHandler",""));
706 :
707 6425 : String corrs = "[ ";
708 27327 : for (uInt corr_i=0;corr_i<(uInt) visibilityBuffer_p->nCorrelations();corr_i++)
709 : {
710 20902 : corrs += (*polarizationIndexMap_p)[corr_i] + " ";
711 : }
712 6425 : corrs += "]";
713 :
714 6425 : *logger_p << "Chunk = " << chunkNo_p << " [progress: " << (Int)progress << "%]"
715 6425 : ", Observation = " << visibilityBuffer_p->observationId()[0] <<
716 6425 : ", Array = " << visibilityBuffer_p->arrayId()[0] <<
717 6425 : ", Scan = " << visibilityBuffer_p->scan()[0] <<
718 19275 : ", Field = " << visibilityBuffer_p->fieldId()(0) << " (" << fieldNames_p->operator()(visibilityBuffer_p->fieldId()) << ")"
719 6425 : ", Spw = " << visibilityBuffer_p->spectralWindows()(0) <<
720 6425 : ", Channels = " << visibilityBuffer_p->nChannels() <<
721 : ", Corrs = " << corrs <<
722 44975 : ", Total Rows = " << visibilityIterator_p->nRowsInChunk() << LogIO::POST;
723 6425 : }
724 342371 : }
725 :
726 348796 : return moreBuffers;
727 : }
728 :
729 : // -----------------------------------------------------------------------
730 : // Generate scan start stop map
731 : // -----------------------------------------------------------------------
732 : void
733 6400 : FlagMSHandler::generateScanStartStopMap()
734 : {
735 :
736 : Int scan;
737 : Double start,stop;
738 6400 : Vector<Int> scans;
739 6400 : Vector<Double> times;
740 :
741 6400 : Cube<Bool> flags;
742 : uInt scanStartRow;
743 : uInt scanStopRow;
744 : uInt ncorrs,nchannels,nrows;
745 : Bool stopSearch;
746 :
747 6400 : if (scanStartStopMap_p == NULL) scanStartStopMap_p = new scanStartStopMap();
748 :
749 6400 : scans = visibilityIterator_p->getVisBuffer()->scan();
750 6400 : times = visibilityIterator_p->getVisBuffer()->time();
751 :
752 : // Check if anything is flagged in this buffer
753 6400 : scanStartRow = 0;
754 6400 : scanStopRow = times.size()-1;
755 6400 : if (mapScanStartStopFlagged_p)
756 : {
757 300 : flags = visibilityIterator_p->getVisBuffer()->flagCube();
758 300 : IPosition shape = flags.shape();
759 300 : ncorrs = shape[0];
760 300 : nchannels = shape[1];
761 300 : nrows = shape[2];
762 :
763 : // Look for effective scan start
764 300 : stopSearch = false;
765 24351 : for (uInt row_i=0;row_i<nrows;row_i++)
766 : {
767 24288 : if (stopSearch) break;
768 :
769 1524570 : for (uInt channel_i=0;channel_i<nchannels;channel_i++)
770 : {
771 1500756 : if (stopSearch) break;
772 :
773 4501320 : for (uInt corr_i=0;corr_i<ncorrs;corr_i++)
774 : {
775 3001038 : if (stopSearch) break;
776 :
777 3000801 : if (!flags(corr_i,channel_i,row_i))
778 : {
779 237 : scanStartRow = row_i;
780 237 : stopSearch = true;
781 : }
782 : }
783 : }
784 : }
785 :
786 : // If none of the rows were un-flagged we don't continue checking from the end
787 : // As a consequence of this some scans may not be present in the map, and have
788 : // to be skipped in the flagging process because they are already flagged.
789 300 : if (!stopSearch) return;
790 :
791 : // Look for effective scan stop
792 237 : stopSearch = false;
793 474 : for (uInt row_i=0;row_i<nrows;row_i++)
794 : {
795 474 : if (stopSearch) break;
796 :
797 474 : for (uInt channel_i=0;channel_i<nchannels;channel_i++)
798 : {
799 474 : if (stopSearch) break;
800 :
801 474 : for (uInt corr_i=0;corr_i<ncorrs;corr_i++)
802 : {
803 474 : if (stopSearch) break;
804 :
805 237 : if (!flags(corr_i,channel_i,nrows-1-row_i))
806 : {
807 237 : scanStopRow = nrows-1-row_i;
808 237 : stopSearch = true;
809 : }
810 : }
811 : }
812 : }
813 300 : }
814 :
815 : // Check scan start/stop times
816 6337 : scan = scans[0];
817 6337 : start = times[scanStartRow];
818 6337 : stop = times[scanStopRow];
819 :
820 6337 : if (scanStartStopMap_p->find(scan) == scanStartStopMap_p->end())
821 : {
822 75 : (*scanStartStopMap_p)[scan].push_back(start);
823 75 : (*scanStartStopMap_p)[scan].push_back(stop);
824 : }
825 : else
826 : {
827 : // Check if we have a better start time
828 6262 : if ((*scanStartStopMap_p)[scan][0] > start)
829 : {
830 0 : (*scanStartStopMap_p)[scan][0] = start;
831 : }
832 : // Check if we have a better stop time
833 6262 : if ((*scanStartStopMap_p)[scan][1] < stop)
834 : {
835 892 : (*scanStartStopMap_p)[scan][1] = stop;
836 : }
837 : }
838 :
839 6337 : return;
840 6400 : }
841 :
842 :
843 : // -----------------------------------------------------------------------
844 : // Flush flags to MS
845 : // -----------------------------------------------------------------------
846 : bool
847 270586 : FlagMSHandler::flushFlags()
848 : {
849 270586 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
850 270586 : *logger_p << LogIO::DEBUG1
851 : << "flushing modified flags cube of shape: " << modifiedFlagCube_p.shape()
852 270586 : << LogIO::POST;
853 :
854 270586 : if (flushFlags_p)
855 : {
856 :
857 229451 : visibilityIterator_p->writeFlag(modifiedFlagCube_p);
858 229451 : flushFlags_p = false;
859 : }
860 :
861 270586 : if ((flushFlagRow_p) and (!inrowSelection_p))
862 : {
863 222200 : visibilityIterator_p->writeFlagRow(modifiedFlagRow_p);
864 222200 : flushFlagRow_p = false;
865 : }
866 :
867 270586 : return true;
868 : }
869 :
870 :
871 : // -----------------------------------------------------------------------
872 : // Flush flags to MS
873 : // -----------------------------------------------------------------------
874 : String
875 309 : FlagMSHandler::getTableName()
876 : {
877 309 : return originalMeasurementSet_p->tableName();
878 : }
879 :
880 : // -----------------------------------------------------------------------
881 : // Check what data columns exist
882 : // -----------------------------------------------------------------------
883 : bool
884 175 : FlagMSHandler::checkIfColumnExists(String column)
885 : {
886 175 : return originalMeasurementSet_p->tableDesc().isColumn(column);
887 : }
888 :
889 : // -----------------------------------------------------------------------
890 : // Whether the per-chunk progress log line should be produced
891 : // -----------------------------------------------------------------------
892 : bool
893 6425 : FlagMSHandler::checkDoChunkLine(double progress)
894 : {
895 6425 : bool result = false;
896 : // per-chunk progress report lines - every 'chunkLineThresholdInc_p' % and in
897 : // any case always the beginning (~0%) end (100% rows)
898 6425 : if ((progress >= chunkLineThreshold_p) or (progress >= 100.0)) {
899 3858 : chunkLineThreshold_p += chunkLineThresholdInc_p;
900 3858 : result = true;
901 : }
902 6425 : result |= (1 == chunkNo_p) or (processedRows_p == selectedMeasurementSet_p->nrow());
903 :
904 6425 : return result;
905 : }
906 :
907 : // ----------------------------------------------------------------------------
908 : // Signal true when a per-agent partial (ongoing run) summary has to be printed
909 : // ----------------------------------------------------------------------------
910 : bool
911 6425 : FlagMSHandler::summarySignal()
912 : {
913 6425 : Double progress = 100.0* ((Double) processedRows_p / (Double) selectedMeasurementSet_p->nrow());
914 :
915 : // The summaries will be printed with INFO level. Do not bother building per-agent
916 : // (for potentially many agents) summaries if logger priority not enough
917 6425 : auto signal = ((progress >= summaryThreshold_p) and (logger_p->priority() <= LogMessage::NORMAL));
918 6425 : if (signal) {
919 3858 : summaryThreshold_p += summaryThresholdInc_p;
920 : }
921 6425 : return signal;
922 : }
923 :
924 : // -----------------------------------------------------------------------
925 : // Get the PROCESSOR sub-table
926 : // -----------------------------------------------------------------------
927 : bool
928 5 : FlagMSHandler::processorTable()
929 : {
930 5 : MSProcessor msSubtable = selectedMeasurementSet_p->processor();
931 :
932 : // The rows in the PROCESSOR table correspond to the PROCESSOR_ID column
933 : // in main table
934 5 : if (msSubtable.nrow() == 0){
935 2 : *logger_p << LogIO::WARN << "PROCESSOR sub-table is empty. Assuming CORRELATOR type."
936 2 : << LogIO::POST;
937 2 : processorTableExist_p = false;
938 : }
939 : else {
940 3 : MSProcessorColumns tableCols(msSubtable);
941 3 : ScalarColumn<String> typeCol = tableCols.type();
942 3 : processorTableExist_p = true;
943 :
944 : /* Create a look-up boolean column to tell if a row is of type CORRELATOR.
945 : isCorrelatorType_p = true when PROCESSOR_ID is of TYPE CORRELATOR
946 : */
947 3 : isCorrelatorType_p.resize(msSubtable.nrow(),False);
948 :
949 : // Assign true to row in look-up table that have TYPE==CORRELATOR
950 8 : for (uInt pid=0; pid<msSubtable.nrow(); pid++){
951 :
952 5 : String proc_type = typeCol.asString(pid);
953 :
954 5 : if (proc_type.compare("CORRELATOR") == 0){
955 2 : isCorrelatorType_p(pid) = True;
956 : }
957 : else
958 3 : isCorrelatorType_p(pid) = False;
959 :
960 5 : }
961 3 : }
962 :
963 5 : return true;
964 :
965 5 : }
966 :
967 : /*
968 : * Check if a VIRTUAL MODEL column exists
969 : */
970 : bool
971 8 : FlagMSHandler::checkIfSourceModelColumnExists()
972 : {
973 8 : Vector<Int> fieldids;
974 :
975 8 : if (casa::refim::VisModelData::hasAnyModel(*selectedMeasurementSet_p, fieldids)){
976 4 : return true;
977 : }
978 :
979 4 : return false;
980 8 : }
981 :
982 : } //# NAMESPACE CASA - END
983 :
|