LCOV - code coverage report
Current view: top level - msvis/MSVis - VLAT.cc (source / functions) Hit Total Coverage
Test: casacpp_coverage.info Lines: 0 356 0.0 %
Date: 2024-11-06 17:42:47 Functions: 0 25 0.0 %

          Line data    Source code
       1             : //# VLAT.cc: Implemenation of visibility lookahead thread related functionality.
       2             : //# Copyright (C) 2011
       3             : //# Associated Universities, Inc. Washington DC, USA.
       4             : //#
       5             : //# This library is free software; you can redistribute it and/or modify it
       6             : //# under the terms of the GNU Library General Public License as published by
       7             : //# the Free Software Foundation; either version 2 of the License, or (at your
       8             : //# option) any later version.
       9             : //#
      10             : //# This library is distributed in the hope that it will be useful, but WITHOUT
      11             : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      12             : //# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
      13             : //# License for more details.
      14             : //#
      15             : //# You should have received a copy of the GNU Library General Public License
      16             : //# along with this library; if not, write to the Free Software Foundation,
      17             : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
      18             : //#
      19             : //# Correspondence concerning CASA should be addressed as follows:
      20             : //#        Internet email: CASA-request@nrao.edu.
      21             : //#        Postal address: CASA Project Office
      22             : //#                        National Radio Astronomy Observatory
      23             : //#                        520 Edgemont Road
      24             : //#                        Charlottesville, VA 22903-2475 USA
      25             : //#
      26             : //#
      27             : //# $Id$
      28             : 
      29             : #include <assert.h>
      30             : #include <time.h>
      31             : #include <sys/time.h>
      32             : 
      33             : #include <casacore/casa/Logging/LogIO.h>
      34             : #include <casacore/casa/System/AipsrcValue.h>
      35             : #include <msvis/MSVis/VLAT.h>
      36             : #include <msvis/MSVis/VisBufferAsync.h>
      37             : 
      38             : #include <stdcasa/thread/AsynchronousTools.h>
      39             : using namespace casacore;
      40             : using namespace casa::async;
      41             : 
      42             : #include <algorithm>
      43             : #include <cstdarg>
      44             : #include <functional>
      45             : 
      46             : #include <stdcasa/UtilJ.h>
      47             : 
      48             : using namespace casacore;
      49             : using namespace casa::utilj;
      50             : using namespace std;
      51             : using namespace casacore;
      52             : using namespace casa::asyncio;
      53             : 
      54             : #define Log(level, ...) \
      55             :         {if (AsynchronousInterface::logThis (level)) \
      56             :     Logger::get()->log (__VA_ARGS__);};
      57             : 
      58             : using namespace casacore;
      59             : namespace casa {
      60             : 
      61             : namespace asyncio {
      62             : 
      63             : //  *****************************
      64             : //  *                           *
      65             : //  * VlatAndDataImplementation *
      66             : //  *                           *
      67             : //  *****************************
      68             : 
      69           0 : VlatAndData::VlatAndData ()
      70           0 : : vlaData_p (NULL),
      71           0 :   vlat_p (NULL)
      72             : {
      73           0 : }
      74             : 
      75             : //  ***********************
      76             : //  *                     *
      77             : //  * VLAT Implementation *
      78             : //  *                     *
      79             : //  ***********************
      80             : 
      81           0 : VLAT::VLAT (AsynchronousInterface * asynchronousInterface)
      82             : {
      83           0 :     interface_p = asynchronousInterface;
      84           0 :     vlaData_p = interface_p->getVlaData();
      85           0 :     visibilityIterator_p = NULL;
      86           0 :     writeIterator_p = NULL;
      87           0 :     threadTerminated_p = false;
      88           0 : }
      89             : 
      90           0 : VLAT::~VLAT ()
      91             : {
      92             : 
      93             :     // Free up storage
      94             : 
      95           0 :     for (FillerDictionary::iterator f = fillerDictionary_p.begin();
      96           0 :             f != fillerDictionary_p.end();
      97           0 :             f++){
      98           0 :         delete (f->second);
      99             :     }
     100             : 
     101           0 :     for (Fillers::iterator f = fillers_p.begin();
     102           0 :             f != fillers_p.end();
     103           0 :             f++){
     104           0 :         delete (* f);
     105             :     }
     106             : 
     107           0 :     delete visibilityIterator_p;
     108           0 :     delete writeIterator_p;
     109           0 : }
     110             : 
     111             : void
     112           0 : VLAT::alignWriteIterator (SubChunkPair subchunk)
     113             : {
     114           0 :     Assert (subchunk <= readSubchunk_p);
     115             : 
     116           0 :     Bool done = false;
     117             : 
     118           0 :     while (subchunk > writeSubchunk_p && ! done){
     119             : 
     120           0 :         ++ (* writeIterator_p); // advance to next subchunk in chunk
     121             : 
     122           0 :         if (writeIterator_p->more()){
     123             : 
     124             :             // Sucessfully moved on to next subchunk
     125             : 
     126           0 :             writeSubchunk_p.incrementSubChunk();
     127             :         }
     128             :         else{
     129             : 
     130             :             // End of subchunks to advance to start of next chunk
     131             : 
     132           0 :             writeIterator_p->nextChunk ();
     133             : 
     134           0 :             if (writeIterator_p->moreChunks ()){
     135             : 
     136             :                 // Moved on to next chunk; position to first subchunk
     137             : 
     138           0 :                 writeIterator_p->origin ();
     139             : 
     140           0 :                 if (writeIterator_p->more()){
     141           0 :                     writeSubchunk_p.incrementChunk();
     142             :                 }
     143             :                 else{
     144           0 :                     done = true; // no more data
     145             :                 }
     146             :             }
     147             :             else{
     148           0 :                 done = true; // no more data
     149             :             }
     150             :         }
     151             :     }
     152             : 
     153           0 :     ThrowIf (subchunk != writeSubchunk_p,
     154             :              String::format ("Failed to advance write iterator to subchunk %s; last subchunk is %s",
     155             :                              subchunk.toString().c_str(), writeSubchunk_p.toString().c_str()));
     156           0 : }
     157             : 
     158             : void
     159           0 : VLAT::applyModifiers (ROVisibilityIterator * rovi, VisibilityIterator * vi)
     160             : {
     161             :     // Apply modifiers to read iterator and to write iterator (if it exists)
     162             : 
     163           0 :     roviaModifiers_p.apply (rovi);
     164             : 
     165           0 :     if (vi != NULL){
     166           0 :         roviaModifiers_p.apply (vi);
     167             :     }
     168             : 
     169             :     // Get the channel selection information from the modified read VI and provide it to the
     170             :     // data object
     171             : 
     172           0 :     roviaModifiers_p.clearAndFree ();
     173             : 
     174           0 :     Block< Vector<Int> > blockNGroup;
     175           0 :     Block< Vector<Int> > blockStart;
     176           0 :     Block< Vector<Int> > blockWidth;
     177           0 :     Block< Vector<Int> > blockIncr;
     178           0 :     Block< Vector<Int> > blockSpw;
     179             : 
     180           0 :     rovi->getChannelSelection (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
     181             : 
     182           0 :     vlaData_p -> storeChannelSelection (asyncio::ChannelSelection (blockNGroup, blockStart,
     183             :                                                                    blockWidth, blockIncr, blockSpw));
     184           0 : }
     185             : 
     186             : void
     187           0 : VLAT::checkFiller (VisBufferComponents::EnumType fillerId)
     188             : {
     189           0 :     ThrowIf (! visibilityIterator_p -> existsColumn (fillerId),
     190             :              String::format ("VLAT: Column to be prefetched, %s, does not exist!",
     191             :                             PrefetchColumns::columnName (fillerId).c_str()));
     192           0 : }
     193             : 
     194             : void
     195           0 : VLAT::createFillerDictionary ()
     196             : {
     197             :     // Create a dictionary of all the possible fillers using the
     198             :     // ViReadImplAsync::PrefetchColumnIds as the keys
     199             : 
     200           0 :     fillerDictionary_p.clear();
     201             : 
     202           0 :     fillerDictionary_p.add (VisBufferComponents::AllBeamOffsetsZero,
     203           0 :                            vlatFunctor0 (& VisBufferAsync::fillAllBeamOffsetsZero));
     204           0 :     fillerDictionary_p.add (VisBufferComponents::AntennaMounts,
     205           0 :                            vlatFunctor0 (& VisBufferAsync::fillAntennaMounts));
     206           0 :     fillerDictionary_p.add (VisBufferComponents::Ant1,
     207           0 :                            vlatFunctor0 (& VisBuffer::fillAnt1));
     208           0 :     fillerDictionary_p.add (VisBufferComponents::Ant2,
     209           0 :                            vlatFunctor0 (& VisBuffer::fillAnt2));
     210           0 :     fillerDictionary_p.add (VisBufferComponents::ArrayId,
     211           0 :                            vlatFunctor0 (& VisBuffer::fillArrayId));
     212           0 :     fillerDictionary_p.add (VisBufferComponents::BeamOffsets,
     213           0 :                            vlatFunctor0 (& VisBufferAsync::fillBeamOffsets));
     214           0 :     fillerDictionary_p.add (VisBufferComponents::Channel,
     215           0 :                            vlatFunctor0 (& VisBuffer::fillChannel));
     216           0 :     fillerDictionary_p.add (VisBufferComponents::Cjones,
     217           0 :                            vlatFunctor0 (& VisBuffer::fillCjones));
     218           0 :     fillerDictionary_p.add (VisBufferComponents::CorrType,
     219           0 :                            vlatFunctor0 (& VisBuffer::fillCorrType));
     220           0 :     fillerDictionary_p.add (VisBufferComponents::Corrected,
     221           0 :                            vlatFunctor1(& VisBuffer::fillVis,
     222             :                                         VisibilityIterator::Corrected));
     223           0 :     fillerDictionary_p.add (VisBufferComponents::CorrectedCube,
     224           0 :                            vlatFunctor1(& VisBuffer::fillVisCube,
     225             :                                         VisibilityIterator::Corrected));
     226           0 :     fillerDictionary_p.add (VisBufferComponents::DataDescriptionId,
     227           0 :                            vlatFunctor0 (& VisBuffer::fillDataDescriptionId));
     228             : //    fillerDictionary_p.add (VisBufferComponents::Direction1,
     229             : //                           vlatFunctor0 (& VisBuffer::fillDirection1));
     230             : //    fillerDictionary_p.add (VisBufferComponents::Direction2,
     231             : //                           vlatFunctor0 (& VisBuffer::fillDirection2));
     232           0 :     fillerDictionary_p.add (VisBufferComponents::Exposure,
     233           0 :                            vlatFunctor0 (& VisBuffer::fillExposure));
     234           0 :     fillerDictionary_p.add (VisBufferComponents::Feed1,
     235           0 :                            vlatFunctor0 (& VisBuffer::fillFeed1));
     236             : //    fillerDictionary_p.add (VisBufferComponents::Feed1_pa,
     237             : //                           vlatFunctor0 (& VisBuffer::fillFeed1_pa));
     238           0 :     fillerDictionary_p.add (VisBufferComponents::Feed2,
     239           0 :                            vlatFunctor0 (& VisBuffer::fillFeed2));
     240             : //    fillerDictionary_p.add (VisBufferComponents::Feed2_pa,
     241             : //                           vlatFunctor0 (& VisBuffer::fillFeed2_pa));
     242           0 :     fillerDictionary_p.add (VisBufferComponents::FieldId,
     243           0 :                            vlatFunctor0 (& VisBuffer::fillFieldId));
     244           0 :     fillerDictionary_p.add (VisBufferComponents::Flag,
     245           0 :                            vlatFunctor0 (& VisBuffer::fillFlag));
     246           0 :     fillerDictionary_p.add (VisBufferComponents::FlagCategory,
     247           0 :                            vlatFunctor0 (& VisBuffer::fillFlagCategory));
     248           0 :     fillerDictionary_p.add (VisBufferComponents::FlagCube,
     249           0 :                            vlatFunctor0 (& VisBuffer::fillFlagCube));
     250           0 :     fillerDictionary_p.add (VisBufferComponents::FlagRow,
     251           0 :                            vlatFunctor0 (& VisBuffer::fillFlagRow));
     252           0 :     fillerDictionary_p.add (VisBufferComponents::Freq,
     253           0 :                            vlatFunctor0 (& VisBuffer::fillFreq));
     254           0 :     fillerDictionary_p.add (VisBufferComponents::ImagingWeight,
     255           0 :                            new VlatFunctor ("ImagingWeight")); // do not fill this one
     256           0 :     fillerDictionary_p.add (VisBufferComponents::Model,
     257           0 :                            vlatFunctor1(& VisBuffer::fillVis,
     258             :                                         VisibilityIterator::Model));
     259           0 :     fillerDictionary_p.add (VisBufferComponents::ModelCube,
     260           0 :                            vlatFunctor1(& VisBuffer::fillVisCube,
     261             :                                         VisibilityIterator::Model));
     262           0 :     fillerDictionary_p.add (VisBufferComponents::NChannel,
     263           0 :                            vlatFunctor0 (& VisBuffer::fillnChannel));
     264           0 :     fillerDictionary_p.add (VisBufferComponents::NCorr,
     265           0 :                            vlatFunctor0 (& VisBuffer::fillnCorr));
     266           0 :     fillerDictionary_p.add (VisBufferComponents::NRow,
     267           0 :                            vlatFunctor0 (& VisBuffer::fillnRow));
     268           0 :     fillerDictionary_p.add (VisBufferComponents::ObservationId,
     269           0 :                            vlatFunctor0 (& VisBuffer::fillObservationId));
     270           0 :     fillerDictionary_p.add (VisBufferComponents::Observed,
     271           0 :                            vlatFunctor1(& VisBuffer::fillVis,
     272             :                                         VisibilityIterator::Observed));
     273           0 :     fillerDictionary_p.add (VisBufferComponents::ObservedCube,
     274           0 :                            vlatFunctor1(& VisBuffer::fillVisCube,
     275             :                                         VisibilityIterator::Observed));
     276           0 :     fillerDictionary_p.add (VisBufferComponents::PhaseCenter,
     277           0 :                            vlatFunctor0 (& VisBuffer::fillPhaseCenter));
     278           0 :     fillerDictionary_p.add (VisBufferComponents::PolFrame,
     279           0 :                            vlatFunctor0 (& VisBuffer::fillPolFrame));
     280           0 :     fillerDictionary_p.add (VisBufferComponents::ProcessorId,
     281           0 :                            vlatFunctor0 (& VisBuffer::fillProcessorId));
     282           0 :     fillerDictionary_p.add (VisBufferComponents::ReceptorAngles,
     283           0 :                            vlatFunctor0 (& VisBufferAsync::fillReceptorAngles));
     284           0 :     fillerDictionary_p.add (VisBufferComponents::Scan,
     285           0 :                            vlatFunctor0 (& VisBuffer::fillScan));
     286           0 :     fillerDictionary_p.add (VisBufferComponents::Sigma,
     287           0 :                            vlatFunctor0 (& VisBuffer::fillSigma));
     288           0 :     fillerDictionary_p.add (VisBufferComponents::SigmaMat,
     289           0 :                            vlatFunctor0 (& VisBuffer::fillSigmaMat));
     290           0 :     fillerDictionary_p.add (VisBufferComponents::SpW,
     291           0 :                            vlatFunctor0 (& VisBuffer::fillSpW));
     292           0 :     fillerDictionary_p.add (VisBufferComponents::StateId,
     293           0 :                            vlatFunctor0 (& VisBuffer::fillStateId));
     294           0 :     fillerDictionary_p.add (VisBufferComponents::Time,
     295           0 :                            vlatFunctor0 (& VisBuffer::fillTime));
     296           0 :     fillerDictionary_p.add (VisBufferComponents::TimeCentroid,
     297           0 :                            vlatFunctor0 (& VisBuffer::fillTimeCentroid));
     298           0 :     fillerDictionary_p.add (VisBufferComponents::TimeInterval,
     299           0 :                            vlatFunctor0 (& VisBuffer::fillTimeInterval));
     300           0 :     fillerDictionary_p.add (VisBufferComponents::Uvw,
     301           0 :                            vlatFunctor0 (& VisBuffer::filluvw));
     302           0 :     fillerDictionary_p.add (VisBufferComponents::UvwMat,
     303           0 :                            vlatFunctor0 (& VisBuffer::filluvwMat));
     304           0 :     fillerDictionary_p.add (VisBufferComponents::Weight,
     305           0 :                            vlatFunctor0 (& VisBuffer::fillWeight));
     306           0 :     fillerDictionary_p.add (VisBufferComponents::WeightMat,
     307           0 :                            vlatFunctor0 (& VisBuffer::fillWeightMat));
     308           0 :     fillerDictionary_p.add (VisBufferComponents::WeightSpectrum,
     309           0 :                            vlatFunctor0 (& VisBuffer::fillWeightSpectrum));
     310             : 
     311             :     // assert (fillerDictionary_p.size() == VisBufferComponents::N_VisBufferComponents);
     312             :     // Every supported prefetch column needs a filler
     313             : 
     314             :     //fillerDependencies_p.add ();
     315             : 
     316             :     //fillerDependencies_p.performTransitiveClosure (fillerDictionary_p);
     317           0 : }
     318             : 
     319             : void
     320           0 : VLAT::fillDatum (VlaDatum * datum)
     321             : {
     322             : 
     323           0 :     VisBufferComponents::EnumType fillerId = VisBufferComponents::Unknown;
     324             :     try{
     325           0 :         VisBufferAsync * vb = datum->getVisBuffer();
     326             : 
     327           0 :         vb->clear();
     328           0 :         vb->setFilling (true);
     329           0 :         vb->attachToVisIter (* visibilityIterator_p); // invalidates vb's cache as well
     330             : 
     331           0 :         fillDatumMiscellanyBefore (datum);
     332             : 
     333           0 :         for (Fillers::iterator filler = fillers_p.begin(); filler != fillers_p.end(); filler ++){
     334             : 
     335             :             //Log (2, "Filler id=%d name=%s starting\n", (* filler)->getId(), ViReadImplAsync::prefetchColumnName((* filler)->getId()).c_str());
     336             : 
     337           0 :             fillerId = (* filler)->getId();
     338           0 :             checkFiller (fillerId);
     339           0 :             (** filler) (vb);
     340             :         }
     341             : 
     342           0 :         fillDatumMiscellanyAfter (datum);
     343             : 
     344           0 :         vb->detachFromVisIter ();
     345           0 :         vb->setFilling (false);
     346             :     }
     347           0 :     catch (...){
     348             : 
     349           0 :         if (fillerId == -1){
     350           0 :             Log (1, "VLAT: Error while filling datum at 0x%08x, vb at 0x%08x; rethrowing\n",
     351             :                  datum, datum->getVisBuffer());
     352             :         }
     353             :         else{
     354           0 :             Log (1, "VLAT: Error while filling datum at 0x%08x, vb at 0x%08x; "
     355             :                  "fillingColumn='%s'; rethrowing\n",
     356             :                  datum, datum->getVisBuffer(),
     357             :                  PrefetchColumns::columnName (fillerId).c_str());
     358             :         }
     359             : 
     360           0 :         throw;
     361           0 :     }
     362           0 : }
     363             : 
     364             : void
     365           0 : VLAT::fillDatumMiscellanyAfter (VlaDatum * datum)
     366             : {
     367           0 :     datum->getVisBuffer()->setVisibilityShape (visibilityIterator_p->visibilityShape ());
     368             : 
     369             :     //////datum->getVisBuffer()->setDataDescriptionId (visibilityIterator_p->getDataDescriptionId());
     370             : 
     371           0 :     datum->getVisBuffer()->setPolarizationId (visibilityIterator_p->polarizationId());
     372             : 
     373           0 :     datum->getVisBuffer()->setNCoh(visibilityIterator_p->numberCoh ());
     374             : 
     375           0 :     datum->getVisBuffer()->setNRowChunk (visibilityIterator_p->nRowChunk());
     376             : 
     377           0 :     Vector<Int> nvischan;
     378           0 :     Vector<Int> spw;
     379             : 
     380           0 :     visibilityIterator_p->allSelectedSpectralWindows(spw, nvischan);
     381             : 
     382           0 :     datum->getVisBuffer()->setSelectedNVisibilityChannels (nvischan);
     383           0 :     datum->getVisBuffer()->setSelectedSpectralWindows (spw);
     384             : 
     385           0 :     int nSpw = visibilityIterator_p->numberSpw();
     386           0 :     datum->getVisBuffer()->setNSpw (nSpw);
     387             : 
     388           0 :     int nRowChunk = visibilityIterator_p->nRowChunk();
     389           0 :     datum->getVisBuffer()->setNRowChunk (nRowChunk);
     390             : 
     391           0 :     datum->getVisBuffer()->setMSD (visibilityIterator_p->getMSD ()); // ought to be last
     392           0 : }
     393             : 
     394             : void
     395           0 : VLAT::fillDatumMiscellanyBefore (VlaDatum * datum)
     396             : {
     397           0 :     datum->getVisBuffer()->setMeasurementSet (visibilityIterator_p->getMeasurementSet());
     398           0 :     datum->getVisBuffer()->setMeasurementSetId (visibilityIterator_p->getMeasurementSetId(),
     399           0 :                                                 datum->getSubChunkPair().second == 0);
     400             : 
     401           0 :     datum->getVisBuffer()->setNewEntityFlags (visibilityIterator_p->newArrayId(),
     402           0 :                                               visibilityIterator_p->newFieldId(),
     403           0 :                                               visibilityIterator_p->newSpectralWindow());
     404           0 :     datum->getVisBuffer()->setNAntennas (visibilityIterator_p->getNAntennas ());
     405           0 :     datum->getVisBuffer()->setMEpoch (visibilityIterator_p->getEpoch ());
     406           0 :     datum->getVisBuffer()->setReceptor0Angle (visibilityIterator_p->getReceptor0Angle());
     407             : 
     408           0 :     fillLsrInfo (datum);
     409             : 
     410           0 :     Vector<Double> lsrFreq, selFreq;
     411           0 :     visibilityIterator_p->getTopoFreqs (lsrFreq, selFreq);
     412           0 :     datum->getVisBuffer()->setTopoFreqs (lsrFreq, selFreq);
     413           0 : }
     414             : 
     415             : 
     416             : void
     417           0 : VLAT::fillLsrInfo (VlaDatum * datum)
     418             : {
     419           0 :     MPosition observatoryPositon;
     420           0 :     MDirection phaseCenter;
     421             :     Bool velocitySelection;
     422             : 
     423             : 
     424           0 :     Block<Int> channelGroupNumber;
     425           0 :     Block<Int> channelIncrement;
     426           0 :     Block<Int> channelStart;
     427           0 :     Block<Int> channelWidth;
     428             : 
     429           0 :     visibilityIterator_p->getLsrInfo (channelGroupNumber,
     430             :                                       channelIncrement,
     431             :                                       channelStart,
     432             :                                       channelWidth,
     433             :                                       observatoryPositon,
     434             :                                       phaseCenter,
     435             :                                       velocitySelection);
     436             : 
     437           0 :     datum->getVisBuffer()->setLsrInfo (channelGroupNumber,
     438             :                                        channelIncrement,
     439             :                                        channelStart,
     440             :                                        channelWidth,
     441             :                                        observatoryPositon,
     442             :                                        phaseCenter,
     443             :                                        velocitySelection);
     444           0 : }
     445             : 
     446             : void
     447           0 : VLAT::flushWrittenData ()
     448             : {
     449           0 :     for (int i = 0; i < (int) measurementSets_p.nelements() ; i++){
     450           0 :         measurementSets_p [i].flush();
     451             :     }
     452           0 : }
     453             : 
     454             : void
     455           0 : VLAT::handleWrite ()
     456             : {
     457             :     // While there is data to write out, write it out.
     458             : 
     459           0 :     Bool done = false;
     460             : 
     461           0 :     WriteQueue & writeQueue = interface_p->getWriteQueue ();
     462             : 
     463             :     do {
     464             : 
     465           0 :         WriteData * writeData = writeQueue.dequeue ();
     466             : 
     467           0 :         if (writeData != NULL){
     468             : 
     469           0 :             SubChunkPair subchunk = writeData->getSubChunkPair();
     470             : 
     471           0 :             alignWriteIterator (subchunk);
     472             : 
     473           0 :             writeData->write (writeIterator_p);
     474           0 :             delete writeData;
     475             :         }
     476             :         else{
     477           0 :             done = true;
     478             :         }
     479             : 
     480           0 :     } while (! done);
     481           0 : }
     482             : 
     483             : 
     484             : void
     485           0 : VLAT::initialize (const ROVisibilityIterator & rovi)
     486             : {
     487           0 :     ThrowIf (isStarted(), "VLAT::initialize: thread already started");
     488             : 
     489           0 :     visibilityIterator_p = new ROVisibilityIterator (rovi);
     490             : 
     491           0 :     visibilityIterator_p->originChunks (true);
     492             :     // force the MSIter, etc., to be rewound, reinitialized, etc.
     493           0 : }
     494             : 
     495             : void
     496           0 : VLAT::initialize (const Block<MeasurementSet> & mss,
     497             :                   const Block<Int> & sortColumns,
     498             :                   Bool addDefaultSortCols,
     499             :                   Double timeInterval,
     500             :                   Bool writable)
     501             : {
     502           0 :     ThrowIf (isStarted(), "VLAT::initialize: thread already started");
     503             : 
     504           0 :     visibilityIterator_p = new ROVisibilityIterator (mss, sortColumns, addDefaultSortCols, timeInterval);
     505             : 
     506           0 :     if (writable){
     507           0 :         writeIterator_p = new VisibilityIterator (mss, sortColumns, addDefaultSortCols, timeInterval);
     508           0 :         writeIterator_p->originChunks();
     509           0 :         writeIterator_p->origin ();
     510             : 
     511           0 :         measurementSets_p = mss;
     512             :     }
     513             : 
     514           0 : }
     515             : 
     516             : Bool
     517           0 : VLAT::isTerminated () const
     518             : {
     519           0 :     return threadTerminated_p;
     520             : }
     521             : 
     522             : void *
     523           0 : VLAT::run ()
     524             : {
     525             :     // Log thread initiation
     526             : 
     527           0 :     Logger::get()->registerName ("VLAT");
     528           0 :     String writable = writeIterator_p != NULL ? "writable" : "readonly";
     529           0 :     Log (1, "VLAT starting execution; tid=%d; VI is %s.\n", gettid(), writable.c_str());
     530             : 
     531           0 :     LogIO logIo (LogOrigin ("VLAT"));
     532           0 :     logIo << "starting execution; tid=" << gettid() << endl << LogIO::POST;
     533             : 
     534             :     // Enter run loop.  The run loop will only be exited when the main thread
     535             :     // explicitly asks for the termination of this thread (or an uncaught
     536             :     // exception comes to this level).
     537             : 
     538             : 
     539             :     try {
     540             :         do{
     541           0 :             Log (1, "VLAT starting VI sweep\n");
     542             : 
     543             :             // Start sweeping the real VI over its entire range
     544             :             // (subject to the number of free buffers).  The sweep
     545             :             // can be ended abruptly if
     546             : 
     547           0 :             sweepVi ();
     548             : 
     549           0 :             Bool startNewSweep = waitForViReset ();
     550             : 
     551           0 :             if (! startNewSweep){
     552           0 :                 break; // Not resetting so it's time to quit
     553             :             }
     554             : 
     555           0 :         } while (true);
     556             : 
     557           0 :         handleWrite (); // service any pending writes
     558             : 
     559           0 :         flushWrittenData ();
     560             : 
     561           0 :         threadTerminated_p = true;
     562           0 :         Log (1, "VLAT stopping execution.\n");
     563           0 :         logIo << "stopping execution normally; tid=" << gettid() << endl << LogIO::POST;
     564             : 
     565           0 :         return NULL;
     566             : 
     567             :     }
     568           0 :     catch (std::exception & e){
     569             : 
     570           0 :         cerr << "VLAT thread caught exception: " << e.what() << endl;
     571           0 :         cerr.flush();
     572             : 
     573           0 :         Log (1, "VLAT caught exception: %s.\n", e.what());
     574           0 :         logIo << "caught exception; tid=" << gettid() << "-->" << e.what() << endl << LogIO::POST;
     575             : 
     576           0 :         threadTerminated_p = true;
     577           0 :         throw;
     578           0 :     }
     579           0 :     catch (...){
     580             : 
     581           0 :         cerr << "VLAT thread caught unknown exception: " << endl;
     582           0 :         cerr.flush();
     583             : 
     584           0 :         Log (1, "VLAT caught unknown exception:\n");
     585           0 :         logIo << "caught unknown exception; tid=" << gettid() << endl << LogIO::POST;
     586             : 
     587           0 :         threadTerminated_p = true;
     588           0 :         throw;
     589           0 :     }
     590           0 : }
     591             : 
     592             : void
     593           0 : VLAT::setPrefetchColumns (const ViReadImplAsync::PrefetchColumns & columns)
     594             : {
     595           0 :     ThrowIf (isStarted(), "VLAT::setColumns: cannot do this after thread started");
     596           0 :     ThrowIf (! fillers_p.empty(), "VLAT::setColumns:: has already been done");
     597             : 
     598           0 :     createFillerDictionary ();
     599             : 
     600           0 :     for (ViReadImplAsync::PrefetchColumns::const_iterator c = columns.begin();
     601           0 :             c != columns.end();
     602           0 :             c ++){
     603             : 
     604           0 :         ThrowIf (! containsKey (*c, fillerDictionary_p),
     605             :                  String::format ("Unknown prefetch column id (%d)", *c));
     606             : 
     607           0 :         fillers_p.push_back (fillerDictionary_p [*c]->clone());
     608             :     }
     609             : 
     610             :     //  sort (fillers_p.begin(),
     611             :     //        fillers_p.end(),
     612             :     //        VlatFunctor::byDecreasingPrecedence);
     613           0 : }
     614             : 
     615             : void
     616           0 : VLAT::sweepVi ()
     617             : {
     618             : 
     619             :     // Configure the iterator(s) to start a new sweep through the data.
     620             :     // Reset the subchunk counters, apply any queued modifiers and if
     621             :     // the write iterator exists reset it to the chunk origin (the
     622             :     // read iterator gets reset at the start of the sweep loop).
     623             : 
     624           0 :     readSubchunk_p.resetToOrigin ();
     625           0 :     writeSubchunk_p.resetToOrigin ();
     626             : 
     627           0 :     applyModifiers (visibilityIterator_p, writeIterator_p);
     628             : 
     629           0 :     if (writeIterator_p != NULL){
     630           0 :         writeIterator_p->originChunks (true);
     631             :     }
     632             : 
     633             :     // Start sweeping the data with the read only iterator.  If there
     634             :     // is a write iterator it will write behind the RO iterator; the RW
     635             :     // iterator is advanced to align with the appropriate subchunk when
     636             :     // a request is made to write a particular subchunk.
     637             : 
     638             :     try {
     639             : 
     640           0 :         for (visibilityIterator_p->originChunks(true);
     641           0 :              visibilityIterator_p->moreChunks();
     642           0 :              visibilityIterator_p->nextChunk(), readSubchunk_p.incrementChunk ()){
     643             : 
     644           0 :             for (visibilityIterator_p->origin();
     645           0 :                  visibilityIterator_p->more();
     646           0 :                  ++ (* visibilityIterator_p), readSubchunk_p.incrementSubChunk ()){
     647             : 
     648           0 :                 ThreadTimes startTime = ThreadTimes ();
     649             : 
     650           0 :                 waitUntilFillCanStart ();
     651             : 
     652           0 :                 throwIfSweepTerminated ();
     653             : 
     654           0 :                 VlaDatum * vlaDatum = vlaData_p -> fillStart (readSubchunk_p, startTime);
     655             : 
     656           0 :                 throwIfSweepTerminated();
     657             : 
     658           0 :                 fillDatum (vlaDatum);
     659             : 
     660           0 :                 throwIfSweepTerminated ();
     661             : 
     662           0 :                 vlaData_p -> fillComplete (vlaDatum);
     663             : 
     664           0 :                 throwIfSweepTerminated ();
     665             : 
     666           0 :                 handleWrite ();
     667             :             }
     668             :         }
     669             : 
     670           0 :         Log (1, "VLAT: no more data\n");
     671             : 
     672           0 :         vlaData_p -> setNoMoreData ();
     673             :     }
     674           0 :     catch (SweepTerminated &){
     675           0 :         Log (1, "VLAT: VI sweep termination requested.\n");
     676           0 :     }
     677           0 :     catch (AipsError e){
     678           0 :         Log (1, "AipsError during sweepVi; readSubchunk=%s, writeSubChunk=%s",
     679             :              readSubchunk_p.toString().c_str(), writeSubchunk_p.toString().c_str());
     680           0 :         throw;
     681           0 :     }
     682           0 : }
     683             : 
     684             : void
     685           0 : VLAT::terminate ()
     686             : {
     687             :     // Called by another thread to terminate the VLAT.
     688             : 
     689           0 :     Log (2, "Terminating VLAT\n");
     690             :     //printBacktrace (cerr, "VLAT termination");
     691             : 
     692           0 :     Thread::terminate(); // ask thread to terminate
     693             : 
     694           0 :     interface_p->terminateLookahead (); // stop lookahead
     695           0 : }
     696             : 
     697             : void
     698           0 : VLAT::throwIfSweepTerminated ()
     699             : {
     700           0 :     if (interface_p->isSweepTerminationRequested()){
     701           0 :         throw SweepTerminated ();
     702             :     }
     703           0 : }
     704             : 
     705             : Bool
     706           0 : VLAT::waitForViReset()
     707             : {
     708           0 :     UniqueLock uniqueLock (interface_p->getMutex());
     709             : 
     710           0 :     while (! interface_p->viResetRequested () &&
     711           0 :            ! interface_p->isLookaheadTerminationRequested ()){
     712             : 
     713           0 :         handleWrite (); // process any pending write requests
     714             : 
     715             :         // Wait for the interface to change:
     716             :         //
     717             :         //   o Buffer consumed by main thread
     718             :         //   o A write was requested
     719             :         //   o A sweep or thread termination was requested
     720             : 
     721           0 :         interface_p->waitForInterfaceChange (uniqueLock);
     722             :     }
     723             : 
     724           0 :     handleWrite (); // One more time to be sure that all writes are completed before
     725             :                     // we either quit or rewind the iterator.
     726             : 
     727           0 :     if (interface_p->isLookaheadTerminationRequested ()){
     728           0 :         return false;
     729             :     }
     730             :     else{
     731             : 
     732           0 :         vlaData_p->resetBufferData ();
     733             : 
     734           0 :         roviaModifiers_p = interface_p->transferRoviaModifiers ();
     735             : 
     736           0 :         interface_p->viResetComplete ();
     737             : 
     738           0 :         return true;
     739             :     }
     740           0 : }
     741             : 
     742             : void
     743           0 : VLAT::waitUntilFillCanStart ()
     744             : {
     745           0 :     UniqueLock uniqueLock (interface_p->getMutex());
     746             : 
     747           0 :     while ( ! vlaData_p->fillCanStart () &&
     748           0 :             ! interface_p->isSweepTerminationRequested ()){
     749             : 
     750           0 :         handleWrite (); // process any pending write requests
     751             : 
     752             :         // Wait for the interface to change:
     753             :         //
     754             :         //   o Buffer consumed by main thread
     755             :         //   o A write was requested
     756             :         //   o A sweep or thread termination was requested
     757             : 
     758           0 :         interface_p->waitForInterfaceChange (uniqueLock);
     759             :     }
     760           0 : }
     761             : 
     762             : void
     763           0 : VlatFunctor::operator() (VisBuffer *)
     764             : {
     765           0 :     ThrowIf (true, "No filler is defined for this VisBuffer component: " + name_p);
     766           0 : }
     767             : 
     768             : 
     769             : } // end namespace asyncio
     770             : 
     771             : using namespace casacore;
     772             : } // end namespace casa

Generated by: LCOV version 1.16