Line data Source code
1 : //# FlagMSHandler.h: This file contains the implementation of the FlagMSHandler class.
2 : //#
3 : //# CASA - Common Astronomy Software Applications (http://casa.nrao.edu/)
4 : //# Copyright (C) Associated Universities, Inc. Washington DC, USA 2011, All rights reserved.
5 : //# Copyright (C) European Southern Observatory, 2011, All rights reserved.
6 : //#
7 : //# This library is free software; you can redistribute it and/or
8 : //# modify it under the terms of the GNU Lesser General Public
9 : //# License as published by the Free software Foundation; either
10 : //# version 2.1 of the License, or (at your option) any later version.
11 : //#
12 : //# This library is distributed in the hope that it will be useful,
13 : //# but WITHOUT ANY WARRANTY, without even the implied warranty of
14 : //# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 : //# Lesser General Public License for more details.
16 : //#
17 : //# You should have received a copy of the GNU Lesser General Public
18 : //# License along with this library; if not, write to the Free Software
19 : //# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20 : //# MA 02111-1307 USA
21 : //# $Id: $
22 :
23 : #include <flagging/Flagging/FlagMSHandler.h>
24 :
25 : // Measurement Set selection
26 : #include <casacore/ms/MeasurementSets/MeasurementSet.h>
27 : #include <casacore/ms/MSSel/MSSelection.h>
28 : #include <casacore/ms/MeasurementSets/MSAntennaColumns.h>
29 : #include <casacore/ms/MeasurementSets/MSFieldColumns.h>
30 : #include <casacore/ms/MeasurementSets/MSPolColumns.h>
31 : #include <casacore/ms/MeasurementSets/MSSpWindowColumns.h>
32 : #include <casacore/ms/MeasurementSets/MSProcessorColumns.h>
33 :
34 : #include <msvis/MSVis/ViFrequencySelection.h>
35 :
36 : #include <mstransform/TVI/ChannelAverageTVI.h>
37 : #include <msvis/MSVis/LayeredVi2Factory.h>
38 : #include <synthesis/TransformMachines2/VisModelData.h>
39 :
40 : using namespace casacore;
41 : namespace casa { //# NAMESPACE CASA - BEGIN
42 :
43 : ////////////////////////////////////
44 : /// FlagMSHandler implementation ///
45 : ////////////////////////////////////
46 :
47 :
48 : // -----------------------------------------------------------------------
49 : // Default constructor
50 : // -----------------------------------------------------------------------
51 1495 : FlagMSHandler::FlagMSHandler(string tablename, uShort iterationApproach, Double timeInterval):
52 1495 : FlagDataHandler(tablename,iterationApproach,timeInterval)
53 : {
54 1495 : selectedMeasurementSet_p = NULL;
55 1495 : originalMeasurementSet_p = NULL;
56 1495 : visibilityIterator_p = NULL;
57 1495 : tableTye_p = MEASUREMENT_SET;
58 1495 : processorTableExist_p = false;
59 1495 : }
60 :
61 :
62 : // -----------------------------------------------------------------------
63 : // Default destructor
64 : // -----------------------------------------------------------------------
65 2990 : FlagMSHandler::~FlagMSHandler()
66 : {
67 1495 : logger_p->origin(LogOrigin("FlagDataHandler",__FUNCTION__,WHERE));
68 1495 : *logger_p << LogIO::DEBUG2 << "FlagMSHandler::~FlagMSHandler()" << LogIO::POST;
69 :
70 : // Delete MS objects
71 1495 : if (selectedMeasurementSet_p) delete selectedMeasurementSet_p;
72 1495 : if (originalMeasurementSet_p) delete originalMeasurementSet_p;
73 :
74 : // Delete VisibilityIterator
75 1495 : if (visibilityIterator_p) delete visibilityIterator_p;
76 2990 : }
77 :
78 :
79 : // -----------------------------------------------------------------------
80 : // Open Measurement Set
81 : // -----------------------------------------------------------------------
82 : bool
83 1495 : FlagMSHandler::open()
84 : {
85 1495 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
86 :
87 1495 : if (originalMeasurementSet_p) delete originalMeasurementSet_p;
88 : //originalMeasurementSet_p = new MeasurementSet(tablename_p,Table::Update);
89 :
90 1495 : if(Table::isWritable(tablename_p))
91 : {
92 1495 : originalMeasurementSet_p = new MeasurementSet(tablename_p, TableLock(TableLock::AutoNoReadLocking) ,Table::Update);
93 : }
94 : else
95 : {
96 0 : originalMeasurementSet_p = new MeasurementSet(tablename_p, TableLock(TableLock::AutoNoReadLocking) ,Table::Old);
97 : }
98 :
99 :
100 : // Activate Memory Resident Sub-tables for everything but Pointing, Syscal and History
101 1495 : originalMeasurementSet_p->setMemoryResidentSubtables (MrsEligibility::defaultEligible());
102 :
103 : // Read antenna names and diameters from Antenna table
104 1495 : MSAntennaColumns antennaSubTable(originalMeasurementSet_p->antenna());
105 1495 : antennaNames_p = new Vector<String>(antennaSubTable.name().getColumn());
106 1495 : antennaDiameters_p = new Vector<Double>(antennaSubTable.dishDiameter().getColumn());
107 1495 : antennaPositions_p = new ScalarMeasColumn<MPosition>(antennaSubTable.positionMeas());
108 :
109 : // File the baseline to Ant1xAnt2 map
110 1495 : String baseline;
111 1495 : std::pair<Int,Int> ant1ant2;
112 25163 : for (Int ant1Idx=0;ant1Idx < static_cast<Int>(antennaNames_p->size());ant1Idx++)
113 : {
114 321207 : for (Int ant2Idx=ant1Idx+1;ant2Idx < static_cast<Int>(antennaNames_p->size());ant2Idx++)
115 : {
116 297539 : ant1ant2.first = ant1Idx;
117 297539 : ant1ant2.second = ant2Idx;
118 297539 : baseline = antennaNames_p->operator()(ant1Idx) + "&&" + antennaNames_p->operator()(ant2Idx);
119 297539 : baselineToAnt1Ant2_p[baseline] = ant1ant2;
120 297539 : Ant1Ant2ToBaseline_p[ant1ant2] = baseline;
121 : }
122 : }
123 :
124 : // Read field names
125 : {
126 1495 : MSFieldColumns *fieldSubTable = new MSFieldColumns(originalMeasurementSet_p->field());
127 1495 : fieldNames_p = new Vector<String>(fieldSubTable->name().getColumn());
128 1495 : delete fieldSubTable;
129 : }
130 :
131 : // Read polarizations
132 1495 : MSPolarizationColumns polarizationSubTable(originalMeasurementSet_p->polarization());
133 1495 : ArrayColumn<Int> corrTypeColum = polarizationSubTable.corrType();
134 1495 : corrProducts_p = new std::vector<String>();
135 3240 : for (uInt polRow_idx=0;polRow_idx<corrTypeColum.nrow();polRow_idx++)
136 : {
137 1745 : Array<Int> polRow = corrTypeColum(polRow_idx);
138 6338 : for (uInt corr_i=0;corr_i<polRow.size();corr_i++)
139 : {
140 4593 : switch (polRow(IPosition(1,corr_i)))
141 : {
142 59 : case Stokes::I:
143 : {
144 59 : *logger_p << LogIO::DEBUG1 << " Correlation product I found, which should correspond to ALMA WVR data - skipping" << LogIO::POST;
145 : // jagonzal (CAS-4234): We need this to Sanitize correlation expressions
146 59 : corrProducts_p->push_back("I");
147 59 : break;
148 : }
149 0 : case Stokes::Q:
150 : {
151 0 : *logger_p << LogIO::DEBUG1 << " Correlation product Q found" << LogIO::POST;
152 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("Q")) == corrProducts_p->end()) corrProducts_p->push_back("Q");
153 0 : break;
154 : }
155 0 : case Stokes::U:
156 : {
157 0 : *logger_p << LogIO::DEBUG1 << " Correlation product U found" << LogIO::POST;
158 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("U")) == corrProducts_p->end()) corrProducts_p->push_back("U");
159 0 : break;
160 : }
161 0 : case Stokes::V:
162 : {
163 0 : *logger_p << LogIO::DEBUG1 << " Correlation product V found" << LogIO::POST;
164 0 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("V")) == corrProducts_p->end()) corrProducts_p->push_back("V");
165 0 : break;
166 : }
167 403 : case Stokes::XX:
168 : {
169 403 : *logger_p << LogIO::DEBUG1 << " Correlation product XX found" << LogIO::POST;
170 403 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("XX")) == corrProducts_p->end()) corrProducts_p->push_back("XX");
171 403 : break;
172 : }
173 235 : case Stokes::YY:
174 : {
175 235 : *logger_p << LogIO::DEBUG1 << " Correlation product YY found" << LogIO::POST;
176 235 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("YY")) == corrProducts_p->end()) corrProducts_p->push_back("YY");
177 235 : break;
178 : }
179 50 : case Stokes::XY:
180 : {
181 50 : *logger_p << LogIO::DEBUG1 << " Correlation product XY found" << LogIO::POST;
182 50 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("XY")) == corrProducts_p->end()) corrProducts_p->push_back("XY");
183 50 : break;
184 : }
185 50 : case Stokes::YX:
186 : {
187 50 : *logger_p << LogIO::DEBUG1 << " Correlation product YX found" << LogIO::POST;
188 50 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("YX")) == corrProducts_p->end()) corrProducts_p->push_back("YX");
189 50 : break;
190 : }
191 1251 : case Stokes::RR:
192 : {
193 1251 : *logger_p << LogIO::DEBUG1 << " Correlation product RR found" << LogIO::POST;
194 1251 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("RR")) == corrProducts_p->end()) corrProducts_p->push_back("RR");
195 1251 : break;
196 : }
197 1233 : case Stokes::LL:
198 : {
199 1233 : *logger_p << LogIO::DEBUG1 << " Correlation product LL found" << LogIO::POST;
200 1233 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("LL")) == corrProducts_p->end()) corrProducts_p->push_back("LL");
201 1233 : break;
202 : }
203 656 : case Stokes::RL:
204 : {
205 656 : *logger_p << LogIO::DEBUG1 << " Correlation product RL found" << LogIO::POST;
206 656 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("RL")) == corrProducts_p->end()) corrProducts_p->push_back("RL");
207 656 : break;
208 : }
209 656 : case Stokes::LR:
210 : {
211 656 : *logger_p << LogIO::DEBUG1 << " Correlation product LR found" << LogIO::POST;
212 656 : if (find(corrProducts_p->begin(),corrProducts_p->end(),String("LR")) == corrProducts_p->end()) corrProducts_p->push_back("LR");
213 656 : break;
214 : }
215 0 : default:
216 : {
217 0 : *logger_p << LogIO::WARN << " Correlation product unknown found: " << polRow(IPosition(1,corr_i)) << LogIO::POST;
218 0 : break;
219 : }
220 : }
221 : }
222 1745 : }
223 :
224 : // Read reference frequencies per SPW
225 2990 : MSSpWindowColumns spwSubTable(originalMeasurementSet_p->spectralWindow());
226 1495 : ScalarColumn<Double> refFrequencies = spwSubTable.refFrequency();
227 1495 : lambdaMap_p = new lambdaMap();
228 12008 : for (uInt spwidx=0;spwidx<refFrequencies.nrow();spwidx++)
229 : {
230 10513 : (*lambdaMap_p)[spwidx]=C::c/refFrequencies(spwidx);
231 10513 : *logger_p << LogIO::DEBUG1 << " spwidx: " << spwidx << " lambda: " << (*lambdaMap_p)[spwidx] << LogIO::POST;
232 : }
233 :
234 2990 : return true;
235 1495 : }
236 :
237 :
238 : // -----------------------------------------------------------------------
239 : // Close Measurement Set
240 : // -----------------------------------------------------------------------
241 : bool
242 0 : FlagMSHandler::close()
243 : {
244 0 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
245 :
246 0 : if (selectedMeasurementSet_p)
247 : {
248 : // Flush and unlock MS
249 0 : selectedMeasurementSet_p->flush();
250 0 : selectedMeasurementSet_p->relinquishAutoLocks(true);
251 0 : selectedMeasurementSet_p->unlock();
252 :
253 : // Post stats
254 0 : if (stats_p)
255 : {
256 0 : *logger_p << LogIO::NORMAL << " Total Flag Cube accesses: " << cubeAccessCounter_p << LogIO::POST;
257 : }
258 : }
259 :
260 0 : return true;
261 : }
262 :
263 :
264 : // -----------------------------------------------------------------------
265 : // Generate selected Measurement Set
266 : // -----------------------------------------------------------------------
267 : bool
268 1095 : FlagMSHandler::selectData()
269 : {
270 1095 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
271 :
272 : // Create Measurement Selection object
273 1095 : const String dummyExpr = String("");
274 1095 : if (measurementSetSelection_p) {
275 0 : delete measurementSetSelection_p;
276 0 : measurementSetSelection_p = NULL;
277 : }
278 :
279 : // Set the MS Selection error handler to catch antenna names that are
280 : // not present in the MS in an expression that contains valid antenna names.
281 : // Catch also invalid intent or spw values in a valid expression.
282 : // This will issue a WARNING and not fail.
283 1095 : MSSelectionLogError mssLEA,mssLES,mssLESPW;
284 1095 : measurementSetSelection_p = new MSSelection();
285 :
286 1095 : measurementSetSelection_p->setErrorHandler(MSSelection::ANTENNA_EXPR, &mssLEA);
287 1095 : measurementSetSelection_p->setErrorHandler(MSSelection::STATE_EXPR, &mssLES);
288 1095 : measurementSetSelection_p->setErrorHandler(MSSelection::SPW_EXPR, &mssLESPW, true);
289 :
290 2190 : measurementSetSelection_p->reset(
291 1095 : *originalMeasurementSet_p,
292 1095 : MSSelection::PARSE_NOW,
293 2190 : (const String)timeSelection_p,
294 2190 : (const String)baselineSelection_p,
295 2190 : (const String)fieldSelection_p,
296 2190 : (const String)spwSelection_p,
297 2190 : (const String)uvwSelection_p,
298 : dummyExpr, // taqlExpr
299 2190 : (const String)polarizationSelection_p,
300 2190 : (const String)scanSelection_p,
301 2190 : (const String)arraySelection_p,
302 2190 : (const String)scanIntentSelection_p,
303 2190 : (const String)observationSelection_p);
304 :
305 : // Apply Measurement Selection to a copy of the original Measurement Set
306 1095 : MeasurementSet auxMeasurementSet = MeasurementSet(*originalMeasurementSet_p);
307 1107 : measurementSetSelection_p->getSelectedMS(auxMeasurementSet, String(""));
308 1083 : if (selectedMeasurementSet_p) delete selectedMeasurementSet_p;
309 1083 : selectedMeasurementSet_p = new MeasurementSet(auxMeasurementSet);
310 :
311 : // Check if selected MS has rows...
312 1083 : if (selectedMeasurementSet_p->nrow() == 0)
313 : {
314 0 : *logger_p << LogIO::WARN << "Selected Measurement Set doesn't have any rows " << LogIO::POST;
315 : }
316 :
317 : // More debugging information from MS-Selection
318 1083 : if (!arraySelection_p.empty())
319 : {
320 1 : *logger_p << LogIO::DEBUG1 << " Selected array ids are " << measurementSetSelection_p->getSubArrayList() << LogIO::POST;
321 : }
322 :
323 1083 : if (!observationSelection_p.empty())
324 : {
325 7 : *logger_p << LogIO::DEBUG1 << " Selected observation ids are " << measurementSetSelection_p->getObservationList() << LogIO::POST;
326 : }
327 :
328 1083 : if (!fieldSelection_p.empty())
329 : {
330 10 : *logger_p << LogIO::DEBUG1 << " Selected field ids are " << measurementSetSelection_p->getFieldList() << LogIO::POST;
331 : }
332 :
333 1083 : if (!scanSelection_p.empty())
334 : {
335 72 : *logger_p << LogIO::DEBUG1 << " Selected scan ids are " << measurementSetSelection_p->getScanList() << LogIO::POST;
336 : }
337 :
338 1083 : if (!scanIntentSelection_p.empty())
339 : {
340 3 : *logger_p << LogIO::DEBUG1 << " Selected scan intent ids are " << measurementSetSelection_p->getStateObsModeList() << LogIO::POST;
341 : }
342 :
343 1083 : if (!timeSelection_p.empty())
344 : {
345 8 : *logger_p << LogIO::DEBUG1 << " Selected time range is " << measurementSetSelection_p->getTimeList() << LogIO::POST;
346 : }
347 :
348 1083 : if (!spwSelection_p.empty())
349 : {
350 162 : *logger_p << LogIO::NORMAL << " Selected spw-channels ids are " << measurementSetSelection_p->getChanList() << LogIO::POST;
351 162 : if (measurementSetSelection_p->getChanList().size())
352 : {
353 162 : inrowSelection_p = true;
354 : }
355 : }
356 :
357 1083 : if (!baselineSelection_p.empty())
358 : {
359 48 : *logger_p << LogIO::DEBUG1 << " Selected antenna1 ids are " << measurementSetSelection_p->getAntenna1List() << LogIO::POST;
360 48 : *logger_p << LogIO::DEBUG1 << " Selected antenna2 ids are " << measurementSetSelection_p->getAntenna2List() << LogIO::POST;
361 48 : *logger_p << LogIO::DEBUG1 << " Selected baselines are " << measurementSetSelection_p->getBaselineList() << LogIO::POST;
362 : }
363 :
364 1083 : if (!uvwSelection_p.empty())
365 : {
366 2 : *logger_p << LogIO::DEBUG1 << " Selected uv range is " << measurementSetSelection_p->getUVList() << LogIO::POST;
367 : }
368 :
369 1083 : if (!polarizationSelection_p.empty())
370 : {
371 0 : ostringstream polarizationListToPrint (ios::in | ios::out);
372 0 : polarizationListToPrint << measurementSetSelection_p->getPolMap();
373 0 : *logger_p << LogIO::DEBUG1 << " Selected correlation ids are " << polarizationListToPrint.str() << LogIO::POST;
374 0 : }
375 :
376 : // There is a new selected MS so iterators have to be regenerated
377 1083 : iteratorGenerated_p = false;
378 1083 : chunksInitialized_p = false;
379 1083 : buffersInitialized_p = false;
380 1083 : stopIteration_p = false;
381 :
382 1083 : return true;
383 1143 : }
384 :
385 :
386 : // -----------------------------------------------------------------------
387 : // Parse MSSelection expression
388 : // -----------------------------------------------------------------------
389 : bool
390 2994 : FlagMSHandler::parseExpression(MSSelection &parser)
391 : {
392 2994 : parser.toTableExprNode(selectedMeasurementSet_p);
393 2991 : return true;
394 : }
395 :
396 :
397 : // -----------------------------------------------------------------------
398 : // Swap MS to check what is the maximum RAM memory needed
399 : // -----------------------------------------------------------------------
400 : void
401 17 : FlagMSHandler::preSweep()
402 : {
403 17 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
404 :
405 168 : for (visibilityIterator_p->originChunks(); visibilityIterator_p->moreChunks();visibilityIterator_p->nextChunk())
406 : {
407 : // Iterate over vis buffers
408 6650 : for (visibilityIterator_p->origin(); visibilityIterator_p->more();visibilityIterator_p->next())
409 : {
410 :
411 6499 : if (mapScanStartStop_p)
412 : {
413 6499 : generateScanStartStopMap();
414 : }
415 : }
416 : }
417 :
418 17 : if (mapScanStartStop_p)
419 : {
420 17 : *logger_p << LogIO::NORMAL << " " << scanStartStopMap_p->size() <<" Scans found in MS" << LogIO::POST;
421 : }
422 :
423 17 : return;
424 : }
425 :
426 :
427 : // -----------------------------------------------------------------------
428 : // Generate Visibility Iterator with a given sort order and time interval
429 : // -----------------------------------------------------------------------
430 : bool
431 1056 : FlagMSHandler::generateIterator()
432 : {
433 1056 : if (!iteratorGenerated_p)
434 : {
435 :
436 1056 : if (asyncio_enabled_p)
437 : {
438 : // Set preFetchColumns
439 0 : prefetchColumns_p = new VisBufferComponents2();
440 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FlagCube);
441 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FlagRow);
442 0 : prefetchColumns_p->operator +=(VisBufferComponent2::NRows);
443 0 : prefetchColumns_p->operator +=(VisBufferComponent2::FieldId);
444 :
445 0 : preFetchColumns();
446 :
447 : // Then create and initialize RO Async iterator
448 0 : if (visibilityIterator_p) delete visibilityIterator_p;
449 0 : visibilityIterator_p = new vi::VisibilityIterator2(*selectedMeasurementSet_p,
450 0 : vi::SortColumns (sortOrder_p, true),
451 0 : true,prefetchColumns_p,timeInterval_p);
452 : }
453 : else
454 : {
455 : //The vector where all the factories will be stacked
456 1056 : std::vector<vi::ViiLayerFactory*> factories;
457 :
458 : //Create a VI Factory that access directly the MS (or the selected MS)
459 : vi::IteratingParameters ipar(timeInterval_p,
460 1056 : vi::SortColumns (sortOrder_p, true));
461 1056 : std::unique_ptr<vi::VisIterImpl2LayerFactory> directMsVIFactory;
462 1056 : directMsVIFactory.reset(new vi::VisIterImpl2LayerFactory(selectedMeasurementSet_p,ipar, true));
463 : //Apply the channel selections only on the layer that access the MS directly
464 1056 : if(enableChanAvg_p)
465 21 : applyChannelSelection(directMsVIFactory.get());
466 1056 : factories.push_back(directMsVIFactory.get());
467 :
468 : //Add a time averaging layer if so requested
469 1056 : std::unique_ptr<vi::AveragingVi2LayerFactory> timeAvgFactory;
470 1056 : if(enableTimeAvg_p)
471 : {
472 : // Time averaging in clip mode uses the Time Averaging Iterator
473 0 : vi::AveragingParameters parameters(timeAverageBin_p, 0,vi::SortColumns(sortOrder_p, false),
474 17 : timeAvgOptions_p,0.0,NULL,true);
475 17 : timeAvgFactory.reset(new vi::AveragingVi2LayerFactory(parameters));
476 17 : factories.push_back(timeAvgFactory.get());
477 17 : }
478 :
479 : //Create a ChannelAverageTVI Factory if so requested
480 1056 : std::unique_ptr<vi::ChannelAverageTVILayerFactory> chanAvgFactory;
481 1056 : if(enableChanAvg_p)
482 : {
483 21 : chanAvgFactory.reset(new vi::ChannelAverageTVILayerFactory(chanAvgOptions_p));
484 21 : factories.push_back(chanAvgFactory.get());
485 : }
486 :
487 : //Create the final visibility iterator
488 1056 : Vector<vi::ViiLayerFactory*> const factoriesV(factories);
489 1056 : visibilityIterator_p = new vi::VisibilityIterator2(factoriesV);
490 1056 : }
491 :
492 : // Do quack pre-sweep
493 1056 : if (mapScanStartStop_p)
494 17 : preSweep();
495 :
496 : // Set the table data manager (ISM and SSM) cache size to the full column size, for
497 : // the columns ANTENNA1, ANTENNA2, FEED1, FEED2, TIME, INTERVAL, FLAG_ROW, SCAN_NUMBER and UVW
498 1056 : if (slurp_p) visibilityIterator_p->slurp();
499 :
500 : // Apply channel selection
501 : // CAS-3959: Channel selection is now going to be handled at the FlagAgent level
502 : // applyChannelSelection(visibilityIterator_p);
503 :
504 : // Group all the time stamps in one single buffer
505 : // NOTE: Otherwise we have to iterate over Visibility Buffers
506 : // that contain all the rows with the same time step.
507 1056 : if (groupTimeSteps_p)
508 : {
509 : // Set row blocking to a huge number
510 81 : uLong maxChunkRows = selectedMeasurementSet_p->nrow();
511 81 : visibilityIterator_p->setRowBlocking(maxChunkRows);
512 :
513 81 : *logger_p << LogIO::NORMAL << "Setting row blocking to number of rows in selected table: " << maxChunkRows << LogIO::POST;
514 :
515 :
516 : }
517 :
518 : // Get Visibility Buffer reference from VisibilityIterator
519 1056 : visibilityBuffer_p = visibilityIterator_p->getVisBuffer();
520 :
521 : // Get the TYPE column of the PROCESSOR sub-table
522 1056 : if (loadProcessorTable_p){
523 7 : processorTable();
524 7 : loadProcessorTable_p = false;
525 : }
526 :
527 1056 : iteratorGenerated_p = true;
528 : }
529 1056 : chunksInitialized_p = false;
530 1056 : buffersInitialized_p = false;
531 1056 : stopIteration_p = false;
532 1056 : processedRows_p = 0;
533 :
534 1056 : return true;
535 : }
536 :
537 :
538 : // -----------------------------------------------------------------------
539 : // Apply channel selection for asyn or normal iterator
540 : // NOTE (first implementation): We always have to do this, even if there is no SPW:channel selection
541 : // NOTE (after Dic 2011 testing): As far as I know spw selection does not have to be *, it can be empty,
542 : // therefore this step will in practice do nothing , because the spw and channel lists are empty too.
543 : // -----------------------------------------------------------------------
544 : void
545 21 : FlagMSHandler::applyChannelSelection(vi::VisIterImpl2LayerFactory *viFactory)
546 : {
547 21 : vi::FrequencySelectionUsingChannels channelSelector;
548 :
549 : // Apply channel selection (in row selection cannot be done with MSSelection)
550 : // NOTE: Each row of the Matrix has the following elements: SpwID StartCh StopCh Step
551 21 : Matrix<Int> spwchan = measurementSetSelection_p->getChanList();
552 21 : IPosition shape = spwchan.shape();
553 21 : uInt nSelections = shape[0];
554 : Int spw,channelStart,channelStop,channelStep,channelWidth;
555 28 : for(uInt selection_i=0;selection_i<nSelections;selection_i++)
556 : {
557 : // NOTE: selectChannel needs channelStart,channelWidth,channelStep
558 7 : spw = spwchan(selection_i,0);
559 7 : channelStart = spwchan(selection_i,1);
560 7 : channelStop = spwchan(selection_i,2);
561 7 : channelStep = spwchan(selection_i,3);
562 7 : channelWidth = channelStop-channelStart+1;
563 7 : channelSelector.add (spw, channelStart, channelWidth,channelStep);
564 : }
565 :
566 21 : auto freqSel = std::make_shared<vi::FrequencySelections>();
567 21 : freqSel->add(channelSelector);
568 21 : viFactory->setFrequencySelections(freqSel);
569 :
570 42 : return;
571 21 : }
572 :
573 :
574 : // -----------------------------------------------------------------------
575 : // Move to next chunk
576 : // -----------------------------------------------------------------------
577 : bool
578 12345 : FlagMSHandler::nextChunk()
579 : {
580 12345 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
581 :
582 12345 : chunkCounts_p = 0;
583 12345 : bool moreChunks = false;
584 12345 : if (stopIteration_p)
585 : {
586 0 : moreChunks = false;
587 : }
588 : else
589 : {
590 12345 : if (!chunksInitialized_p)
591 : {
592 1056 : if (!iteratorGenerated_p) generateIterator();
593 1056 : visibilityIterator_p->originChunks();
594 1056 : chunksInitialized_p = true;
595 1056 : buffersInitialized_p = false;
596 1056 : chunkNo_p++;
597 1056 : bufferNo_p = 0;
598 1056 : moreChunks = true;
599 : }
600 : else
601 : {
602 11289 : visibilityIterator_p->nextChunk();
603 11289 : if (visibilityIterator_p->moreChunks())
604 : {
605 10233 : buffersInitialized_p = false;
606 10233 : moreChunks = true;
607 10233 : chunkNo_p++;
608 10233 : bufferNo_p = 0;
609 : }
610 : }
611 : }
612 :
613 12345 : if (!moreChunks)
614 : {
615 1056 : *logger_p << LogIO::NORMAL << "==================================================================================== " << LogIO::POST;
616 : }
617 :
618 12345 : return moreChunks;
619 : }
620 :
621 :
622 : // -----------------------------------------------------------------------
623 : // Move to next buffer
624 : // -----------------------------------------------------------------------
625 : bool
626 602089 : FlagMSHandler::nextBuffer()
627 : {
628 602089 : bool moreBuffers = false;
629 602089 : if (stopIteration_p) {
630 0 : return moreBuffers;
631 : }
632 :
633 602089 : if (!buffersInitialized_p)
634 : {
635 11289 : visibilityIterator_p->origin();
636 11289 : buffersInitialized_p = true;
637 11289 : if (!asyncio_enabled_p) preFetchColumns();
638 11289 : if (mapAntennaPairs_p) generateAntennaPairMap();
639 11289 : if (mapSubIntegrations_p) generateSubIntegrationMap();
640 11289 : if (mapPolarizations_p) generatePolarizationsMap();
641 11289 : if (mapAntennaPointing_p) generateAntennaPointingMap();
642 :
643 11289 : moreBuffers = true;
644 11289 : flushFlags_p = false;
645 11289 : flushFlagRow_p = false;
646 11289 : bufferNo_p++;
647 : }
648 : else
649 : {
650 : // WARNING: ++ operator is defined for VisibilityIterator class ("advance" function)
651 : // but if you define a VisibilityIterator pointer, then ++ operator does not call
652 : // the advance function but increments the pointers.
653 590800 : visibilityIterator_p->next();
654 :
655 : // WARNING: We iterate and afterwards check if the iterator is valid
656 590800 : if (visibilityIterator_p->more())
657 : {
658 579511 : if (!asyncio_enabled_p) preFetchColumns();
659 579511 : if (mapAntennaPairs_p) generateAntennaPairMap();
660 579511 : if (mapSubIntegrations_p) generateSubIntegrationMap();
661 579511 : if (mapPolarizations_p) generatePolarizationsMap();
662 579511 : if (mapAntennaPointing_p) generateAntennaPointingMap();
663 579511 : moreBuffers = true;
664 579511 : flushFlags_p = false;
665 579511 : flushFlagRow_p = false;
666 579511 : bufferNo_p++;
667 : }
668 : }
669 :
670 :
671 : // Set new common flag cube
672 602089 : if (moreBuffers)
673 : {
674 : // Get flag (WARNING: We have to modify the shape of the cube before re-assigning it)
675 590800 : Cube<Bool> curentFlagCube= visibilityBuffer_p->flagCube();
676 590800 : modifiedFlagCube_p.resize(curentFlagCube.shape());
677 590800 : modifiedFlagCube_p = curentFlagCube;
678 590800 : originalFlagCube_p.resize(curentFlagCube.shape());
679 590800 : originalFlagCube_p = curentFlagCube;
680 :
681 : // Get flag row (WARNING: We have to modify the shape of the cube before re-assigning it)
682 590800 : Vector<Bool> curentflagRow= visibilityBuffer_p->flagRow();
683 590800 : modifiedFlagRow_p.resize(curentflagRow.shape());
684 590800 : modifiedFlagRow_p = curentflagRow;
685 590800 : originalFlagRow_p.resize(curentflagRow.shape());
686 590800 : originalFlagRow_p = curentflagRow;
687 :
688 : // Compute total number of flags per buffer to be used for generating the agents stats
689 590800 : Int64 currentBufferCounts = curentFlagCube.shape().product();
690 590800 : chunkCounts_p += currentBufferCounts;
691 590800 : progressCounts_p += currentBufferCounts;
692 590800 : msCounts_p += currentBufferCounts;
693 :
694 : // Print chunk characteristics
695 590800 : if (bufferNo_p == 1)
696 : {
697 11289 : processedRows_p += visibilityIterator_p->nRowsInChunk();
698 11289 : double progress = 100.0* ((double) processedRows_p / (double) selectedMeasurementSet_p->nrow());
699 : // Send only a limited number of lines to INFO level, 'other' lines to debug
700 11289 : if (checkDoChunkLine(progress)) {
701 5695 : *logger_p << LogIO::NORMAL;
702 : } else {
703 5594 : *logger_p << LogIO::DEBUG1;
704 : }
705 11289 : logger_p->origin(LogOrigin("FlagMSHandler",""));
706 :
707 11289 : String corrs = "[ ";
708 46632 : for (uInt corr_i=0;corr_i<(uInt) visibilityBuffer_p->nCorrelations();corr_i++)
709 : {
710 35343 : corrs += (*polarizationIndexMap_p)[corr_i] + " ";
711 : }
712 11289 : corrs += "]";
713 :
714 11289 : *logger_p << "Chunk = " << chunkNo_p << " [progress: " << (Int)progress << "%]"
715 11289 : ", Observation = " << visibilityBuffer_p->observationId()[0] <<
716 11289 : ", Array = " << visibilityBuffer_p->arrayId()[0] <<
717 11289 : ", Scan = " << visibilityBuffer_p->scan()[0] <<
718 33867 : ", Field = " << visibilityBuffer_p->fieldId()(0) << " (" << fieldNames_p->operator()(visibilityBuffer_p->fieldId()) << ")"
719 11289 : ", Spw = " << visibilityBuffer_p->spectralWindows()(0) <<
720 11289 : ", Channels = " << visibilityBuffer_p->nChannels() <<
721 : ", Corrs = " << corrs <<
722 79023 : ", Total Rows = " << visibilityIterator_p->nRowsInChunk() << LogIO::POST;
723 11289 : }
724 590800 : }
725 :
726 602089 : return moreBuffers;
727 : }
728 :
729 : // -----------------------------------------------------------------------
730 : // Generate scan start stop map
731 : // -----------------------------------------------------------------------
732 : void
733 6499 : FlagMSHandler::generateScanStartStopMap()
734 : {
735 :
736 : Int scan;
737 : Double start,stop;
738 6499 : Vector<Int> scans;
739 6499 : Vector<Double> times;
740 :
741 6499 : Cube<Bool> flags;
742 : uInt scanStartRow;
743 : uInt scanStopRow;
744 : uInt ncorrs,nchannels,nrows;
745 : Bool stopSearch;
746 :
747 6499 : if (scanStartStopMap_p == NULL) scanStartStopMap_p = new scanStartStopMap();
748 :
749 6499 : scans = visibilityIterator_p->getVisBuffer()->scan();
750 6499 : times = visibilityIterator_p->getVisBuffer()->time();
751 :
752 : // Check if anything is flagged in this buffer
753 6499 : scanStartRow = 0;
754 6499 : scanStopRow = times.size()-1;
755 6499 : if (mapScanStartStopFlagged_p)
756 : {
757 300 : flags = visibilityIterator_p->getVisBuffer()->flagCube();
758 300 : IPosition shape = flags.shape();
759 300 : ncorrs = shape[0];
760 300 : nchannels = shape[1];
761 300 : nrows = shape[2];
762 :
763 : // Look for effective scan start
764 300 : stopSearch = false;
765 24351 : for (uInt row_i=0;row_i<nrows;row_i++)
766 : {
767 24288 : if (stopSearch) break;
768 :
769 1524570 : for (uInt channel_i=0;channel_i<nchannels;channel_i++)
770 : {
771 1500756 : if (stopSearch) break;
772 :
773 4501320 : for (uInt corr_i=0;corr_i<ncorrs;corr_i++)
774 : {
775 3001038 : if (stopSearch) break;
776 :
777 3000801 : if (!flags(corr_i,channel_i,row_i))
778 : {
779 237 : scanStartRow = row_i;
780 237 : stopSearch = true;
781 : }
782 : }
783 : }
784 : }
785 :
786 : // If none of the rows were un-flagged we don't continue checking from the end
787 : // As a consequence of this some scans may not be present in the map, and have
788 : // to be skipped in the flagging process because they are already flagged.
789 300 : if (!stopSearch) return;
790 :
791 : // Look for effective scan stop
792 237 : stopSearch = false;
793 474 : for (uInt row_i=0;row_i<nrows;row_i++)
794 : {
795 474 : if (stopSearch) break;
796 :
797 474 : for (uInt channel_i=0;channel_i<nchannels;channel_i++)
798 : {
799 474 : if (stopSearch) break;
800 :
801 474 : for (uInt corr_i=0;corr_i<ncorrs;corr_i++)
802 : {
803 474 : if (stopSearch) break;
804 :
805 237 : if (!flags(corr_i,channel_i,nrows-1-row_i))
806 : {
807 237 : scanStopRow = nrows-1-row_i;
808 237 : stopSearch = true;
809 : }
810 : }
811 : }
812 : }
813 300 : }
814 :
815 : // Check scan start/stop times
816 6436 : scan = scans[0];
817 6436 : start = times[scanStartRow];
818 6436 : stop = times[scanStopRow];
819 :
820 6436 : if (scanStartStopMap_p->find(scan) == scanStartStopMap_p->end())
821 : {
822 84 : (*scanStartStopMap_p)[scan].push_back(start);
823 84 : (*scanStartStopMap_p)[scan].push_back(stop);
824 : }
825 : else
826 : {
827 : // Check if we have a better start time
828 6352 : if ((*scanStartStopMap_p)[scan][0] > start)
829 : {
830 0 : (*scanStartStopMap_p)[scan][0] = start;
831 : }
832 : // Check if we have a better stop time
833 6352 : if ((*scanStartStopMap_p)[scan][1] < stop)
834 : {
835 982 : (*scanStartStopMap_p)[scan][1] = stop;
836 : }
837 : }
838 :
839 6436 : return;
840 6499 : }
841 :
842 :
843 : // -----------------------------------------------------------------------
844 : // Flush flags to MS
845 : // -----------------------------------------------------------------------
846 : bool
847 488003 : FlagMSHandler::flushFlags()
848 : {
849 488003 : logger_p->origin(LogOrigin("FlagMSHandler",__FUNCTION__,WHERE));
850 488003 : *logger_p << LogIO::DEBUG1
851 : << "flushing modified flags cube of shape: " << modifiedFlagCube_p.shape()
852 488003 : << LogIO::POST;
853 :
854 488003 : if (flushFlags_p)
855 : {
856 :
857 406388 : visibilityIterator_p->writeFlag(modifiedFlagCube_p);
858 406388 : flushFlags_p = false;
859 : }
860 :
861 488003 : if ((flushFlagRow_p) and (!inrowSelection_p))
862 : {
863 382165 : visibilityIterator_p->writeFlagRow(modifiedFlagRow_p);
864 382165 : flushFlagRow_p = false;
865 : }
866 :
867 488003 : return true;
868 : }
869 :
870 :
871 : // -----------------------------------------------------------------------
872 : // Flush flags to MS
873 : // -----------------------------------------------------------------------
874 : String
875 931 : FlagMSHandler::getTableName()
876 : {
877 931 : return originalMeasurementSet_p->tableName();
878 : }
879 :
880 : // -----------------------------------------------------------------------
881 : // Check what data columns exist
882 : // -----------------------------------------------------------------------
883 : bool
884 202 : FlagMSHandler::checkIfColumnExists(String column)
885 : {
886 202 : return originalMeasurementSet_p->tableDesc().isColumn(column);
887 : }
888 :
889 : // -----------------------------------------------------------------------
890 : // Whether the per-chunk progress log line should be produced
891 : // -----------------------------------------------------------------------
892 : bool
893 11289 : FlagMSHandler::checkDoChunkLine(double progress)
894 : {
895 11289 : bool result = false;
896 : // per-chunk progress report lines - every 'chunkLineThresholdInc_p' % and in
897 : // any case always the beginning (~0%) end (100% rows)
898 11289 : if ((progress >= chunkLineThreshold_p) or (progress >= 100.0)) {
899 5452 : chunkLineThreshold_p += chunkLineThresholdInc_p;
900 5452 : result = true;
901 : }
902 11289 : result |= (1 == chunkNo_p) or (processedRows_p == selectedMeasurementSet_p->nrow());
903 :
904 11289 : return result;
905 : }
906 :
907 : // ----------------------------------------------------------------------------
908 : // Signal true when a per-agent partial (ongoing run) summary has to be printed
909 : // ----------------------------------------------------------------------------
910 : bool
911 11289 : FlagMSHandler::summarySignal()
912 : {
913 11289 : Double progress = 100.0* ((Double) processedRows_p / (Double) selectedMeasurementSet_p->nrow());
914 :
915 : // The summaries will be printed with INFO level. Do not bother building per-agent
916 : // (for potentially many agents) summaries if logger priority not enough
917 11289 : auto signal = ((progress >= summaryThreshold_p) and (logger_p->priority() <= LogMessage::NORMAL));
918 11289 : if (signal) {
919 5452 : summaryThreshold_p += summaryThresholdInc_p;
920 : }
921 11289 : return signal;
922 : }
923 :
924 : // -----------------------------------------------------------------------
925 : // Get the PROCESSOR sub-table
926 : // -----------------------------------------------------------------------
927 : bool
928 7 : FlagMSHandler::processorTable()
929 : {
930 7 : MSProcessor msSubtable = selectedMeasurementSet_p->processor();
931 :
932 : // The rows in the PROCESSOR table correspond to the PROCESSOR_ID column
933 : // in main table
934 7 : if (msSubtable.nrow() == 0){
935 3 : *logger_p << LogIO::WARN << "PROCESSOR sub-table is empty. Assuming CORRELATOR type."
936 3 : << LogIO::POST;
937 3 : processorTableExist_p = false;
938 : }
939 : else {
940 4 : MSProcessorColumns tableCols(msSubtable);
941 4 : ScalarColumn<String> typeCol = tableCols.type();
942 4 : processorTableExist_p = true;
943 :
944 : /* Create a look-up boolean column to tell if a row is of type CORRELATOR.
945 : isCorrelatorType_p = true when PROCESSOR_ID is of TYPE CORRELATOR
946 : */
947 4 : isCorrelatorType_p.resize(msSubtable.nrow(),False);
948 :
949 : // Assign true to row in look-up table that have TYPE==CORRELATOR
950 10 : for (uInt pid=0; pid<msSubtable.nrow(); pid++){
951 :
952 6 : String proc_type = typeCol.asString(pid);
953 :
954 6 : if (proc_type.compare("CORRELATOR") == 0){
955 2 : isCorrelatorType_p(pid) = True;
956 : }
957 : else
958 4 : isCorrelatorType_p(pid) = False;
959 :
960 6 : }
961 4 : }
962 :
963 7 : return true;
964 :
965 7 : }
966 :
967 : /*
968 : * Check if a VIRTUAL MODEL column exists
969 : */
970 : bool
971 8 : FlagMSHandler::checkIfSourceModelColumnExists()
972 : {
973 8 : Vector<Int> fieldids;
974 :
975 8 : if (casa::refim::VisModelData::hasAnyModel(*selectedMeasurementSet_p, fieldids)){
976 4 : return true;
977 : }
978 :
979 4 : return false;
980 8 : }
981 :
982 : } //# NAMESPACE CASA - END
983 :
|