Line data Source code
1 : /*
2 : * VisibilityIteratorAsync.h
3 : *
4 : * Created on: Nov 1, 2010
5 : * Author: jjacobs
6 : */
7 :
8 : #if ! defined (VisibilityIteratorAsync_H_)
9 : #define VisibilityIteratorAsync_H_
10 :
11 : #include <stack>
12 : #include <set>
13 :
14 : #include <msvis/MSVis/VisibilityIteratorImpl.h>
15 : #include <msvis/MSVis/AsynchronousInterface.h>
16 : #include <stdcasa/UtilJ.h>
17 :
18 : #define NotImplementedROVIA throw utilj::AipsErrorTrace (casacore::String ("Method not legal in ROVIA: ") + __PRETTY_FUNCTION__, __FILE__, __LINE__)
19 : #define NotPrefetched throw utilj::AipsErrorTrace (casacore::String ("Column not prefetched for async I/O: ") + __PRETTY_FUNCTION__, __FILE__, __LINE__)
20 :
21 : namespace casa {
22 :
23 :
24 : class VisBufferAsync;
25 : class VisBufferAsyncWrapper;
26 :
27 : namespace asyncio {
28 :
29 : class AsynchronousInterface;
30 : class VlaDatum;
31 : class VlaData;
32 : class VLAT;
33 :
34 : } // end namespace asyncio
35 :
36 : class ViReadImplAsync : public VisibilityIteratorReadImpl {
37 : // This needs to be changed back to ROVisibilityIterator at some point
38 : // after feasibility testing
39 :
40 : friend class Rovia_Test;
41 : friend class ROVisIterator;
42 : friend class VisIterator;
43 : friend class ViWriteImplAsync;
44 :
45 : public:
46 :
47 : typedef casa::asyncio::PrefetchColumns PrefetchColumns;
48 :
49 : // static VisibilityIteratorReadImpl *
50 : // create (const ROVisibilityIterator &,
51 : // const PrefetchColumns & prefetchColumns,
52 : // casacore::Int nReadAheadBuffers = -1);
53 :
54 : ViReadImplAsync (const casacore::Block<casacore::MeasurementSet> & mss,
55 : const PrefetchColumns & prefetchColumns,
56 : const casacore::Block<casacore::Int> & sortColumns,
57 : const casacore::Bool addDefaultSortCols,
58 : casacore::Double timeInterval,
59 : casacore::Bool writable);
60 :
61 : ViReadImplAsync (const PrefetchColumns & prefetchColumns,
62 : const VisibilityIteratorReadImpl & other,
63 : casacore::Bool writable);
64 :
65 :
66 : ~ViReadImplAsync ();
67 :
68 : VisibilityIteratorReadImpl * clone () const;
69 :
70 : void attachVisBuffer (VisBuffer & vb);
71 : void detachVisBuffer (VisBuffer & vb);
72 : void getChannelSelection(casacore::Block< casacore::Vector<casacore::Int> >& ,
73 : casacore::Block< casacore::Vector<casacore::Int> >& ,
74 : casacore::Block< casacore::Vector<casacore::Int> >& ,
75 : casacore::Block< casacore::Vector<casacore::Int> >& ,
76 : casacore::Block< casacore::Vector<casacore::Int> >& );
77 : PrefetchColumns getPrefetchColumns () const;
78 : VisBuffer * getVisBuffer ();
79 :
80 :
81 : // casacore::Int getDataDescriptionId () const;
82 : // const casacore::MeasurementSet & getMeasurementSet();
83 : // const casacore::Int getMeasurementSetId ();
84 : // casacore::Int getNAntennas () const;
85 : casacore::MEpoch getEpoch () const;
86 : // casacore::Vector<casacore::Float> getReceptor0Angle ();
87 :
88 : /////void linkWithRovi (VisibilityIteratorReadImpl * rovi);
89 :
90 : bool more () const;
91 : bool moreChunks () const;
92 : ViReadImplAsync & nextChunk ();
93 : void origin ();
94 : void originChunks ();
95 : virtual void advance ();
96 :
97 : void setPrefetchColumns (const PrefetchColumns & prefetchColumns);
98 :
99 : VisibilityIteratorReadImpl& selectChannel(casacore::Int nGroup=1,
100 : casacore::Int start=0,
101 : casacore::Int width=0,
102 : casacore::Int increment=1,
103 : casacore::Int spectralWindow=-1);
104 :
105 : VisibilityIteratorReadImpl& selectChannel(const casacore::Block< casacore::Vector<casacore::Int> >& blockNGroup,
106 : const casacore::Block< casacore::Vector<casacore::Int> >& blockStart,
107 : const casacore::Block< casacore::Vector<casacore::Int> >& blockWidth,
108 : const casacore::Block< casacore::Vector<casacore::Int> >& blockIncr,
109 : const casacore::Block< casacore::Vector<casacore::Int> >& blockSpw);
110 :
111 :
112 : VisibilityIteratorReadImpl&
113 : selectVelocity(casacore::Int ,
114 : const casacore::MVRadialVelocity& ,
115 : const casacore::MVRadialVelocity& ,
116 : casacore::MRadialVelocity::Types ,
117 : casacore::MDoppler::Types , casacore::Bool );
118 : void setInterval(casacore::Double timeInterval);
119 : void setRowBlocking(casacore::Int nRow);
120 :
121 : // These functions generate a list of the IDs (from PrefetchColumnIDs enum)
122 : // of the columns to prefetch. For the variable arg calls, terminate with a
123 : // -1.
124 :
125 : static int getDefaultNBuffers ();
126 :
127 : // The functions below make no sense (at first glance) for asynchronous operation and are implemented
128 : // to throw an casacore::AipsError if called. ROVIA is designed to have all the data accessed through the
129 : // associated VisBufferAsync. Any method which tries to access data through the ROVIA makes no
130 : // sense. Also anything that tries to change the characteristics of underlying ROVI is not currently
131 : // permitted. During integration some methods may be found to be more important to the use of ROVIA
132 : // and a way may be found to implement them in ROVIA.
133 :
134 : casacore::Bool allBeamOffsetsZero() const; // { NotPrefetched; }
135 0 : void allSelectedSpectralWindows(casacore::Vector<casacore::Int>& , casacore::Vector<casacore::Int>& ) { NotPrefetched; }
136 0 : casacore::Vector<casacore::Int>& antenna1(casacore::Vector<casacore::Int>& ) const { NotPrefetched; }
137 0 : casacore::Vector<casacore::Int>& antenna2(casacore::Vector<casacore::Int>& ) const { NotPrefetched; }
138 : const casacore::Vector<casacore::String>& antennaMounts() const; // { NotPrefetched; }
139 0 : casacore::Vector<casacore::MDirection> azel(casacore::Double ) const { NotImplementedROVIA; }
140 0 : casacore::MDirection azel0(casacore::Double ) const { NotImplementedROVIA; }
141 0 : casacore::Vector<casacore::Int>& channel(casacore::Vector<casacore::Int>& ) const { NotPrefetched; }
142 0 : casacore::Int channelGroupSize() const { NotImplementedROVIA; }
143 0 : casacore::Int channelIndex() const { NotImplementedROVIA; }
144 0 : casacore::Vector<casacore::SquareMatrix<casacore::Complex,2> >& CJones(casacore::Vector<casacore::SquareMatrix<casacore::Complex,2> >& ) const { NotPrefetched; }
145 0 : casacore::Vector<casacore::Int>& corrType(casacore::Vector<casacore::Int>& ) const { NotPrefetched; }
146 0 : casacore::Int dataDescriptionId() const { NotPrefetched; }
147 : casacore::Bool existsWeightSpectrum() const { NotImplementedROVIA; }
148 0 : casacore::Vector<casacore::Double>& exposure(casacore::Vector<casacore::Double>& /*expo*/) const { NotPrefetched; }
149 0 : casacore::Vector<casacore::Int>& feed1(casacore::Vector<casacore::Int>& ) const { NotPrefetched; }
150 0 : casacore::Vector<casacore::Int>& feed2(casacore::Vector<casacore::Int>& ) const { NotPrefetched; }
151 : //casacore::Vector<casacore::Float> feed_pa(casacore::Double ) const { NotImplementedROVIA; }
152 0 : casacore::Int fieldId() const { NotPrefetched; }
153 0 : casacore::Array<casacore::Bool>& flagCategory(casacore::Array<casacore::Bool>& /*flagCategories*/) const { NotPrefetched; }
154 0 : casacore::Cube<casacore::Float>& floatData(casacore::Cube<casacore::Float>& /*fcube*/) const { NotPrefetched; }
155 0 : void getFloatDataColumn (casacore::Cube<casacore::Float>& /*data*/) const { NotImplementedROVIA; }
156 0 : void getFloatDataColumn(const casacore::Slicer& /*slicer*/, casacore::Cube<casacore::Float>& /*data*/) const { NotImplementedROVIA; }
157 : void getInterpolatedFloatDataFlagWeight() const { NotImplementedROVIA; }
158 : void getInterpolatedVisFlagWeight(DataColumn /*whichOne*/) const { NotImplementedROVIA; }
159 0 : casacore::Int arrayId() const { NotPrefetched; }
160 0 : casacore::String fieldName() const { NotImplementedROVIA; }
161 0 : casacore::String sourceName() const { NotImplementedROVIA; }
162 0 : casacore::Cube<casacore::Bool>& flag(casacore::Cube<casacore::Bool>& ) const { NotPrefetched; }
163 0 : casacore::Matrix<casacore::Bool>& flag(casacore::Matrix<casacore::Bool>& ) const { NotPrefetched; }
164 0 : casacore::Vector<casacore::Bool>& flagRow(casacore::Vector<casacore::Bool>& ) const { NotPrefetched; }
165 0 : casacore::Vector<casacore::Double>& frequency(casacore::Vector<casacore::Double>& ) const { NotPrefetched; }
166 : const casacore::Cube<casacore::RigidVector<casacore::Double, 2> >& getBeamOffsets() const;// { NotImplementedROVIA; }
167 : casacore::Int getDataDescriptionId () const { NotPrefetched; }
168 : casacore::MEpoch getMEpoch () const { NotImplementedROVIA; }
169 : const casacore::MeasurementSet & getMeasurementSet() const { NotImplementedROVIA; }
170 : casacore::Int getMeasurementSetId() const { NotImplementedROVIA; }
171 : casacore::Int getNAntennas () const { NotImplementedROVIA; }
172 : casacore::Vector<casacore::Float> getReceptor0Angle () { NotImplementedROVIA; }
173 : casacore::Vector<casacore::uInt> getRowIds () const { NotImplementedROVIA; }
174 0 : casacore::Double hourang(casacore::Double ) const { NotImplementedROVIA; }
175 0 : casacore::Vector<casacore::Double>& lsrFrequency(casacore::Vector<casacore::Double>& ) const { NotImplementedROVIA; }
176 : void lsrFrequency(const casacore::Int& , casacore::Vector<casacore::Double>& , casacore::Bool& ) { NotImplementedROVIA; }
177 0 : const casacore::MeasurementSet& ms() const { NotImplementedROVIA; }
178 : const casacore::MSColumns& msColumns() const;
179 : casacore::Int msId() const;
180 0 : casacore::Int nCorr() const { NotPrefetched; }
181 0 : casacore::Int nRow() const { NotPrefetched; }
182 : casacore::Int nRowChunk() const;
183 0 : casacore::Int nSubInterval() const { NotImplementedROVIA; }
184 0 : casacore::Bool newArrayId() const { NotImplementedROVIA; }
185 0 : casacore::Bool newFieldId() const { NotImplementedROVIA; }
186 0 : casacore::Bool newMS() const { return msIter_p.more();}
187 0 : casacore::Bool newSpectralWindow() const { NotImplementedROVIA; }
188 0 : casacore::Int numberCoh() { NotPrefetched; }
189 0 : casacore::Int numberDDId() { NotPrefetched; }
190 0 : casacore::Int numberPol() { NotPrefetched; }
191 : casacore::Int numberSpw();
192 0 : casacore::Vector<casacore::Int>& observationId(casacore::Vector<casacore::Int>& /*obsIDs*/) const { NotPrefetched; }
193 0 : casacore::Vector<casacore::Float> parang(casacore::Double ) const { NotImplementedROVIA; }
194 0 : const casacore::Float& parang0(casacore::Double ) const { NotImplementedROVIA; }
195 0 : const casacore::MDirection& phaseCenter() const { NotPrefetched; }
196 0 : casacore::Int polFrame() const { NotPrefetched; }
197 0 : casacore::Vector<casacore::Int>& processorId(casacore::Vector<casacore::Int>& /*procIDs*/) const { NotPrefetched; }
198 0 : casacore::Int polarizationId() const { NotPrefetched; }
199 : const casacore::Cube<casacore::Double>& receptorAngles() const; // { NotImplementedROVIA; }
200 0 : casacore::Vector<casacore::rownr_t>& rowIds(casacore::Vector<casacore::rownr_t>& ) const { NotImplementedROVIA; }
201 0 : casacore::Vector<casacore::Int>& scan(casacore::Vector<casacore::Int>& ) const { NotPrefetched; }
202 0 : casacore::Vector<casacore::Float>& sigma(casacore::Vector<casacore::Float>& ) const { NotPrefetched; }
203 0 : casacore::Matrix<casacore::Float>& sigmaMat(casacore::Matrix<casacore::Float>& ) const { NotPrefetched; }
204 0 : void slurp() const { NotImplementedROVIA; }
205 0 : casacore::Int spectralWindow() const { NotPrefetched; }
206 0 : casacore::Vector<casacore::Int>& stateId(casacore::Vector<casacore::Int>& /*stateIds*/) const { NotPrefetched; }
207 0 : casacore::Vector<casacore::Double>& time(casacore::Vector<casacore::Double>& ) const { NotPrefetched; }
208 0 : casacore::Vector<casacore::Double>& timeCentroid(casacore::Vector<casacore::Double>& /*t*/) const { NotPrefetched; }
209 0 : casacore::Vector<casacore::Double>& timeInterval(casacore::Vector<casacore::Double>& ) const { NotPrefetched; }
210 0 : casacore::Vector<casacore::RigidVector<casacore::Double,3> >& uvw(casacore::Vector<casacore::RigidVector<casacore::Double,3> >& ) const { NotPrefetched; }
211 0 : casacore::Matrix<casacore::Double>& uvwMat(casacore::Matrix<casacore::Double>& ) const { NotPrefetched; }
212 0 : VisibilityIteratorReadImpl& velInterpolation(const casacore::String& ) { NotImplementedROVIA; }
213 0 : casacore::Cube<casacore::Complex>& visibility(casacore::Cube<casacore::Complex>& , DataColumn ) const { NotPrefetched; }
214 0 : casacore::Matrix<CStokesVector>& visibility(casacore::Matrix<CStokesVector>& , DataColumn ) const { NotPrefetched; }
215 0 : casacore::IPosition visibilityShape() const { NotImplementedROVIA; }
216 0 : casacore::Vector<casacore::Float>& weight(casacore::Vector<casacore::Float>& ) const { NotPrefetched; }
217 0 : casacore::Matrix<casacore::Float>& weightMat(casacore::Matrix<casacore::Float>& ) const { NotPrefetched; }
218 0 : casacore::Cube<casacore::Float>& weightSpectrum(casacore::Cube<casacore::Float>& ) const { NotPrefetched; }
219 :
220 : static casacore::String prefetchColumnName (casacore::Int id); // for debug only
221 :
222 : protected:
223 :
224 : // Use the factory method "create" instead of calling the constructor
225 : // directly. This allows disabling the feature.
226 :
227 : // ViReadImplAsync (const casacore::MeasurementSet & ms,
228 : // const PrefetchColumns & prefetchColumns,
229 : // const casacore::Block<casacore::Int> & sortColumns,
230 : // casacore::Double timeInterval=0,
231 : // casacore::Int nReadAheadBuffers = 2);
232 : // ViReadImplAsync (const casacore::MeasurementSet & ms,
233 : // const PrefetchColumns & prefetchColumns,
234 : // const casacore::Block<casacore::Int> & sortColumns,
235 : // const casacore::Bool addDefaultSortCols,
236 : // casacore::Double timeInterval=0,
237 : // casacore::Int nReadAheadBuffers = 2);
238 : //
239 : // // Same as previous constructor, but with multiple MSs to iterate over.
240 : //
241 : // ViReadImplAsync (const casacore::Block<casacore::MeasurementSet> & mss,
242 : // const PrefetchColumns & prefetchColumns,
243 : // const casacore::Block<casacore::Int> & sortColumns,
244 : // casacore::Double timeInterval=0,
245 : // casacore::Int nReadAheadBuffers = 2);
246 :
247 :
248 : ViReadImplAsync (const ROVisibilityIterator & rovi,
249 : const PrefetchColumns & prefetchColumns,
250 : casacore::Int nReadAheadBuffers = -1);
251 :
252 : PrefetchColumns augmentPrefetchColumns (const PrefetchColumns & prefetchColumnsBase);
253 : void construct(const casacore::Block<casacore::MeasurementSet> & mss,
254 : const PrefetchColumns & prefetchColumns,
255 : const casacore::Block<casacore::Int> & sortColumns,
256 : const casacore::Bool addDefaultSortCols,
257 : casacore::Double timeInterval,
258 : casacore::Bool writable);
259 :
260 :
261 : void fillVisBuffer();
262 : const casacore::MeasurementSet & getMs() const;
263 : void readComplete ();
264 : void saveMss (const casacore::Block<casacore::MeasurementSet> & mss);
265 : void saveMss (const casacore::MeasurementSet & ms);
266 : //void startVlat ();
267 :
268 : private:
269 :
270 : asyncio::AsynchronousInterface * interface_p; // [own]
271 : casacore::Int msId_p;
272 : PrefetchColumns prefetchColumns_p;
273 : std::stack<VisBufferAsyncWrapper *> vbaWrapperStack_p;
274 : VisBufferAsync * visBufferAsync_p;
275 : asyncio::VlaData * vlaData_p; // [use]
276 : asyncio::VLAT * vlat_p; // [use]
277 :
278 : void dumpPrefetchColumns () const;
279 : void updateMsd ();
280 :
281 : ViReadImplAsync (const ViReadImplAsync & MSI);
282 : ViReadImplAsync & operator=(const ViReadImplAsync &MSI);
283 :
284 : }; // end class ViReadImplAsync
285 :
286 : namespace asyncio {
287 : class WriteData;
288 : } // end namespace asyncio
289 :
290 : class ViWriteImplAsync : public VisibilityIteratorWriteImpl {
291 :
292 : public:
293 :
294 : typedef casa::asyncio::PrefetchColumns PrefetchColumns;
295 :
296 : ViWriteImplAsync (VisibilityIterator * vi);
297 : ViWriteImplAsync (const PrefetchColumns & prefetchColumns,
298 : const VisibilityIteratorWriteImpl & other,
299 : VisibilityIterator * vi);
300 :
301 : ~ViWriteImplAsync ();
302 :
303 : VisibilityIteratorWriteImpl * clone () const;
304 :
305 : void putModel(const casacore::RecordInterface& rec, casacore::Bool iscomponentlist=true, casacore::Bool incremental=false);
306 :
307 :
308 : // Set/modify the flags in the data.
309 : // This will flag all channels in the original data that contributed to
310 : // the output channel in the case of channel averaging.
311 : // All polarizations have the same flag value.
312 : void setFlag(const casacore::Matrix<casacore::Bool>& flag);
313 : // Set/modify the flags in the data.
314 : // This sets the flags as found in the casacore::MS, casacore::Cube(npol,nchan,nrow),
315 : // where nrow is the number of rows in the current iteration (given by
316 : // nRow()).
317 : void setFlag(const casacore::Cube<casacore::Bool>& flag);
318 :
319 : void setFlagCategory (const casacore::Array<casacore::Bool> & flagCategory);
320 :
321 : // Set/modify the flag row column; dimension casacore::Vector(nrow)
322 : void setFlagRow(const casacore::Vector<casacore::Bool>& rowflags);
323 : // Set/modify the visibilities.
324 : // This is possibly only for a 'reference' casacore::MS which has a new DATA column.
325 : // The first axis of the matrix should equal the selected number of channels
326 : // in the original MS.
327 : // If the casacore::MS does not contain all polarizations, only the parallel
328 : // hand polarizations are used.
329 : void setVis(const casacore::Matrix<CStokesVector>& vis, DataColumn whichOne);
330 : // Set/modify the visibilities
331 : // This sets the data as found in the casacore::MS, casacore::Cube(npol,nchan,nrow).
332 : void setVis(const casacore::Cube<casacore::Complex>& vis, DataColumn whichOne);
333 : // Set the visibility and flags, and interpolate from velocities if needed
334 : void setVisAndFlag(const casacore::Cube<casacore::Complex>& vis, const casacore::Cube<casacore::Bool>& flag,
335 : DataColumn whichOne);
336 : // Set/modify the weights
337 : void setWeight(const casacore::Vector<casacore::Float>& wt);
338 : // Set/modify the weightMat
339 : void setWeightMat(const casacore::Matrix<casacore::Float>& wtmat);
340 : // Set/modify the weightSpectrum
341 : void setWeightSpectrum(const casacore::Cube<casacore::Float>& wtsp);
342 : // Set/modify the Sigma
343 : void setSigma(const casacore::Vector<casacore::Float>& sig);
344 : // Set/modify the ncorr x nrow SigmaMat.
345 : void setSigmaMat(const casacore::Matrix<casacore::Float>& sigmat);
346 :
347 : protected:
348 :
349 : ViReadImplAsync * getReadImpl();
350 : void queueWriteData (const asyncio::WriteData & data);
351 :
352 : private:
353 :
354 :
355 : }; // end class ViWriteImplAsync
356 :
357 :
358 : typedef ViReadImplAsync ROVIA;
359 :
360 : } // end namespace casa
361 :
362 : #endif // ViReadImplAsync
|