Line data Source code
1 : //# FlagMSHandler.h: This file contains the implementation of the FlagMSHandler class.
2 : //#
3 : //# CASA - Common Astronomy Software Applications (http://casa.nrao.edu/)
4 : //# Copyright (C) Associated Universities, Inc. Washington DC, USA 2011, All rights reserved.
5 : //# Copyright (C) European Southern Observatory, 2011, All rights reserved.
6 : //#
7 : //# This library is free software; you can redistribute it and/or
8 : //# modify it under the terms of the GNU Lesser General Public
9 : //# License as published by the Free software Foundation; either
10 : //# version 2.1 of the License, or (at your option) any later version.
11 : //#
12 : //# This library is distributed in the hope that it will be useful,
13 : //# but WITHOUT ANY WARRANTY, without even the implied warranty of
14 : //# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 : //# Lesser General Public License for more details.
16 : //#
17 : //# You should have received a copy of the GNU Lesser General Public
18 : //# License along with this library; if not, write to the Free Software
19 : //# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20 : //# MA 02111-1307 USA
21 : //# $Id: $
22 :
23 : #include <flagging/Flagging/FlagMSHandler.h>
24 :
25 : // Measurement Set selection
26 : #include <casacore/ms/MeasurementSets/MeasurementSet.h>
27 : #include <casacore/ms/MSSel/MSSelection.h>
28 : #include <casacore/ms/MeasurementSets/MSAntennaColumns.h>
29 : #include <casacore/ms/MeasurementSets/MSFieldColumns.h>
30 : #include <casacore/ms/MeasurementSets/MSPolColumns.h>
31 : #include <casacore/ms/MeasurementSets/MSSpWindowColumns.h>
32 : #include <casacore/ms/MeasurementSets/MSProcessorColumns.h>
33 :
34 : #include <msvis/MSVis/ViFrequencySelection.h>
35 :
36 : #include <mstransform/TVI/ChannelAverageTVI.h>
37 : #include <msvis/MSVis/LayeredVi2Factory.h>
38 : #include <synthesis/TransformMachines2/VisModelData.h>
39 :
40 : using namespace casacore;
41 : namespace casa { //# NAMESPACE CASA - BEGIN
42 :
43 : ////////////////////////////////////
44 : /// FlagMSHandler implementation ///
45 : ////////////////////////////////////
46 :
47 :
48 : // -----------------------------------------------------------------------
49 : // Default constructor
50 : // -----------------------------------------------------------------------
51 0 : FlagMSHandler::FlagMSHandler(string tablename, uShort iterationApproach, Double timeInterval):
52 0 : FlagDataHandler(tablename,iterationApproach,timeInterval)
53 : {
54 0 : selectedMeasurementSet_p = NULL;
55 0 : originalMeasurementSet_p = NULL;
56 0 : visibilityIterator_p = NULL;
57 0 : tableTye_p = MEASUREMENT_SET;
58 0 : processorTableExist_p = false;
59 0 : }
60 :
61 :
62 : // -----------------------------------------------------------------------
63 : // Default destructor
64 : // -----------------------------------------------------------------------
65 0 : FlagMSHandler::~FlagMSHandler()
66 : {
67 0 : logger_p->origin(LogOrigin("FlagDataHandler",__FUNCTION__,WHERE));
68 0 : *logger_p << LogIO::DEBUG2 << "FlagMSHandler::~FlagMSHandler()" << LogIO::POST;
69 :
70 : // Delete MS objects
71 0 : if (selectedMeasurementSet_p) delete selectedMeasurementSet_p;
72 0 : if (originalMeasurementSet_p) delete originalMeasurementSet_p;
73 :
74 : // Delete VisibilityIterator
75 0 : if (visibilityIterator_p) delete visibilityIterator_p;
76 0 : }
77 :
78 :
79 : // -----------------------------------------------------------------------
80 : // Open Measurement Set
81 : // -----------------------------------------------------------------------
82 : bool
83 0 : FlagMSHandler::open()
84 : {
85 0 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
86 :
87 0 : if (originalMeasurementSet_p) delete originalMeasurementSet_p;
88 : //originalMeasurementSet_p = new MeasurementSet(tablename_p,Table::Update);
89 :
90 0 : if(Table::isWritable(tablename_p))
91 : {
92 0 : originalMeasurementSet_p = new MeasurementSet(tablename_p, TableLock(TableLock::AutoNoReadLocking) ,Table::Update);
93 : }
94 : else
95 : {
96 0 : originalMeasurementSet_p = new MeasurementSet(tablename_p, TableLock(TableLock::AutoNoReadLocking) ,Table::Old);
97 : }
98 :
99 :
100 : // Activate Memory Resident Sub-tables for everything but Pointing, Syscal and History
101 0 : originalMeasurementSet_p->setMemoryResidentSubtables (MrsEligibility::defaultEligible());
102 :
103 : // Read antenna names and diameters from Antenna table
104 0 : MSAntennaColumns antennaSubTable(originalMeasurementSet_p->antenna());
105 0 : antennaNames_p = new Vector<String>(antennaSubTable.name().getColumn());
106 0 : antennaDiameters_p = new Vector<Double>(antennaSubTable.dishDiameter().getColumn());
107 0 : antennaPositions_p = new ScalarMeasColumn<MPosition>(antennaSubTable.positionMeas());
108 :
109 : // File the baseline to Ant1xAnt2 map
110 0 : String baseline;
111 0 : std::pair<Int,Int> ant1ant2;
112 0 : for (Int ant1Idx=0;ant1Idx < static_cast<Int>(antennaNames_p->size());ant1Idx++)
113 : {
114 0 : for (Int ant2Idx=ant1Idx+1;ant2Idx < static_cast<Int>(antennaNames_p->size());ant2Idx++)
115 : {
116 0 : ant1ant2.first = ant1Idx;
117 0 : ant1ant2.second = ant2Idx;
118 0 : baseline = antennaNames_p->operator()(ant1Idx) + "&&" + antennaNames_p->operator()(ant2Idx);
119 0 : baselineToAnt1Ant2_p[baseline] = ant1ant2;
120 0 : Ant1Ant2ToBaseline_p[ant1ant2] = baseline;
121 : }
122 : }
123 :
124 : // Read field names
125 : {
126 0 : MSFieldColumns *fieldSubTable = new MSFieldColumns(originalMeasurementSet_p->field());
127 0 : fieldNames_p = new Vector<String>(fieldSubTable->name().getColumn());
128 0 : delete fieldSubTable;
129 : }
130 :
131 : // Read polarizations
132 0 : MSPolarizationColumns polarizationSubTable(originalMeasurementSet_p->polarization());
133 0 : ArrayColumn<Int> corrTypeColum = polarizationSubTable.corrType();
134 0 : corrProducts_p = new std::vector<String>();
135 0 : for (uInt polRow_idx=0;polRow_idx<corrTypeColum.nrow();polRow_idx++)
136 : {
137 0 : Array<Int> polRow = corrTypeColum(polRow_idx);
138 0 : for (uInt corr_i=0;corr_i<polRow.size();corr_i++)
139 : {
140 0 : switch (polRow(IPosition(1,corr_i)))
141 : {
142 0 : case Stokes::I:
143 : {
144 0 : *logger_p << LogIO::DEBUG1 << " Correlation product I found, which should correspond to ALMA WVR data - skipping" << LogIO::POST;
145 : // jagonzal (CAS-4234): We need this to Sanitize correlation expressions
146 0 : corrProducts_p->push_back("I");
147 0 : break;
148 : }
149 0 : case Stokes::Q:
150 : {
151 0 : *logger_p << LogIO::DEBUG1 << " Correlation product Q found" << LogIO::POST;
152 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("Q")) == corrProducts_p->end()) corrProducts_p->push_back("Q");
153 0 : break;
154 : }
155 0 : case Stokes::U:
156 : {
157 0 : *logger_p << LogIO::DEBUG1 << " Correlation product U found" << LogIO::POST;
158 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("U")) == corrProducts_p->end()) corrProducts_p->push_back("U");
159 0 : break;
160 : }
161 0 : case Stokes::V:
162 : {
163 0 : *logger_p << LogIO::DEBUG1 << " Correlation product V found" << LogIO::POST;
164 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("V")) == corrProducts_p->end()) corrProducts_p->push_back("V");
165 0 : break;
166 : }
167 0 : case Stokes::XX:
168 : {
169 0 : *logger_p << LogIO::DEBUG1 << " Correlation product XX found" << LogIO::POST;
170 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("XX")) == corrProducts_p->end()) corrProducts_p->push_back("XX");
171 0 : break;
172 : }
173 0 : case Stokes::YY:
174 : {
175 0 : *logger_p << LogIO::DEBUG1 << " Correlation product YY found" << LogIO::POST;
176 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("YY")) == corrProducts_p->end()) corrProducts_p->push_back("YY");
177 0 : break;
178 : }
179 0 : case Stokes::XY:
180 : {
181 0 : *logger_p << LogIO::DEBUG1 << " Correlation product XY found" << LogIO::POST;
182 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("XY")) == corrProducts_p->end()) corrProducts_p->push_back("XY");
183 0 : break;
184 : }
185 0 : case Stokes::YX:
186 : {
187 0 : *logger_p << LogIO::DEBUG1 << " Correlation product YX found" << LogIO::POST;
188 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("YX")) == corrProducts_p->end()) corrProducts_p->push_back("YX");
189 0 : break;
190 : }
191 0 : case Stokes::RR:
192 : {
193 0 : *logger_p << LogIO::DEBUG1 << " Correlation product RR found" << LogIO::POST;
194 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("RR")) == corrProducts_p->end()) corrProducts_p->push_back("RR");
195 0 : break;
196 : }
197 0 : case Stokes::LL:
198 : {
199 0 : *logger_p << LogIO::DEBUG1 << " Correlation product LL found" << LogIO::POST;
200 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("LL")) == corrProducts_p->end()) corrProducts_p->push_back("LL");
201 0 : break;
202 : }
203 0 : case Stokes::RL:
204 : {
205 0 : *logger_p << LogIO::DEBUG1 << " Correlation product RL found" << LogIO::POST;
206 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("RL")) == corrProducts_p->end()) corrProducts_p->push_back("RL");
207 0 : break;
208 : }
209 0 : case Stokes::LR:
210 : {
211 0 : *logger_p << LogIO::DEBUG1 << " Correlation product LR found" << LogIO::POST;
212 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("LR")) == corrProducts_p->end()) corrProducts_p->push_back("LR");
213 0 : break;
214 : }
215 0 : default:
216 : {
217 0 : *logger_p << LogIO::WARN << " Correlation product unknown found: " << polRow(IPosition(1,corr_i)) << LogIO::POST;
218 0 : break;
219 : }
220 : }
221 : }
222 0 : }
223 :
224 : // Read reference frequencies per SPW
225 0 : MSSpWindowColumns spwSubTable(originalMeasurementSet_p->spectralWindow());
226 0 : ScalarColumn<Double> refFrequencies = spwSubTable.refFrequency();
227 0 : lambdaMap_p = new lambdaMap();
228 0 : for (uInt spwidx=0;spwidx<refFrequencies.nrow();spwidx++)
229 : {
230 0 : (*lambdaMap_p)[spwidx]=C::c/refFrequencies(spwidx);
231 0 : *logger_p << LogIO::DEBUG1 << " spwidx: " << spwidx << " lambda: " << (*lambdaMap_p)[spwidx] << LogIO::POST;
232 : }
233 :
234 0 : return true;
235 0 : }
236 :
237 :
238 : // -----------------------------------------------------------------------
239 : // Close Measurement Set
240 : // -----------------------------------------------------------------------
241 : bool
242 0 : FlagMSHandler::close()
243 : {
244 0 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
245 :
246 0 : if (selectedMeasurementSet_p)
247 : {
248 : // Flush and unlock MS
249 0 : selectedMeasurementSet_p->flush();
250 0 : selectedMeasurementSet_p->relinquishAutoLocks(true);
251 0 : selectedMeasurementSet_p->unlock();
252 :
253 : // Post stats
254 0 : if (stats_p)
255 : {
256 0 : *logger_p << LogIO::NORMAL << " Total Flag Cube accesses: " << cubeAccessCounter_p << LogIO::POST;
257 : }
258 : }
259 :
260 0 : return true;
261 : }
262 :
263 :
264 : // -----------------------------------------------------------------------
265 : // Generate selected Measurement Set
266 : // -----------------------------------------------------------------------
267 : bool
268 0 : FlagMSHandler::selectData()
269 : {
270 0 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
271 :
272 : // Create Measurement Selection object
273 0 : const String dummyExpr = String("");
274 0 : if (measurementSetSelection_p) {
275 0 : delete measurementSetSelection_p;
276 0 : measurementSetSelection_p = NULL;
277 : }
278 :
279 : // Set the MS Selection error handler to catch antenna names that are
280 : // not present in the MS in an expression that contains valid antenna names.
281 : // Catch also invalid intent or spw values in a valid expression.
282 : // This will issue a WARNING and not fail.
283 0 : MSSelectionLogError mssLEA,mssLES,mssLESPW;
284 0 : measurementSetSelection_p = new MSSelection();
285 :
286 0 : measurementSetSelection_p->setErrorHandler(MSSelection::ANTENNA_EXPR, &mssLEA);
287 0 : measurementSetSelection_p->setErrorHandler(MSSelection::STATE_EXPR, &mssLES);
288 0 : measurementSetSelection_p->setErrorHandler(MSSelection::SPW_EXPR, &mssLESPW, true);
289 :
290 0 : measurementSetSelection_p->reset(
291 0 : *originalMeasurementSet_p,
292 0 : MSSelection::PARSE_NOW,
293 0 : (const String)timeSelection_p,
294 0 : (const String)baselineSelection_p,
295 0 : (const String)fieldSelection_p,
296 0 : (const String)spwSelection_p,
297 0 : (const String)uvwSelection_p,
298 : dummyExpr, // taqlExpr
299 0 : (const String)polarizationSelection_p,
300 0 : (const String)scanSelection_p,
301 0 : (const String)arraySelection_p,
302 0 : (const String)scanIntentSelection_p,
303 0 : (const String)observationSelection_p);
304 :
305 : // Apply Measurement Selection to a copy of the original Measurement Set
306 0 : MeasurementSet auxMeasurementSet = MeasurementSet(*originalMeasurementSet_p);
307 0 : measurementSetSelection_p->getSelectedMS(auxMeasurementSet, String(""));
308 0 : if (selectedMeasurementSet_p) delete selectedMeasurementSet_p;
309 0 : selectedMeasurementSet_p = new MeasurementSet(auxMeasurementSet);
310 :
311 : // Check if selected MS has rows...
312 0 : if (selectedMeasurementSet_p->nrow() == 0)
313 : {
314 0 : *logger_p << LogIO::WARN << "Selected Measurement Set doesn't have any rows " << LogIO::POST;
315 : }
316 :
317 : // More debugging information from MS-Selection
318 0 : if (!arraySelection_p.empty())
319 : {
320 0 : *logger_p << LogIO::DEBUG1 << " Selected array ids are " << measurementSetSelection_p->getSubArrayList() << LogIO::POST;
321 : }
322 :
323 0 : if (!observationSelection_p.empty())
324 : {
325 0 : *logger_p << LogIO::DEBUG1 << " Selected observation ids are " << measurementSetSelection_p->getObservationList() << LogIO::POST;
326 : }
327 :
328 0 : if (!fieldSelection_p.empty())
329 : {
330 0 : *logger_p << LogIO::DEBUG1 << " Selected field ids are " << measurementSetSelection_p->getFieldList() << LogIO::POST;
331 : }
332 :
333 0 : if (!scanSelection_p.empty())
334 : {
335 0 : *logger_p << LogIO::DEBUG1 << " Selected scan ids are " << measurementSetSelection_p->getScanList() << LogIO::POST;
336 : }
337 :
338 0 : if (!scanIntentSelection_p.empty())
339 : {
340 0 : *logger_p << LogIO::DEBUG1 << " Selected scan intent ids are " << measurementSetSelection_p->getStateObsModeList() << LogIO::POST;
341 : }
342 :
343 0 : if (!timeSelection_p.empty())
344 : {
345 0 : *logger_p << LogIO::DEBUG1 << " Selected time range is " << measurementSetSelection_p->getTimeList() << LogIO::POST;
346 : }
347 :
348 0 : if (!spwSelection_p.empty())
349 : {
350 0 : *logger_p << LogIO::NORMAL << " Selected spw-channels ids are " << measurementSetSelection_p->getChanList() << LogIO::POST;
351 0 : if (measurementSetSelection_p->getChanList().size())
352 : {
353 0 : inrowSelection_p = true;
354 : }
355 : }
356 :
357 0 : if (!baselineSelection_p.empty())
358 : {
359 0 : *logger_p << LogIO::DEBUG1 << " Selected antenna1 ids are " << measurementSetSelection_p->getAntenna1List() << LogIO::POST;
360 0 : *logger_p << LogIO::DEBUG1 << " Selected antenna2 ids are " << measurementSetSelection_p->getAntenna2List() << LogIO::POST;
361 0 : *logger_p << LogIO::DEBUG1 << " Selected baselines are " << measurementSetSelection_p->getBaselineList() << LogIO::POST;
362 : }
363 :
364 0 : if (!uvwSelection_p.empty())
365 : {
366 0 : *logger_p << LogIO::DEBUG1 << " Selected uv range is " << measurementSetSelection_p->getUVList() << LogIO::POST;
367 : }
368 :
369 0 : if (!polarizationSelection_p.empty())
370 : {
371 0 : ostringstream polarizationListToPrint (ios::in | ios::out);
372 0 : polarizationListToPrint << measurementSetSelection_p->getPolMap();
373 0 : *logger_p << LogIO::DEBUG1 << " Selected correlation ids are " << polarizationListToPrint.str() << LogIO::POST;
374 0 : }
375 :
376 : // There is a new selected MS so iterators have to be regenerated
377 0 : iteratorGenerated_p = false;
378 0 : chunksInitialized_p = false;
379 0 : buffersInitialized_p = false;
380 0 : stopIteration_p = false;
381 :
382 0 : return true;
383 0 : }
384 :
385 :
386 : // -----------------------------------------------------------------------
387 : // Parse MSSelection expression
388 : // -----------------------------------------------------------------------
389 : bool
390 0 : FlagMSHandler::parseExpression(MSSelection &parser)
391 : {
392 0 : parser.toTableExprNode(selectedMeasurementSet_p);
393 0 : return true;
394 : }
395 :
396 :
397 : // -----------------------------------------------------------------------
398 : // Swap MS to check what is the maximum RAM memory needed
399 : // -----------------------------------------------------------------------
400 : void
401 0 : FlagMSHandler::preSweep()
402 : {
403 0 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
404 :
405 0 : for (visibilityIterator_p->originChunks(); visibilityIterator_p->moreChunks();visibilityIterator_p->nextChunk())
406 : {
407 : // Iterate over vis buffers
408 0 : for (visibilityIterator_p->origin(); visibilityIterator_p->more();visibilityIterator_p->next())
409 : {
410 :
411 0 : if (mapScanStartStop_p)
412 : {
413 0 : generateScanStartStopMap();
414 : }
415 : }
416 : }
417 :
418 0 : if (mapScanStartStop_p)
419 : {
420 0 : *logger_p << LogIO::NORMAL << " " << scanStartStopMap_p->size() <<" Scans found in MS" << LogIO::POST;
421 : }
422 :
423 0 : return;
424 : }
425 :
426 :
427 : // -----------------------------------------------------------------------
428 : // Generate Visibility Iterator with a given sort order and time interval
429 : // -----------------------------------------------------------------------
430 : bool
431 0 : FlagMSHandler::generateIterator()
432 : {
433 0 : if (!iteratorGenerated_p)
434 : {
435 :
436 0 : if (asyncio_enabled_p)
437 : {
438 : // Set preFetchColumns
439 0 : prefetchColumns_p = new VisBufferComponents2();
440 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FlagCube);
441 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FlagRow);
442 0 : prefetchColumns_p->operator +=(VisBufferComponent2::NRows);
443 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FieldId);
444 :
445 0 : preFetchColumns();
446 :
447 : // Then create and initialize RO Async iterator
448 0 : if (visibilityIterator_p) delete visibilityIterator_p;
449 0 : visibilityIterator_p = new vi::VisibilityIterator2(*selectedMeasurementSet_p,
450 0 : vi::SortColumns (sortOrder_p, true),
451 0 : true,prefetchColumns_p,timeInterval_p);
452 : }
453 : else
454 : {
455 : //The vector where all the factories will be stacked
456 0 : std::vector<vi::ViiLayerFactory*> factories;
457 :
458 : //Create a VI Factory that access directly the MS (or the selected MS)
459 : vi::IteratingParameters ipar(timeInterval_p,
460 0 : vi::SortColumns (sortOrder_p, true));
461 0 : std::unique_ptr<vi::VisIterImpl2LayerFactory> directMsVIFactory;
462 0 : directMsVIFactory.reset(new vi::VisIterImpl2LayerFactory(selectedMeasurementSet_p,ipar, true));
463 : //Apply the channel selections only on the layer that access the MS directly
464 0 : if(enableChanAvg_p)
465 0 : applyChannelSelection(directMsVIFactory.get());
466 0 : factories.push_back(directMsVIFactory.get());
467 :
468 : //Add a time averaging layer if so requested
469 0 : std::unique_ptr<vi::AveragingVi2LayerFactory> timeAvgFactory;
470 0 : if(enableTimeAvg_p)
471 : {
472 : // Time averaging in clip mode uses the Time Averaging Iterator
473 0 : vi::AveragingParameters parameters(timeAverageBin_p, 0,vi::SortColumns(sortOrder_p, false),
474 0 : timeAvgOptions_p,0.0,NULL,true);
475 0 : timeAvgFactory.reset(new vi::AveragingVi2LayerFactory(parameters));
476 0 : factories.push_back(timeAvgFactory.get());
477 0 : }
478 :
479 : //Create a ChannelAverageTVI Factory if so requested
480 0 : std::unique_ptr<vi::ChannelAverageTVILayerFactory> chanAvgFactory;
481 0 : if(enableChanAvg_p)
482 : {
483 0 : chanAvgFactory.reset(new vi::ChannelAverageTVILayerFactory(chanAvgOptions_p));
484 0 : factories.push_back(chanAvgFactory.get());
485 : }
486 :
487 : //Create the final visibility iterator
488 0 : Vector<vi::ViiLayerFactory*> const factoriesV(factories);
489 0 : visibilityIterator_p = new vi::VisibilityIterator2(factoriesV);
490 0 : }
491 :
492 : // Do quack pre-sweep
493 0 : if (mapScanStartStop_p)
494 0 : preSweep();
495 :
496 : // Set the table data manager (ISM and SSM) cache size to the full column size, for
497 : // the columns ANTENNA1, ANTENNA2, FEED1, FEED2, TIME, INTERVAL, FLAG_ROW, SCAN_NUMBER and UVW
498 0 : if (slurp_p) visibilityIterator_p->slurp();
499 :
500 : // Apply channel selection
501 : // CAS-3959: Channel selection is now going to be handled at the FlagAgent level
502 : // applyChannelSelection(visibilityIterator_p);
503 :
504 : // Group all the time stamps in one single buffer
505 : // NOTE: Otherwise we have to iterate over Visibility Buffers
506 : // that contain all the rows with the same time step.
507 0 : if (groupTimeSteps_p)
508 : {
509 : // Set row blocking to a huge number
510 0 : uLong maxChunkRows = selectedMeasurementSet_p->nrow();
511 0 : visibilityIterator_p->setRowBlocking(maxChunkRows);
512 :
513 0 : *logger_p << LogIO::NORMAL << "Setting row blocking to number of rows in selected table: " << maxChunkRows << LogIO::POST;
514 :
515 :
516 : }
517 :
518 : // Get Visibility Buffer reference from VisibilityIterator
519 0 : visibilityBuffer_p = visibilityIterator_p->getVisBuffer();
520 :
521 : // Get the TYPE column of the PROCESSOR sub-table
522 0 : if (loadProcessorTable_p){
523 0 : processorTable();
524 0 : loadProcessorTable_p = false;
525 : }
526 :
527 0 : iteratorGenerated_p = true;
528 : }
529 0 : chunksInitialized_p = false;
530 0 : buffersInitialized_p = false;
531 0 : stopIteration_p = false;
532 0 : processedRows_p = 0;
533 :
534 0 : return true;
535 : }
536 :
537 :
538 : // -----------------------------------------------------------------------
539 : // Apply channel selection for asyn or normal iterator
540 : // NOTE (first implementation): We always have to do this, even if there is no SPW:channel selection
541 : // NOTE (after Dic 2011 testing): As far as I know spw selection does not have to be *, it can be empty,
542 : // therefore this step will in practice do nothing , because the spw and channel lists are empty too.
543 : // -----------------------------------------------------------------------
544 : void
545 0 : FlagMSHandler::applyChannelSelection(vi::VisIterImpl2LayerFactory *viFactory)
546 : {
547 0 : vi::FrequencySelectionUsingChannels channelSelector;
548 :
549 : // Apply channel selection (in row selection cannot be done with MSSelection)
550 : // NOTE: Each row of the Matrix has the following elements: SpwID StartCh StopCh Step
551 0 : Matrix<Int> spwchan = measurementSetSelection_p->getChanList();
552 0 : IPosition shape = spwchan.shape();
553 0 : uInt nSelections = shape[0];
554 : Int spw,channelStart,channelStop,channelStep,channelWidth;
555 0 : for(uInt selection_i=0;selection_i<nSelections;selection_i++)
556 : {
557 : // NOTE: selectChannel needs channelStart,channelWidth,channelStep
558 0 : spw = spwchan(selection_i,0);
559 0 : channelStart = spwchan(selection_i,1);
560 0 : channelStop = spwchan(selection_i,2);
561 0 : channelStep = spwchan(selection_i,3);
562 0 : channelWidth = channelStop-channelStart+1;
563 0 : channelSelector.add (spw, channelStart, channelWidth,channelStep);
564 : }
565 :
566 0 : auto freqSel = std::make_shared<vi::FrequencySelections>();
567 0 : freqSel->add(channelSelector);
568 0 : viFactory->setFrequencySelections(freqSel);
569 :
570 0 : return;
571 0 : }
572 :
573 :
574 : // -----------------------------------------------------------------------
575 : // Move to next chunk
576 : // -----------------------------------------------------------------------
577 : bool
578 0 : FlagMSHandler::nextChunk()
579 : {
580 0 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
581 :
582 0 : chunkCounts_p = 0;
583 0 : bool moreChunks = false;
584 0 : if (stopIteration_p)
585 : {
586 0 : moreChunks = false;
587 : }
588 : else
589 : {
590 0 : if (!chunksInitialized_p)
591 : {
592 0 : if (!iteratorGenerated_p) generateIterator();
593 0 : visibilityIterator_p->originChunks();
594 0 : chunksInitialized_p = true;
595 0 : buffersInitialized_p = false;
596 0 : chunkNo_p++;
597 0 : bufferNo_p = 0;
598 0 : moreChunks = true;
599 : }
600 : else
601 : {
602 0 : visibilityIterator_p->nextChunk();
603 0 : if (visibilityIterator_p->moreChunks())
604 : {
605 0 : buffersInitialized_p = false;
606 0 : moreChunks = true;
607 0 : chunkNo_p++;
608 0 : bufferNo_p = 0;
609 : }
610 : }
611 : }
612 :
613 0 : if (!moreChunks)
614 : {
615 0 : *logger_p << LogIO::NORMAL << "==================================================================================== " << LogIO::POST;
616 : }
617 :
618 0 : return moreChunks;
619 : }
620 :
621 :
622 : // -----------------------------------------------------------------------
623 : // Move to next buffer
624 : // -----------------------------------------------------------------------
625 : bool
626 0 : FlagMSHandler::nextBuffer()
627 : {
628 0 : bool moreBuffers = false;
629 0 : if (stopIteration_p) {
630 0 : return moreBuffers;
631 : }
632 :
633 0 : if (!buffersInitialized_p)
634 : {
635 0 : visibilityIterator_p->origin();
636 0 : buffersInitialized_p = true;
637 0 : if (!asyncio_enabled_p) preFetchColumns();
638 0 : if (mapAntennaPairs_p) generateAntennaPairMap();
639 0 : if (mapSubIntegrations_p) generateSubIntegrationMap();
640 0 : if (mapPolarizations_p) generatePolarizationsMap();
641 0 : if (mapAntennaPointing_p) generateAntennaPointingMap();
642 :
643 0 : moreBuffers = true;
644 0 : flushFlags_p = false;
645 0 : flushFlagRow_p = false;
646 0 : bufferNo_p++;
647 : }
648 : else
649 : {
650 : // WARNING: ++ operator is defined for VisibilityIterator class ("advance" function)
651 : // but if you define a VisibilityIterator pointer, then ++ operator does not call
652 : // the advance function but increments the pointers.
653 0 : visibilityIterator_p->next();
654 :
655 : // WARNING: We iterate and afterwards check if the iterator is valid
656 0 : if (visibilityIterator_p->more())
657 : {
658 0 : if (!asyncio_enabled_p) preFetchColumns();
659 0 : if (mapAntennaPairs_p) generateAntennaPairMap();
660 0 : if (mapSubIntegrations_p) generateSubIntegrationMap();
661 0 : if (mapPolarizations_p) generatePolarizationsMap();
662 0 : if (mapAntennaPointing_p) generateAntennaPointingMap();
663 0 : moreBuffers = true;
664 0 : flushFlags_p = false;
665 0 : flushFlagRow_p = false;
666 0 : bufferNo_p++;
667 : }
668 : }
669 :
670 :
671 : // Set new common flag cube
672 0 : if (moreBuffers)
673 : {
674 : // Get flag (WARNING: We have to modify the shape of the cube before re-assigning it)
675 0 : Cube<Bool> curentFlagCube= visibilityBuffer_p->flagCube();
676 0 : modifiedFlagCube_p.resize(curentFlagCube.shape());
677 0 : modifiedFlagCube_p = curentFlagCube;
678 0 : originalFlagCube_p.resize(curentFlagCube.shape());
679 0 : originalFlagCube_p = curentFlagCube;
680 :
681 : // Get flag row (WARNING: We have to modify the shape of the cube before re-assigning it)
682 0 : Vector<Bool> curentflagRow= visibilityBuffer_p->flagRow();
683 0 : modifiedFlagRow_p.resize(curentflagRow.shape());
684 0 : modifiedFlagRow_p = curentflagRow;
685 0 : originalFlagRow_p.resize(curentflagRow.shape());
686 0 : originalFlagRow_p = curentflagRow;
687 :
688 : // Compute total number of flags per buffer to be used for generating the agents stats
689 0 : Int64 currentBufferCounts = curentFlagCube.shape().product();
690 0 : chunkCounts_p += currentBufferCounts;
691 0 : progressCounts_p += currentBufferCounts;
692 0 : msCounts_p += currentBufferCounts;
693 :
694 : // Print chunk characteristics
695 0 : if (bufferNo_p == 1)
696 : {
697 0 : processedRows_p += visibilityIterator_p->nRowsInChunk();
698 0 : double progress = 100.0* ((double) processedRows_p / (double) selectedMeasurementSet_p->nrow());
699 : // Send only a limited number of lines to INFO level, 'other' lines to debug
700 0 : if (checkDoChunkLine(progress)) {
701 0 : *logger_p << LogIO::NORMAL;
702 : } else {
703 0 : *logger_p << LogIO::DEBUG1;
704 : }
705 0 : logger_p->origin(LogOrigin("FlagMSHandler",""));
706 :
707 0 : String corrs = "[ ";
708 0 : for (uInt corr_i=0;corr_i<(uInt) visibilityBuffer_p->nCorrelations();corr_i++)
709 : {
710 0 : corrs += (*polarizationIndexMap_p)[corr_i] + " ";
711 : }
712 0 : corrs += "]";
713 :
714 0 : *logger_p << "Chunk = " << chunkNo_p << " [progress: " << (Int)progress << "%]"
715 0 : ", Observation = " << visibilityBuffer_p->observationId()[0] <<
716 0 : ", Array = " << visibilityBuffer_p->arrayId()[0] <<
717 0 : ", Scan = " << visibilityBuffer_p->scan()[0] <<
718 0 : ", Field = " << visibilityBuffer_p->fieldId()(0) << " (" << fieldNames_p->operator()(visibilityBuffer_p->fieldId()) << ")"
719 0 : ", Spw = " << visibilityBuffer_p->spectralWindows()(0) <<
720 0 : ", Channels = " << visibilityBuffer_p->nChannels() <<
721 : ", Corrs = " << corrs <<
722 0 : ", Total Rows = " << visibilityIterator_p->nRowsInChunk() << LogIO::POST;
723 0 : }
724 0 : }
725 :
726 0 : return moreBuffers;
727 : }
728 :
729 : // -----------------------------------------------------------------------
730 : // Generate scan start stop map
731 : // -----------------------------------------------------------------------
732 : void
733 0 : FlagMSHandler::generateScanStartStopMap()
734 : {
735 :
736 : Int scan;
737 : Double start,stop;
738 0 : Vector<Int> scans;
739 0 : Vector<Double> times;
740 :
741 0 : Cube<Bool> flags;
742 : uInt scanStartRow;
743 : uInt scanStopRow;
744 : uInt ncorrs,nchannels,nrows;
745 : Bool stopSearch;
746 :
747 0 : if (scanStartStopMap_p == NULL) scanStartStopMap_p = new scanStartStopMap();
748 :
749 0 : scans = visibilityIterator_p->getVisBuffer()->scan();
750 0 : times = visibilityIterator_p->getVisBuffer()->time();
751 :
752 : // Check if anything is flagged in this buffer
753 0 : scanStartRow = 0;
754 0 : scanStopRow = times.size()-1;
755 0 : if (mapScanStartStopFlagged_p)
756 : {
757 0 : flags = visibilityIterator_p->getVisBuffer()->flagCube();
758 0 : IPosition shape = flags.shape();
759 0 : ncorrs = shape[0];
760 0 : nchannels = shape[1];
761 0 : nrows = shape[2];
762 :
763 : // Look for effective scan start
764 0 : stopSearch = false;
765 0 : for (uInt row_i=0;row_i<nrows;row_i++)
766 : {
767 0 : if (stopSearch) break;
768 :
769 0 : for (uInt channel_i=0;channel_i<nchannels;channel_i++)
770 : {
771 0 : if (stopSearch) break;
772 :
773 0 : for (uInt corr_i=0;corr_i<ncorrs;corr_i++)
774 : {
775 0 : if (stopSearch) break;
776 :
777 0 : if (!flags(corr_i,channel_i,row_i))
778 : {
779 0 : scanStartRow = row_i;
780 0 : stopSearch = true;
781 : }
782 : }
783 : }
784 : }
785 :
786 : // If none of the rows were un-flagged we don't continue checking from the end
787 : // As a consequence of this some scans may not be present in the map, and have
788 : // to be skipped in the flagging process because they are already flagged.
789 0 : if (!stopSearch) return;
790 :
791 : // Look for effective scan stop
792 0 : stopSearch = false;
793 0 : for (uInt row_i=0;row_i<nrows;row_i++)
794 : {
795 0 : if (stopSearch) break;
796 :
797 0 : for (uInt channel_i=0;channel_i<nchannels;channel_i++)
798 : {
799 0 : if (stopSearch) break;
800 :
801 0 : for (uInt corr_i=0;corr_i<ncorrs;corr_i++)
802 : {
803 0 : if (stopSearch) break;
804 :
805 0 : if (!flags(corr_i,channel_i,nrows-1-row_i))
806 : {
807 0 : scanStopRow = nrows-1-row_i;
808 0 : stopSearch = true;
809 : }
810 : }
811 : }
812 : }
813 0 : }
814 :
815 : // Check scan start/stop times
816 0 : scan = scans[0];
817 0 : start = times[scanStartRow];
818 0 : stop = times[scanStopRow];
819 :
820 0 : if (scanStartStopMap_p->find(scan) == scanStartStopMap_p->end())
821 : {
822 0 : (*scanStartStopMap_p)[scan].push_back(start);
823 0 : (*scanStartStopMap_p)[scan].push_back(stop);
824 : }
825 : else
826 : {
827 : // Check if we have a better start time
828 0 : if ((*scanStartStopMap_p)[scan][0] > start)
829 : {
830 0 : (*scanStartStopMap_p)[scan][0] = start;
831 : }
832 : // Check if we have a better stop time
833 0 : if ((*scanStartStopMap_p)[scan][1] < stop)
834 : {
835 0 : (*scanStartStopMap_p)[scan][1] = stop;
836 : }
837 : }
838 :
839 0 : return;
840 0 : }
841 :
842 :
843 : // -----------------------------------------------------------------------
844 : // Flush flags to MS
845 : // -----------------------------------------------------------------------
846 : bool
847 0 : FlagMSHandler::flushFlags()
848 : {
849 0 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
850 0 : *logger_p << LogIO::DEBUG1
851 : << "flushing modified flags cube of shape: " << modifiedFlagCube_p.shape()
852 0 : << LogIO::POST;
853 :
854 0 : if (flushFlags_p)
855 : {
856 :
857 0 : visibilityIterator_p->writeFlag(modifiedFlagCube_p);
858 0 : flushFlags_p = false;
859 : }
860 :
861 0 : if ((flushFlagRow_p) and (!inrowSelection_p))
862 : {
863 0 : visibilityIterator_p->writeFlagRow(modifiedFlagRow_p);
864 0 : flushFlagRow_p = false;
865 : }
866 :
867 0 : return true;
868 : }
869 :
870 :
871 : // -----------------------------------------------------------------------
872 : // Flush flags to MS
873 : // -----------------------------------------------------------------------
874 : String
875 0 : FlagMSHandler::getTableName()
876 : {
877 0 : return originalMeasurementSet_p->tableName();
878 : }
879 :
880 : // -----------------------------------------------------------------------
881 : // Check what data columns exist
882 : // -----------------------------------------------------------------------
883 : bool
884 0 : FlagMSHandler::checkIfColumnExists(String column)
885 : {
886 0 : return originalMeasurementSet_p->tableDesc().isColumn(column);
887 : }
888 :
889 : // -----------------------------------------------------------------------
890 : // Whether the per-chunk progress log line should be produced
891 : // -----------------------------------------------------------------------
892 : bool
893 0 : FlagMSHandler::checkDoChunkLine(double progress)
894 : {
895 0 : bool result = false;
896 : // per-chunk progress report lines - every 'chunkLineThresholdInc_p' % and in
897 : // any case always the beginning (~0%) end (100% rows)
898 0 : if ((progress >= chunkLineThreshold_p) or (progress >= 100.0)) {
899 0 : chunkLineThreshold_p += chunkLineThresholdInc_p;
900 0 : result = true;
901 : }
902 0 : result |= (1 == chunkNo_p) or (processedRows_p == selectedMeasurementSet_p->nrow());
903 :
904 0 : return result;
905 : }
906 :
907 : // ----------------------------------------------------------------------------
908 : // Signal true when a per-agent partial (ongoing run) summary has to be printed
909 : // ----------------------------------------------------------------------------
910 : bool
911 0 : FlagMSHandler::summarySignal()
912 : {
913 0 : Double progress = 100.0* ((Double) processedRows_p / (Double) selectedMeasurementSet_p->nrow());
914 :
915 : // The summaries will be printed with INFO level. Do not bother building per-agent
916 : // (for potentially many agents) summaries if logger priority not enough
917 0 : auto signal = ((progress >= summaryThreshold_p) and (logger_p->priority() <= LogMessage::NORMAL));
918 0 : if (signal) {
919 0 : summaryThreshold_p += summaryThresholdInc_p;
920 : }
921 0 : return signal;
922 : }
923 :
924 : // -----------------------------------------------------------------------
925 : // Get the PROCESSOR sub-table
926 : // -----------------------------------------------------------------------
927 : bool
928 0 : FlagMSHandler::processorTable()
929 : {
930 0 : MSProcessor msSubtable = selectedMeasurementSet_p->processor();
931 :
932 : // The rows in the PROCESSOR table correspond to the PROCESSOR_ID column
933 : // in main table
934 0 : if (msSubtable.nrow() == 0){
935 0 : *logger_p << LogIO::WARN << "PROCESSOR sub-table is empty. Assuming CORRELATOR type."
936 0 : << LogIO::POST;
937 0 : processorTableExist_p = false;
938 : }
939 : else {
940 0 : MSProcessorColumns tableCols(msSubtable);
941 0 : ScalarColumn<String> typeCol = tableCols.type();
942 0 : processorTableExist_p = true;
943 :
944 : /* Create a look-up boolean column to tell if a row is of type CORRELATOR.
945 : isCorrelatorType_p = true when PROCESSOR_ID is of TYPE CORRELATOR
946 : */
947 0 : isCorrelatorType_p.resize(msSubtable.nrow(),False);
948 :
949 : // Assign true to row in look-up table that have TYPE==CORRELATOR
950 0 : for (uInt pid=0; pid<msSubtable.nrow(); pid++){
951 :
952 0 : String proc_type = typeCol.asString(pid);
953 :
954 0 : if (proc_type.compare("CORRELATOR") == 0){
955 0 : isCorrelatorType_p(pid) = True;
956 : }
957 : else
958 0 : isCorrelatorType_p(pid) = False;
959 :
960 0 : }
961 0 : }
962 :
963 0 : return true;
964 :
965 0 : }
966 :
967 : /*
968 : * Check if a VIRTUAL MODEL column exists
969 : */
970 : bool
971 0 : FlagMSHandler::checkIfSourceModelColumnExists()
972 : {
973 0 : Vector<Int> fieldids;
974 :
975 0 : if (casa::refim::VisModelData::hasAnyModel(*selectedMeasurementSet_p, fieldids)){
976 0 : return true;
977 : }
978 :
979 0 : return false;
980 0 : }
981 :
982 : } //# NAMESPACE CASA - END
983 :
|