LCOV - code coverage report
Current view: top level - msvis/MSVis - AsynchronousInterface.cc (source / functions) Hit Total Coverage
Test: casacpp_coverage.info Lines: 0 468 0.0 %
Date: 2024-12-11 20:54:31 Functions: 0 78 0.0 %

          Line data    Source code
       1             : #include "AsynchronousInterface.h"
       2             : #include "VLAT.h"
       3             : 
       4             : #include <stdcasa/thread/AsynchronousTools.h>
       5             : #include <stdcasa/UtilJ.h>
       6             : #include <casacore/casa/System/AipsrcValue.h>
       7             : #include <msvis/MSVis/VisBufferAsync.h>
       8             : #include <msvis/MSVis/VisibilityIteratorImplAsync.h>
       9             : 
      10             : #include <ostream>
      11             : #include <utility>
      12             : 
      13             : using namespace casacore;
      14             : using namespace casa::async;
      15             : using namespace casacore;
      16             : using namespace casa::utilj;
      17             : using namespace std;
      18             : 
      19             : #define Log(level, ...) \
      20             :         {if (AsynchronousInterface::logThis (level)) \
      21             :     Logger::get()->log (__VA_ARGS__);};
      22             : 
      23             : using casa::async::Mutex;
      24             : 
      25             : using namespace casacore;
      26             : namespace casa {
      27             : 
      28             : namespace asyncio {
      29             : 
      30             : Bool AsynchronousInterface::loggingInitialized_p = false;
      31             : Int AsynchronousInterface::logLevel_p = -1;
      32             : 
      33           0 : AsynchronousInterface::AsynchronousInterface (int maxNBuffers)
      34           0 : : lookaheadTerminationRequested_p (false),
      35           0 :   sweepTerminationRequested_p (false),
      36           0 :   viResetComplete_p (false),
      37           0 :   viResetRequested_p (false),
      38           0 :   vlaData_p (maxNBuffers, mutex_p),
      39           0 :   vlat_p (NULL),
      40           0 :   writeQueue_p ()
      41           0 : {}
      42             : 
      43           0 : AsynchronousInterface::~AsynchronousInterface ()
      44           0 : {}
      45             : 
      46             : void
      47           0 : AsynchronousInterface::addModifier (RoviaModifier * modifier)
      48             : {
      49           0 :     Log (1, "AsynchronousInterface::addModifier: {%s}\n", string(*modifier).c_str());
      50             : 
      51           0 :     LockGuard lg (mutex_p);
      52             : 
      53           0 :     roviaModifiers_p.add (modifier);
      54           0 : }
      55             : 
      56             : async::Mutex &
      57           0 : AsynchronousInterface::getMutex () const
      58             : {
      59           0 :     return mutex_p;
      60             : }
      61             : 
      62             : VlaData *
      63           0 : AsynchronousInterface::getVlaData ()
      64             : {
      65           0 :     return & vlaData_p;
      66             : }
      67             : 
      68             : VLAT *
      69           0 : AsynchronousInterface::getVlat ()
      70             : {
      71           0 :     return vlat_p;
      72             : }
      73             : 
      74             : WriteQueue &
      75           0 : AsynchronousInterface::getWriteQueue ()
      76             : {
      77           0 :     return writeQueue_p;
      78             : }
      79             : 
      80             : 
      81             : 
      82             : void
      83           0 : AsynchronousInterface::initialize ()
      84             : {
      85           0 :     initializeLogging ();
      86             : 
      87           0 :     vlaData_p.initialize (this);
      88             : 
      89           0 :     writeQueue_p.initialize (this);
      90             : 
      91           0 :     vlat_p = new VLAT (this);
      92           0 : }
      93             : 
      94             : Bool
      95           0 : AsynchronousInterface::initializeLogging ()
      96             : {
      97           0 :     if (loggingInitialized_p){
      98           0 :         return true;
      99             :     }
     100             : 
     101           0 :     loggingInitialized_p = true;
     102             : 
     103             :     // If the log file variable is defined then start
     104             :     // up the logger
     105             : 
     106           0 :     const String logFileVariable = "Casa_VIA_LogFile";
     107           0 :     const String logLevelVariable = "Casa_VIA_LogLevel";
     108             : 
     109           0 :     String logFilename;
     110           0 :     Bool logFileFound = AipsrcValue<String>::find (logFilename,
     111           0 :                                                    ROVisibilityIterator::getAsyncRcBase () + ".debug.logFile",
     112             :                                                    "");
     113             : 
     114           0 :     if (logFileFound &&
     115           0 :         ! logFilename.empty() &&
     116           0 :         downcase (logFilename) != "null" &&
     117           0 :         downcase (logFilename) != "none"){
     118             : 
     119           0 :         Logger::get()->start (logFilename.c_str());
     120           0 :         AipsrcValue<Int>::find (logLevel_p, ROVisibilityIterator::getAsyncRcBase () + ".debug.logLevel", 1);
     121           0 :         Logger::get()->log ("VlaData log-level is %d; async I/O: %s; nBuffers=%d\n",
     122             :                             logLevel_p,
     123           0 :                             ROVisibilityIterator::isAsynchronousIoEnabled() ? "enabled" : "disabled",
     124             :                             ViReadImplAsync::getDefaultNBuffers() );
     125             : 
     126           0 :         return true;
     127             : 
     128             :     }
     129             : 
     130           0 :     return false;
     131           0 : }
     132             : 
     133             : Bool
     134           0 : AsynchronousInterface::isLookaheadTerminationRequested () const
     135             : {
     136           0 :     return lookaheadTerminationRequested_p;
     137             : }
     138             : 
     139             : 
     140             : Bool
     141           0 : AsynchronousInterface::isSweepTerminationRequested () const
     142             : {
     143           0 :     return sweepTerminationRequested_p;
     144             : }
     145             : 
     146             : Bool
     147           0 : AsynchronousInterface::logThis (Int level)
     148             : {
     149           0 :     return loggingInitialized_p && level <= logLevel_p;
     150             : }
     151             : 
     152             : void
     153           0 : AsynchronousInterface::notifyAllInterfaceChanged () const
     154             : {
     155           0 :     interfaceDataChanged_p.notify_all();
     156           0 : }
     157             : 
     158             : void
     159           0 : AsynchronousInterface::requestViReset ()
     160             : {
     161             :     // Called by main thread to request that the VI reset to the
     162             :     // start of the MS.
     163             : 
     164           0 :     UniqueLock uniqueLock (mutex_p); // enter critical section
     165             : 
     166           0 :     Log (1, "Requesting VI reset\n");
     167             : 
     168           0 :     viResetRequested_p = true; // officially request the reset
     169           0 :     viResetComplete_p = false; // clear any previous completions
     170             : 
     171           0 :     terminateSweep ();
     172             : 
     173             :     // Wait for the request to be completed.
     174             : 
     175           0 :     Log (1, "Waiting for requesting VI reset\n");
     176             : 
     177           0 :     while (! viResetComplete_p){
     178           0 :         interfaceDataChanged_p.wait (uniqueLock);
     179             :     }
     180             : 
     181           0 :     Log (1, "Notified that VI reset has completed\n");
     182             : 
     183             :     // The VI was reset
     184           0 : }
     185             : 
     186             : 
     187             : 
     188             : 
     189             : void
     190           0 : AsynchronousInterface::terminate ()
     191             : {
     192             :     // Destroy the VLAT
     193             : 
     194           0 :     vlat_p->terminate(); // request termination
     195           0 :     vlat_p->join();      // wait for it to terminate
     196           0 :     delete vlat_p;       // free its storage
     197           0 : }
     198             : 
     199             : void
     200           0 : AsynchronousInterface::terminateLookahead ()
     201             : {
     202             :     // Called by main thread to stop the VLAT, etc.
     203             : 
     204           0 :     LockGuard lg (& mutex_p);
     205             : 
     206           0 :     lookaheadTerminationRequested_p = true;
     207             : 
     208           0 :     terminateSweep();
     209           0 : }
     210             : 
     211             : void
     212           0 : AsynchronousInterface::terminateSweep ()
     213             : {
     214             :     // Called internally to terminate VI sweeping.
     215             : 
     216           0 :     sweepTerminationRequested_p = true;   // stop filling
     217             : 
     218           0 :     notifyAllInterfaceChanged();
     219           0 : }
     220             : 
     221             : RoviaModifiers
     222           0 : AsynchronousInterface::transferRoviaModifiers ()
     223             : {
     224           0 :     return roviaModifiers_p.transferModifiers();
     225             : }
     226             : 
     227             : void
     228           0 : AsynchronousInterface::viResetComplete ()
     229             : {
     230             :     ////Assert (mutex_p.isLockedByThisThread());
     231             : 
     232           0 :     viResetRequested_p = false;
     233           0 :     sweepTerminationRequested_p = false;
     234           0 :     viResetComplete_p = true;
     235             : 
     236           0 :     notifyAllInterfaceChanged();
     237           0 : }
     238             : 
     239             : Bool
     240           0 : AsynchronousInterface::viResetRequested ()
     241             : {
     242             :     ////Assert (mutex_p.isLockedByThisThread());
     243             : 
     244           0 :     return viResetRequested_p;
     245             : }
     246             : 
     247             : void
     248           0 : AsynchronousInterface::waitForInterfaceChange (async::UniqueLock & uniqueLock) const
     249             : {
     250           0 :     interfaceDataChanged_p.wait (uniqueLock);
     251           0 : }
     252             : 
     253           0 : ChannelSelection::ChannelSelection (const Block< Vector<Int> > & blockNGroup,
     254             :                                     const Block< Vector<Int> > & blockStart,
     255             :                                     const Block< Vector<Int> > & blockWidth,
     256             :                                     const Block< Vector<Int> > & blockIncr,
     257           0 :                                     const Block< Vector<Int> > & blockSpw)
     258             : {
     259           0 :     blockNGroup_p = blockNGroup;
     260           0 :     blockStart_p = blockStart;
     261           0 :     blockWidth_p = blockWidth;
     262           0 :     blockIncr_p = blockIncr;
     263           0 :     blockSpw_p = blockSpw;
     264           0 : }
     265             : 
     266           0 : ChannelSelection::ChannelSelection (const ChannelSelection & other)
     267             : {
     268           0 :     * this = other;
     269           0 : }
     270             : 
     271             : ChannelSelection &
     272           0 : ChannelSelection::operator= (const ChannelSelection & other)
     273             : {
     274           0 :     if (this != & other){
     275             : 
     276           0 :         copyBlock (other.blockNGroup_p, blockNGroup_p);
     277           0 :         copyBlock (other.blockStart_p, blockStart_p);
     278           0 :         copyBlock (other.blockWidth_p, blockWidth_p);
     279           0 :         copyBlock (other.blockIncr_p, blockIncr_p);
     280           0 :         copyBlock (other.blockSpw_p, blockSpw_p);
     281             : 
     282             :     }
     283             : 
     284           0 :     return * this;
     285             : }
     286             : 
     287             : void
     288           0 : ChannelSelection::copyBlock (const Block <Vector<Int> > & src,
     289             :                              Block <Vector<Int> > & to) const
     290             : {
     291             :     // Since this is a Block of Vector, we need to wipe out
     292             :     // the original contents of "to"; otherwise the semantics
     293             :     // of Vector::operator= will generate an exception if there
     294             :     // is a difference in length of any of the vector elements.
     295             : 
     296           0 :     to.resize (0, true);
     297           0 :     to = src;
     298           0 : }
     299             : 
     300             : 
     301             : void
     302           0 : ChannelSelection::get (Block< Vector<Int> > & blockNGroup,
     303             :                        Block< Vector<Int> > & blockStart,
     304             :                        Block< Vector<Int> > & blockWidth,
     305             :                        Block< Vector<Int> > & blockIncr,
     306             :                        Block< Vector<Int> > & blockSpw) const
     307             : {
     308           0 :     copyBlock (blockNGroup_p, blockNGroup);
     309           0 :     copyBlock (blockStart_p, blockStart);
     310           0 :     copyBlock (blockWidth_p, blockWidth);
     311           0 :     copyBlock (blockIncr_p, blockIncr);
     312           0 :     copyBlock (blockSpw_p, blockSpw);
     313           0 : }
     314             : 
     315             : std::ostream &
     316           0 : operator<< (std::ostream & o, const RoviaModifier & m)
     317             : {
     318           0 :     m.print (o);
     319             : 
     320           0 :     return o;
     321             : }
     322             : 
     323           0 : RoviaModifiers::~RoviaModifiers ()
     324             : {
     325             : //    // Free the objects owned by the vector
     326             : //
     327             : //    for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){
     328             : //        delete (* i);
     329             : //    }
     330           0 : }
     331             : 
     332             : void
     333           0 : RoviaModifiers::add (RoviaModifier * modifier)
     334             : {
     335           0 :     data_p.push_back (modifier);
     336           0 : }
     337             : 
     338             : void
     339           0 : RoviaModifiers::apply (ROVisibilityIterator * rovi)
     340             : {
     341             :     // Free the objects owned by the vector
     342             : 
     343           0 :     for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){
     344           0 :         Log (1, "Applying vi modifier: %s\n", string(** i).c_str());
     345           0 :         (* i) -> apply (rovi);
     346             :     }
     347             : 
     348           0 : }
     349             : 
     350             : void
     351           0 : RoviaModifiers::clearAndFree ()
     352             : {
     353           0 :     for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){
     354           0 :         delete (* i);
     355             :     }
     356             : 
     357           0 :     data_p.clear();
     358           0 : }
     359             : 
     360             : RoviaModifiers
     361           0 : RoviaModifiers::transferModifiers ()
     362             : {
     363           0 :     RoviaModifiers result;
     364             : 
     365           0 :     result.data_p.assign (data_p.begin(), data_p.end());
     366             : 
     367           0 :     data_p.clear(); // remove them from the other object but do not destroy them
     368             : 
     369           0 :     return result;
     370           0 : }
     371             : 
     372           0 : SelectChannelModifier::SelectChannelModifier (Int nGroup, Int start, Int width, Int increment, Int spectralWindow)
     373           0 : : channelBlocks_p (false),
     374           0 :   increment_p (increment),
     375           0 :   nGroup_p (nGroup),
     376           0 :   spectralWindow_p (spectralWindow),
     377           0 :   start_p (start),
     378           0 :   width_p (width)
     379           0 : {}
     380             : 
     381           0 : SelectChannelModifier::SelectChannelModifier (const Block< Vector<Int> > & blockNGroup,
     382             :                                               const Block< Vector<Int> > & blockStart,
     383             :                                               const Block< Vector<Int> > & blockWidth,
     384             :                                               const Block< Vector<Int> > & blockIncr,
     385           0 :                                               const Block< Vector<Int> > & blockSpw)
     386           0 : : channelBlocks_p (true),
     387           0 :   channelSelection_p (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw)
     388           0 : {}
     389             : 
     390             : void
     391           0 : SelectChannelModifier::apply (ROVisibilityIterator * rovi) const
     392             : {
     393           0 :     if (! channelBlocks_p){
     394           0 :         rovi->selectChannel (nGroup_p, start_p, width_p, increment_p, spectralWindow_p);
     395             :     }
     396             :     else{
     397           0 :         Block< Vector<Int> > blockNGroup;
     398           0 :         Block< Vector<Int> > blockStart;
     399           0 :         Block< Vector<Int> > blockWidth;
     400           0 :         Block< Vector<Int> > blockIncr;
     401           0 :         Block< Vector<Int> > blockSpw;
     402             : 
     403           0 :         channelSelection_p.get (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
     404           0 :         rovi->selectChannel (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
     405           0 :     }
     406           0 : }
     407             : 
     408             : void
     409           0 : SelectChannelModifier::print (ostream & os) const
     410             : {
     411           0 :     os << "SelectChannel::{";
     412             : 
     413           0 :     if (channelBlocks_p){
     414           0 :         Block< Vector<Int> > blockNGroup;
     415           0 :         Block< Vector<Int> > blockStart;
     416           0 :         Block< Vector<Int> > blockWidth;
     417           0 :         Block< Vector<Int> > blockIncr;
     418           0 :         Block< Vector<Int> > blockSpw;
     419             : 
     420           0 :         channelSelection_p.get (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
     421             : 
     422           0 :         os << "nGroup=" << toCsv (blockNGroup)
     423           0 :            << ", start=" << toCsv (blockStart)
     424           0 :            << ", width=" << toCsv (blockWidth)
     425           0 :            << ", increment=" << toCsv (blockIncr)
     426           0 :            << ", spw=" << toCsv (blockSpw);
     427           0 :     }
     428             :     else {
     429           0 :         os << "nGroup=" << nGroup_p
     430           0 :            << ", start=" << start_p
     431           0 :            << ", width=" << width_p
     432           0 :            << ", increment=" << increment_p
     433           0 :            << ", spw=" << spectralWindow_p;
     434             :     }
     435           0 :     os << "}";
     436           0 : }
     437             : 
     438             : String
     439           0 : SelectChannelModifier::toCsv (const Block< Vector<Int> > & bv) const
     440             : {
     441           0 :     String result = "{";
     442             : 
     443           0 :     for (Block<Vector<Int> >::const_iterator v = bv.begin(); v != bv.end(); ++ v){
     444           0 :         if (result.size() != 1)
     445           0 :             result += ",";
     446             : 
     447           0 :         result += "{" + toCsv (* v) + "}";
     448             : 
     449             :     }
     450             : 
     451           0 :     result += "}";
     452             : 
     453           0 :     return result;
     454             : 
     455           0 : }
     456             : 
     457             : String
     458           0 : SelectChannelModifier::toCsv (const Vector<Int> & v) const
     459             : {
     460           0 :     String result = "";
     461           0 :     for (Vector<Int>::const_iterator i = v.begin(); i != v.end(); ++ i){
     462           0 :         if (! result.empty())
     463           0 :             result += ",";
     464           0 :         result +=  String::toString (* i);
     465           0 :     }
     466             : 
     467           0 :     return result;
     468           0 : }
     469             : 
     470             : 
     471           0 : SelectVelocityModifier::SelectVelocityModifier (Int nChan, const MVRadialVelocity& vStart, const MVRadialVelocity& vInc,
     472           0 :                                                 MRadialVelocity::Types rvType, MDoppler::Types dType, Bool precise)
     473             : 
     474           0 : : dType_p (dType),
     475           0 :   nChan_p (nChan),
     476           0 :   precise_p (precise),
     477           0 :   rvType_p (rvType),
     478           0 :   vInc_p (vInc),
     479           0 :   vStart_p (vStart)
     480           0 : {}
     481             : 
     482             : void
     483           0 : SelectVelocityModifier::apply (ROVisibilityIterator * rovi) const
     484             : {
     485           0 :     rovi-> selectVelocity (nChan_p, vStart_p, vInc_p, rvType_p, dType_p, precise_p);
     486           0 : }
     487             : 
     488             : void
     489           0 : SelectVelocityModifier::print (std::ostream & os) const
     490             : {
     491             :     os << "SelectVelocity::{"
     492             : 
     493           0 :        << "dType=" << dType_p
     494           0 :        << ",nChan=" << nChan_p
     495           0 :        << ",precise=" << precise_p
     496           0 :        << ",rvType=" << rvType_p
     497           0 :        << ",vInc=" << vInc_p
     498           0 :        << ",vStart=" << vStart_p
     499           0 :        << "}";
     500           0 : }
     501             : 
     502           0 : SetIntervalModifier::SetIntervalModifier (Double timeInterval)
     503           0 : : timeInterval_p (timeInterval)
     504           0 : {}
     505             : 
     506             : void
     507           0 : SetIntervalModifier::apply (ROVisibilityIterator * rovi) const
     508             : {
     509           0 :     rovi -> setInterval (timeInterval_p);
     510           0 : }
     511             : 
     512             : void
     513           0 : SetIntervalModifier::print (std::ostream & os) const
     514             : {
     515           0 :     os << "SetInterval::{" << timeInterval_p << "}";
     516           0 : }
     517             : 
     518             : 
     519             : 
     520           0 : SetRowBlockingModifier::SetRowBlockingModifier (Int nRows)
     521           0 : : nRows_p (nRows)
     522           0 : {}
     523             : 
     524             : void
     525           0 : SetRowBlockingModifier::apply (ROVisibilityIterator * rovi) const
     526             : {
     527           0 :     rovi->setRowBlocking (nRows_p);
     528           0 : }
     529             : 
     530             : void
     531           0 : SetRowBlockingModifier::print (std::ostream & os) const
     532             : {
     533             :     os << "SetRowBlocking::{"
     534           0 :        << "nRows=" << nRows_p
     535           0 :        << ",nGroup=" << nGroup_p
     536           0 :        << ",spectralWindow=" << spectralWindow_p
     537           0 :        << ",start=" << start_p
     538           0 :        << ",width=" << width_p
     539           0 :        << "}";
     540           0 : }
     541             : 
     542             : 
     543             : //  **************************
     544             : //  *                        *
     545             : //  * VlaData Implementation *
     546             : //  *                        *
     547             : //  **************************
     548             : 
     549             : //Semaphore VlaData::debugBlockSemaphore_p (0); // used to block a thread for debugging
     550             : 
     551           0 : VlaData::VlaData (Int maxNBuffers, async::Mutex & mutex)
     552           0 : : MaxNBuffers_p (maxNBuffers),
     553           0 :   mutex_p (mutex)
     554             : {
     555           0 :     timing_p.fillCycle_p = DeltaThreadTimes (true);
     556           0 :     timing_p.fillOperate_p = DeltaThreadTimes (true);
     557           0 :     timing_p.fillWait_p = DeltaThreadTimes (true);
     558           0 :     timing_p.readCycle_p = DeltaThreadTimes (true);
     559           0 :     timing_p.readOperate_p = DeltaThreadTimes (true);
     560           0 :     timing_p.readWait_p = DeltaThreadTimes (true);
     561           0 :     timing_p.timeStart_p = ThreadTimes();
     562           0 : }
     563             : 
     564           0 : VlaData::~VlaData ()
     565             : {
     566           0 :     timing_p.timeStop_p = ThreadTimes();
     567             : 
     568           0 :     if (statsEnabled()){
     569           0 :         Log (1, "VlaData stats:\n%s", makeReport ().c_str());
     570             :     }
     571             : 
     572           0 :     resetBufferData ();
     573           0 : }
     574             : 
     575             : 
     576             : Int
     577           0 : VlaData::clock (Int arg, Int base)
     578             : {
     579           0 :     Int r = arg % base;
     580             : 
     581           0 :     if (r < 0){
     582           0 :         r += base;
     583             :     }
     584             : 
     585           0 :     return r;
     586             : }
     587             : 
     588             : //void
     589             : //VlaData::debugBlock ()
     590             : //{
     591             : //    //    Log (1, "VlaData::debugBlock(): Blocked\n");
     592             : //    //
     593             : //    //    debugBlockSemaphore_p.wait ();
     594             : //    //
     595             : //    //    Log (1, "VlaData::debugBlock(): Unblocked\n");
     596             : //}
     597             : 
     598             : //void
     599             : //VlaData::debugUnblock ()
     600             : //{
     601             : //    //    int v = debugBlockSemaphore_p.getValue();
     602             : //    //
     603             : //    //    if (v == 0){
     604             : //    //        Log (1, "VlaData::debugUnblock()\n");
     605             : //    //        debugBlockSemaphore_p.post ();
     606             : //    //    }
     607             : //    //    else
     608             : //    //        Log (1, "VlaData::debugUnblock(): already unblocked; v=%d\n", v);
     609             : //}
     610             : 
     611             : 
     612             : void
     613           0 : VlaData::fillComplete (VlaDatum * datum)
     614             : {
     615           0 :     LockGuard lg (mutex_p);
     616             : 
     617           0 :     if (statsEnabled()){
     618           0 :         timing_p.fill3_p = ThreadTimes();
     619           0 :         timing_p.fillWait_p += timing_p.fill2_p - timing_p.fill1_p;
     620           0 :         timing_p.fillOperate_p += timing_p.fill3_p - timing_p.fill2_p;
     621           0 :         timing_p.fillCycle_p += timing_p.fill3_p - timing_p.fill1_p;
     622             :     }
     623             : 
     624           0 :     data_p.push (datum);
     625             : 
     626           0 :     Log (2, "VlaData::fillComplete on %s\n", datum->getSubChunkPair ().toString().c_str());
     627             : 
     628           0 :     assert ((Int)data_p.size() <= MaxNBuffers_p);
     629             : 
     630           0 :     interface_p->notifyAllInterfaceChanged();
     631           0 : }
     632             : 
     633             : Bool
     634           0 : VlaData::fillCanStart () const
     635             : {
     636             :     // Caller must lock
     637             : 
     638           0 :     Bool canStart = (int) data_p.size() < MaxNBuffers_p;
     639             : 
     640           0 :     return canStart;
     641             : }
     642             : 
     643             : 
     644             : VlaDatum *
     645           0 : VlaData::fillStart (SubChunkPair subchunk, const ThreadTimes & fillStartTime)
     646             : {
     647           0 :     LockGuard lg (mutex_p);
     648             : 
     649           0 :     statsEnabled () && (timing_p.fill1_p = fillStartTime, true);
     650             : 
     651           0 :     Assert ((int) data_p.size() < MaxNBuffers_p);
     652             : 
     653           0 :     VlaDatum * datum = new VlaDatum (subchunk);
     654             : 
     655           0 :     Log (2, "VlaData::fillStart on %s\n", datum->getSubChunkPair().toString().c_str());
     656             : 
     657           0 :     if (validChunks_p.empty() || validChunks_p.back() != subchunk.chunk ())
     658           0 :         insertValidChunk (subchunk.chunk ());
     659             : 
     660           0 :     insertValidSubChunk (subchunk);
     661             : 
     662           0 :     statsEnabled () && (timing_p.fill2_p = ThreadTimes(), true);
     663             : 
     664           0 :     if (interface_p->isSweepTerminationRequested()){
     665           0 :         delete datum;
     666           0 :         datum = NULL; // datum may not be ready to fill and shouldn't be anyway
     667             :     }
     668             : 
     669           0 :     return datum;
     670           0 : }
     671             : 
     672             : asyncio::ChannelSelection
     673           0 : VlaData::getChannelSelection () const
     674             : {
     675           0 :     LockGuard lg (mutex_p);
     676             : 
     677           0 :     return channelSelection_p;
     678           0 : }
     679             : 
     680             : void
     681           0 : VlaData::initialize (const AsynchronousInterface * interface)
     682             : {
     683           0 :     interface_p = interface;
     684             : 
     685           0 :     LockGuard lg (mutex_p);
     686             : 
     687           0 :     resetBufferData ();
     688           0 : }
     689             : 
     690             : 
     691             : void
     692           0 : VlaData::insertValidChunk (Int chunkNumber)
     693             : {
     694             :     ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex.
     695             : 
     696           0 :     validChunks_p.push (chunkNumber);
     697             : 
     698           0 :     interface_p->notifyAllInterfaceChanged();
     699           0 : }
     700             : 
     701             : void
     702           0 : VlaData::insertValidSubChunk (SubChunkPair subchunk)
     703             : {
     704             :     ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex.
     705             : 
     706           0 :     validSubChunks_p.push (subchunk);
     707             : 
     708           0 :     interface_p->notifyAllInterfaceChanged();
     709           0 : }
     710             : 
     711             : //Bool
     712             : //VlaData::isSweepTerminationRequested () const
     713             : //{
     714             : //    return sweepTerminationRequested_p;
     715             : //}
     716             : 
     717             : Bool
     718           0 : VlaData::isValidChunk (Int chunkNumber) const
     719             : {
     720           0 :     bool validChunk = false;
     721             : 
     722             :     // Check to see if this is a valid chunk.  If the data structure is empty
     723             :     // then sleep for a tiny bit to allow the VLAT thread to either make more
     724             :     // chunks available for insert the sentinel value INT_MAX into the data
     725             :     // structure.
     726             : 
     727           0 :     UniqueLock uniqueLock (mutex_p);
     728             : 
     729             :     do {
     730             : 
     731           0 :         while (validChunks_p.empty()){
     732           0 :             interface_p->waitForInterfaceChange (uniqueLock);
     733             :         }
     734             : 
     735           0 :         while (! validChunks_p.empty() && validChunks_p.front() < chunkNumber){
     736           0 :             validChunks_p.pop();
     737             :         }
     738             : 
     739           0 :         if (! validChunks_p.empty())
     740           0 :             validChunk = validChunks_p.front() == chunkNumber;
     741             : 
     742           0 :     } while (validChunks_p.empty());
     743             : 
     744           0 :     Log (3, "isValidChunk (%d) --> %s\n", chunkNumber, validChunk ? "true" : "false");
     745             : 
     746           0 :     return validChunk;
     747           0 : }
     748             : 
     749             : Bool
     750           0 : VlaData::isValidSubChunk (SubChunkPair subchunk) const
     751             : {
     752           0 :     SubChunkPair s;
     753             : 
     754           0 :     bool validSubChunk = false;
     755             : 
     756             :     // Check to see if this is a valid subchunk.  If the data structure is empty
     757             :     // then sleep for a tiny bit to allow the VLAT thread to either make more
     758             :     // subchunks available for insert the sentinel value (INT_MAX, INT_MAX) into the data
     759             :     // structure.
     760             : 
     761           0 :     UniqueLock uniqueLock (mutex_p);
     762             : 
     763             :     do {
     764             : 
     765           0 :         while (validSubChunks_p.empty()){
     766           0 :             interface_p->waitForInterfaceChange (uniqueLock);
     767             :         }
     768             : 
     769           0 :         while (! validSubChunks_p.empty() && validSubChunks_p.front() < subchunk){
     770           0 :             validSubChunks_p.pop();
     771             :         }
     772             : 
     773           0 :         if (! validSubChunks_p.empty())
     774           0 :             validSubChunk = validSubChunks_p.front() == subchunk;
     775             : 
     776           0 :     } while (validSubChunks_p.empty());
     777             : 
     778           0 :     Log (3, "isValidSubChunk %s --> %s\n", subchunk.toString().c_str(), validSubChunk ? "true" : "false");
     779             : 
     780           0 :     return validSubChunk;
     781           0 : }
     782             : 
     783             : String
     784           0 : VlaData::makeReport ()
     785             : {
     786           0 :     String report;
     787             : 
     788           0 :     DeltaThreadTimes duration = (timing_p.timeStop_p - timing_p.timeStart_p); // seconds
     789           0 :     report += String::format ("\nLookahead Stats: nCycles=%d, duration=%.3f sec\n...\n",
     790           0 :                       timing_p.readWait_p.n(), duration.elapsed());
     791           0 :     report += "...ReadWait:    " + timing_p.readWait_p.formatAverage () + "\n";
     792           0 :     report += "...ReadOperate: " + timing_p.readOperate_p.formatAverage() + "\n";
     793           0 :     report += "...ReadCycle:   " + timing_p.readCycle_p.formatAverage() + "\n";
     794             : 
     795           0 :     report += "...FillWait:    " + timing_p.fillWait_p.formatAverage() + "\n";
     796           0 :     report += "...FillOperate: " + timing_p.fillOperate_p.formatAverage () + "\n";
     797           0 :     report += "...FillCycle:   " + timing_p.fillCycle_p.formatAverage () + "\n";
     798             : 
     799           0 :     Double syncCycle = timing_p.fillOperate_p.elapsedAvg() + timing_p.readOperate_p.elapsedAvg();
     800           0 :     Double asyncCycle = max (timing_p.fillCycle_p.elapsedAvg(), timing_p.readCycle_p.elapsedAvg());
     801           0 :     report += String::format ("...Sync cycle would be %6.1f ms\n", syncCycle * 1000);
     802           0 :     report += String::format ("...Speedup is %5.1f%%\n", (syncCycle / asyncCycle  - 1) * 100);
     803           0 :     report += String::format ("...Total time savings estimate is %7.3f seconds\n",
     804           0 :                       (syncCycle - asyncCycle) * timing_p.readWait_p.n());
     805             : 
     806           0 :     return report;
     807             : 
     808           0 : }
     809             : 
     810             : 
     811             : void
     812           0 : VlaData::readComplete (SubChunkPair subchunk)
     813             : {
     814           0 :     LockGuard lg (mutex_p);
     815             : 
     816           0 :     if (statsEnabled()){
     817           0 :         timing_p.read3_p = ThreadTimes();
     818           0 :         timing_p.readWait_p += timing_p.read2_p - timing_p.read1_p;
     819           0 :         timing_p.readOperate_p += timing_p.read3_p - timing_p.read2_p;
     820           0 :         timing_p.readCycle_p += timing_p.read3_p - timing_p.read1_p;
     821             :     }
     822             : 
     823           0 :     Log (2, "VlaData::readComplete on %s\n", subchunk.toString().c_str());
     824           0 : }
     825             : 
     826             : VisBufferAsync *
     827           0 : VlaData::readStart (SubChunkPair subchunk)
     828             : {
     829             :     // Called by main thread
     830             : 
     831           0 :     UniqueLock uniqueLock (mutex_p);
     832             : 
     833           0 :     statsEnabled () && (timing_p.read1_p = ThreadTimes(), true);
     834             : 
     835             :     // Wait for a subchunk's worth of data to be available.
     836             : 
     837           0 :     while (data_p.empty()){
     838           0 :         interface_p->waitForInterfaceChange (uniqueLock);
     839             :     }
     840             : 
     841             :     // Get the data off the queue and notify world of change in VlaData.
     842             : 
     843           0 :     VlaDatum * datum = data_p.front();
     844           0 :     data_p.pop ();
     845           0 :     interface_p->notifyAllInterfaceChanged();
     846             : 
     847           0 :     ThrowIf (! datum->isSubChunk (subchunk),
     848             :              String::format ("Reader wanted subchunk %s while next subchunk is %s",
     849             :                             subchunk.toString().c_str(), datum->getSubChunkPair().toString().c_str()));
     850             : 
     851           0 :     Log (2, "VlaData::readStart on %s\n", subchunk.toString().c_str());
     852             : 
     853           0 :     statsEnabled () && (timing_p.read2_p = ThreadTimes(), true);
     854             : 
     855             :     // Extract the VisBufferAsync enclosed in the datum for return to caller,
     856             :     // then destroy the rest of the datum object
     857             : 
     858           0 :     VisBufferAsync * vba = datum->releaseVisBufferAsync ();
     859           0 :     delete datum;
     860           0 :     return vba;
     861           0 : }
     862             : 
     863             : void
     864           0 : VlaData::resetBufferData ()
     865             : {
     866             :     ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex.
     867             : 
     868             :     // Flush any accumulated buffers
     869             : 
     870           0 :     while (! data_p.empty()){
     871           0 :         VlaDatum * datum = data_p.front();
     872           0 :         data_p.pop ();
     873           0 :         delete datum;
     874             :     }
     875             : 
     876             :     // Flush the chunk and subchunk indices
     877             : 
     878           0 :     while (! validChunks_p.empty())
     879           0 :         validChunks_p.pop();
     880             : 
     881           0 :     while (! validSubChunks_p.empty())
     882           0 :         validSubChunks_p.pop();
     883           0 : }
     884             : 
     885             : void
     886           0 : VlaData::setNoMoreData ()
     887             : {
     888           0 :     LockGuard lg (mutex_p);
     889             : 
     890           0 :     insertValidChunk (INT_MAX);
     891           0 :     insertValidSubChunk (SubChunkPair::noMoreData ());
     892           0 : }
     893             : 
     894             : Bool
     895           0 : VlaData::statsEnabled () const
     896             : {
     897             :     // Determines whether asynchronous I/O is enabled by looking for the
     898             :     // expected AipsRc value.  If not found then async i/o is disabled.
     899             : 
     900             :     Bool doStats;
     901           0 :     AipsrcValue<Bool>::find (doStats, ROVisibilityIterator::getAsyncRcBase () + ".doStats", false);
     902             : 
     903           0 :     return doStats;
     904             : }
     905             : 
     906             : void
     907           0 : VlaData::storeChannelSelection (const asyncio::ChannelSelection & channelSelection)
     908             : {
     909           0 :     LockGuard lg (mutex_p);
     910             : 
     911           0 :     channelSelection_p = channelSelection;
     912           0 : }
     913             : 
     914             : 
     915             : 
     916             : //  ***************************
     917             : //  *                         *
     918             : //  * VlaDatum Implementation *
     919             : //  *                         *
     920             : //  ***************************
     921             : 
     922           0 : VlaDatum::VlaDatum (SubChunkPair subchunk)
     923           0 : : subchunk_p (subchunk),
     924           0 :   visBuffer_p (new VisBufferAsync ())
     925           0 : {}
     926             : 
     927           0 : VlaDatum::~VlaDatum()
     928             : {
     929           0 :     delete visBuffer_p;
     930           0 : }
     931             : 
     932             : SubChunkPair
     933           0 : VlaDatum::getSubChunkPair () const
     934             : {
     935           0 :     return subchunk_p;
     936             : }
     937             : 
     938             : VisBufferAsync *
     939           0 : VlaDatum::getVisBuffer ()
     940             : {
     941           0 :     return visBuffer_p;
     942             : }
     943             : 
     944             : //const VisBufferAsync *
     945             : //VlaDatum::getVisBuffer () const
     946             : //{
     947             : //    assert (state_p == Filling || state_p == Reading);
     948             : //
     949             : //    return visBuffer_p;
     950             : //}
     951             : 
     952             : Bool
     953           0 : VlaDatum::isSubChunk (SubChunkPair subchunk) const
     954             : {
     955           0 :     return subchunk == subchunk_p;
     956             : }
     957             : 
     958             : VisBufferAsync *
     959           0 : VlaDatum::releaseVisBufferAsync ()
     960             : {
     961           0 :     VisBufferAsync * vba = visBuffer_p;
     962           0 :     visBuffer_p = NULL;
     963             : 
     964           0 :     return vba;
     965             : }
     966             : 
     967           0 : WriteQueue::WriteQueue ()
     968           0 : : interface_p (NULL)
     969           0 : {}
     970             : 
     971           0 : WriteQueue::~WriteQueue ()
     972             : {
     973           0 :     Assert (queue_p.empty());
     974           0 : }
     975             : 
     976             : WriteData *
     977           0 : WriteQueue::dequeue ()
     978             : {
     979           0 :     LockGuard lg (mutex_p);
     980             : 
     981           0 :     WriteData * result = NULL;
     982             : 
     983           0 :     if (! empty (true)){
     984             : 
     985           0 :         result = queue_p.front(); // get the first value
     986           0 :         queue_p.pop();            // remove it from the queue
     987             :     }
     988             : 
     989           0 :     return result;
     990           0 : }
     991             : 
     992             : Bool
     993           0 : WriteQueue::empty (Bool alreadyLocked)
     994             : {
     995             :     Bool isEmpty;
     996             : 
     997           0 :     if (alreadyLocked){
     998           0 :         isEmpty = queue_p.empty();
     999             :     }
    1000             :     else {
    1001           0 :         LockGuard lg (mutex_p);
    1002           0 :         isEmpty = queue_p.empty();
    1003           0 :     }
    1004             : 
    1005           0 :     return isEmpty;
    1006             : }
    1007             : 
    1008             : void
    1009           0 : WriteQueue::enqueue (WriteData * writeData)
    1010             : {
    1011           0 :     Assert (writeData != NULL);
    1012             : 
    1013           0 :     LockGuard lg (mutex_p);
    1014             : 
    1015           0 :     queue_p.push (writeData);
    1016             : 
    1017           0 :     interface_p->notifyAllInterfaceChanged ();
    1018           0 : }
    1019             : 
    1020             : void
    1021           0 : WriteQueue::initialize (const AsynchronousInterface * interface)
    1022             : {
    1023           0 :     interface_p = interface;
    1024           0 : }
    1025             : 
    1026             : } // end namespace asyncio
    1027             : 
    1028             : using namespace casacore;
    1029             : } // end namespace casa

Generated by: LCOV version 1.16