Line data Source code
1 : /*
2 : * DataAccumulator.h
3 : *
4 : * Created on: Jan 18, 2016
5 : * Author: nakazato
6 : */
7 :
8 : #ifndef SINGLEDISH_FILLER_DATAACCUMULATOR_H_
9 : #define SINGLEDISH_FILLER_DATAACCUMULATOR_H_
10 :
11 : #include <vector>
12 : #include <cassert>
13 : #include <memory>
14 : #include <algorithm>
15 : #include <numeric>
16 : #include <functional>
17 :
18 : #include <casacore/casa/BasicSL/String.h>
19 : #include <casacore/casa/Arrays/Vector.h>
20 : #include <casacore/casa/Arrays/Matrix.h>
21 : #include <casacore/casa/Arrays/ArrayMath.h>
22 : #include <casacore/casa/IO/ArrayIO.h>
23 : #include <casacore/casa/Containers/Record.h>
24 :
25 : #include <casacore/measures/Measures/Stokes.h>
26 :
27 : #include <casacore/tables/Tables/TableRecord.h>
28 : #include <singledishfiller/Filler/DataRecord.h>
29 : #include <singledishfiller/Filler/FillerUtil.h>
30 :
31 : using namespace casacore;
32 :
33 : namespace {
34 : template<class T>
35 0 : inline void resizeTo(T &array, casacore::IPosition const &shape) {
36 0 : if (array.shape() != shape) {
37 0 : array.resize(shape, false);
38 : }
39 0 : }
40 :
41 : template<class T>
42 0 : inline void setValue1(ssize_t n, T const *src, T *dst) {
43 0 : for (ssize_t i = 0; i < n; ++i) {
44 0 : dst[i] = src[i];
45 : }
46 0 : }
47 :
48 : template<class T>
49 0 : inline void setValueToMatrixColumn(casacore::Vector<T> const &src,
50 : ssize_t icolumn, casacore::Matrix<T> &dst) {
51 0 : casacore::IPosition const &shape = dst.shape();
52 0 : ssize_t const nrow = shape[0];
53 0 : ssize_t const ncolumn = shape[1];
54 0 : if (icolumn >= ncolumn) {
55 0 : throw casacore::AipsError("Specified column doesn't exist.");
56 : }
57 :
58 : casacore::Bool b1, b2;
59 0 : T *dst_p = dst.getStorage(b1);
60 0 : T *work_p = dst_p + icolumn * nrow;
61 0 : T const *src_p = src.getStorage(b2);
62 :
63 0 : setValue1(nrow, src_p, work_p);
64 :
65 0 : src.freeStorage(src_p, b2);
66 0 : dst.putStorage(dst_p, b1);
67 0 : }
68 :
69 : template<class T, class Executor>
70 0 : inline void shuffleTransposeMatrix(ssize_t n, size_t offset_src,
71 : casacore::Matrix<T> const &src, casacore::Matrix<T> &dst,
72 : std::vector<size_t> src_order = { }) {
73 0 : if (offset_src > src.ncolumn() - 1)
74 0 : throw casacore::AipsError("offset too large");
75 : casacore::Bool b1, b2;
76 0 : T const *src_p = src.getStorage(b1);
77 0 : T *dst_p = dst.getStorage(b2);
78 0 : T const *wsrc_p = src_p + offset_src * n;
79 0 : T *wdst_p = dst_p;
80 :
81 0 : Executor::execute(n, wsrc_p, wdst_p, src_order);
82 :
83 0 : src.freeStorage(src_p, b1);
84 0 : dst.putStorage(dst_p, b2);
85 0 : }
86 :
87 0 : inline void getDefaultOrder(size_t const num_order,
88 : std::vector<size_t> &order) {
89 0 : order.resize(num_order);
90 0 : std::iota(order.begin(), order.end(), 0);
91 0 : }
92 :
93 0 : inline bool setAndCheckOrder(size_t const required_size, size_t const max_value,
94 : std::vector<size_t> &order) {
95 0 : if (order.size() == 0)
96 0 : getDefaultOrder(required_size, order);
97 :
98 0 : if (order.size() < required_size) {
99 0 : return false;
100 : }
101 0 : size_t max_idx = order[0];
102 0 : for (size_t i = 1; i < order.size(); ++i) {
103 0 : max_idx = std::max(max_idx, order[i]);
104 : }
105 0 : return max_idx <= max_value;
106 : }
107 :
108 : struct ExecuteMatrix1 {
109 : template<class T>
110 0 : static void execute(ssize_t n, T const *src, T *dst,
111 : std::vector<size_t> /*in_order*/) {
112 0 : setValue1(n, src, dst);
113 0 : }
114 : };
115 :
116 : struct ExecuteMatrix2 {
117 : template<class T>
118 0 : static void execute(ssize_t n, T const *src, T *dst,
119 : std::vector<size_t> src_order) {
120 : // need to set max_value=3 for 4 pol case
121 0 : if (!setAndCheckOrder(2, 3, src_order))
122 0 : throw casacore::AipsError("got invalid order list");
123 0 : T const *row0_p = src + src_order[0] * n;
124 0 : T const *row1_p = src + src_order[1] * n;
125 0 : for (ssize_t i = 0; i < n; ++i) {
126 0 : dst[2 * i] = row0_p[i];
127 0 : dst[2 * i + 1] = row1_p[i];
128 : }
129 0 : }
130 : };
131 :
132 : struct ExecuteMatrix4X {
133 : template<class T>
134 : static void execute(ssize_t /*n*/, T const */*src*/, T */*dst*/,
135 : std::vector<size_t> /*in_order*/) {
136 : throw std::runtime_error("");
137 : }
138 : };
139 :
140 : // polarization order assumption (auto, cross, cross, auto)
141 : template<>
142 0 : inline void ExecuteMatrix4X::execute(ssize_t n, casacore::Bool const *src,
143 : casacore::Bool *dst, std::vector<size_t> src_order) {
144 0 : if (!setAndCheckOrder(4, 3, src_order))
145 0 : throw casacore::AipsError("got invalid order list");
146 0 : casacore::Bool const *row0_p = src + src_order[0] * n;
147 0 : casacore::Bool const *row1_p = src + src_order[1] * n;
148 0 : casacore::Bool const *row2_p = src + src_order[2] * n;
149 0 : casacore::Bool const *row3_p = src + src_order[3] * n;
150 0 : for (ssize_t i = 0; i < n; ++i) {
151 0 : dst[4 * i + 0] = row0_p[i];
152 0 : casacore::Bool b = row1_p[i] || row2_p[i];
153 0 : dst[4 * i + 1] = b;
154 0 : dst[4 * i + 2] = b;
155 0 : dst[4 * i + 3] = row3_p[i];
156 : }
157 0 : }
158 :
159 : struct ExecuteMatrix4 {
160 : template<class T>
161 0 : static void execute(ssize_t n, T const *src, T *dst,
162 : std::vector<size_t> src_order) {
163 0 : if (!setAndCheckOrder(4, 3, src_order))
164 0 : throw casacore::AipsError("got invalid order list");
165 0 : T const *row0_p = src + src_order[0] * n;
166 0 : T const *row1_p = src + src_order[1] * n;
167 0 : T const *row2_p = src + src_order[2] * n;
168 0 : T const *row3_p = src + src_order[3] * n;
169 0 : for (ssize_t i = 0; i < n; ++i) {
170 0 : dst[4 * i + 0] = row0_p[i];
171 0 : dst[4 * i + 1] = row1_p[i];
172 0 : dst[4 * i + 2] = row2_p[i];
173 0 : dst[4 * i + 3] = row3_p[i];
174 : }
175 0 : }
176 : };
177 :
178 0 : inline void shuffleTransposeMatrix4F2C(ssize_t n,
179 : casacore::Matrix<casacore::Float> const &src,
180 : casacore::Matrix<casacore::Complex> &dst,
181 : std::vector<size_t> src_order = { }) {
182 0 : if (!setAndCheckOrder(4, 3, src_order))
183 0 : throw casacore::AipsError("got invalid order list");
184 : casacore::Bool b1, b2;
185 0 : casacore::Float const *src_p = src.getStorage(b1);
186 0 : casacore::Complex *dst_p = dst.getStorage(b2);
187 :
188 : // polarization order assumption (auto, cross, cross, auto)
189 0 : casacore::Float const *row0_p = src_p + src_order[0] * n;
190 0 : casacore::Float const *row1_p = src_p + src_order[1] * n;
191 0 : casacore::Float const *row2_p = src_p + src_order[2] * n;
192 0 : casacore::Float const *row3_p = src_p + src_order[3] * n;
193 0 : for (ssize_t i = 0; i < n; ++i) {
194 0 : dst_p[4 * i].real(row0_p[i]);
195 0 : dst_p[4 * i].imag(0.0f);
196 0 : casacore::Float fr = row1_p[i];
197 0 : casacore::Float fi = row2_p[i];
198 0 : dst_p[4 * i + 1].real(fr);
199 0 : dst_p[4 * i + 1].imag(fi);
200 0 : dst_p[4 * i + 2].real(fr);
201 0 : dst_p[4 * i + 2].imag(-fi);
202 0 : dst_p[4 * i + 3].real(row3_p[i]);
203 0 : dst_p[4 * i + 3].imag(0.0f);
204 : }
205 :
206 0 : src.freeStorage(src_p, b1);
207 0 : dst.putStorage(dst_p, b2);
208 0 : }
209 :
210 : ////// in-place reorder of MatrixColumn
211 : //template<class T>
212 : //inline void suffleMatrixColumn(casacore::Matrix<T> const &src,
213 : // std::vector<size_t> order) {
214 : // ssize_t n = src.nrow();
215 : // if (checkOrder(1, src.ncolumn()-1, order)) throw AipsError("Invalid order");
216 : // casacore::Matrix<T> const temp = src;
217 : // casacore::Bool b1, b2;
218 : // T const *src_p = src.getStorage(b1);
219 : // T const *temp_p = temp.getStorage(b2);
220 : // if (src_p==temp_p) {
221 : // throw casacore::AipsError("Failed to generate temp storage");
222 : // }
223 : // for (ssize_t j = 0; j < order.size(); ++j) {
224 : // setValue1(n, temp_p[order[j]*n], src_p[j*n]);
225 : // }
226 : // src.putStorage(src_p, b1);
227 : // temp.freeStorage(temp_p, b2);
228 : //}
229 :
230 : // get a vector of indices (out_idx) to sort in_data in accending order
231 : template<typename T>
232 0 : inline void getSortIndex(casacore::Vector<T> in_data,
233 : std::vector<size_t> &out_idx) {
234 0 : casacore::Vector<size_t> idx_vec(in_data.size());
235 0 : casacore::indgen(idx_vec);
236 0 : idx_vec.tovector(out_idx);
237 :
238 0 : std::sort(out_idx.begin(), out_idx.end(),
239 0 : [&in_data](size_t const &i, size_t const &j) -> bool {return in_data[i] < in_data[j];});
240 0 : }
241 : }
242 :
243 : namespace casa { //# NAMESPACE CASA - BEGIN
244 : namespace sdfiller { //# NAMESPACE SDFILLER - BEGIN
245 :
246 : class DataAccumulator;
247 :
248 : class DataChunk {
249 : public:
250 : friend DataAccumulator;
251 :
252 0 : DataChunk(casacore::String const &poltype) :
253 0 : num_pol_max_(4), num_chan_(0), data_(), flag_(), flag_row_(
254 0 : num_pol_max_, false), tsys_(), tcal_(), weight_(
255 0 : num_pol_max_, 1.0f), sigma_(weight_), poltype_(poltype), valid_pcorr_(), pcorr_type_(), get_chunk_(
256 0 : nullptr), get_num_pol_(nullptr) {
257 : POST_START;
258 :
259 0 : setPolType(poltype);
260 :
261 : POST_END;
262 0 : }
263 :
264 0 : virtual ~DataChunk() {
265 0 : }
266 :
267 0 : casacore::String getPolType() const {
268 0 : return poltype_;
269 : }
270 :
271 : void resetPolType(casacore::String const &poltype) {
272 : initialize(num_chan_);
273 : setPolType(poltype);
274 : }
275 :
276 : casacore::uInt getNumPol() const {
277 : return get_num_pol_();
278 : }
279 :
280 0 : void initialize(size_t num_chan) {
281 0 : num_chan_ = num_chan;
282 0 : casacore::IPosition const shape(2, num_chan_, num_pol_max_);
283 0 : ::resizeTo(data_, shape);
284 0 : ::resizeTo(flag_, shape);
285 0 : ::resizeTo(tsys_, shape);
286 0 : ::resizeTo(tcal_, shape);
287 0 : pcorr_type_.resize(0);
288 0 : tsys_ = -1.0f;
289 0 : tcal_ = -1.0f;
290 0 : }
291 :
292 0 : void clear() {
293 0 : num_chan_ = 0;
294 0 : pcorr_type_.resize(0);
295 0 : }
296 :
297 : bool readyToWrite() {
298 : return true;
299 : }
300 :
301 0 : bool accumulate(DataRecord const &record) {
302 : POST_START;
303 :
304 0 : if (!isValidRecord(record)) {
305 0 : return false;
306 : }
307 :
308 0 : if (num_pol_max_ < pcorr_type_.size()) {
309 0 : return false;
310 : }
311 :
312 0 : casacore::Vector<casacore::Float> const &data = record.data;
313 0 : if (num_chan_ == 0) {
314 0 : size_t num_chan = data.size();
315 0 : initialize(num_chan);
316 : }
317 :
318 : // Polarization
319 0 : casacore::Stokes::StokesTypes pcorr = record.pol;
320 : // check for consistency between pol_type and stokes
321 0 : bool consistent = false;
322 0 : for (size_t i = 0; i < valid_pcorr_.size(); ++i) {
323 0 : consistent |= (valid_pcorr_[i] == (Int) pcorr);
324 : }
325 0 : if (!consistent) {
326 0 : throw casacore::AipsError(
327 0 : "Got inconsistent polarization with poltype");
328 : }
329 : // check if pcorr is the new corr type to be added
330 0 : for (size_t i = 0; i < pcorr_type_.size(); ++i) {
331 0 : if (pcorr == pcorr_type_[i]) {
332 0 : throw casacore::AipsError(
333 : "DataChunk already has polarization "
334 0 : + casacore::Stokes::name(pcorr));
335 : }
336 : }
337 : // new polarization type
338 0 : pcorr_type_.push_back(pcorr);
339 : // Matrices in DataRecord is stacked in the order of accumulation
340 0 : size_t const colid = pcorr_type_.size() - 1;
341 :
342 0 : casacore::Vector<casacore::Bool> const &flag = record.flag;
343 0 : casacore::Bool flagrow = record.flag_row;
344 :
345 0 : if (data.shape() != flag.shape()) {
346 0 : return false;
347 : }
348 :
349 0 : casacore::Vector<casacore::Float> tsys;
350 0 : if (!record.tsys.empty()) {
351 : // std::cout << "tsys is not empty: " << record.tsys[0] << std::endl;
352 0 : tsys.assign(record.tsys);
353 : }
354 0 : casacore::Vector<casacore::Float> tcal;
355 0 : if (!record.tcal.empty()) {
356 : // std::cout << "tcal is not empty: " << record.tcal << std::endl;
357 0 : tcal.assign(record.tcal);
358 : }
359 :
360 0 : if (data.nelements() != num_chan_) {
361 0 : return false;
362 : }
363 :
364 : // Filling Matrices[chan, pol] in DataRecord. The Matrices are column (channel) major
365 : //data_.column(colid) = data;
366 0 : ::setValueToMatrixColumn(data, colid, data_);
367 : //flag_.column(colid) = flag;
368 0 : ::setValueToMatrixColumn(flag, colid, flag_);
369 0 : flag_row_[colid] = flagrow;
370 0 : if (tsys.size() == num_chan_) {
371 : //tsys_.column(polid) = tsys;
372 0 : ::setValueToMatrixColumn(tsys, colid, tsys_);
373 0 : } else if (!tsys.empty()) {
374 0 : tsys_(0, colid) = tsys[0];
375 : }
376 0 : if (tcal.size() == num_chan_) {
377 : //tcal_.column(polid) = tcal;
378 0 : ::setValueToMatrixColumn(tcal, colid, tcal_);
379 0 : } else if (!tcal.empty()) {
380 0 : tcal_(0, colid) = tcal[0];
381 : }
382 :
383 0 : return true;
384 0 : }
385 :
386 0 : bool get(MSDataRecord &record) {
387 0 : bool return_value = get_chunk_(record);
388 0 : return return_value;
389 : }
390 :
391 : private:
392 : // Functions to return if accumulated corr types are conformant set, e.g.,
393 : // liner polarization should have XX and YY for dual pol.
394 : // assumption: pcorr_type_ has StokesTypes consistent with poltype_ and
395 : // not redundant (assured by accumulate())
396 0 : bool isFullPol() const {
397 0 : return pcorr_type_.size() == 4;
398 : }
399 0 : bool isDualPol() const {
400 0 : if (pcorr_type_.size() != 2)
401 0 : return false;
402 0 : casacore::Int pcorr0 = valid_pcorr_[0];
403 0 : casacore::Int pcorr1 = valid_pcorr_[valid_pcorr_.size() - 1];
404 0 : return (pcorr_type_[0] == pcorr0 && pcorr_type_[1] == pcorr1)
405 0 : || (pcorr_type_[0] == pcorr1 && pcorr_type_[1] == pcorr0);
406 : }
407 0 : bool isSinglePol0() const {
408 0 : return (pcorr_type_.size() == 1
409 0 : && valid_pcorr_[0] == (Int) pcorr_type_[0]);
410 : }
411 0 : bool isSinglePol1() const {
412 0 : return (pcorr_type_.size() == 1
413 0 : && valid_pcorr_[valid_pcorr_.size() - 1] == (Int) pcorr_type_[0]);
414 : }
415 0 : bool isValidRecord(DataRecord const &record) {
416 0 : return !record.data.empty() && !record.flag.empty();
417 : }
418 0 : void setPolType(casacore::String const &poltype) {
419 : POST_START;
420 :
421 0 : poltype_ = poltype;
422 0 : if (poltype_ == "linear") {
423 0 : get_chunk_ = [&](MSDataRecord &r) {return DataChunk::getLinear(r);};
424 0 : get_num_pol_ = [&]() {return DataChunk::getNumPolLinear();};
425 0 : valid_pcorr_.resize(4);
426 0 : valid_pcorr_[0] = casacore::Stokes::XX;
427 0 : valid_pcorr_[1] = casacore::Stokes::XY;
428 0 : valid_pcorr_[2] = casacore::Stokes::YX;
429 0 : valid_pcorr_[3] = casacore::Stokes::YY;
430 0 : } else if (poltype_ == "circular") {
431 0 : get_chunk_ = [&](MSDataRecord &r) {return DataChunk::getCircular(r);};
432 0 : get_num_pol_ = [&]() {return DataChunk::getNumPolCircular();};
433 0 : valid_pcorr_.resize(4);
434 0 : valid_pcorr_[0] = casacore::Stokes::RR;
435 0 : valid_pcorr_[1] = casacore::Stokes::RL;
436 0 : valid_pcorr_[2] = casacore::Stokes::LR;
437 0 : valid_pcorr_[3] = casacore::Stokes::LL;
438 0 : } else if (poltype_ == "stokes") {
439 0 : get_chunk_ = [&](MSDataRecord &r) {return DataChunk::getStokes(r);};
440 0 : get_num_pol_ = [&]() {return DataChunk::getNumPolStokes();};
441 0 : valid_pcorr_.resize(4);
442 0 : valid_pcorr_[0] = casacore::Stokes::I;
443 0 : valid_pcorr_[1] = casacore::Stokes::Q;
444 0 : valid_pcorr_[2] = casacore::Stokes::U;
445 0 : valid_pcorr_[3] = casacore::Stokes::V;
446 0 : } else if (poltype_ == "linpol") {
447 0 : get_chunk_ = [&](MSDataRecord &r) {return DataChunk::getLinpol(r);};
448 0 : get_num_pol_ = [&]() {return DataChunk::getNumPolLinpol();};
449 0 : valid_pcorr_.resize(2);
450 0 : valid_pcorr_[0] = casacore::Stokes::Plinear;
451 0 : valid_pcorr_[1] = casacore::Stokes::Pangle;
452 : } else {
453 0 : throw casacore::AipsError(
454 0 : casacore::String("Invalid poltype") + poltype);
455 : }
456 :
457 : POST_END;
458 0 : }
459 : size_t const num_pol_max_;
460 : size_t num_chan_;
461 : casacore::Matrix<casacore::Float> data_;
462 : casacore::Matrix<casacore::Bool> flag_;
463 : casacore::Vector<casacore::Bool> flag_row_;
464 : casacore::Matrix<casacore::Float> tsys_;
465 : casacore::Matrix<casacore::Float> tcal_;
466 : casacore::Vector<casacore::Float> weight_;
467 : casacore::Vector<casacore::Float> sigma_;
468 :
469 : casacore::String poltype_;
470 : casacore::Vector<casacore::Int> valid_pcorr_;
471 : std::vector<casacore::Stokes::StokesTypes> pcorr_type_;
472 : std::function<bool(MSDataRecord &)> get_chunk_;
473 : std::function<casacore::uInt()> get_num_pol_;
474 :
475 : // Tsys and Tcal assignment for 2 & 4 pols. Auto-correlation components of pol should be used.
476 0 : void setTsys2(MSDataRecord &record, std::vector<size_t> order = { }) {
477 : // safeguard to avoid processing empty array
478 0 : if (tsys_.empty()) {
479 0 : return;
480 : }
481 :
482 0 : assert(tsys_.ncolumn() == num_pol_max_);
483 0 : if (!setAndCheckOrder(2, tsys_.ncolumn() - 1, order)) {
484 0 : throw casacore::AipsError("got invalid order list");
485 : }
486 0 : size_t apol0 = order[0], apol1 = order[order.size() - 1];
487 :
488 : // clear Tsys
489 0 : record.setTsysSize(0, 0);
490 :
491 0 : if (num_chan_ == 1) {
492 0 : record.setTsysSize(2, 1);
493 0 : record.tsys(0, 0) = tsys_(0, apol0);
494 0 : record.tsys(1, 0) = tsys_(0, apol1);
495 0 : } else if (casacore::anyGT(tsys_, 0.0f)) {
496 : // valid spectral Tsys data are available
497 0 : casacore::IPosition const startpos0(2, 1, apol0);
498 0 : casacore::IPosition const endpos0(2, num_chan_ - 1, apol0);
499 0 : casacore::IPosition const startpos1(2, 1, apol1);
500 0 : casacore::IPosition const endpos1(2, num_chan_ - 1, apol1);
501 0 : if (casacore::anyGT(tsys_(startpos0, endpos0), 0.0f)
502 0 : || casacore::anyGT(tsys_(startpos1, endpos1), 0.0f)) {
503 : // spectral Tsys
504 0 : record.setTsysSize(2, num_chan_);
505 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix2>(
506 0 : num_chan_, 0, tsys_, record.tsys, { apol0, apol1 });
507 : } else {
508 : // valid Tsys data are only for channel 0
509 : // so treat as scalar Tsys
510 0 : record.setTsysSize(2, 1);
511 0 : record.tsys(0, 0) = tsys_(0, apol0);
512 0 : record.tsys(1, 0) = tsys_(0, apol1);
513 : }
514 0 : }
515 : }
516 :
517 0 : void setTcal2(MSDataRecord &record, std::vector<size_t> order = { }) {
518 : // safeguard to avoid processing empty array
519 0 : if (tcal_.empty()) {
520 0 : return;
521 : }
522 :
523 0 : assert(tcal_.ncolumn() == num_pol_max_);
524 0 : if (!setAndCheckOrder(2, tcal_.ncolumn()-1, order)) {
525 0 : throw casacore::AipsError("got invalid order list");
526 : }
527 0 : size_t apol0 = order[0], apol1 = order[order.size() - 1];
528 :
529 : // clear Tcal
530 0 : record.setTcalSize(0, 0);
531 :
532 0 : if (num_chan_ == 1) {
533 0 : record.setTcalSize(2, 1);
534 0 : record.tcal(0, 0) = tcal_(0, apol0);
535 0 : record.tcal(1, 0) = tcal_(0, apol1);
536 0 : } else if (casacore::anyGT(tcal_, 0.0f)) {
537 : // valid spectral Tcal data are available
538 0 : casacore::IPosition const startpos0(2, 1, apol0);
539 0 : casacore::IPosition const endpos0(2, num_chan_ - 1, apol0);
540 0 : casacore::IPosition const startpos1(2, 1, apol1);
541 0 : casacore::IPosition const endpos1(2, num_chan_ - 1, apol1);
542 0 : if (casacore::anyGT(tcal_(startpos0, endpos0), 0.0f)
543 0 : || casacore::anyGT(tcal_(startpos1, endpos1), 0.0f)) {
544 : // spectral Tcal
545 0 : record.setTcalSize(2, num_chan_);
546 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix2>(
547 0 : num_chan_, 0, tcal_, record.tcal, { apol0, apol1 });
548 : } else {
549 : // valid Tcal data are only for channel 0
550 : // so treat as scalar Tcal
551 0 : record.setTcalSize(2, 1);
552 0 : record.tcal(0, 0) = tcal_(0, apol0);
553 0 : record.tcal(1, 0) = tcal_(0, apol1);
554 : }
555 0 : }
556 : }
557 :
558 0 : void setTsys1(MSDataRecord &record, ssize_t start_src) {
559 0 : if (tsys_.empty()) {
560 : // no Tsys data. do nothing.
561 0 : return;
562 0 : } else if (num_chan_ == 1) {
563 0 : record.setTsysSize(1, 1);
564 0 : record.tsys(0, 0) = tsys_(0, start_src);
565 0 : } else if (casacore::anyGT(tsys_, 0.0f)) {
566 0 : casacore::IPosition const startpos(2, 1, start_src);
567 0 : casacore::IPosition const endpos(2, num_chan_ - 1, start_src);
568 0 : if (casacore::anyGT(tsys_(startpos, endpos), 0.0f)) {
569 : // should be spectral Tsys
570 0 : record.setTsysSize(1, num_chan_);
571 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix1>(num_chan_,
572 0 : start_src, tsys_, record.tsys);
573 : } else {
574 : // valid Tsys data are only for channel 0
575 : // so treat as scalar Tsys
576 0 : record.setTsysSize(1, 1);
577 0 : record.tsys(0, 0) = tsys_(0, start_src);
578 : }
579 0 : }
580 : }
581 :
582 0 : void setTcal1(MSDataRecord &record, ssize_t start_src) {
583 0 : if (tcal_.empty()) {
584 : // no Tcal data. do nothing.
585 0 : return;
586 0 : } else if (num_chan_ == 1) {
587 0 : record.setTcalSize(1, 1);
588 0 : record.tcal(0, 0) = tcal_(0, start_src);
589 0 : } else if (casacore::anyGT(tcal_, 0.0f)) {
590 0 : casacore::IPosition const startpos(2, 1, start_src);
591 0 : casacore::IPosition const endpos(2, num_chan_ - 1, start_src);
592 0 : if (casacore::anyGT(tcal_(startpos, endpos), 0.0f)) {
593 : // should be spectral Tcal
594 0 : record.setTcalSize(1, num_chan_);
595 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix1>(num_chan_,
596 0 : start_src, tcal_, record.tcal);
597 : } else {
598 : // valid Tcal data are only for channel 0
599 : // so treat as scalar Tcal
600 0 : record.setTcalSize(1, 1);
601 0 : record.tcal(0, 0) = tcal_(0, start_src);
602 : }
603 0 : }
604 : }
605 :
606 0 : bool getLinear(MSDataRecord &record) {
607 : POST_START;
608 :
609 0 : casacore::Vector<casacore::Int> const col_stokes(pcorr_type_);
610 0 : std::vector<size_t> pol_idx_order;
611 0 : getSortIndex(col_stokes, pol_idx_order);
612 :
613 0 : casacore::Vector<casacore::Float> weight;
614 0 : casacore::Vector<casacore::Float> sigma;
615 0 : if (isFullPol()) {
616 : // POL 0, 1, 2, and 3
617 : // std::cout << "set data/flag" << std::endl;
618 0 : record.setComplex();
619 0 : record.setDataSize(4, num_chan_);
620 0 : shuffleTransposeMatrix4F2C(num_chan_, data_, record.complex_data,
621 : pol_idx_order);
622 0 : shuffleTransposeMatrix<casacore::Bool, ExecuteMatrix4X>(num_chan_,
623 0 : 0, flag_, record.flag, pol_idx_order);
624 0 : record.flag_row = anyEQ(flag_row_, true);
625 : // std::cout << "weight = " << record.weight << std::endl;
626 :
627 : // std::cout << "set tsys" << std::endl;
628 0 : setTsys2(record, pol_idx_order);
629 :
630 : // std::cout << "set tcal " << tcal_ << std::endl;
631 0 : setTcal2(record, pol_idx_order);
632 :
633 0 : record.num_pol = 4;
634 0 : } else if (isDualPol()) {
635 : // POL 0 and 1
636 : // std::cout << "set data/flag" << std::endl;
637 0 : record.setFloat();
638 0 : record.setDataSize(2, num_chan_);
639 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix2>(num_chan_,
640 0 : 0, data_, record.float_data, pol_idx_order);
641 0 : shuffleTransposeMatrix<casacore::Bool, ExecuteMatrix2>(num_chan_, 0,
642 0 : flag_, record.flag, pol_idx_order);
643 0 : record.flag_row = flag_row_[0] || flag_row_[1];
644 : // std::cout << "weight = " << record.weight << std::endl;
645 :
646 : // std::cout << "set tsys" << std::endl;
647 0 : setTsys2(record, pol_idx_order);
648 :
649 : // std::cout << "set tcal " << tcal_ << std::endl;
650 0 : setTcal2(record, pol_idx_order);
651 :
652 0 : record.num_pol = 2;
653 0 : } else if (isSinglePol0()) {
654 : // only POL 0
655 : // std::cout << "set data/flag (pol 0)" << std::endl;
656 0 : record.setFloat();
657 0 : record.setDataSize(1, num_chan_);
658 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix1>(num_chan_,
659 0 : 0, data_, record.float_data, pol_idx_order);
660 0 : shuffleTransposeMatrix<casacore::Bool, ExecuteMatrix1>(num_chan_, 0,
661 0 : flag_, record.flag, pol_idx_order);
662 0 : record.flag_row = flag_row_(0);
663 :
664 0 : setTsys1(record, 0);
665 :
666 : // std::cout << "set tcal " << tcal_ << std::endl;
667 0 : setTcal1(record, 0);
668 :
669 0 : record.num_pol = 1;
670 0 : } else if (isSinglePol1()) {
671 : // only POL 1
672 : // std::cout << "set data/flag (pol 1)" << std::endl;
673 0 : record.setFloat();
674 0 : record.setDataSize(1, num_chan_);
675 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix1>(num_chan_,
676 0 : 0, data_, record.float_data, pol_idx_order);
677 0 : shuffleTransposeMatrix<casacore::Bool, ExecuteMatrix1>(num_chan_, 0,
678 0 : flag_, record.flag, pol_idx_order);
679 0 : record.flag_row = flag_row_(1);
680 :
681 0 : setTsys1(record, 0);
682 :
683 : // std::cout << "set tcal " << tcal_ << std::endl;
684 0 : setTcal1(record, 0);
685 :
686 0 : record.num_pol = 1;
687 0 : } else if (pcorr_type_.size() == 0) {
688 : // std::cout << "DataChunk is not ready for get" << std::endl;
689 0 : return false;
690 : } else {// means strange combination of polarization types in DataChunk
691 0 : throw AipsError("non-conforming combination of polarization accumulated");
692 : }
693 0 : for (casacore::Int i = 0; i < record.num_pol; ++i) {
694 0 : record.corr_type[i] = col_stokes[pol_idx_order[i]];
695 : }
696 :
697 : POST_END;
698 0 : return true;
699 0 : }
700 :
701 0 : bool getCircular(MSDataRecord &record) {
702 0 : return getLinear(record);
703 : }
704 :
705 0 : bool getStokes(MSDataRecord &record) {
706 : POST_START;
707 :
708 0 : casacore::Vector<casacore::Int> const col_stokes(pcorr_type_);
709 0 : std::vector<size_t> pol_idx_order;
710 0 : getSortIndex(col_stokes, pol_idx_order);
711 :
712 0 : record.setFloat();
713 0 : if (isFullPol()) {
714 0 : record.setDataSize(4, num_chan_);
715 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix4>(num_chan_,
716 0 : 0, data_, record.float_data, pol_idx_order);
717 0 : shuffleTransposeMatrix<casacore::Bool, ExecuteMatrix4>(num_chan_, 0,
718 0 : flag_, record.flag, pol_idx_order);
719 0 : record.flag_row = anyTrue(flag_row_);
720 :
721 0 : record.num_pol = 4;
722 0 : } else if (isSinglePol0()) {
723 0 : record.setDataSize(1, num_chan_);
724 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix1>(num_chan_,
725 0 : 0, data_, record.float_data, pol_idx_order);
726 0 : shuffleTransposeMatrix<casacore::Bool, ExecuteMatrix1>(num_chan_, 0,
727 0 : flag_, record.flag, pol_idx_order);
728 0 : record.flag_row = flag_row_[0];
729 :
730 0 : record.num_pol = 1;
731 0 : } else if (pcorr_type_.size() == 0) {
732 0 : return false;
733 : } else {// means strange combination of polarization types in DataChunk
734 0 : throw AipsError("non-conforming combination of polarization accumulated");
735 : }
736 0 : for (casacore::Int i = 0; i < record.num_pol; ++i) {
737 0 : record.corr_type[i] = col_stokes[pol_idx_order[i]];
738 : }
739 :
740 : POST_END;
741 0 : return true;
742 0 : }
743 :
744 0 : bool getLinpol(MSDataRecord &record) {
745 : POST_START;
746 :
747 0 : casacore::Vector<casacore::Int> const col_stokes(pcorr_type_);
748 0 : std::vector<size_t> pol_idx_order;
749 0 : getSortIndex(col_stokes, pol_idx_order);
750 :
751 0 : record.setFloat();
752 0 : if (isDualPol()) {
753 : // POL 0 and 1
754 0 : record.setDataSize(2, num_chan_);
755 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix2>(num_chan_,
756 0 : 0, data_, record.float_data, pol_idx_order);
757 0 : shuffleTransposeMatrix<casacore::Bool, ExecuteMatrix2>(num_chan_, 0,
758 0 : flag_, record.flag, pol_idx_order);
759 0 : record.flag_row = flag_row_[0] || flag_row_[1];
760 0 : record.num_pol = 2;
761 0 : } else if (isSinglePol0()) {
762 0 : record.setDataSize(1, num_chan_);
763 0 : shuffleTransposeMatrix<casacore::Float, ExecuteMatrix1>(num_chan_,
764 0 : 0, data_, record.float_data, pol_idx_order);
765 0 : shuffleTransposeMatrix<casacore::Bool, ExecuteMatrix1>(num_chan_, 0,
766 0 : flag_, record.flag, pol_idx_order);
767 0 : record.flag_row = flag_row_[0];
768 :
769 0 : record.num_pol = 1;
770 0 : } else if (pcorr_type_.size() == 0) {
771 0 : return false;
772 : } else {// means strange combination of polarization types in DataChunk
773 0 : throw AipsError("non-conforming combination of polarization accumulated");
774 : }
775 0 : for (casacore::Int i = 0; i < record.num_pol; ++i) {
776 0 : record.corr_type[i] = col_stokes[pol_idx_order[i]];
777 : }
778 :
779 : POST_END;
780 0 : return true;
781 0 : }
782 :
783 0 : casacore::uInt getNumPolLinear() const {
784 0 : if (isFullPol()) {
785 0 : return 4;
786 0 : } else if (isDualPol()) {
787 0 : return 2;
788 0 : } else if (isSinglePol0() || isSinglePol1()) {
789 0 : return 1;
790 : } else {
791 0 : return 0;
792 : }
793 : }
794 :
795 0 : casacore::uInt getNumPolCircular() const {
796 0 : return getNumPolLinear();
797 : }
798 :
799 0 : casacore::uInt getNumPolStokes() const {
800 0 : if (isFullPol()) {
801 0 : return 4;
802 0 : } else if (isSinglePol0()) {
803 0 : return 1;
804 : } else {
805 0 : return 0;
806 : }
807 : }
808 :
809 0 : casacore::uInt getNumPolLinpol() const {
810 0 : if (isDualPol()) {
811 0 : return 2;
812 0 : } else if (isSinglePol0()) {
813 0 : return 1;
814 : } else {
815 0 : return 0;
816 : }
817 : }
818 : };
819 :
820 : class DataAccumulator {
821 : private:
822 : struct DataAccumulatorKey {
823 : casacore::Int antenna_id;
824 : casacore::Int field_id;
825 : casacore::Int feed_id;
826 : casacore::Int spw_id;
827 : casacore::String pol_type;
828 : casacore::String intent;
829 :
830 : template<class T, class C>
831 0 : bool comp(T const &a, T const &b, C const &c) const {
832 0 : if (a < b) {
833 0 : return true;
834 0 : } else if (a == b) {
835 0 : return c();
836 : } else {
837 0 : return false;
838 : }
839 : }
840 :
841 0 : bool operator()(DataAccumulatorKey const &lhs,
842 : DataAccumulatorKey const &rhs) const {
843 0 : return comp(lhs.antenna_id, rhs.antenna_id,
844 0 : [&]() {return comp(lhs.field_id, rhs.field_id,
845 0 : [&]() {return comp(lhs.feed_id, rhs.feed_id,
846 0 : [&]() {return comp(lhs.spw_id, rhs.spw_id,
847 0 : [&]() {return comp(lhs.pol_type, rhs.pol_type,
848 0 : [&]() {return comp(lhs.intent, rhs.intent,
849 0 : []() {return false;});});});});});});
850 : }
851 : };
852 :
853 : public:
854 0 : DataAccumulator() :
855 0 : pool_(), antenna_id_(), spw_id_(), field_id_(), feed_id_(), scan_(), subscan_(), intent_(), direction_(), interval_(), indexer_(), time_(
856 0 : -1.0), is_free_() {
857 0 : }
858 :
859 0 : virtual ~DataAccumulator() {
860 : POST_START;
861 :
862 0 : for (auto iter = pool_.begin(); iter != pool_.end(); ++iter) {
863 0 : delete *iter;
864 : }
865 :
866 : POST_END;
867 0 : }
868 :
869 0 : size_t getNumberOfChunks() const {
870 0 : return pool_.size();
871 : }
872 :
873 : size_t getNumberOfActiveChunks() const {
874 : return std::count_if(pool_.begin(), pool_.end(),
875 : [](DataChunk * const &c) {
876 : return c->getNumPol() > 0;
877 : });
878 : }
879 :
880 : bool queryForGet(DataRecord const &record) const {
881 : casacore::Double const time = record.time;
882 : bool is_ready = (0.0 <= time_) && !(time_ == time);
883 : return is_ready;
884 : }
885 :
886 0 : bool queryForGet(casacore::Double const &time) const {
887 0 : bool is_ready = (0.0 <= time_) && !(time_ == time);
888 0 : return is_ready;
889 : }
890 :
891 0 : void clear() {
892 0 : for (auto iter = pool_.begin(); iter != pool_.end(); ++iter) {
893 0 : (*iter)->clear();
894 : }
895 0 : time_ = -1.0;
896 0 : }
897 :
898 0 : bool get(size_t ichunk, MSDataRecord &record) {
899 : POST_START;
900 :
901 0 : if (pool_.size() == 0) {
902 0 : return false;
903 0 : } else if (ichunk >= pool_.size()) {
904 0 : return false;
905 : }
906 0 : bool status = pool_[ichunk]->get(record);
907 : // std::cout << "get Chunk status = " << status << std::endl;
908 0 : if (!status) {
909 0 : record.clear();
910 0 : return status;
911 : }
912 0 : record.time = time_;
913 0 : record.pol_type = pool_[ichunk]->getPolType();
914 0 : record.antenna_id = antenna_id_[ichunk];
915 0 : record.spw_id = spw_id_[ichunk];
916 0 : record.field_id = field_id_[ichunk];
917 0 : record.feed_id = feed_id_[ichunk];
918 0 : record.scan = scan_[ichunk];
919 0 : record.subscan = subscan_[ichunk];
920 0 : record.intent = intent_[ichunk];
921 0 : record.direction = direction_[ichunk];
922 0 : record.interval = interval_[ichunk];
923 0 : record.temperature = temperature_[ichunk];
924 0 : record.pressure = pressure_[ichunk];
925 0 : record.rel_humidity = rel_humidity_[ichunk];
926 0 : record.wind_speed = wind_speed_[ichunk];
927 0 : record.wind_direction = wind_direction_[ichunk];
928 :
929 : POST_END;
930 0 : return status;
931 : }
932 :
933 0 : bool accumulate(DataRecord const &record) {
934 : POST_START;
935 :
936 0 : if (!isValidRecord(record)) {
937 : // std::cout << "record is not a valid one" << std::endl;
938 0 : return false;
939 : }
940 :
941 0 : casacore::Double time = record.time;
942 0 : if (time_ < 0.0) {
943 0 : time_ = time;
944 : }
945 0 : if (time_ != time) {
946 : // std::cout << "timestamp mismatch" << std::endl;
947 0 : return false;
948 : }
949 0 : casacore::Int antennaid = record.antenna_id;
950 0 : casacore::Int spwid = record.spw_id;
951 0 : casacore::Int fieldid = record.field_id;
952 0 : casacore::Int feedid = record.feed_id;
953 0 : casacore::Int scan = record.scan;
954 0 : casacore::Int subscan = record.subscan;
955 0 : casacore::String intent = record.intent;
956 0 : casacore::String poltype = record.pol_type;
957 0 : DataAccumulatorKey key;
958 0 : key.antenna_id = record.antenna_id;
959 0 : key.field_id = record.field_id;
960 0 : key.feed_id = record.feed_id;
961 0 : key.spw_id = record.spw_id;
962 0 : key.intent = record.intent;
963 0 : key.pol_type = record.pol_type;
964 0 : casacore::Matrix<casacore::Double> const &direction = record.direction;
965 0 : casacore::Double interval = record.interval;
966 0 : casacore::Float temperature = record.temperature;
967 0 : casacore::Float pressure = record.pressure;
968 0 : casacore::Float rel_humidity = record.rel_humidity;
969 0 : casacore::Float wind_speed = record.wind_speed;
970 0 : casacore::Float wind_direction = record.wind_direction;
971 0 : bool status = false;
972 0 : auto iter = indexer_.find(key);
973 : // std::cout << "(ant, spw, pol, pol_type, field, feed, intent) = ("
974 : // << key.antenna_id << ", " << key.spw_id << ", " << record.pol<< ", " << key.pol_type << ", " << key.field_id << ", " << key.feed_id
975 : // << ", " << key.intent << ", "
976 : // << std::endl;
977 0 : if (iter != indexer_.end()) {
978 0 : casacore::uInt index = iter->second;
979 0 : status = pool_[index]->accumulate(record);
980 0 : if (status) {
981 0 : antenna_id_[index] = antennaid;
982 0 : spw_id_[index] = spwid;
983 0 : field_id_[index] = fieldid;
984 0 : feed_id_[index] = feedid;
985 0 : scan_[index] = scan;
986 0 : subscan_[index] = subscan;
987 0 : intent_[index] = intent;
988 0 : direction_[index].assign(direction);
989 0 : interval_[index] = interval;
990 0 : temperature_[index] = temperature;
991 0 : pressure_[index] = pressure;
992 0 : rel_humidity_[index] = rel_humidity;
993 0 : wind_speed_[index] = wind_speed;
994 0 : wind_direction_[index] = wind_direction;
995 : }
996 : } else {
997 0 : pool_.push_back(new DataChunk(poltype));
998 0 : antenna_id_.push_back(-1);
999 0 : spw_id_.push_back(-1);
1000 0 : field_id_.push_back(-1);
1001 0 : feed_id_.push_back(-1);
1002 0 : scan_.push_back(-1);
1003 0 : subscan_.push_back(-1);
1004 0 : intent_.push_back("");
1005 0 : direction_.push_back(casacore::Matrix<casacore::Double>());
1006 0 : interval_.push_back(-1.0);
1007 0 : temperature_.push_back(0.0f);
1008 0 : pressure_.push_back(0.0f);
1009 0 : rel_humidity_.push_back(0.0f);
1010 0 : wind_speed_.push_back(0.0f);
1011 0 : wind_direction_.push_back(0.0f);
1012 0 : casacore::uInt index = pool_.size() - 1;
1013 0 : indexer_[key] = index;
1014 0 : status = pool_[index]->accumulate(record);
1015 0 : if (status) {
1016 0 : antenna_id_[index] = antennaid;
1017 0 : spw_id_[index] = spwid;
1018 0 : field_id_[index] = fieldid;
1019 0 : feed_id_[index] = feedid;
1020 0 : scan_[index] = scan;
1021 0 : subscan_[index] = subscan;
1022 0 : intent_[index] = intent;
1023 0 : direction_[index].assign(direction);
1024 0 : interval_[index] = interval;
1025 0 : temperature_[index] = temperature;
1026 0 : pressure_[index] = pressure;
1027 0 : rel_humidity_[index] = rel_humidity;
1028 0 : wind_speed_[index] = wind_speed;
1029 0 : wind_direction_[index] = wind_direction;
1030 : }
1031 : }
1032 :
1033 : // std::cout << "status = " << status << std::endl;
1034 : // std::cout << "key (a" << key.antenna_id << ",f" << key.field_id << ",s"
1035 : // << key.spw_id << ",i" << key.intent << ",p" << key.pol_type << ",d"
1036 : // << key.feed_id << "(index " << indexer_[key] << "): TIME="
1037 : // << time_ << " INTERVAL=" << interval << " polno=" << record.pol << std::endl;
1038 : POST_END;
1039 0 : return status;
1040 0 : }
1041 :
1042 : casacore::String getPolType(size_t ichunk) const {
1043 : assert(ichunk < pool_.size());
1044 : return pool_[ichunk]->getPolType();
1045 : }
1046 :
1047 : casacore::uInt getNumPol(size_t ichunk) const {
1048 : assert(ichunk < pool_.size());
1049 : return pool_[ichunk]->getNumPol();
1050 : }
1051 :
1052 : private:
1053 0 : bool isValidRecord(DataRecord const &record) {
1054 : // std::cout << record.time << " " << record.interval << " "
1055 : // << record.antenna_id << " " << record.field_id << " " << record.feed_id
1056 : // << " " << record.spw_id << " " << record.scan << " " << record.subscan
1057 : // << " " << record.direction << std::endl;
1058 0 : return record.time > 0.0 && record.interval > 0.0
1059 0 : && record.antenna_id >= 0 && record.field_id >= 0
1060 0 : && record.feed_id >= 0 && record.spw_id >= 0 && record.scan >= 0
1061 0 : && record.subscan >= 0 && !record.direction.empty();
1062 : }
1063 :
1064 : std::vector<DataChunk *> pool_;
1065 : std::vector<casacore::Int> antenna_id_;
1066 : std::vector<casacore::Int> spw_id_;
1067 : std::vector<casacore::Int> field_id_;
1068 : std::vector<casacore::Int> feed_id_;
1069 : std::vector<casacore::Int> scan_;
1070 : std::vector<casacore::Int> subscan_;
1071 : std::vector<casacore::String> intent_;
1072 : std::vector<casacore::Matrix<casacore::Double> > direction_;
1073 : std::vector<casacore::Double> interval_;
1074 : std::vector<casacore::Float> temperature_;
1075 : std::vector<casacore::Float> pressure_;
1076 : std::vector<casacore::Float> rel_humidity_;
1077 : std::vector<casacore::Float> wind_speed_;
1078 : std::vector<casacore::Float> wind_direction_;
1079 : std::map<DataAccumulatorKey, casacore::uInt, DataAccumulatorKey> indexer_;
1080 : casacore::Double time_;
1081 : std::vector<bool> is_free_;
1082 : };
1083 :
1084 : } //# NAMESPACE SDFILLER - END
1085 : } //# NAMESPACE CASA - END
1086 :
1087 : #endif /* SINGLEDISH_FILLER_DATAACCUMULATOR_H_ */
|