Line data Source code
1 : //# VLAT.cc: Implemenation of visibility lookahead thread related functionality.
2 : //# Copyright (C) 2011
3 : //# Associated Universities, Inc. Washington DC, USA.
4 : //#
5 : //# This library is free software; you can redistribute it and/or modify it
6 : //# under the terms of the GNU Library General Public License as published by
7 : //# the Free Software Foundation; either version 2 of the License, or (at your
8 : //# option) any later version.
9 : //#
10 : //# This library is distributed in the hope that it will be useful, but WITHOUT
11 : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 : //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13 : //# License for more details.
14 : //#
15 : //# You should have received a copy of the GNU Library General Public License
16 : //# along with this library; if not, write to the Free Software Foundation,
17 : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
18 : //#
19 : //# Correspondence concerning CASA should be addressed as follows:
20 : //# Internet email: CASA-request@nrao.edu.
21 : //# Postal address: CASA Project Office
22 : //# National Radio Astronomy Observatory
23 : //# 520 Edgemont Road
24 : //# Charlottesville, VA 22903-2475 USA
25 : //#
26 : //#
27 : //# $Id$
28 :
29 : #include <assert.h>
30 : #include <time.h>
31 : #include <sys/time.h>
32 :
33 : #include <casacore/casa/Logging/LogIO.h>
34 : #include <casacore/casa/System/AipsrcValue.h>
35 : #include <msvis/MSVis/VLAT.h>
36 : #include <msvis/MSVis/VisBufferAsync.h>
37 :
38 : #include <stdcasa/thread/AsynchronousTools.h>
39 : using namespace casacore;
40 : using namespace casa::async;
41 :
42 : #include <algorithm>
43 : #include <cstdarg>
44 : #include <functional>
45 :
46 : #include <stdcasa/UtilJ.h>
47 :
48 : using namespace casacore;
49 : using namespace casa::utilj;
50 : using namespace std;
51 : using namespace casacore;
52 : using namespace casa::asyncio;
53 :
54 : #define Log(level, ...) \
55 : {if (AsynchronousInterface::logThis (level)) \
56 : Logger::get()->log (__VA_ARGS__);};
57 :
58 : using namespace casacore;
59 : namespace casa {
60 :
61 : namespace asyncio {
62 :
63 : // *****************************
64 : // * *
65 : // * VlatAndDataImplementation *
66 : // * *
67 : // *****************************
68 :
69 0 : VlatAndData::VlatAndData ()
70 0 : : vlaData_p (NULL),
71 0 : vlat_p (NULL)
72 : {
73 0 : }
74 :
75 : // ***********************
76 : // * *
77 : // * VLAT Implementation *
78 : // * *
79 : // ***********************
80 :
81 0 : VLAT::VLAT (AsynchronousInterface * asynchronousInterface)
82 : {
83 0 : interface_p = asynchronousInterface;
84 0 : vlaData_p = interface_p->getVlaData();
85 0 : visibilityIterator_p = NULL;
86 0 : writeIterator_p = NULL;
87 0 : threadTerminated_p = false;
88 0 : }
89 :
90 0 : VLAT::~VLAT ()
91 : {
92 :
93 : // Free up storage
94 :
95 0 : for (FillerDictionary::iterator f = fillerDictionary_p.begin();
96 0 : f != fillerDictionary_p.end();
97 0 : f++){
98 0 : delete (f->second);
99 : }
100 :
101 0 : for (Fillers::iterator f = fillers_p.begin();
102 0 : f != fillers_p.end();
103 0 : f++){
104 0 : delete (* f);
105 : }
106 :
107 0 : delete visibilityIterator_p;
108 0 : delete writeIterator_p;
109 0 : }
110 :
111 : void
112 0 : VLAT::alignWriteIterator (SubChunkPair subchunk)
113 : {
114 0 : Assert (subchunk <= readSubchunk_p);
115 :
116 0 : Bool done = false;
117 :
118 0 : while (subchunk > writeSubchunk_p && ! done){
119 :
120 0 : ++ (* writeIterator_p); // advance to next subchunk in chunk
121 :
122 0 : if (writeIterator_p->more()){
123 :
124 : // Sucessfully moved on to next subchunk
125 :
126 0 : writeSubchunk_p.incrementSubChunk();
127 : }
128 : else{
129 :
130 : // End of subchunks to advance to start of next chunk
131 :
132 0 : writeIterator_p->nextChunk ();
133 :
134 0 : if (writeIterator_p->moreChunks ()){
135 :
136 : // Moved on to next chunk; position to first subchunk
137 :
138 0 : writeIterator_p->origin ();
139 :
140 0 : if (writeIterator_p->more()){
141 0 : writeSubchunk_p.incrementChunk();
142 : }
143 : else{
144 0 : done = true; // no more data
145 : }
146 : }
147 : else{
148 0 : done = true; // no more data
149 : }
150 : }
151 : }
152 :
153 0 : ThrowIf (subchunk != writeSubchunk_p,
154 : String::format ("Failed to advance write iterator to subchunk %s; last subchunk is %s",
155 : subchunk.toString().c_str(), writeSubchunk_p.toString().c_str()));
156 0 : }
157 :
158 : void
159 0 : VLAT::applyModifiers (ROVisibilityIterator * rovi, VisibilityIterator * vi)
160 : {
161 : // Apply modifiers to read iterator and to write iterator (if it exists)
162 :
163 0 : roviaModifiers_p.apply (rovi);
164 :
165 0 : if (vi != NULL){
166 0 : roviaModifiers_p.apply (vi);
167 : }
168 :
169 : // Get the channel selection information from the modified read VI and provide it to the
170 : // data object
171 :
172 0 : roviaModifiers_p.clearAndFree ();
173 :
174 0 : Block< Vector<Int> > blockNGroup;
175 0 : Block< Vector<Int> > blockStart;
176 0 : Block< Vector<Int> > blockWidth;
177 0 : Block< Vector<Int> > blockIncr;
178 0 : Block< Vector<Int> > blockSpw;
179 :
180 0 : rovi->getChannelSelection (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
181 :
182 0 : vlaData_p -> storeChannelSelection (asyncio::ChannelSelection (blockNGroup, blockStart,
183 : blockWidth, blockIncr, blockSpw));
184 0 : }
185 :
186 : void
187 0 : VLAT::checkFiller (VisBufferComponents::EnumType fillerId)
188 : {
189 0 : ThrowIf (! visibilityIterator_p -> existsColumn (fillerId),
190 : String::format ("VLAT: Column to be prefetched, %s, does not exist!",
191 : PrefetchColumns::columnName (fillerId).c_str()));
192 0 : }
193 :
194 : void
195 0 : VLAT::createFillerDictionary ()
196 : {
197 : // Create a dictionary of all the possible fillers using the
198 : // ViReadImplAsync::PrefetchColumnIds as the keys
199 :
200 0 : fillerDictionary_p.clear();
201 :
202 0 : fillerDictionary_p.add (VisBufferComponents::AllBeamOffsetsZero,
203 0 : vlatFunctor0 (& VisBufferAsync::fillAllBeamOffsetsZero));
204 0 : fillerDictionary_p.add (VisBufferComponents::AntennaMounts,
205 0 : vlatFunctor0 (& VisBufferAsync::fillAntennaMounts));
206 0 : fillerDictionary_p.add (VisBufferComponents::Ant1,
207 0 : vlatFunctor0 (& VisBuffer::fillAnt1));
208 0 : fillerDictionary_p.add (VisBufferComponents::Ant2,
209 0 : vlatFunctor0 (& VisBuffer::fillAnt2));
210 0 : fillerDictionary_p.add (VisBufferComponents::ArrayId,
211 0 : vlatFunctor0 (& VisBuffer::fillArrayId));
212 0 : fillerDictionary_p.add (VisBufferComponents::BeamOffsets,
213 0 : vlatFunctor0 (& VisBufferAsync::fillBeamOffsets));
214 0 : fillerDictionary_p.add (VisBufferComponents::Channel,
215 0 : vlatFunctor0 (& VisBuffer::fillChannel));
216 0 : fillerDictionary_p.add (VisBufferComponents::Cjones,
217 0 : vlatFunctor0 (& VisBuffer::fillCjones));
218 0 : fillerDictionary_p.add (VisBufferComponents::CorrType,
219 0 : vlatFunctor0 (& VisBuffer::fillCorrType));
220 0 : fillerDictionary_p.add (VisBufferComponents::Corrected,
221 0 : vlatFunctor1(& VisBuffer::fillVis,
222 : VisibilityIterator::Corrected));
223 0 : fillerDictionary_p.add (VisBufferComponents::CorrectedCube,
224 0 : vlatFunctor1(& VisBuffer::fillVisCube,
225 : VisibilityIterator::Corrected));
226 0 : fillerDictionary_p.add (VisBufferComponents::DataDescriptionId,
227 0 : vlatFunctor0 (& VisBuffer::fillDataDescriptionId));
228 : // fillerDictionary_p.add (VisBufferComponents::Direction1,
229 : // vlatFunctor0 (& VisBuffer::fillDirection1));
230 : // fillerDictionary_p.add (VisBufferComponents::Direction2,
231 : // vlatFunctor0 (& VisBuffer::fillDirection2));
232 0 : fillerDictionary_p.add (VisBufferComponents::Exposure,
233 0 : vlatFunctor0 (& VisBuffer::fillExposure));
234 0 : fillerDictionary_p.add (VisBufferComponents::Feed1,
235 0 : vlatFunctor0 (& VisBuffer::fillFeed1));
236 : // fillerDictionary_p.add (VisBufferComponents::Feed1_pa,
237 : // vlatFunctor0 (& VisBuffer::fillFeed1_pa));
238 0 : fillerDictionary_p.add (VisBufferComponents::Feed2,
239 0 : vlatFunctor0 (& VisBuffer::fillFeed2));
240 : // fillerDictionary_p.add (VisBufferComponents::Feed2_pa,
241 : // vlatFunctor0 (& VisBuffer::fillFeed2_pa));
242 0 : fillerDictionary_p.add (VisBufferComponents::FieldId,
243 0 : vlatFunctor0 (& VisBuffer::fillFieldId));
244 0 : fillerDictionary_p.add (VisBufferComponents::Flag,
245 0 : vlatFunctor0 (& VisBuffer::fillFlag));
246 0 : fillerDictionary_p.add (VisBufferComponents::FlagCategory,
247 0 : vlatFunctor0 (& VisBuffer::fillFlagCategory));
248 0 : fillerDictionary_p.add (VisBufferComponents::FlagCube,
249 0 : vlatFunctor0 (& VisBuffer::fillFlagCube));
250 0 : fillerDictionary_p.add (VisBufferComponents::FlagRow,
251 0 : vlatFunctor0 (& VisBuffer::fillFlagRow));
252 0 : fillerDictionary_p.add (VisBufferComponents::Freq,
253 0 : vlatFunctor0 (& VisBuffer::fillFreq));
254 0 : fillerDictionary_p.add (VisBufferComponents::ImagingWeight,
255 0 : new VlatFunctor ("ImagingWeight")); // do not fill this one
256 0 : fillerDictionary_p.add (VisBufferComponents::Model,
257 0 : vlatFunctor1(& VisBuffer::fillVis,
258 : VisibilityIterator::Model));
259 0 : fillerDictionary_p.add (VisBufferComponents::ModelCube,
260 0 : vlatFunctor1(& VisBuffer::fillVisCube,
261 : VisibilityIterator::Model));
262 0 : fillerDictionary_p.add (VisBufferComponents::NChannel,
263 0 : vlatFunctor0 (& VisBuffer::fillnChannel));
264 0 : fillerDictionary_p.add (VisBufferComponents::NCorr,
265 0 : vlatFunctor0 (& VisBuffer::fillnCorr));
266 0 : fillerDictionary_p.add (VisBufferComponents::NRow,
267 0 : vlatFunctor0 (& VisBuffer::fillnRow));
268 0 : fillerDictionary_p.add (VisBufferComponents::ObservationId,
269 0 : vlatFunctor0 (& VisBuffer::fillObservationId));
270 0 : fillerDictionary_p.add (VisBufferComponents::Observed,
271 0 : vlatFunctor1(& VisBuffer::fillVis,
272 : VisibilityIterator::Observed));
273 0 : fillerDictionary_p.add (VisBufferComponents::ObservedCube,
274 0 : vlatFunctor1(& VisBuffer::fillVisCube,
275 : VisibilityIterator::Observed));
276 0 : fillerDictionary_p.add (VisBufferComponents::PhaseCenter,
277 0 : vlatFunctor0 (& VisBuffer::fillPhaseCenter));
278 0 : fillerDictionary_p.add (VisBufferComponents::PolFrame,
279 0 : vlatFunctor0 (& VisBuffer::fillPolFrame));
280 0 : fillerDictionary_p.add (VisBufferComponents::ProcessorId,
281 0 : vlatFunctor0 (& VisBuffer::fillProcessorId));
282 0 : fillerDictionary_p.add (VisBufferComponents::ReceptorAngles,
283 0 : vlatFunctor0 (& VisBufferAsync::fillReceptorAngles));
284 0 : fillerDictionary_p.add (VisBufferComponents::Scan,
285 0 : vlatFunctor0 (& VisBuffer::fillScan));
286 0 : fillerDictionary_p.add (VisBufferComponents::Sigma,
287 0 : vlatFunctor0 (& VisBuffer::fillSigma));
288 0 : fillerDictionary_p.add (VisBufferComponents::SigmaMat,
289 0 : vlatFunctor0 (& VisBuffer::fillSigmaMat));
290 0 : fillerDictionary_p.add (VisBufferComponents::SpW,
291 0 : vlatFunctor0 (& VisBuffer::fillSpW));
292 0 : fillerDictionary_p.add (VisBufferComponents::StateId,
293 0 : vlatFunctor0 (& VisBuffer::fillStateId));
294 0 : fillerDictionary_p.add (VisBufferComponents::Time,
295 0 : vlatFunctor0 (& VisBuffer::fillTime));
296 0 : fillerDictionary_p.add (VisBufferComponents::TimeCentroid,
297 0 : vlatFunctor0 (& VisBuffer::fillTimeCentroid));
298 0 : fillerDictionary_p.add (VisBufferComponents::TimeInterval,
299 0 : vlatFunctor0 (& VisBuffer::fillTimeInterval));
300 0 : fillerDictionary_p.add (VisBufferComponents::Uvw,
301 0 : vlatFunctor0 (& VisBuffer::filluvw));
302 0 : fillerDictionary_p.add (VisBufferComponents::UvwMat,
303 0 : vlatFunctor0 (& VisBuffer::filluvwMat));
304 0 : fillerDictionary_p.add (VisBufferComponents::Weight,
305 0 : vlatFunctor0 (& VisBuffer::fillWeight));
306 0 : fillerDictionary_p.add (VisBufferComponents::WeightMat,
307 0 : vlatFunctor0 (& VisBuffer::fillWeightMat));
308 0 : fillerDictionary_p.add (VisBufferComponents::WeightSpectrum,
309 0 : vlatFunctor0 (& VisBuffer::fillWeightSpectrum));
310 :
311 : // assert (fillerDictionary_p.size() == VisBufferComponents::N_VisBufferComponents);
312 : // Every supported prefetch column needs a filler
313 :
314 : //fillerDependencies_p.add ();
315 :
316 : //fillerDependencies_p.performTransitiveClosure (fillerDictionary_p);
317 0 : }
318 :
319 : void
320 0 : VLAT::fillDatum (VlaDatum * datum)
321 : {
322 :
323 0 : VisBufferComponents::EnumType fillerId = VisBufferComponents::Unknown;
324 : try{
325 0 : VisBufferAsync * vb = datum->getVisBuffer();
326 :
327 0 : vb->clear();
328 0 : vb->setFilling (true);
329 0 : vb->attachToVisIter (* visibilityIterator_p); // invalidates vb's cache as well
330 :
331 0 : fillDatumMiscellanyBefore (datum);
332 :
333 0 : for (Fillers::iterator filler = fillers_p.begin(); filler != fillers_p.end(); filler ++){
334 :
335 : //Log (2, "Filler id=%d name=%s starting\n", (* filler)->getId(), ViReadImplAsync::prefetchColumnName((* filler)->getId()).c_str());
336 :
337 0 : fillerId = (* filler)->getId();
338 0 : checkFiller (fillerId);
339 0 : (** filler) (vb);
340 : }
341 :
342 0 : fillDatumMiscellanyAfter (datum);
343 :
344 0 : vb->detachFromVisIter ();
345 0 : vb->setFilling (false);
346 : }
347 0 : catch (...){
348 :
349 0 : if (fillerId == -1){
350 0 : Log (1, "VLAT: Error while filling datum at 0x%08x, vb at 0x%08x; rethrowing\n",
351 : datum, datum->getVisBuffer());
352 : }
353 : else{
354 0 : Log (1, "VLAT: Error while filling datum at 0x%08x, vb at 0x%08x; "
355 : "fillingColumn='%s'; rethrowing\n",
356 : datum, datum->getVisBuffer(),
357 : PrefetchColumns::columnName (fillerId).c_str());
358 : }
359 :
360 0 : throw;
361 0 : }
362 0 : }
363 :
364 : void
365 0 : VLAT::fillDatumMiscellanyAfter (VlaDatum * datum)
366 : {
367 0 : datum->getVisBuffer()->setVisibilityShape (visibilityIterator_p->visibilityShape ());
368 :
369 : //////datum->getVisBuffer()->setDataDescriptionId (visibilityIterator_p->getDataDescriptionId());
370 :
371 0 : datum->getVisBuffer()->setPolarizationId (visibilityIterator_p->polarizationId());
372 :
373 0 : datum->getVisBuffer()->setNCoh(visibilityIterator_p->numberCoh ());
374 :
375 0 : datum->getVisBuffer()->setNRowChunk (visibilityIterator_p->nRowChunk());
376 :
377 0 : Vector<Int> nvischan;
378 0 : Vector<Int> spw;
379 :
380 0 : visibilityIterator_p->allSelectedSpectralWindows(spw, nvischan);
381 :
382 0 : datum->getVisBuffer()->setSelectedNVisibilityChannels (nvischan);
383 0 : datum->getVisBuffer()->setSelectedSpectralWindows (spw);
384 :
385 0 : int nSpw = visibilityIterator_p->numberSpw();
386 0 : datum->getVisBuffer()->setNSpw (nSpw);
387 :
388 0 : int nRowChunk = visibilityIterator_p->nRowChunk();
389 0 : datum->getVisBuffer()->setNRowChunk (nRowChunk);
390 :
391 0 : datum->getVisBuffer()->setMSD (visibilityIterator_p->getMSD ()); // ought to be last
392 0 : }
393 :
394 : void
395 0 : VLAT::fillDatumMiscellanyBefore (VlaDatum * datum)
396 : {
397 0 : datum->getVisBuffer()->setMeasurementSet (visibilityIterator_p->getMeasurementSet());
398 0 : datum->getVisBuffer()->setMeasurementSetId (visibilityIterator_p->getMeasurementSetId(),
399 0 : datum->getSubChunkPair().second == 0);
400 :
401 0 : datum->getVisBuffer()->setNewEntityFlags (visibilityIterator_p->newArrayId(),
402 0 : visibilityIterator_p->newFieldId(),
403 0 : visibilityIterator_p->newSpectralWindow());
404 0 : datum->getVisBuffer()->setNAntennas (visibilityIterator_p->getNAntennas ());
405 0 : datum->getVisBuffer()->setMEpoch (visibilityIterator_p->getEpoch ());
406 0 : datum->getVisBuffer()->setReceptor0Angle (visibilityIterator_p->getReceptor0Angle());
407 :
408 0 : fillLsrInfo (datum);
409 :
410 0 : Vector<Double> lsrFreq, selFreq;
411 0 : visibilityIterator_p->getTopoFreqs (lsrFreq, selFreq);
412 0 : datum->getVisBuffer()->setTopoFreqs (lsrFreq, selFreq);
413 0 : }
414 :
415 :
416 : void
417 0 : VLAT::fillLsrInfo (VlaDatum * datum)
418 : {
419 0 : MPosition observatoryPositon;
420 0 : MDirection phaseCenter;
421 : Bool velocitySelection;
422 :
423 :
424 0 : Block<Int> channelGroupNumber;
425 0 : Block<Int> channelIncrement;
426 0 : Block<Int> channelStart;
427 0 : Block<Int> channelWidth;
428 :
429 0 : visibilityIterator_p->getLsrInfo (channelGroupNumber,
430 : channelIncrement,
431 : channelStart,
432 : channelWidth,
433 : observatoryPositon,
434 : phaseCenter,
435 : velocitySelection);
436 :
437 0 : datum->getVisBuffer()->setLsrInfo (channelGroupNumber,
438 : channelIncrement,
439 : channelStart,
440 : channelWidth,
441 : observatoryPositon,
442 : phaseCenter,
443 : velocitySelection);
444 0 : }
445 :
446 : void
447 0 : VLAT::flushWrittenData ()
448 : {
449 0 : for (int i = 0; i < (int) measurementSets_p.nelements() ; i++){
450 0 : measurementSets_p [i].flush();
451 : }
452 0 : }
453 :
454 : void
455 0 : VLAT::handleWrite ()
456 : {
457 : // While there is data to write out, write it out.
458 :
459 0 : Bool done = false;
460 :
461 0 : WriteQueue & writeQueue = interface_p->getWriteQueue ();
462 :
463 : do {
464 :
465 0 : WriteData * writeData = writeQueue.dequeue ();
466 :
467 0 : if (writeData != NULL){
468 :
469 0 : SubChunkPair subchunk = writeData->getSubChunkPair();
470 :
471 0 : alignWriteIterator (subchunk);
472 :
473 0 : writeData->write (writeIterator_p);
474 0 : delete writeData;
475 : }
476 : else{
477 0 : done = true;
478 : }
479 :
480 0 : } while (! done);
481 0 : }
482 :
483 :
484 : void
485 0 : VLAT::initialize (const ROVisibilityIterator & rovi)
486 : {
487 0 : ThrowIf (isStarted(), "VLAT::initialize: thread already started");
488 :
489 0 : visibilityIterator_p = new ROVisibilityIterator (rovi);
490 :
491 0 : visibilityIterator_p->originChunks (true);
492 : // force the MSIter, etc., to be rewound, reinitialized, etc.
493 0 : }
494 :
495 : void
496 0 : VLAT::initialize (const Block<MeasurementSet> & mss,
497 : const Block<Int> & sortColumns,
498 : Bool addDefaultSortCols,
499 : Double timeInterval,
500 : Bool writable)
501 : {
502 0 : ThrowIf (isStarted(), "VLAT::initialize: thread already started");
503 :
504 0 : visibilityIterator_p = new ROVisibilityIterator (mss, sortColumns, addDefaultSortCols, timeInterval);
505 :
506 0 : if (writable){
507 0 : writeIterator_p = new VisibilityIterator (mss, sortColumns, addDefaultSortCols, timeInterval);
508 0 : writeIterator_p->originChunks();
509 0 : writeIterator_p->origin ();
510 :
511 0 : measurementSets_p = mss;
512 : }
513 :
514 0 : }
515 :
516 : Bool
517 0 : VLAT::isTerminated () const
518 : {
519 0 : return threadTerminated_p;
520 : }
521 :
522 : void *
523 0 : VLAT::run ()
524 : {
525 : // Log thread initiation
526 :
527 0 : Logger::get()->registerName ("VLAT");
528 0 : String writable = writeIterator_p != NULL ? "writable" : "readonly";
529 0 : Log (1, "VLAT starting execution; tid=%d; VI is %s.\n", gettid(), writable.c_str());
530 :
531 0 : LogIO logIo (LogOrigin ("VLAT"));
532 0 : logIo << "starting execution; tid=" << gettid() << endl << LogIO::POST;
533 :
534 : // Enter run loop. The run loop will only be exited when the main thread
535 : // explicitly asks for the termination of this thread (or an uncaught
536 : // exception comes to this level).
537 :
538 :
539 : try {
540 : do{
541 0 : Log (1, "VLAT starting VI sweep\n");
542 :
543 : // Start sweeping the real VI over its entire range
544 : // (subject to the number of free buffers). The sweep
545 : // can be ended abruptly if
546 :
547 0 : sweepVi ();
548 :
549 0 : Bool startNewSweep = waitForViReset ();
550 :
551 0 : if (! startNewSweep){
552 0 : break; // Not resetting so it's time to quit
553 : }
554 :
555 0 : } while (true);
556 :
557 0 : handleWrite (); // service any pending writes
558 :
559 0 : flushWrittenData ();
560 :
561 0 : threadTerminated_p = true;
562 0 : Log (1, "VLAT stopping execution.\n");
563 0 : logIo << "stopping execution normally; tid=" << gettid() << endl << LogIO::POST;
564 :
565 0 : return NULL;
566 :
567 : }
568 0 : catch (std::exception & e){
569 :
570 0 : cerr << "VLAT thread caught exception: " << e.what() << endl;
571 0 : cerr.flush();
572 :
573 0 : Log (1, "VLAT caught exception: %s.\n", e.what());
574 0 : logIo << "caught exception; tid=" << gettid() << "-->" << e.what() << endl << LogIO::POST;
575 :
576 0 : threadTerminated_p = true;
577 0 : throw;
578 0 : }
579 0 : catch (...){
580 :
581 0 : cerr << "VLAT thread caught unknown exception: " << endl;
582 0 : cerr.flush();
583 :
584 0 : Log (1, "VLAT caught unknown exception:\n");
585 0 : logIo << "caught unknown exception; tid=" << gettid() << endl << LogIO::POST;
586 :
587 0 : threadTerminated_p = true;
588 0 : throw;
589 0 : }
590 0 : }
591 :
592 : void
593 0 : VLAT::setPrefetchColumns (const ViReadImplAsync::PrefetchColumns & columns)
594 : {
595 0 : ThrowIf (isStarted(), "VLAT::setColumns: cannot do this after thread started");
596 0 : ThrowIf (! fillers_p.empty(), "VLAT::setColumns:: has already been done");
597 :
598 0 : createFillerDictionary ();
599 :
600 0 : for (ViReadImplAsync::PrefetchColumns::const_iterator c = columns.begin();
601 0 : c != columns.end();
602 0 : c ++){
603 :
604 0 : ThrowIf (! containsKey (*c, fillerDictionary_p),
605 : String::format ("Unknown prefetch column id (%d)", *c));
606 :
607 0 : fillers_p.push_back (fillerDictionary_p [*c]->clone());
608 : }
609 :
610 : // sort (fillers_p.begin(),
611 : // fillers_p.end(),
612 : // VlatFunctor::byDecreasingPrecedence);
613 0 : }
614 :
615 : void
616 0 : VLAT::sweepVi ()
617 : {
618 :
619 : // Configure the iterator(s) to start a new sweep through the data.
620 : // Reset the subchunk counters, apply any queued modifiers and if
621 : // the write iterator exists reset it to the chunk origin (the
622 : // read iterator gets reset at the start of the sweep loop).
623 :
624 0 : readSubchunk_p.resetToOrigin ();
625 0 : writeSubchunk_p.resetToOrigin ();
626 :
627 0 : applyModifiers (visibilityIterator_p, writeIterator_p);
628 :
629 0 : if (writeIterator_p != NULL){
630 0 : writeIterator_p->originChunks (true);
631 : }
632 :
633 : // Start sweeping the data with the read only iterator. If there
634 : // is a write iterator it will write behind the RO iterator; the RW
635 : // iterator is advanced to align with the appropriate subchunk when
636 : // a request is made to write a particular subchunk.
637 :
638 : try {
639 :
640 0 : for (visibilityIterator_p->originChunks(true);
641 0 : visibilityIterator_p->moreChunks();
642 0 : visibilityIterator_p->nextChunk(), readSubchunk_p.incrementChunk ()){
643 :
644 0 : for (visibilityIterator_p->origin();
645 0 : visibilityIterator_p->more();
646 0 : ++ (* visibilityIterator_p), readSubchunk_p.incrementSubChunk ()){
647 :
648 0 : ThreadTimes startTime = ThreadTimes ();
649 :
650 0 : waitUntilFillCanStart ();
651 :
652 0 : throwIfSweepTerminated ();
653 :
654 0 : VlaDatum * vlaDatum = vlaData_p -> fillStart (readSubchunk_p, startTime);
655 :
656 0 : throwIfSweepTerminated();
657 :
658 0 : fillDatum (vlaDatum);
659 :
660 0 : throwIfSweepTerminated ();
661 :
662 0 : vlaData_p -> fillComplete (vlaDatum);
663 :
664 0 : throwIfSweepTerminated ();
665 :
666 0 : handleWrite ();
667 : }
668 : }
669 :
670 0 : Log (1, "VLAT: no more data\n");
671 :
672 0 : vlaData_p -> setNoMoreData ();
673 : }
674 0 : catch (SweepTerminated &){
675 0 : Log (1, "VLAT: VI sweep termination requested.\n");
676 0 : }
677 0 : catch (AipsError e){
678 0 : Log (1, "AipsError during sweepVi; readSubchunk=%s, writeSubChunk=%s",
679 : readSubchunk_p.toString().c_str(), writeSubchunk_p.toString().c_str());
680 0 : throw;
681 0 : }
682 0 : }
683 :
684 : void
685 0 : VLAT::terminate ()
686 : {
687 : // Called by another thread to terminate the VLAT.
688 :
689 0 : Log (2, "Terminating VLAT\n");
690 : //printBacktrace (cerr, "VLAT termination");
691 :
692 0 : Thread::terminate(); // ask thread to terminate
693 :
694 0 : interface_p->terminateLookahead (); // stop lookahead
695 0 : }
696 :
697 : void
698 0 : VLAT::throwIfSweepTerminated ()
699 : {
700 0 : if (interface_p->isSweepTerminationRequested()){
701 0 : throw SweepTerminated ();
702 : }
703 0 : }
704 :
705 : Bool
706 0 : VLAT::waitForViReset()
707 : {
708 0 : UniqueLock uniqueLock (interface_p->getMutex());
709 :
710 0 : while (! interface_p->viResetRequested () &&
711 0 : ! interface_p->isLookaheadTerminationRequested ()){
712 :
713 0 : handleWrite (); // process any pending write requests
714 :
715 : // Wait for the interface to change:
716 : //
717 : // o Buffer consumed by main thread
718 : // o A write was requested
719 : // o A sweep or thread termination was requested
720 :
721 0 : interface_p->waitForInterfaceChange (uniqueLock);
722 : }
723 :
724 0 : handleWrite (); // One more time to be sure that all writes are completed before
725 : // we either quit or rewind the iterator.
726 :
727 0 : if (interface_p->isLookaheadTerminationRequested ()){
728 0 : return false;
729 : }
730 : else{
731 :
732 0 : vlaData_p->resetBufferData ();
733 :
734 0 : roviaModifiers_p = interface_p->transferRoviaModifiers ();
735 :
736 0 : interface_p->viResetComplete ();
737 :
738 0 : return true;
739 : }
740 0 : }
741 :
742 : void
743 0 : VLAT::waitUntilFillCanStart ()
744 : {
745 0 : UniqueLock uniqueLock (interface_p->getMutex());
746 :
747 0 : while ( ! vlaData_p->fillCanStart () &&
748 0 : ! interface_p->isSweepTerminationRequested ()){
749 :
750 0 : handleWrite (); // process any pending write requests
751 :
752 : // Wait for the interface to change:
753 : //
754 : // o Buffer consumed by main thread
755 : // o A write was requested
756 : // o A sweep or thread termination was requested
757 :
758 0 : interface_p->waitForInterfaceChange (uniqueLock);
759 : }
760 0 : }
761 :
762 : void
763 0 : VlatFunctor::operator() (VisBuffer *)
764 : {
765 0 : ThrowIf (true, "No filler is defined for this VisBuffer component: " + name_p);
766 0 : }
767 :
768 :
769 : } // end namespace asyncio
770 :
771 : using namespace casacore;
772 : } // end namespace casa
|